In [1]:
# Cell 2: Imports and Google Drive Mount
# This cell imports all the libraries we've installed and connects to your Google Drive for saving the final output.
print("--- Importing libraries and mounting Google Drive... ---")

# Standard libraries
import os
import re
import shutil
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm
import soundfile as sf
import warnings
import subprocess # For running external scripts

# For file management and Google Drive
# from google.colab import drive

# For audio and signal processing
import librosa
import mne
from xml.etree import ElementTree as ET

# --- Import your custom XML parser ---
import sys
sys.path.append('helper_scripts') # Add the directory to Python's path
from working_with_xml import extract_apnea_events # Your specific function

# Suppress minor warnings from libraries to keep output clean
warnings.filterwarnings("ignore", category=RuntimeWarning)

# Mount Google Drive
# try:
#     drive.mount('/content/drive')
#     print("Google Drive mounted successfully.")
# except Exception as e:
#     print(f"Error mounting Google Drive: {e}")

--- Importing libraries and mounting Google Drive... ---


In [2]:
# Cell 3: Configuration - The User Control Panel
# This is the main control panel for the entire script.
print("--- Configuring the data processing pipeline... ---")

# --- FILE & PATH SETTINGS ---
# Path to your pre-downloaded patient data
RAW_PATIENT_DATA_BASE_DIR = '' 
NOISE_AUDIO_DIR = "noise_audio" # Folder where you uploaded your noise files (for combining_audio.py to find)
DENOISER_SCRIPTS_DIR = os.path.abspath("../src") # Folder where you uploaded your denoiser scripts
DRIVE_SAVE_PATH = "/content/drive/MyDrive/ApneaResearch" # The folder in your Google Drive to save the final CSV

# --- VIRTUAL ENVIRONMENT PYTHON PATH ---
# Auto-detect the virtual environment Python executable
def find_venv_python():
    parent_dir = os.path.abspath('..')
    possible_venv_names = ['venv', '.venv', 'env', 'virtualenv']
    
    for venv_name in possible_venv_names:
        if os.name == 'nt':  # Windows
            python_path = os.path.join(parent_dir, venv_name, 'Scripts', 'python.exe')
        else:  # Linux/Mac
            python_path = os.path.join(parent_dir, venv_name, 'bin', 'python')
        
        if os.path.exists(python_path):
            return python_path
    
    return None

VENV_PYTHON = find_venv_python()
if VENV_PYTHON:
    print(f"Found venv Python: {VENV_PYTHON}")
else:
    print("No venv found, using system Python")
    VENV_PYTHON = "python"

# List the specific patient folders you want to process.
# These names must match the folder names inside RAW_PATIENT_DATA_BASE_DIR (e.g., 'patient-01', 'patient-02', etc.)
# If you leave this empty, the script will try to find all 'patient-XX' folders.
PATIENT_FOLDERS_TO_PROCESS = ['patient_01']
                            #   , 'patient-02', 'patient-03', 'patient-04', 'patient-05',
                            #    'patient-06', 'patient-07', 'patient-08'] # For a full run, list all 20+ patients here


# --- DEBUG MODE SETTINGS ---
# Set this to True to run for only a small subset of patients for testing and get detailed diagnostic outputs.
# Set to False for the full overnight run on all patients in PATIENT_FOLDERS_TO_PROCESS.
DEBUG_MODE = True
DEBUG_PATIENT_COUNT = 1 # In debug mode, we'll only process this many patients from the list


# --- DATA PROCESSING SETTINGS ---
AUDIO_CHANNEL_NAME = 'Mic' # The confirmed microphone channel name from your EDF files
FRAME_DURATION_SEC = 1.0 # Duration of each frame for feature extraction
CHUNK_DURATION_MIN = 10# Process audio in 2-minute chunks to manage RAM
SUB_CHUNK_DURATION_SEC = 15 # For DL denoisers, process the chunk in 15-second sub-chunks

# --- NOISE INJECTION SETTINGS ---
# These are the categories that your combining_audio.py script expects for -cat flag.
# It will internally find the WAV files from the ESC-50 database.
NOISE_CATEGORIES = ['vacuum_cleaner', 'cat', 'door_wood_creaks'] # Must match categories in ESC-50 that combining_audio knows
NOISE_LEVEL_RMS_RATIO = 0.3333 # Corresponds to ~9.54 dB SNR (Signal Power to Noise Power ratio of ~9:1)

# --- SANITY CHECK SETTINGS (only active in DEBUG_MODE) ---
SAVE_SNIPPETS = True # Saves temporary audio snippets for manual verification
SNIPPET_DURATION_SEC = 5 # Duration of saved audio snippets

