# Enhanced PKL Processing Test Notebook

This notebook tests the integrated enhanced PKL processing functionality.

In [1]:
# %%
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(os.getcwd()), 'src'))

# Import the new enhanced PKL processing module
from data.qc.enhanced_pkl_processing import process_pkl_data_enhanced, EnhancedPKLProcessor
from config.notebook_config import NotebookConfig
from notebook_utils.pkl_cleaning_integration import create_enhanced_setup

# Your existing configuration
config = NotebookConfig(
    site_code='ETAD',
    wavelength='Red',
    quality_threshold=10,
    output_format='jpl',
    min_samples_for_analysis=30,
    confidence_level=0.95,
    outlier_threshold=3.0,
    figure_size=(12, 8),
    font_size=10,
    dpi=300
)

# Set your data paths (same as before)
base_data_path = "/Users/ahzs645/Library/CloudStorage/GoogleDrive-ahzs645@gmail.com/My Drive/University/Research/Grad/UC Davis Ann/NASA MAIA/Data"

config.aethalometer_files = {
    'pkl_data': os.path.join(
        base_data_path,
        "Aethelometry Data/Kyan Data/Mergedcleaned and uncleaned MA350 data20250707030704",
        "df_uncleaned_Jacros_API_and_OG.pkl"
    ),
    'csv_data': os.path.join(
        base_data_path,
        "Aethelometry Data/Raw",
        "Jacros_MA350_1-min_2022-2024_Cleaned.csv"
    )
}

config.ftir_db_path = os.path.join(
    base_data_path,
    "EC-HIPS-Aeth Comparison/Data/Original Data/Combined Database",
    "spartan_ftir_hips.db"
)

# Create enhanced setup
setup = create_enhanced_setup(config)

✅ Advanced plotting style configured
🚀 Aethalometer-FTIR/HIPS Pipeline with Simplified Setup
📊 Configuration Summary:
   Site: ETAD
   Wavelength: Red
   Output format: jpl
   Quality threshold: 10 minutes
   Output directory: outputs

📁 File paths:
   pkl_data: ✅ df_uncleaned_Jacros_API_and_OG.pkl
   csv_data: ✅ Jacros_MA350_1-min_2022-2024_Cleaned.csv
   FTIR DB: ✅ spartan_ftir_hips.db
🧹 Enhanced setup with PKL cleaning capabilities loaded


In [2]:
# %%
print("📁 Loading datasets...")
datasets = setup.load_all_data()

# Get PKL data
pkl_data_original = setup.get_dataset('pkl_data')

# Quick fix for datetime_local issue (same as before)
if 'datetime_local' not in pkl_data_original.columns:
    if pkl_data_original.index.name == 'datetime_local':
        print("✅ Converting datetime_local from index to column...")
        pkl_data_original = pkl_data_original.reset_index()
    elif hasattr(pkl_data_original.index, 'tz'):
        print("✅ Creating datetime_local column from datetime index...")
        pkl_data_original['datetime_local'] = pkl_data_original.index
        pkl_data_original = pkl_data_original.reset_index(drop=True)

print(f"📊 PKL data ready: {pkl_data_original.shape}")
print(f"📅 Date range: {pkl_data_original['datetime_local'].min()} to {pkl_data_original['datetime_local'].max()}")

📁 Loading datasets...
📦 Setting up modular system...
✅ Aethalometer loaders imported
✅ Database loader imported
✅ Plotting utilities imported
✅ Plotting style configured
✅ Successfully imported 5 modular components

📁 LOADING DATASETS
📁 Loading all datasets...

📊 Loading pkl_data
📁 Loading pkl_data: df_uncleaned_Jacros_API_and_OG.pkl
Detected format: standard
Set 'datetime_local' as DatetimeIndex for time series operations
Converted 17 columns to JPL format
✅ Modular load: 1,665,156 rows × 238 columns
📊 Method: modular
📊 Format: jpl
📊 Memory: 7443.05 MB
🧮 BC columns: 30
📈 ATN columns: 25
📅 Time range: 2021-01-09 16:38:00 to 2025-06-26 23:18:00
✅ pkl_data loaded successfully

