In [9]:
import pandas as pd
import numpy as np
import os
from os.path import join
from concurrent.futures import ProcessPoolExecutor, as_completed
from functools import partial

def num_gaps(df, features):
    """
    Analyze gaps in time series data and categorize them by duration.
    
    Parameters:
    df: DataFrame with datetime index
    features: list of column names to analyze
    
    Returns:
    Dictionary with gap statistics per feature
    """
    # Define gap bins in hours
    gap_bins = [0, 1, 4, 8, 24, 72, 168, 336, 720, 2160, 8760, np.inf]  # bin edges
    gap_labels = ['<1hr', '1-4hr', '4-8hr', '8hr-1day', '1-3days', '3days-1week', 
                  '1-2weeks', '2weeks-1month', '1-3months', '3months-1year', '>1year']  # 11 labels for 12 edges
    
    gap_stats = {}
    
    for feature in features:
        if feature not in df.columns:
            continue
            
        # Find missing values
        is_missing = df[feature].isna()
        
        # Calculate gap lengths
        gap_lengths = []
        current_gap = 0
        
        for missing in is_missing:
            if missing:
                current_gap += 1
            else:
                if current_gap > 0:
                    gap_lengths.append(current_gap)
                    current_gap = 0
        
        # Add final gap if exists
        if current_gap > 0:
            gap_lengths.append(current_gap)
        
        # Categorize gaps into bins
        if len(gap_lengths) > 0:
            gap_counts = pd.cut(gap_lengths, bins=gap_bins, labels=gap_labels, right=False).value_counts().sort_index()
            gap_stats[feature] = {
                'total_gaps': len(gap_lengths),
                'total_missing': is_missing.sum(),
                'pct_missing': (is_missing.sum() / len(df)) * 100,
                'gap_distribution': gap_counts.to_dict()
            }
        else:
            gap_stats[feature] = {
                'total_gaps': 0,
                'total_missing': 0,
                'pct_missing': 0.0,
                'gap_distribution': {}
            }
    
    return gap_stats


def process_single_file(file, folder, features):
    """
    Process a single file - worker function for parallel processing.
    
    Parameters:
    file: filename to process
    folder: path to folder containing the file
    features: list of features to analyze
    
    Returns:
    Tuple of (filename, gap_stats_dict) or (filename, None) on error
    """
    try:
        df = pd.read_csv(join(folder, file))
        df = df.set_index('Timestamp')
        df.index = pd.to_datetime(df.index)
        
        # Reindex to full hourly range from 2023 to 2025
        full_range = pd.date_range(start='2023-01-01 00:00:00', 
                                  end='2025-12-31 23:00:00', 
                                  freq='1h')
        df = df.reindex(full_range)
        
        # Analyze gaps
        gaps = num_gaps(df, features)
        return (file, gaps)
        
    except Exception as e:
        print(f"Error processing {file}: {e}")
        return (file, None)


def analyze_all_sites(folder, max_workers=None):
    """
    Analyze gaps for all site files in the folder using concurrent.futures.
    
    Parameters:
    folder: path to folder containing site CSV files
    max_workers: maximum number of worker processes (default: CPU count - 1)
    
    Returns:
    Dictionary with gap analysis for each site
    """
    features = ["PM2.5 (µg/m³)", "PM10 (µg/m³)", "NO2 (µg/m³)", "SO2 (µg/m³)", 
                "CO (mg/m³)", "Ozone (µg/m³)", "NH3 (µg/m³)", "AT (°C)", "RH (%)"]
    
    # Get all CSV files in folder
    files = [file for file in os.listdir(folder) if os.path.isfile(join(folder, file)) and file.endswith('.csv')]
    
    print(f"Found {len(files)} files to process")
    
    # Determine number of workers
    if max_workers is None:
        max_workers = max(1, os.cpu_count() - 1)
    
    print(f"Using {max_workers} worker processes")
    
    # Create partial function with fixed folder and features
    worker = partial(process_single_file, folder=folder, features=features)
    
    # Process files in parallel
    site_dict = {}
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        future_to_file = {executor.submit(worker, file): file for file in files}
        
        # Collect results as they complete
        completed = 0
        for future in as_completed(future_to_file):
            file, gaps = future.result()
            if gaps is not None:
                site_dict[file] = gaps
            completed += 1
            if completed % 10 == 0 or completed == len(files):
                print(f"Progress: {completed}/{len(files)} files processed")
    
    print(f"\nSuccessfully processed {len(site_dict)} out of {len(files)} files")
    
    # Sort site_dict by site name
    site_dict = dict(sorted(site_dict.items()))
    
    return site_dict


# Run the analysis
folder = r"/home/rishi/ML Projects/Air Pollution/CPCB/sites_comb"
results = analyze_all_sites(folder)

