# Chapter 13: Advanced File Processing and Data Handling 📊

## 🎯 **Learning Objectives**
Master advanced Python file processing for VLSI automation:

### **Advanced File Operations**
- Directory operations and batch processing
- Large file handling and streaming
- File monitoring and change detection
- Backup and versioning strategies

### **Data Processing Techniques**
- **Log Analysis**: EDA tool log parsing and error extraction
- **Report Aggregation**: Combining data from multiple sources
- **Format Conversion**: Converting between tool formats
- **Data Validation**: Content verification and integrity checking

### **Automation Workflows**
- Batch file processing pipelines
- Automated report generation
- File system monitoring
- Configuration management

---

## 🔧 **Why Advanced File Processing Matters**
Complex VLSI projects require sophisticated file handling:
- **Scale**: Processing thousands of design files
- **Integration**: Connecting multiple EDA tools
- **Quality**: Ensuring data integrity across tool flows
- **Efficiency**: Automating repetitive file operations
- **Reliability**: Robust error handling and recovery

In [None]:
# DIRECTORY OPERATIONS AND FILE MANAGEMENT
# ========================================
# Advanced directory handling and batch file operations

print("📂 DIRECTORY OPERATIONS AND FILE MANAGEMENT")
print("=" * 45)

import shutil
import glob
from datetime import datetime
import logging
import os
import tempfile
from pathlib import Path
import re

# =============================================================================
# DIRECTORY OPERATIONS FOR VLSI PROJECTS
# =============================================================================

print("\n📁 DIRECTORY OPERATIONS FOR VLSI PROJECTS:")

def create_vlsi_project_structure(project_path, project_name):
    """Create standard VLSI project directory structure."""
    base_path = Path(project_path) / project_name

    # Define standard directory structure
    directories = [
        'src/rtl',
        'src/constraints',
        'src/testbench',
        'scripts/synthesis',
        'scripts/place_route',
        'scripts/verification',
        'results/synthesis',
        'results/place_route',
        'results/timing',
        'results/power',
        'reports',
        'docs',
        'libs',
        'work'
    ]

    created_dirs = []
    try:
        for dir_path in directories:
            full_path = base_path / dir_path
            full_path.mkdir(parents=True, exist_ok=True)
            created_dirs.append(str(full_path.relative_to(base_path)))

        print(f"     ✅ Created project structure for '{project_name}'")
        print(f"     📁 Base path: {base_path}")
        print(f"     📂 Created {len(created_dirs)} directories")
        return str(base_path), created_dirs

    except Exception as e:
        print(f"     ❌ Error creating project structure: {e}")
        return None, []

def find_files_by_pattern(directory, pattern, recursive=True):
    """Find files matching pattern in directory."""
    search_path = Path(directory)

    if not search_path.exists():
        print(f"     ❌ Directory not found: {directory}")
        return []

    try:
        if recursive:
            # Use ** for recursive search
            files = list(search_path.glob(f"**/{pattern}"))
        else:
            files = list(search_path.glob(pattern))

        # Convert to relative paths for better display
        relative_files = [f.relative_to(search_path) for f in files]

        print(f"     🔍 Found {len(files)} files matching '{pattern}'")
        return [str(f) for f in relative_files]

    except Exception as e:
        print(f"     ❌ Error searching files: {e}")
        return []

def copy_files_by_type(source_dir, dest_dir, file_extensions):
    """Copy files with specific extensions from source to destination."""
    source_path = Path(source_dir)
    dest_path = Path(dest_dir)

    # Create destination directory
    dest_path.mkdir(parents=True, exist_ok=True)

    copied_files = []

    try:
        for ext in file_extensions:
            pattern = f"*.{ext}"
            files = list(source_path.glob(f"**/{pattern}"))

            for file_path in files:
                # Maintain relative directory structure
                relative_path = file_path.relative_to(source_path)
                dest_file = dest_path / relative_path

                # Create parent directories if needed
                dest_file.parent.mkdir(parents=True, exist_ok=True)

                # Copy file
                shutil.copy2(file_path, dest_file)
                copied_files.append(str(relative_path))

        print(f"     ✅ Copied {len(copied_files)} files ({', '.join(file_extensions)})")
        return copied_files

    except Exception as e:
        print(f"     ❌ Error copying files: {e}")
        return []