📊 Loading csv_data
📁 Loading csv_data: Jacros_MA350_1-min_2022-2024_Cleaned.csv
Set 'Time (Local)' as DatetimeIndex for time series operations
Converted 5 columns to JPL format
✅ Modular load: 1,095,086 rows × 77 columns
📊 Method: modular
📊 Format: jpl
📊 Memory: 884.83 MB
🧮 BC columns: 15
📈 ATN columns: 10
📅 Time r

In [9]:
# %%
# ENHANCED PKL PROCESSING with optional Ethiopia fix

# 🎛️ Configuration: Toggle Ethiopia fix here
APPLY_ETHIOPIA_FIX = True  # Set to True to enable Ethiopia pneumatic pump fix

print(f"🚀 Enhanced PKL Processing {'WITH' if APPLY_ETHIOPIA_FIX else 'WITHOUT'} Ethiopia Fix")
print("=" * 60)

pkl_data_cleaned = process_pkl_data_enhanced(
    pkl_data_original,
    wavelengths_to_filter=['IR', 'Blue'],
    export_path=f'pkl_data_cleaned_{"ethiopia" if APPLY_ETHIOPIA_FIX else "standard"}',
    apply_ethiopia_fix=APPLY_ETHIOPIA_FIX,  # 🔧 Ethiopia fix toggle
    site_code='ETAD' if APPLY_ETHIOPIA_FIX else None,
    verbose=True
)

print(f"\n✅ Processing complete: {pkl_data_cleaned.shape}")

# Show what Ethiopia corrections were added (if any)
if APPLY_ETHIOPIA_FIX:
    ethiopia_cols = [col for col in pkl_data_cleaned.columns if any(x in col for x in ['corrected', 'manual', 'optimized', 'denominator'])]
    if ethiopia_cols:
        print(f"\n🔧 Ethiopia correction columns added ({len(ethiopia_cols)}):")
        for col in sorted(ethiopia_cols)[:10]:  # Show first 10
            print(f"  • {col}")
        if len(ethiopia_cols) > 10:
            print(f"  ... and {len(ethiopia_cols)-10} more")
    else:
        print("\n⚠️ No Ethiopia correction columns found")
else:
    print("\n📊 Standard processing - no Ethiopia corrections applied")

🚀 Enhanced PKL Processing WITH Ethiopia Fix
🚀 Enhanced PKL Data Processing Pipeline
🔧 Comprehensive Preprocessing Pipeline
Step 1: Processing datetime...

Step 2: Fixing column names...
✅ Renamed 16 columns

Step 3: Converting data types...
Converted IR ATN1 to float.
Converted UV ATN1 to float.
Converted Blue ATN1 to float.
Converted Green ATN1 to float.
Converted Red ATN1 to float.
✅ Applied calibration.convert_to_float()

Step 4: Adding Session ID...

Step 5: Adding delta calculations...
✅ Applied calibration.add_deltas()

Step 6: Final adjustments...
✅ Filtered to 2022+: 1,665,156 -> 1,627,058 rows
🔄 Applying DEMA Smoothing...

Processing IR wavelength...
  Available BC columns: ['IR BC1', 'IR BC2', 'IR BCc']
  ✅ Created IR BC1 smoothed
  ✅ Created IR BC2 smoothed
  ✅ Created IR BCc smoothed

Processing Blue wavelength...
  Available BC columns: ['Blue BC1', 'Blue BC2', 'Blue BCc']
  ✅ Created Blue BC1 smoothed
  ✅ Created Blue BC2 smoothed
  ✅ Created Blue BCc smoothed

🔧 Applying

In [14]:
# %%
# VALIDATION: Ethiopia fix validation (only runs if fix was applied)