# --- DENOISER SCRIPT MAPPING ---
# Maps a simple name to the actual filename of your denoiser script.
# All scripts must be in the 'denoiser_scripts' folder.
# IMPORTANT: Adjust these filenames to match what you uploaded!
DENOISER_SCRIPT_MAP = {
    "spectral": "spec_subtraction_same_file.py",
    "wiener": "wiener_filtering.py",
    "logmmse": "log_mmse.py",
    # "speechbrain": "neural_1_speechbrain.py",  # Temporarily commented out due to installation issues
    # "deepfilternet": "denoise_with_deepfilternet.py"  # Temporarily commented out until Rust/deepfilternet is installed
}

# --- Create necessary directories ---
# os.makedirs(os.path.join('/content', NOISE_AUDIO_DIR), exist_ok=True) # Ensure noise audio dir exists
# os.makedirs(DENOISER_SCRIPTS_DIR, exist_ok=True) # Ensure denoiser scripts dir exists
# os.makedirs('helper_scripts', exist_ok=True) # Ensure helper scripts dir exists
# os.makedirs(DRIVE_SAVE_PATH, exist_ok=True) # Ensure Google Drive save path exists

print("Configuration set.")

--- Configuring the data processing pipeline... ---
Found venv Python: c:\Users\solom\Documents\Evaluating-Noise-Reduction-Techniques\venv\Scripts\python.exe
Configuration set.


In [3]:
# Cell 4: Helper Functions - Audio Processing, Feature Extraction, and External Script Wrappers
# This cell contains all reusable helper functions for the processing pipeline.
print("--- Defining audio processing and external script wrapper functions... ---")

# --- Wrapper for your external RML parser ---
def parse_respironics_rml(rml_path):
    """
    Wraps your extract_apnea_events function to return (start_time, end_time) tuples.
    """
    # Your script returns (event_type, start_time, end_time)
    apnea_event_data = extract_apnea_events(rml_path, output_csv=None)
    
    # Filter to only return (start_time, end_time) as floats
    events_only_times = [(float(start), float(end)) for event_type, start, end in apnea_event_data]
    
    return events_only_times

# (extract_features and add_noise functions remain exactly the same as our final refined versions)
def extract_features(frame, sr):
    """Extracts features from a single frame using librosa."""
    rms = librosa.feature.rms(y=frame).mean()
    zcr = librosa.feature.zero_crossing_rate(y=frame).mean()
    try:
        centroid = librosa.feature.spectral_centroid(y=frame, sr=sr).mean()
    except ZeroDivisionError:
        centroid = 0
    mfccs = librosa.feature.mfcc(y=frame, sr=sr, n_mfcc=13)
    mfccs_mean = mfccs.mean(axis=1)
    features = {'rms': rms, 'zcr': zcr, 'centroid': centroid}
    for i, val in enumerate(mfccs_mean, 1):
        features[f'mfcc_{i}'] = val
    # Re-adding bandwidth and rolloff features based on initial data structure
    features['bandwidth'] = librosa.feature.spectral_bandwidth(y=frame, sr=sr).mean()
    features['rolloff'] = librosa.feature.spectral_rolloff(y=frame, sr=sr).mean()
    return features

def add_noise(clean_signal, noise_signal, sr, noise_level_rms_ratio):
    """Mixes a clean signal with a noise signal using the provided RMS ratio logic."""
    signal_power = np.sum(clean_signal ** 2) / len(clean_signal)
    
    if len(noise_signal) < len(clean_signal):
        repeats = int(np.ceil(len(clean_signal) / len(noise_signal)))
        noise_signal = np.tile(noise_signal, repeats)
    noise_signal = noise_signal[:len(clean_signal)]
    
    noise_power = np.sum(noise_signal ** 2) / len(noise_signal)
    
    if noise_power > 0: # Avoid division by zero for silent noise
        # This scales noise_signal such that its RMS matches clean_signal's RMS, then applies the ratio
        scaling_factor = np.sqrt(signal_power / noise_power) * noise_level_rms_ratio
        noisy_signal = clean_signal + (noise_signal * scaling_factor)
    else: # If noise is silent, just return clean signal
        noisy_signal = clean_signal
    
    return noisy_signal

