In [None]:
# Cell 1: Full Processing Pipeline Setup
import os
import subprocess
import shutil
from concurrent.futures import ThreadPoolExecutor
from tqdm.notebook import tqdm
from Bio import SeqIO
import warnings
warnings.filterwarnings('ignore')

# Configuration
MAX_WORKERS = 4  
BATCH_SIZE = 10  

In [5]:
# Cell 2: Enhanced Processing Functions
def process_single_file(mgb_file, source_dir, dest_dir):
    """Process one MGB file with full error handling"""
    try:
        base_name = os.path.splitext(mgb_file)[0]
        input_path = os.path.join(source_dir, mgb_file)
        temp_fastq = os.path.join(source_dir, f"{base_name}.fastq")
        
        # Docker command - using Windows path conversion
        host_dir = source_dir.replace('\\', '/')
        command = [
            "docker", "run", "--rm",
            "--memory=8g", "--memory-swap=12g",
            "-v", f"{source_dir}:/data",
            "muefab/genie:latest", "run",
            "-f",
            "-i", f"/data/{mgb_file}",
            "-o", f"/data/{base_name}.fastq"
        ]
        
        # Run processing
        result = subprocess.run(command, capture_output=True, text=True)
        if result.returncode != 0:
            return (False, mgb_file, result.stderr)
        
        # Verify and move output
        if os.path.exists(temp_fastq):
            final_path = os.path.join(dest_dir, f"{base_name}.fastq")
            shutil.move(temp_fastq, final_path)
            return (True, mgb_file, None)
        return (False, mgb_file, "Output file not created")
    
    except Exception as e:
        return (False, mgb_file, str(e))

def process_all_files(source_dir, dest_dir, max_workers=MAX_WORKERS):
    """Process all files in directory with parallel execution"""
    os.makedirs(dest_dir, exist_ok=True)
    mgb_files = [f for f in os.listdir(source_dir) if f.endswith(".mgb")]
    
    # Process files in parallel with progress bar
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        results = {'success': 0, 'failed': 0, 'errors': []}
        
        # Submit all tasks
        for mgb_file in mgb_files:
            futures.append(executor.submit(
                process_single_file, 
                mgb_file, 
                source_dir, 
                dest_dir
            ))
        
        # Monitor progress
        with tqdm(total=len(mgb_files), desc="Processing Files") as pbar:
            for future in futures:
                success, filename, error = future.result()
                if success:
                    results['success'] += 1
                else:
                    results['failed'] += 1
                    results['errors'].append((filename, error))
                pbar.update(1)
                
                # Periodic status update
                if results['success'] % BATCH_SIZE == 0:
                    pbar.set_postfix({
                        'Success': results['success'],
                        'Failed': results['failed']
                    })
    
    return results

In [None]:
# Cell 3: Main Execution
train_dir = r"C:\Users\divin\.cache\kagglehub\datasets\maestroalert\trainfiles\versions\1\TrainFiles"
test_dir = r"C:\Users\divin\.cache\kagglehub\datasets\maestroalert\testfiles\versions\1\TestFiles"
output_train = "processed_data/train"
output_test = "processed_data/test"

# Process training files
print("Starting training files processing...")
train_results = process_all_files(train_dir, output_train)

# Process test files
print("\nStarting test files processing...")
test_results = process_all_files(test_dir, output_test)

# Print summary
print("\nProcessing Complete!")
print(f"Training Files - Success: {train_results['success']}, Failed: {train_results['failed']}")
print(f"Test Files - Success: {test_results['success']}, Failed: {test_results['failed']}")

# Save error log if any failures
if train_results['failed'] > 0 or test_results['failed'] > 0:
    with open("processing_errors.log", "w") as f:
        f.write("Training File Errors:\n")
        for error in train_results['errors']:
            f.write(f"{error[0]}: {error[1]}\n")
        f.write("\nTest File Errors:\n")
        for error in test_results['errors']:
            f.write(f"{error[0]}: {error[1]}\n")
    print("Error log saved to processing_errors.log")

Starting training files processing...


Processing Files:   0%|          | 0/2901 [00:00<?, ?it/s]


Starting test files processing...


Processing Files:   0%|          | 0/1068 [00:00<?, ?it/s]


Processing Complete!
Training Files - Success: 2901, Failed: 0
Test Files - Success: 1068, Failed: 0