if APPLY_ETHIOPIA_FIX:
    print("📊 Ethiopia Fix Validation:")
    print("=" * 50)
    
    # Use the validation functions from the site corrections module
    from src.data.processors.site_corrections import SiteCorrections, apply_ethiopia_fix
    
    # Create a small sample for comparison (to demonstrate the fix)
    sample_size = 10000
    sample_original = pkl_data_original.head(sample_size)
    
    print(f"\n🔬 Running validation on sample data ({sample_size:,} rows)...")
    
    # Apply just the Ethiopia fix (without full processing) for comparison
    sample_with_fix = apply_ethiopia_fix(sample_original, verbose=True)
    
    # Create corrector for validation
    corrector = SiteCorrections(site_code='ETAD', verbose=False)
    
    # Validate the fix
    validation_results = corrector.validate_corrections(
        sample_original,  # Original
        sample_with_fix,  # With Ethiopia fix only
        wavelength='IR'
    )
    
    print("\n📈 Validation Results:")
    for key, value in validation_results.items():
        if isinstance(value, dict):
            print(f"  {key}:")
            for subkey, subval in value.items():
                if isinstance(subval, float):
                    print(f"    {subkey}: {subval:.6f}")
                else:
                    print(f"    {subkey}: {subval}")
        else:
            if isinstance(value, float):
                print(f"  {key}: {value:.6f}")
            else:
                print(f"  {key}: {value}")
    
    # Check correlation improvements
    if 'original_atn_correlation' in validation_results and 'corrected_atn_correlation' in validation_results:
        orig_corr = validation_results['original_atn_correlation'] 
        corr_corr = validation_results['corrected_atn_correlation']
        improvement = abs(orig_corr) - abs(corr_corr)
        
        print(f"\n🎯 Key Improvement Metric:")
        print(f"  Original BCc-ATN1 correlation: {orig_corr:.6f}")
        print(f"  Corrected BCc-ATN1 correlation: {corr_corr:.6f}")
        print(f"  Improvement: {improvement:.6f} ({'✅ Better!' if improvement > 0 else '⚠️ Check data'})")
        
        if improvement > 0:
            print(f"  🎉 Ethiopia fix successfully reduced correlation by {improvement:.6f}")
else:
    print("📊 Ethiopia Fix Validation: SKIPPED")
    print("=" * 50)
    print("Set APPLY_ETHIOPIA_FIX = True to run validation")

print(f"\n📊 Final Data Summary:")
print("=" * 50)
print(f"Shape: {pkl_data_cleaned.shape}")
print(f"Date range: {pkl_data_cleaned['datetime_local'].min()} to {pkl_data_cleaned['datetime_local'].max()}")

# Check key columns
key_cols = ['datetime_local', 'IR ATN1', 'IR BCc', 'Blue ATN1', 'Blue BCc', 'Flow total (mL/min)']
for col in key_cols:
    status = "✅" if col in pkl_data_cleaned.columns else "❌"
    print(f"  {status} {col}")

# Show Ethiopia-specific columns if present
ethiopia_specific = [col for col in pkl_data_cleaned.columns if any(x in col for x in ['corrected', 'manual', 'optimized', 'denominator'])]
if ethiopia_specific:
    print(f"  🔧 Ethiopia corrections: {len(ethiopia_specific)} columns")

memory_mb = pkl_data_cleaned.memory_usage(deep=True).sum() / 1024 / 1024
print(f"  💾 Memory usage: {memory_mb:.1f} MB")

print(f"\n✅ {'Ethiopia-enhanced' if APPLY_ETHIOPIA_FIX else 'Standard'} processing complete!")

📊 Ethiopia Fix Validation:


ModuleNotFoundError: No module named 'src'

In [14]:
# %%
# VALIDATION: Compare original uncleaned data vs Ethiopia-enhanced processed data

print("📊 Ethiopia Fix Validation:")
print("=" * 50)