# Test directory operations
with tempfile.TemporaryDirectory() as temp_dir:
    temp_path = Path(temp_dir)

    print("   Testing directory operations:")

    # Create project structure
    project_base, dirs = create_vlsi_project_structure(temp_dir, "cpu_design")

    if project_base:
        project_path = Path(project_base)

        # Create some sample files
        sample_files = {
            'src/rtl/cpu_core.v': 'module cpu_core(); endmodule',
            'src/rtl/alu.v': 'module alu(); endmodule',
            'src/constraints/timing.sdc': 'create_clock -period 10 clk',
            'scripts/synthesis/syn_script.tcl': 'read_verilog cpu_core.v',
            'results/synthesis/area.rpt': 'Area Report\n============',
            'docs/readme.txt': 'CPU Design Documentation'
        }

        for file_path, content in sample_files.items():
            full_file_path = project_path / file_path
            full_file_path.write_text(content)

        print(f"     📝 Created {len(sample_files)} sample files")

        # Test file finding
        print("\n   Testing file search:")
        verilog_files = find_files_by_pattern(project_path, "*.v")
        sdc_files = find_files_by_pattern(project_path, "*.sdc")
        report_files = find_files_by_pattern(project_path, "*.rpt")

        for files, file_type in [(verilog_files, "Verilog"), (sdc_files, "SDC"), (report_files, "Report")]:
            if files:
                print(f"     {file_type} files: {files}")

        # Test file copying
        print("\n   Testing file copying:")
        backup_dir = project_path / "backup"
        copied = copy_files_by_type(project_path / "src", backup_dir, ["v", "sdc"])

# =============================================================================
# BATCH FILE PROCESSING FUNCTIONS
# =============================================================================

print(f"\n⚙️ BATCH FILE PROCESSING FUNCTIONS:")

def process_verilog_files(file_list, output_dir):
    """Process multiple Verilog files and extract module information."""
    results = []
    output_path = Path(output_dir)
    output_path.mkdir(parents=True, exist_ok=True)

    for file_path in file_list:
        try:
            with open(file_path, 'r') as f:
                content = f.read()

            # Extract module information
            module_pattern = r'module\s+(\w+)\s*\('
            modules = re.findall(module_pattern, content)

            # Count various constructs
            always_blocks = content.count('always')
            wire_count = content.count('wire')
            reg_count = content.count('reg')

            file_info = {
                'file': Path(file_path).name,
                'modules': modules,
                'module_count': len(modules),
                'always_blocks': always_blocks,
                'wire_declarations': wire_count,
                'reg_declarations': reg_count,
                'line_count': len(content.split('\n')),
                'char_count': len(content)
            }

            results.append(file_info)

            # Create summary file
            summary_file = output_path / f"{Path(file_path).stem}_summary.txt"
            with open(summary_file, 'w') as f:
                f.write(f"Verilog File Analysis: {file_info['file']}\n")
                f.write(f"{'='*50}\n")
                f.write(f"Modules: {', '.join(file_info['modules'])}\n")
                f.write(f"Module count: {file_info['module_count']}\n")
                f.write(f"Always blocks: {file_info['always_blocks']}\n")
                f.write(f"Wire declarations: {file_info['wire_declarations']}\n")
                f.write(f"Reg declarations: {file_info['reg_declarations']}\n")
                f.write(f"Lines: {file_info['line_count']}\n")
                f.write(f"Characters: {file_info['char_count']}\n")

        except Exception as e:
            print(f"     ❌ Error processing {file_path}: {e}")
            continue

    return results