# --- Wrapper for calling external denoiser scripts ---
def run_denoiser_script(script_name, input_wav_path, output_wav_path, denoiser_script_map, sr, current_temp_dir):
    script_path = os.path.join(DENOISER_SCRIPTS_DIR, denoiser_script_map[script_name])
    
    if not os.path.exists(script_path):
        print(f"  > ERROR: Denoiser script '{script_name}' not found at '{script_path}'. Skipping.")
        return None

    # Use venv Python instead of system python
    command = [VENV_PYTHON, script_path, "--input", input_wav_path, "--output", output_wav_path]

    # Specific arguments for combining_audio.py (if used as a denoiser too, though not intended)
    # Or for other denoisers that need specific flags.
    # For now, we assume standard --input --output.

    try:
        # Timeout for denoiser scripts to prevent endless hanging (e.g., 5 minutes)
        # See [3] and [10] for subprocess.run timeout behavior.
        result = subprocess.run(command, capture_output=True, text=True, check=True, timeout=300)
        
        if result.returncode != 0:
            print(f"  > ERROR: {script_name} failed with exit code {result.returncode}.")
            print(f"    STDOUT: {result.stdout}")
            print(f"    STDERR: {result.stderr}")
            return None

        if not os.path.exists(output_wav_path) or os.path.getsize(output_wav_path) == 0:
            print(f"  > ERROR: {script_name} did not produce a valid output file at {output_wav_path}.")
            return None

        # Load the denoised audio signal (ensuring it's at the correct sample rate)
        denoised_audio, loaded_sr = librosa.load(output_wav_path, sr=None)
        if loaded_sr != sr:
            # Resample if the denoiser changed the sample rate (unlikely but safe check)
            denoised_audio = librosa.resample(denoised_audio, orig_sr=loaded_sr, target_sr=sr)
        
        return denoised_audio

    except subprocess.TimeoutExpired:
        print(f"  > ERROR: {script_name} timed out after 300 seconds.")
        return None
    except subprocess.CalledProcessError as e:
        print(f"  > ERROR: {script_name} failed with CalledProcessError. STDOUT: {e.stdout} STDERR: {e.stderr}")
        return None
    except Exception as e:
        print(f"  > ERROR: Unexpected error running {script_name}: {e}")
        return None
    finally:
        # Clean up temporary files used by the subprocess call
        if os.path.exists(input_wav_path): os.remove(input_wav_path)
        # We don't remove output_wav_path here, as we load it and it's temporary for the chunk.
        # It will be cleaned up by the rmtree at the end of the patient's processing.

print("Helper functions defined.")

--- Defining audio processing and external script wrapper functions... ---
Helper functions defined.


In [4]:
# Cell 5: Test Module - Single Chunk Processing (for testing individual modules)
print("--- Testing Module: Individual Processing Functions ---")

# Function to process a single test chunk (for debugging/testing specific modules)
def test_single_chunk_processing(patient_folder_name, chunk_duration_min=2):
    """
    Process just one chunk for testing purposes
    """
    patient_id_formatted = "test_patient_01"
    print(f"Testing with {patient_folder_name}")
    
    # Create temp directory for testing
    current_temp_process_dir = os.path.join(RAW_PATIENT_DATA_BASE_DIR, f"temp_test_{patient_id_formatted}")
    os.makedirs(current_temp_process_dir, exist_ok=True)
    
    try:
        # Load patient data
        patient_local_dir = os.path.join(RAW_PATIENT_DATA_BASE_DIR, patient_folder_name)
        edf_files = sorted([f for f in os.listdir(patient_local_dir) if f.endswith('.edf')])
        
        if not edf_files:
            print(f"No EDF files found in {patient_local_dir}")
            return None
            
        # Process first EDF file, first chunk only
        edf_path = os.path.join(patient_local_dir, edf_files[0])
        raw = mne.io.read_raw_edf(edf_path, preload=False, verbose=False)
        fs = int(raw.info['sfreq'])
        raw.pick_channels([AUDIO_CHANNEL_NAME])
        
        # Extract just first chunk (for testing)
        chunk_size_samples = chunk_duration_min * 60 * fs
        audio_chunk_raw, _ = raw[:, 0:min(chunk_size_samples, raw.n_times)]
        audio_chunk_raw = audio_chunk_raw.flatten()
        
        # Save test chunk
        test_chunk_path = os.path.join(current_temp_process_dir, "test_chunk_raw.wav")
        sf.write(test_chunk_path, audio_chunk_raw, fs)
        
        print(f"Test chunk saved: {test_chunk_path}")
        print(f"Chunk duration: {len(audio_chunk_raw)/fs:.1f} seconds")
        print(f"Sample rate: {fs} Hz")
        
        return {
            'chunk_path': test_chunk_path,
            'temp_dir': current_temp_process_dir,
            'sample_rate': fs,
            'audio_data': audio_chunk_raw
        }
        
    except Exception as e:
        print(f"Error creating test chunk: {e}")
        return None

# Create test data
test_data = test_single_chunk_processing(PATIENT_FOLDERS_TO_PROCESS[0] if PATIENT_FOLDERS_TO_PROCESS else None)
if test_data:
    print("✅ Test chunk created successfully!")
else:
    print("❌ Failed to create test chunk")