# Load both original uncleaned data and Ethiopia-enhanced processed data for comparison
try:
    import pandas as pd
    import os
    
    # Use the same paths as your config
    base_data_path = "/Users/ahzs645/Library/CloudStorage/GoogleDrive-ahzs645@gmail.com/My Drive/University/Research/Grad/UC Davis Ann/NASA MAIA/Data"
    
    original_file = os.path.join(
        base_data_path,
        "Aethelometry Data/Kyan Data/Mergedcleaned and uncleaned MA350 data20250707030704",
        "df_uncleaned_Jacros_API_and_OG.pkl"
    )
    ethiopia_file = 'pkl_data_cleaned_ethiopia.pkl'
    
    print("📁 Loading datasets for comparison...")
    print(f"  🔍 Original uncleaned: {os.path.basename(original_file)}")
    print(f"  🔧 Ethiopia processed: {os.path.basename(ethiopia_file)}")
    
    # Load datasets
    datasets = {}
    
    # Load original uncleaned data
    try:
        datasets["Original"] = pd.read_pickle(original_file)
        print(f"  ✅ Loaded Original: {datasets['Original'].shape}")
    except FileNotFoundError:
        print(f"  ❌ Original file not found: {original_file}")
    except Exception as e:
        print(f"  ⚠️ Error loading original: {e}")
    
    # Load Ethiopia-enhanced processed data
    try:
        datasets["Ethiopia"] = pd.read_pickle(ethiopia_file)
        print(f"  ✅ Loaded Ethiopia: {datasets['Ethiopia'].shape}")
    except FileNotFoundError:
        print(f"  ❌ Ethiopia file not found: {ethiopia_file}")
        print("  💡 Make sure you've run processing with APPLY_ETHIOPIA_FIX = True")
    except Exception as e:
        print(f"  ⚠️ Error loading Ethiopia data: {e}")
    
    # Validation if we have both datasets
    if "Original" in datasets and "Ethiopia" in datasets:
        print(f"\n🔬 Comparing Original Uncleaned vs Ethiopia-Enhanced Processed:")
        print("=" * 70)
        
        original_data = datasets["Original"]
        ethiopia_data = datasets["Ethiopia"]
        
        # Basic comparison
        print(f"📊 Dataset transformation:")
        print(f"  Original uncleaned: {original_data.shape} ({original_data.shape[1]} columns)")
        print(f"  Ethiopia processed: {ethiopia_data.shape} ({ethiopia_data.shape[1]} columns)")
        
        # Check Ethiopia-specific columns in processed data
        ethiopia_cols = [col for col in ethiopia_data.columns if any(x in col for x in ['corrected', 'manual', 'optimized', 'denominator'])]
        if ethiopia_cols:
            print(f"  🔧 Ethiopia corrections added: {len(ethiopia_cols)} columns")
            
            # Group by wavelength
            wavelengths = ['IR', 'Blue', 'Red', 'Green', 'UV']
            correction_summary = []
            for wl in wavelengths:
                wl_cols = [col for col in ethiopia_cols if wl in col]
                if wl_cols:
                    correction_summary.append(f"{wl}({len(wl_cols)})")
            
            print(f"  📈 Corrections by wavelength: {', '.join(correction_summary)}")
        
        # Detailed validation for key wavelengths
        print(f"\n🎯 Ethiopia Fix Impact Analysis:")
        print("-" * 50)
        
        for wl in ['IR', 'Blue']:
            print(f"\n{wl} Wavelength Analysis:")
            
            # Column names
            original_bcc = f'{wl} BCc'  # or might be BC1 in original
            if original_bcc not in original_data.columns and f'{wl} BC1' in original_data.columns:
                original_bcc = f'{wl} BC1'
            
            corrected_bcc = f'{wl} BCc_corrected'
            atn_col = f'{wl} ATN1'
            
            # Check if required columns exist
            has_original_bcc = original_bcc in original_data.columns
            has_corrected_bcc = corrected_bcc in ethiopia_data.columns
            has_atn_original = atn_col in original_data.columns
            has_atn_ethiopia = atn_col in ethiopia_data.columns
            
            print(f"  📋 Column availability:")
            print(f"    Original {original_bcc}: {'✅' if has_original_bcc else '❌'}")
            print(f"    Ethiopia {corrected_bcc}: {'✅' if has_corrected_bcc else '❌'}")
            print(f"    {atn_col}: {'✅' if has_atn_original and has_atn_ethiopia else '❌'}")
            
            if has_original_bcc and has_corrected_bcc and has_atn_original and has_atn_ethiopia:
                try:
                    # Get sample of data for analysis (use smaller sample for speed)
                    sample_size = min(50000, len(original_data), len(ethiopia_data))
                    
                    # Original data analysis
                    orig_sample = original_data.head(sample_size)
                    orig_bcc_data = orig_sample[original_bcc].dropna()
                    orig_atn_data = orig_sample[atn_col].dropna()
                    common_orig = orig_bcc_data.index.intersection(orig_atn_data.index)
                    
                    # Ethiopia data analysis
                    eth_sample = ethiopia_data.head(sample_size)
                    eth_bcc_data = eth_sample[corrected_bcc].dropna()
                    eth_atn_data = eth_sample[atn_col].dropna()
                    common_eth = eth_bcc_data.index.intersection(eth_atn_data.index)
                    
                    if len(common_orig) > 100 and len(common_eth) > 100:
                        # Calculate correlations
                        orig_corr = orig_sample.loc[common_orig, original_bcc].corr(orig_sample.loc[common_orig, atn_col])
                        corr_corr = eth_sample.loc[common_eth, corrected_bcc].corr(eth_sample.loc[common_eth, atn_col])
                        
                        # Calculate basic statistics
                        orig_mean = orig_sample[original_bcc].mean()
                        corr_mean = eth_sample[corrected_bcc].mean()
                        
                        print(f"  📊 Statistical comparison:")
                        print(f"    Original {original_bcc} mean: {orig_mean:.3f}")
                        print(f"    Corrected BCc mean: {corr_mean:.3f}")
                        print(f"    Mean difference: {abs(corr_mean - orig_mean):.3f}")
                        
                        print(f"  🎯 Correlation with {atn_col}:")
                        print(f"    Original correlation: {orig_corr:.6f}")
                        print(f"    Ethiopia corrected: {corr_corr:.6f}")
                        
                        if not (pd.isna(orig_corr) or pd.isna(corr_corr)):
                            improvement = abs(orig_corr) - abs(corr_corr)
                            print(f"    Improvement: {improvement:.6f} ({'✅ Better!' if improvement > 0 else '⚠️ Check data'})")
                            
                            if improvement > 0:
                                improvement_pct = (improvement / abs(orig_corr)) * 100
                                print(f"    🎉 {improvement_pct:.1f}% correlation reduction!")
                                
                                # Ethiopia fix effectiveness
                                if abs(corr_corr) < 0.1:  # Near zero correlation
                                    print(f"    ✨ Excellent: Near-zero correlation achieved!")
                                elif improvement > 0.1:  # Significant improvement
                                    print(f"    👍 Good: Significant correlation reduction")
                                else:
                                    print(f"    📈 Moderate improvement")
                        else:
                            print(f"    ⚠️ Could not calculate correlation improvement")
                    else:
                        print(f"    ⚠️ Insufficient data for analysis (orig: {len(common_orig)}, eth: {len(common_eth)})")
                        
                except Exception as e:
                    print(f"    ⚠️ Error in analysis: {e}")
            else:
                missing_cols = []
                if not has_original_bcc:
                    missing_cols.append(f"Original: {original_bcc}")
                if not has_corrected_bcc:
                    missing_cols.append(f"Ethiopia: {corrected_bcc}")
                if not (has_atn_original and has_atn_ethiopia):
                    missing_cols.append(f"ATN: {atn_col}")
                print(f"    ❌ Missing columns: {', '.join(missing_cols)}")
        
        # Summary of Ethiopia fix benefits
        print(f"\n✅ Ethiopia Fix Summary:")
        print("=" * 30)
        print(f"📈 Data processing: {original_data.shape[0]:,} → {ethiopia_data.shape[0]:,} rows")
        print(f"🔧 Columns added: {ethiopia_data.shape[1] - original_data.shape[1]} correction columns")
        print(f"🎯 Primary benefit: Reduced BCc-ATN1 correlation (pneumatic pump fix)")
        print(f"📊 Quality control: Applied comprehensive cleaning pipeline")
        print(f"🧹 DEMA smoothing: Added for noise reduction")
        
    elif "Ethiopia" in datasets:
        # Only Ethiopia data available - validate it has corrections
        ethiopia_data = datasets["Ethiopia"]
        ethiopia_cols = [col for col in ethiopia_data.columns if any(x in col for x in ['corrected', 'manual', 'optimized', 'denominator'])]
        
        print(f"\n✅ Ethiopia-enhanced data loaded successfully!")
        print(f"🔧 Found {len(ethiopia_cols)} correction columns")
        print("⚠️ Original uncleaned data not available for comparison")
        
        if ethiopia_cols:
            wavelengths = ['IR', 'Blue', 'Red', 'Green', 'UV']
            for wl in wavelengths:
                wl_cols = [col for col in ethiopia_cols if wl in col]
                if wl_cols:
                    print(f"  {wl}: {len(wl_cols)} corrections")
    
    else:
        print(f"\n⚠️ Could not load required files for comparison")
        if "Original" not in datasets:
            print("❌ Original uncleaned data not found")
        if "Ethiopia" not in datasets:
            print("❌ Ethiopia processed data not found - run processing with APPLY_ETHIOPIA_FIX = True")