Found 564 files to process
Using 23 worker processes
Progress: 10/564 files processed
Progress: 20/564 files processed
Progress: 30/564 files processed
Progress: 40/564 files processed
Progress: 50/564 files processed
Progress: 60/564 files processed
Progress: 70/564 files processed
Progress: 80/564 files processed
Progress: 90/564 files processed
Progress: 100/564 files processed
Progress: 110/564 files processed
Progress: 120/564 files processed
Progress: 130/564 files processed
Progress: 140/564 files processed
Progress: 150/564 files processed
Progress: 160/564 files processed
Progress: 170/564 files processed
Progress: 180/564 files processed
Progress: 190/564 files processed
Progress: 200/564 files processed
Progress: 210/564 files processed
Progress: 220/564 files processed
Progress: 230/564 files processed
Progress: 240/564 files processed
Progress: 250/564 files processed
Progress: 260/564 files processed
Progress: 270/564 files processed
Progress: 280/564 files processed
Prog

In [10]:
# Create summary dataframe with gap distribution bins
summary_data = []

# Define the bin labels for columns
bin_labels = ['<1hr', '1-4hr', '4-8hr', '8hr-1day', '1-3days', '3days-1week', 
              '1-2weeks', '2weeks-1month', '1-3months', '3months-1year', '>1year']

for site_name, site_data in results.items():
    for feature, stats in site_data.items():
        row = {
            'Site': site_name,
            'Feature': feature,
            'Total Gaps': stats['total_gaps'],
            'Total Missing Hours': stats['total_missing'],
            'Percent Missing': round(stats['pct_missing'], 2)
        }
        
        # Add gap distribution bins as columns
        gap_dist = stats['gap_distribution']
        for bin_label in bin_labels:
            row[f'Gaps_{bin_label}'] = gap_dist.get(bin_label, 0)
        
        summary_data.append(row)

summary_df = pd.DataFrame(summary_data)
print(f"Total sites analyzed: {len(results)}")
print(f"\nSummary of missing data:")
print(summary_df.head(20))

Total sites analyzed: 564

Summary of missing data:
                                              Site        Feature  Total Gaps  \
0   site_103_CRRI_Mathura_Road_Delhi_IMD_15Min.csv  PM2.5 (µg/m³)         128   
1   site_103_CRRI_Mathura_Road_Delhi_IMD_15Min.csv   PM10 (µg/m³)         132   
2   site_103_CRRI_Mathura_Road_Delhi_IMD_15Min.csv    NO2 (µg/m³)         119   
3   site_103_CRRI_Mathura_Road_Delhi_IMD_15Min.csv    SO2 (µg/m³)           1   
4   site_103_CRRI_Mathura_Road_Delhi_IMD_15Min.csv     CO (mg/m³)         111   
5   site_103_CRRI_Mathura_Road_Delhi_IMD_15Min.csv  Ozone (µg/m³)         126   
6   site_103_CRRI_Mathura_Road_Delhi_IMD_15Min.csv    NH3 (µg/m³)           1   
7   site_103_CRRI_Mathura_Road_Delhi_IMD_15Min.csv        AT (°C)           1   
8   site_103_CRRI_Mathura_Road_Delhi_IMD_15Min.csv         RH (%)           1   
9     site_104_Burari_Crossing_Delhi_IMD_15Min.csv  PM2.5 (µg/m³)         166   
10    site_104_Burari_Crossing_Delhi_IMD_15Min.csv   PM10

In [11]:
# Save summary to CSV
summary_df.to_csv('/home/rishi/ML Projects/Air Pollution/CPCB/gap_analysis_summary.csv', index=False)
print("Summary saved to gap_analysis_summary.csv")

# Display gap distribution for a specific site and feature
def show_gap_distribution(site_name, feature_name):
    """Display detailed gap distribution for a specific site and feature"""
    if site_name in results and feature_name in results[site_name]:
        stats = results[site_name][feature_name]
        print(f"\nGap Analysis for {site_name} - {feature_name}")
        print(f"Total Gaps: {stats['total_gaps']}")
        print(f"Total Missing Hours: {stats['total_missing']}")
        print(f"Percent Missing: {stats['pct_missing']:.2f}%")
        print("\nGap Distribution by Duration:")
        for duration, count in sorted(stats['gap_distribution'].items()):
            print(f"  {duration}: {count} gaps")
    else:
        print(f"Data not found for {site_name} - {feature_name}")

# Example usage - uncomment and modify to view specific site/feature
# show_gap_distribution('site_113_Shadipur_Delhi_CPCB_15Min.csv', 'PM2.5 (µg/m³)')

Summary saved to gap_analysis_summary.csv


In [13]:
summary_df.sum()

Site                   site_103_CRRI_Mathura_Road_Delhi_IMD_15Min.csv...
Feature                PM2.5 (µg/m³)PM10 (µg/m³)NO2 (µg/m³)SO2 (µg/m³...
Total Gaps                                                       1312718
Total Missing Hours                                             36626907
Percent Missing                                                139245.03
Gaps_<1hr                                                              0
Gaps_1-4hr                                                        834741
Gaps_4-8hr                                                        194011
Gaps_8hr-1day                                                     200041
Gaps_1-3days                                                       60566
Gaps_3days-1week                                                   13320
Gaps_1-2weeks                                                       4027
Gaps_2weeks-1month                                                  2166
Gaps_1-3months                                     