def generate_project_report(project_path, output_file):
    """Generate comprehensive project report."""
    project_path = Path(project_path)

    if not project_path.exists():
        print(f"     ❌ Project path not found: {project_path}")
        return False

    try:
        # Collect file statistics
        file_stats = {
            'verilog': len(list(project_path.glob("**/*.v"))),
            'sdc': len(list(project_path.glob("**/*.sdc"))),
            'tcl': len(list(project_path.glob("**/*.tcl"))),
            'reports': len(list(project_path.glob("**/*.rpt"))),
            'logs': len(list(project_path.glob("**/*.log"))),
            'total': 0
        }
        file_stats['total'] = sum(file_stats.values()) - file_stats['total']

        # Calculate total size
        total_size = 0
        all_files = list(project_path.glob("**/*"))
        for file_path in all_files:
            if file_path.is_file():
                total_size += file_path.stat().st_size

        # Generate report
        with open(output_file, 'w') as f:
            f.write(f"VLSI Project Report\n")
            f.write(f"{'='*50}\n")
            f.write(f"Project Path: {project_path}\n")
            f.write(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")

            f.write(f"File Statistics:\n")
            f.write(f"{'='*20}\n")
            for file_type, count in file_stats.items():
                f.write(f"{file_type.upper():15s}: {count:6d} files\n")

            f.write(f"\nProject Size:\n")
            f.write(f"{'='*15}\n")
            f.write(f"Total size: {total_size:,} bytes ({total_size/1024:.1f} KB)\n")
            f.write(f"Total files: {len([f for f in all_files if f.is_file()])} files\n")
            f.write(f"Total directories: {len([f for f in all_files if f.is_dir()])} directories\n")

        print(f"     ✅ Project report generated: {output_file}")
        return True

    except Exception as e:
        print(f"     ❌ Error generating report: {e}")
        return False

# Test batch processing
with tempfile.TemporaryDirectory() as temp_dir:
    temp_path = Path(temp_dir)

    print("   Testing batch file processing:")

    # Create test Verilog files
    test_verilog_files = {
        'cpu.v': '''module cpu (
    input wire clk,
    input wire reset,
    output reg [31:0] data
);
    reg [31:0] counter;
    wire clock_enable;

    always @(posedge clk) begin
        if (reset)
            counter <= 0;
        else
            counter <= counter + 1;
    end
endmodule''',

        'memory.v': '''module memory (
    input wire clk,
    input wire [7:0] addr,
    output reg [31:0] data
);
    reg [31:0] mem_array [0:255];

    always @(posedge clk) begin
        data <= mem_array[addr];
    end
endmodule'''
    }

    # Write test files
    verilog_files = []
    for filename, content in test_verilog_files.items():
        file_path = temp_path / filename
        file_path.write_text(content)
        verilog_files.append(str(file_path))

    # Process Verilog files
    output_dir = temp_path / "analysis"
    results = process_verilog_files(verilog_files, output_dir)

    print(f"     📊 Processed {len(results)} Verilog files")
    for result in results:
        print(f"       {result['file']}: {result['module_count']} modules, {result['line_count']} lines")

    # Generate project report
    report_file = temp_path / "project_report.txt"
    success = generate_project_report(temp_path, report_file)

print("\n🏆 DIRECTORY AND BATCH OPERATION BENEFITS:")
print("✅ **Project Organization**: Automated directory structure creation")
print("✅ **File Discovery**: Pattern-based file searching and filtering")
print("✅ **Batch Processing**: Efficient handling of multiple files")
print("✅ **Reporting**: Automated project analysis and documentation")
print("✅ **Maintenance**: File copying, backup, and cleanup operations")

In [None]:
# LOG ANALYSIS AND ERROR EXTRACTION
# =================================
# Processing EDA tool logs for insights

print("📊 LOG ANALYSIS AND ERROR EXTRACTION")
print("=" * 40)

from collections import defaultdict
import json

# =============================================================================
# EDA TOOL LOG PARSING
# =============================================================================

print("\n📝 EDA TOOL LOG PARSING:")

# Sample EDA tool log content
synthesis_log = """
Info: Starting synthesis flow
Info: Reading design files...
Info: Reading /home/project/src/cpu_core.v
Info: Reading /home/project/src/alu.v
Info: Reading /home/project/src/memory.v
Warning: Implicit wire declaration for signal 'clk_enable' in module cpu_core
Info: Elaborating design...
Info: Elaborated module cpu_core with 1250 gates
Info: Elaborated module alu with 456 gates
Info: Elaborated module memory with 2048 gates
Warning: Unconnected output port 'overflow' in module alu
Info: Starting synthesis optimization...
Info: Area optimization: 15% reduction
Info: Timing optimization: setup slack improved by 0.5ns
Error: Setup timing violation on path cpu_core/reg1 -> alu/out
Error: Hold timing violation on path memory/data -> cpu_core/reg2
Info: Power optimization: 8% reduction
Warning: High fanout on signal 'clk' (fanout = 1024)
Info: Synthesis completed
Info: Final area: 1250.5 um^2
Info: Final power: 825.3 mW
Info: Critical path delay: 9.85 ns
"""

def parse_eda_log(log_content, tool_name="Unknown"):
    """Parse EDA tool log and extract key information."""
    lines = log_content.strip().split('\n')

    log_data = {
        'tool': tool_name,
        'total_lines': len(lines),
        'messages': {
            'info': [],
            'warning': [],
            'error': []
        },
        'metrics': {},
        'files_processed': [],
        'summary': {}
    }

    for line_num, line in enumerate(lines, 1):
        line = line.strip()
        if not line:
            continue

        # Extract message type and content
        if line.startswith('Info:'):
            message = line[5:].strip()
            log_data['messages']['info'].append({
                'line': line_num,
                'message': message
            })

            # Extract metrics from info messages
            if 'Final area:' in message:
                area_match = re.search(r'Final area:\s*([\d.]+)', message)
                if area_match:
                    log_data['metrics']['area'] = float(area_match.group(1))
            elif 'Final power:' in message:
                power_match = re.search(r'Final power:\s*([\d.]+)', message)
                if power_match:
                    log_data['metrics']['power'] = float(power_match.group(1))
            elif 'Critical path delay:' in message:
                delay_match = re.search(r'Critical path delay:\s*([\d.]+)', message)
                if delay_match:
                    log_data['metrics']['delay'] = float(delay_match.group(1))
            elif 'Reading /' in message:
                file_match = re.search(r'Reading\s+(/[^\s]+)', message)
                if file_match:
                    log_data['files_processed'].append(file_match.group(1))

        elif line.startswith('Warning:'):
            message = line[8:].strip()
            log_data['messages']['warning'].append({
                'line': line_num,
                'message': message
            })

        elif line.startswith('Error:'):
            message = line[6:].strip()
            log_data['messages']['error'].append({
                'line': line_num,
                'message': message
            })

    # Generate summary
    log_data['summary'] = {
        'info_count': len(log_data['messages']['info']),
        'warning_count': len(log_data['messages']['warning']),
        'error_count': len(log_data['messages']['error']),
        'files_count': len(log_data['files_processed']),
        'has_errors': len(log_data['messages']['error']) > 0,
        'status': 'FAIL' if len(log_data['messages']['error']) > 0 else 'PASS'
    }

    return log_data

def categorize_log_messages(log_data):
    """Categorize log messages by type for better analysis."""
    categories = {
        'timing_violations': [],
        'unconnected_signals': [],
        'implicit_declarations': [],
        'optimization_results': [],
        'file_operations': []
    }

    all_messages = (
        log_data['messages']['info'] +
        log_data['messages']['warning'] +
        log_data['messages']['error']
    )

    for msg in all_messages:
        message = msg['message'].lower()

        if any(keyword in message for keyword in ['timing violation', 'slack']):
            categories['timing_violations'].append(msg)
        elif any(keyword in message for keyword in ['unconnected', 'floating']):
            categories['unconnected_signals'].append(msg)
        elif any(keyword in message for keyword in ['implicit', 'undeclared']):
            categories['implicit_declarations'].append(msg)
        elif any(keyword in message for keyword in ['optimization', 'reduction', 'improved']):
            categories['optimization_results'].append(msg)
        elif any(keyword in message for keyword in ['reading', 'writing', 'file']):
            categories['file_operations'].append(msg)

    return categories

def generate_log_summary_report(log_data, output_file):
    """Generate comprehensive log summary report."""
    try:
        with open(output_file, 'w') as f:
            f.write(f"EDA Tool Log Analysis Report\n")
            f.write(f"{'='*50}\n")
            f.write(f"Tool: {log_data['tool']}\n")
            f.write(f"Total lines processed: {log_data['total_lines']}\n")
            f.write(f"Analysis status: {log_data['summary']['status']}\n\n")

            # Message summary
            f.write(f"Message Summary:\n")
            f.write(f"{'='*20}\n")
            f.write(f"Info messages: {log_data['summary']['info_count']}\n")
            f.write(f"Warnings: {log_data['summary']['warning_count']}\n")
            f.write(f"Errors: {log_data['summary']['error_count']}\n\n")

            # Metrics
            if log_data['metrics']:
                f.write(f"Design Metrics:\n")
                f.write(f"{'='*15}\n")
                for metric, value in log_data['metrics'].items():
                    f.write(f"{metric.capitalize()}: {value}\n")
                f.write("\n")

            # Files processed
            if log_data['files_processed']:
                f.write(f"Files Processed ({len(log_data['files_processed'])}):\n")
                f.write(f"{'='*25}\n")
                for file_path in log_data['files_processed']:
                    f.write(f"  {file_path}\n")
                f.write("\n")

            # Errors (if any)
            if log_data['messages']['error']:
                f.write(f"Errors ({len(log_data['messages']['error'])}):\n")
                f.write(f"{'='*15}\n")
                for error in log_data['messages']['error']:
                    f.write(f"  Line {error['line']}: {error['message']}\n")
                f.write("\n")

            # Warnings (if any)
            if log_data['messages']['warning']:
                f.write(f"Warnings ({len(log_data['messages']['warning'])}):\n")
                f.write(f"{'='*18}\n")
                for warning in log_data['messages']['warning']:
                    f.write(f"  Line {warning['line']}: {warning['message']}\n")

        return True
    except Exception as e:
        print(f"     ❌ Error generating report: {e}")
        return False

# Test log analysis
print("   Analyzing synthesis log:")
parsed_log = parse_eda_log(synthesis_log, "Design Compiler")

print(f"     ✅ Parsed {parsed_log['total_lines']} log lines")
print(f"     Status: {parsed_log['summary']['status']}")
print(f"     Messages: {parsed_log['summary']['info_count']} info, {parsed_log['summary']['warning_count']} warnings, {parsed_log['summary']['error_count']} errors")

if parsed_log['metrics']:
    print(f"     Metrics extracted:")
    for metric, value in parsed_log['metrics'].items():
        print(f"       {metric.capitalize()}: {value}")

# Categorize messages
categories = categorize_log_messages(parsed_log)
print(f"\n   Message categorization:")
for category, messages in categories.items():
    if messages:
        print(f"     {category.replace('_', ' ').title()}: {len(messages)} messages")

# Generate report
with tempfile.TemporaryDirectory() as temp_dir:
    report_file = Path(temp_dir) / "log_analysis.txt"
    success = generate_log_summary_report(parsed_log, report_file)
    if success:
        print(f"     ✅ Log analysis report generated")

# =============================================================================
# MULTI-LOG AGGREGATION
# =============================================================================

print(f"\n📋 MULTI-LOG AGGREGATION:")

def aggregate_multiple_logs(log_files_data):
    """Aggregate data from multiple log files."""
    aggregated = {
        'total_logs': len(log_files_data),
        'overall_status': 'PASS',
        'combined_metrics': {},
        'message_totals': {
            'info': 0,
            'warning': 0,
            'error': 0
        },
        'tool_summary': {},
        'error_patterns': defaultdict(int),
        'warning_patterns': defaultdict(int)
    }

    for log_data in log_files_data:
        # Aggregate message counts
        for msg_type in ['info', 'warning', 'error']:
            aggregated['message_totals'][msg_type] += len(log_data['messages'][msg_type])

        # Check overall status
        if log_data['summary']['status'] == 'FAIL':
            aggregated['overall_status'] = 'FAIL'

        # Aggregate metrics
        for metric, value in log_data['metrics'].items():
            if metric not in aggregated['combined_metrics']:
                aggregated['combined_metrics'][metric] = []
            aggregated['combined_metrics'][metric].append(value)

        # Tool summary
        tool = log_data['tool']
        if tool not in aggregated['tool_summary']:
            aggregated['tool_summary'][tool] = {
                'logs': 0,
                'errors': 0,
                'warnings': 0
            }
        aggregated['tool_summary'][tool]['logs'] += 1
        aggregated['tool_summary'][tool]['errors'] += len(log_data['messages']['error'])
        aggregated['tool_summary'][tool]['warnings'] += len(log_data['messages']['warning'])

        # Analyze error patterns
        for error in log_data['messages']['error']:
            # Extract key words from error message
            words = error['message'].lower().split()
            key_words = [w for w in words if len(w) > 3 and w not in ['this', 'that', 'with', 'from', 'module']]
            if key_words:
                pattern = key_words[0]  # Use first significant word as pattern
                aggregated['error_patterns'][pattern] += 1

        # Analyze warning patterns
        for warning in log_data['messages']['warning']:
            words = warning['message'].lower().split()
            key_words = [w for w in words if len(w) > 3 and w not in ['this', 'that', 'with', 'from', 'module']]
            if key_words:
                pattern = key_words[0]
                aggregated['warning_patterns'][pattern] += 1

    return aggregated

# Test multi-log aggregation
sample_logs = [
    parse_eda_log(synthesis_log, "Design Compiler"),
    parse_eda_log("Info: Starting place and route\nWarning: High congestion in region\nInfo: Completed successfully", "Innovus"),
    parse_eda_log("Info: Starting timing analysis\nError: Setup violation found\nInfo: Analysis complete", "PrimeTime")
]

aggregated_data = aggregate_multiple_logs(sample_logs)

print("   Multi-log aggregation results:")
print(f"     Total logs analyzed: {aggregated_data['total_logs']}")
print(f"     Overall status: {aggregated_data['overall_status']}")
print(f"     Combined messages: {aggregated_data['message_totals']['info']} info, {aggregated_data['message_totals']['warning']} warnings, {aggregated_data['message_totals']['error']} errors")

print(f"\n   Tool summary:")
for tool, summary in aggregated_data['tool_summary'].items():
    print(f"     {tool}: {summary['logs']} logs, {summary['errors']} errors, {summary['warnings']} warnings")

if aggregated_data['error_patterns']:
    print(f"\n   Common error patterns:")
    for pattern, count in sorted(aggregated_data['error_patterns'].items(), key=lambda x: x[1], reverse=True)[:3]:
        print(f"     '{pattern}': {count} occurrences")

print("\n🏆 LOG ANALYSIS BENEFITS:")
print("✅ **Automated Monitoring**: Parse tool logs without manual review")
print("✅ **Error Detection**: Quickly identify and categorize issues")
print("✅ **Trend Analysis**: Track error patterns across runs")
print("✅ **Quality Metrics**: Extract performance data from logs")
print("✅ **Reporting**: Generate comprehensive analysis reports")

In [None]:
# LARGE FILE PROCESSING AND STREAMING
# ===================================
# Efficient handling of large VLSI files

print("💾 LARGE FILE PROCESSING AND STREAMING")
print("=" * 40)

import mmap
import itertools

# =============================================================================
# STREAMING FILE PROCESSING
# =============================================================================

print("\n🌊 STREAMING FILE PROCESSING:")

def process_large_netlist_streaming(file_path, chunk_size=8192):
    """Process large netlist file using streaming to manage memory."""
    statistics = {
        'total_lines': 0,
        'module_count': 0,
        'instance_count': 0,
        'wire_count': 0,
        'assign_count': 0,
        'file_size': 0
    }

    try:
        # Get file size
        file_path = Path(file_path)
        statistics['file_size'] = file_path.stat().st_size

        print(f"     📁 Processing file: {file_path.name} ({statistics['file_size']:,} bytes)")

        # Process file in chunks
        with open(file_path, 'r', encoding='utf-8') as f:
            buffer = ""

            while True:
                chunk = f.read(chunk_size)
                if not chunk:
                    break

                buffer += chunk

                # Process complete lines
                while '\n' in buffer:
                    line, buffer = buffer.split('\n', 1)
                    line = line.strip()

                    if line:
                        statistics['total_lines'] += 1

                        # Count different constructs
                        if line.startswith('module '):
                            statistics['module_count'] += 1
                        elif 'wire ' in line:
                            statistics['wire_count'] += 1
                        elif 'assign ' in line:
                            statistics['assign_count'] += 1
                        elif re.match(r'\s*\w+\s+\w+\s*\(', line):  # Instance pattern
                            statistics['instance_count'] += 1

            # Process remaining buffer
            if buffer.strip():
                statistics['total_lines'] += 1

        print(f"     ✅ Streaming processing completed")
        print(f"       Lines: {statistics['total_lines']:,}")
        print(f"       Modules: {statistics['module_count']}")
        print(f"       Instances: {statistics['instance_count']}")
        print(f"       Wires: {statistics['wire_count']}")
        print(f"       Assigns: {statistics['assign_count']}")

        return statistics

    except Exception as e:
        print(f"     ❌ Error in streaming processing: {e}")
        return None

def search_large_file_patterns(file_path, patterns, max_matches=100):
    """Search for patterns in large files efficiently."""
    matches = {pattern: [] for pattern in patterns}

    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            for line_num, line in enumerate(f, 1):
                line = line.strip()

                for pattern in patterns:
                    if pattern in line and len(matches[pattern]) < max_matches:
                        matches[pattern].append({
                            'line_number': line_num,
                            'content': line
                        })

                # Stop if all patterns have enough matches
                if all(len(matches[p]) >= max_matches for p in patterns):
                    break

        print(f"     🔍 Pattern search completed:")
        for pattern, match_list in matches.items():
            print(f"       '{pattern}': {len(match_list)} matches")

        return matches

    except Exception as e:
        print(f"     ❌ Error in pattern search: {e}")
        return None

# Create sample large netlist for testing
def create_sample_large_netlist(file_path, num_modules=100):
    """Create a sample large netlist file for testing."""
    with open(file_path, 'w') as f:
        f.write("// Large Netlist Example\n")
        f.write("// Generated for testing large file processing\n\n")

        for i in range(num_modules):
            f.write(f"module test_module_{i} (\n")
            f.write(f"    input wire clk,\n")
            f.write(f"    input wire reset,\n")
            f.write(f"    input wire [31:0] data_in,\n")
            f.write(f"    output wire [31:0] data_out\n")
            f.write(f");\n\n")

            # Add some internal wires and instances
            for j in range(5):
                f.write(f"    wire internal_sig_{j};\n")

            for j in range(3):
                f.write(f"    sub_module_inst inst_{j} (\n")
                f.write(f"        .clk(clk),\n")
                f.write(f"        .reset(reset),\n")
                f.write(f"        .data(internal_sig_{j})\n")
                f.write(f"    );\n\n")

            # Add assign statements
            for j in range(2):
                f.write(f"    assign internal_sig_{j} = data_in[{j*8+7}:{j*8}];\n")

            f.write(f"\nendmodule\n\n")

# Test large file processing
with tempfile.TemporaryDirectory() as temp_dir:
    temp_path = Path(temp_dir)
    large_netlist = temp_path / "large_design.v"

    print("   Creating sample large netlist:")
    create_sample_large_netlist(large_netlist, num_modules=50)

    # Test streaming processing
    print("\n   Testing streaming processing:")
    stats = process_large_netlist_streaming(large_netlist)

    # Test pattern searching
    print("\n   Testing pattern search:")
    patterns = ['module', 'wire', 'assign', 'input', 'output']
    matches = search_large_file_patterns(large_netlist, patterns, max_matches=10)

# =============================================================================
# MEMORY-MAPPED FILE PROCESSING
# =============================================================================

print(f"\n🗺️ MEMORY-MAPPED FILE PROCESSING:")

def process_with_memory_mapping(file_path):
    """Process file using memory mapping for efficiency."""
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
                # Convert to string for processing
                content = mm.read().decode('utf-8')

                # Quick statistics
                line_count = content.count('\n')
                module_count = content.count('module ')
                instance_count = len(re.findall(r'\w+\s+\w+\s*\(', content))

                print(f"     📊 Memory-mapped processing results:")
                print(f"       File size: {len(content):,} bytes")
                print(f"       Lines: {line_count:,}")
                print(f"       Modules: {module_count}")
                print(f"       Instances: {instance_count}")

                return {
                    'size': len(content),
                    'lines': line_count,
                    'modules': module_count,
                    'instances': instance_count
                }

    except Exception as e:
        print(f"     ❌ Error in memory-mapped processing: {e}")
        return None

# =============================================================================
# CHUNKED FILE PROCESSING FOR REPORTS
# =============================================================================

print(f"\n📊 CHUNKED FILE PROCESSING FOR REPORTS:")

def process_large_timing_report(file_path, chunk_lines=1000):
    """Process large timing report in chunks."""
    path_data = []
    current_path = None

    try:
        with open(file_path, 'r') as f:
            line_buffer = []

            for line_num, line in enumerate(f, 1):
                line_buffer.append(line.strip())

                # Process in chunks
                if len(line_buffer) >= chunk_lines:
                    # Process this chunk
                    chunk_paths = extract_timing_paths_from_chunk(line_buffer)
                    path_data.extend(chunk_paths)

                    # Clear buffer but keep some overlap for path boundaries
                    line_buffer = line_buffer[-50:]  # Keep last 50 lines

        # Process remaining lines
        if line_buffer:
            chunk_paths = extract_timing_paths_from_chunk(line_buffer)
            path_data.extend(chunk_paths)

        print(f"     ✅ Processed timing report with {len(path_data)} paths")
        return path_data

    except Exception as e:
        print(f"     ❌ Error processing timing report: {e}")
        return []

def extract_timing_paths_from_chunk(lines):
    """Extract timing paths from a chunk of lines."""
    paths = []
    current_path = None

    for line in lines:
        if line.startswith('Startpoint:'):
            if current_path:
                paths.append(current_path)

            current_path = {
                'startpoint': line.split(':')[1].strip().split()[0],
                'endpoint': '',
                'slack': 0.0
            }
        elif line.startswith('Endpoint:') and current_path:
            current_path['endpoint'] = line.split(':')[1].strip().split()[0]
        elif 'slack' in line and current_path:
            slack_match = re.search(r'([\d.-]+)\s+slack', line)
            if slack_match:
                current_path['slack'] = float(slack_match.group(1))

    if current_path:
        paths.append(current_path)

    return paths

# Test memory-mapped processing
with tempfile.TemporaryDirectory() as temp_dir:
    temp_path = Path(temp_dir)
    test_file = temp_path / "test_design.v"

    # Create test file
    create_sample_large_netlist(test_file, num_modules=20)

    print("   Testing memory-mapped processing:")
    mm_stats = process_with_memory_mapping(test_file)

print("\n🏆 LARGE FILE PROCESSING BENEFITS:")
print("✅ **Memory Efficiency**: Handle files larger than available RAM")
print("✅ **Performance**: Fast processing through streaming and mapping")
print("✅ **Scalability**: Process multi-gigabyte VLSI files")
print("✅ **Pattern Search**: Efficient searching in large datasets")
print("✅ **Chunk Processing**: Break large tasks into manageable pieces")

## 💪 **Practice Exercises: Advanced File Processing**

### **🎯 Exercise 1: Complete EDA Log Analysis System**
Create a comprehensive log analysis framework:
- Parse logs from multiple EDA tools (synthesis, P&R, timing, power)
- Extract and categorize errors, warnings, and performance metrics
- Generate trend analysis across multiple design iterations
- Create automated alerts for critical issues and quality gates

### **🎯 Exercise 2: Large Design File Processor**
Build a system for processing massive VLSI design files:
- Stream process multi-gigabyte netlist files efficiently
- Extract hierarchical design information and statistics
- Implement parallel processing for multiple files
- Generate comprehensive design analysis reports

### **🎯 Exercise 3: Automated Report Aggregation Pipeline**
Implement a complete report processing pipeline:
- Collect timing, power, and area reports from multiple corners
- Aggregate data across process variations and operating conditions
- Generate executive summary reports with trend analysis
- Create HTML dashboards with interactive charts

### **🎯 Exercise 4: Configuration and Environment Manager**
Create a robust configuration management system:
- Handle complex tool configuration hierarchies
- Support environment-specific overrides and templates
- Implement configuration validation and dependency checking
- Generate configuration diffs and change tracking

---

## 🏆 **Chapter Summary: Advanced File Processing Mastery**

### **✅ Directory and Batch Operations**
- **Project Structure**: Automated VLSI project organization
- **File Discovery**: Pattern-based searching and filtering
- **Batch Processing**: Efficient handling of multiple design files
- **Maintenance**: Copying, backup, and cleanup operations

### **✅ Log Analysis and Monitoring**
- **EDA Tool Logs**: Automated parsing and error extraction
- **Pattern Recognition**: Categorization of issues and metrics
- **Multi-Log Aggregation**: Cross-tool analysis and reporting
- **Quality Monitoring**: Automated error detection and alerting

### **✅ Large File Processing**
- **Streaming**: Memory-efficient processing of huge files
- **Memory Mapping**: Fast access to large datasets
- **Chunked Processing**: Breaking large tasks into manageable pieces
- **Performance**: Optimized algorithms for VLSI file sizes

### **✅ Professional Applications**
- **Scalability**: Handle enterprise-scale VLSI projects
- **Automation**: Reduce manual file processing tasks
- **Integration**: Connect multiple EDA tools seamlessly
- **Quality Assurance**: Comprehensive validation and monitoring

**🚀 Next**: Ready for advanced Python topics including testing, debugging, performance optimization, and deployment strategies for production VLSI automation systems!