--- Testing Module: Individual Processing Functions ---
Testing with patient_01
NOTE: pick_channels() is a legacy function. New code should use inst.pick(...).
Test chunk saved: temp_test_test_patient_01\test_chunk_raw.wav
Chunk duration: 120.0 seconds
Sample rate: 48000 Hz
✅ Test chunk created successfully!


In [5]:
# Cell 6: Test Module - Noise Injection (combining_audio.py)
print("--- Testing Module: Noise Injection ---")

def test_noise_injection(test_data, noise_category='vacuum_cleaner'):
    """
    Test the combining_audio.py script with a single noise category
    """
    if not test_data:
        print("❌ No test data available. Run the previous cell first.")
        return None
        
    print(f"Testing noise injection with category: {noise_category}")
    
    # Prepare paths
    input_path = test_data['chunk_path']
    output_path = os.path.join(test_data['temp_dir'], f"test_noisy_{noise_category}.wav")
    
    # Build command
    combine_command = [
        VENV_PYTHON, os.path.join(DENOISER_SCRIPTS_DIR, "combining_audio.py"),
        "-cl", input_path,
        "-out", output_path,
        "-cat", noise_category,
        "-nl", str(NOISE_LEVEL_RMS_RATIO)
    ]
    
    print(f"Command: {' '.join(combine_command)}")
    
    try:
        # Run the command
        result = subprocess.run(combine_command, capture_output=True, text=True, check=True, timeout=60)
        
        # Check if output file was created
        if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
            # Load and analyze the result
            noisy_audio, sr = librosa.load(output_path, sr=None)
            print(f"✅ Noise injection successful!")
            print(f"   Output file: {output_path}")
            print(f"   Output duration: {len(noisy_audio)/sr:.1f} seconds")
            print(f"   Output sample rate: {sr} Hz")
            
            # Calculate RMS to verify noise was added
            original_rms = np.sqrt(np.mean(test_data['audio_data']**2))
            noisy_rms = np.sqrt(np.mean(noisy_audio**2))
            print(f"   Original RMS: {original_rms:.6f}")
            print(f"   Noisy RMS: {noisy_rms:.6f}")
            print(f"   RMS ratio: {noisy_rms/original_rms:.3f}")
            
            return {'output_path': output_path, 'noisy_audio': noisy_audio, 'sample_rate': sr}
        else:
            print(f"❌ Output file not created or is empty: {output_path}")
            print(f"   STDOUT: {result.stdout}")
            print(f"   STDERR: {result.stderr}")
            return None
            
    except subprocess.CalledProcessError as e:
        print(f"❌ combining_audio failed with exit code {e.returncode}")
        print(f"   STDOUT: {e.stdout}")
        print(f"   STDERR: {e.stderr}")
        return None
    except subprocess.TimeoutExpired:
        print("❌ combining_audio timed out")
        return None
    except Exception as e:
        print(f"❌ Unexpected error: {e}")
        return None

# Test noise injection for each category
noise_test_results = {}
for noise_category in NOISE_CATEGORIES:
    print(f"\n--- Testing {noise_category} ---")
    result = test_noise_injection(test_data, noise_category)
    noise_test_results[noise_category] = result
    
print(f"\n--- Noise Injection Test Summary ---")
successful_categories = [cat for cat, result in noise_test_results.items() if result is not None]
print(f"✅ Successful: {successful_categories}")
print(f"❌ Failed: {[cat for cat in NOISE_CATEGORIES if cat not in successful_categories]}")

--- Testing Module: Noise Injection ---

--- Testing vacuum_cleaner ---
Testing noise injection with category: vacuum_cleaner
Command: c:\Users\solom\Documents\Evaluating-Noise-Reduction-Techniques\venv\Scripts\python.exe c:\Users\solom\Documents\Evaluating-Noise-Reduction-Techniques\src\combining_audio.py -cl temp_test_test_patient_01\test_chunk_raw.wav -out temp_test_test_patient_01\test_noisy_vacuum_cleaner.wav -cat vacuum_cleaner -nl 0.3333
✅ Noise injection successful!
   Output file: temp_test_test_patient_01\test_noisy_vacuum_cleaner.wav
   Output duration: 120.0 seconds
   Output sample rate: 16000 Hz
   Original RMS: 0.005164
   Noisy RMS: 0.030688
   RMS ratio: 5.943

--- Testing cat ---
Testing noise injection with category: cat
Command: c:\Users\solom\Documents\Evaluating-Noise-Reduction-Techniques\venv\Scripts\python.exe c:\Users\solom\Documents\Evaluating-Noise-Reduction-Techniques\src\combining_audio.py -cl temp_test_test_patient_01\test_chunk_raw.wav -out temp_test_test