except Exception as e:
    print(f"❌ Error during validation: {e}")

print(f"\n📊 Current Session Data:")
print("=" * 50)
if 'pkl_data_cleaned' in locals():
    print(f"Shape: {pkl_data_cleaned.shape}")
    print(f"Date range: {pkl_data_cleaned['datetime_local'].min()} to {pkl_data_cleaned['datetime_local'].max()}")
    
    # Show Ethiopia-specific columns if present
    ethiopia_specific = [col for col in pkl_data_cleaned.columns if any(x in col for x in ['corrected', 'manual', 'optimized', 'denominator'])]
    if ethiopia_specific:
        print(f"🔧 Ethiopia corrections in current data: {len(ethiopia_specific)} columns")
        has_ethiopia_fix = True
    else:
        has_ethiopia_fix = False
    
    print(f"✅ Current data: {'Ethiopia-enhanced' if has_ethiopia_fix else 'Standard'} processing")
else:
    print("No current data available - run the processing cell first")

📊 Ethiopia Fix Validation:
📁 Loading datasets for comparison...
  🔍 Original uncleaned: df_uncleaned_Jacros_API_and_OG.pkl
  🔧 Ethiopia processed: pkl_data_cleaned_ethiopia.pkl
  ✅ Loaded Original: (1665156, 239)
  ✅ Loaded Ethiopia: (1477783, 318)

🔬 Comparing Original Uncleaned vs Ethiopia-Enhanced Processed:
📊 Dataset transformation:
  Original uncleaned: (1665156, 239) (239 columns)
  Ethiopia processed: (1477783, 318) (318 columns)
  🔧 Ethiopia corrections added: 27 columns
  📈 Corrections by wavelength: IR(6), Blue(6), Red(5), Green(5), UV(5)

🎯 Ethiopia Fix Impact Analysis:
--------------------------------------------------

IR Wavelength Analysis:
  📋 Column availability:
    Original IR BCc: ✅
    Ethiopia IR BCc_corrected: ✅
    IR ATN1: ✅
  📊 Statistical comparison:
    Original IR BCc mean: 2790.828
    Corrected BCc mean: 6093.255
    Mean difference: 3302.426
  🎯 Correlation with IR ATN1:
    Original correlation: 0.417843
    Ethiopia corrected: -0.005505
    Improvement