In [None]:
import os
import sqlite3
import pandas as pd
from datetime import datetime
import json

In [None]:
# pipeline configuration
CONFIG = {
    'data_dir': 'data',
    'database': 'data/analysis.db',
    'expected_files': [
        'data/air.csv',
        'data/hosp_data.csv',
        'data/air_data.csv'
    ]
}

print("Pipeline Configuration:")
print(json.dumps(CONFIG, indent=2))

In [None]:
# validation functions
def validate_csv_data(filepath, expected_min_rows=100):
    if not os.path.exists(filepath):
        print(f"No - {filepath}: File not found")
        return False
    
    try:
        df = pd.read_csv(filepath, nrows=10)
        total_rows = sum(1 for _ in open(filepath)) - 1
        
        if total_rows < expected_min_rows:
            print(f"Warning - {filepath}: Only {total_rows:,} rows")
            return False
        
        print(f"Yes - {filepath}: {total_rows:,} rows, {len(df.columns)} columns")
        return True
    except Exception as e:
        print(f"Error - {filepath}: {e}")
        return False

def validate_database(db_path):
    if not os.path.exists(db_path):
        print(f"No - Database not found: {db_path}")
        return False
    
    try:
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()
        cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
        tables = cursor.fetchall()
        
        if len(tables) == 0:
            print(f"No - Database has no tables")
            conn.close()
            return False
        
        print(f"Yes - Database: {len(tables)} tables found")
        for (table_name,) in tables:
            cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
            count = cursor.fetchone()[0]
            print(f"  - {table_name}: {count:,} records")
        
        conn.close()
        return True
    except Exception as e:
        print(f"Error - Database validation: {e}")
        return False

In [None]:
# validate data files
print("="*60)
print("STEP 1: Validating Data Files")
print("="*60)

all_valid = True
for filepath in CONFIG['expected_files']:
    if not validate_csv_data(filepath):
        all_valid = False

if all_valid:
    print("\nAll data files validated")
else:
    print("\nWarning: Some data files are missing or invalid")

In [None]:
# validate database
print("\n" + "="*60)
print("STEP 2: Validating Database")
print("="*60)

db_valid = validate_database(CONFIG['database'])

if not db_valid:
    print("\nWarning: Database not found or invalid")

In [None]:
# data quality report
print("\n" + "="*60)
print("DATA QUALITY REPORT")
print("="*60)
print(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*60)

# air quality data
print("\n1. AIR QUALITY DATA (air.csv)")
print("-"*60)
if os.path.exists('data/air.csv'):
    air_df = pd.read_csv('data/air.csv', low_memory=False)
    print(f"Total records: {len(air_df):,}")
    print(f"Date range: {air_df['date_local'].min()} to {air_df['date_local'].max()}")
    print(f"Unique cities: {air_df['city'].nunique() if 'city' in air_df.columns else 'N/A'}")
    print(f"Unique parameters: {air_df['parameter'].nunique() if 'parameter' in air_df.columns else 'N/A'}")
    print(f"Missing values: {air_df.isnull().sum().sum():,}")
    
    if 'arithmetic_mean' in air_df.columns:
        print(f"\nArithmetic Mean Statistics:")
        print(f"  Mean: {air_df['arithmetic_mean'].mean():.2f}")
        print(f"  Median: {air_df['arithmetic_mean'].median():.2f}")
        print(f"  Min: {air_df['arithmetic_mean'].min():.2f}")
        print(f"  Max: {air_df['arithmetic_mean'].max():.2f}")
else:
    print("File not found")

# hospital data
print("\n2. HOSPITAL VISIT DATA (hosp_data.csv)")
print("-"*60)
if os.path.exists('data/hosp_data.csv'):
    hosp_df = pd.read_csv('data/hosp_data.csv')
    print(f"Total records: {len(hosp_df):,}")
    if 'week_start' in hosp_df.columns:
        print(f"Date range: {hosp_df['week_start'].min()} to {hosp_df['week_start'].max()}")
    print(f"Respiratory categories: {hosp_df['respiratory_category'].nunique() if 'respiratory_category' in hosp_df.columns else 'N/A'}")
    print(f"Demographic groups: {hosp_df['demographic_group'].nunique() if 'demographic_group' in hosp_df.columns else 'N/A'}")
    print(f"Missing values: {hosp_df.isnull().sum().sum():,}")
    
    if 'percent' in hosp_df.columns:
        print(f"\nPercent Visit Statistics:")
        print(f"  Mean: {hosp_df['percent'].mean():.4f}")
        print(f"  Median: {hosp_df['percent'].median():.4f}")
        print(f"  Min: {hosp_df['percent'].min():.4f}")
        print(f"  Max: {hosp_df['percent'].max():.4f}")
else:
    print("File not found")

# database analysis
print("\n3. DATABASE ANALYSIS (analysis.db)")
print("-"*60)
if os.path.exists(CONFIG['database']):
    conn = sqlite3.connect(CONFIG['database'])
    
    try:
        monthly_df = pd.read_sql('SELECT * FROM monthly_analysis', conn)
        print(f"Monthly analysis records: {len(monthly_df):,}")
        
        if len(monthly_df) > 0:
            print(f"\nMonthly Analysis Summary:")
            print(f"  Unique months: {monthly_df['month'].nunique()}")
            print(f"  Respiratory categories: {monthly_df['respiratory_category'].nunique()}")
            print(f"  Pollutant parameters: {monthly_df['parameter'].nunique()}")
            
            print(f"\nPollution Statistics:")
            print(f"  Mean: {monthly_df['avg_pollution'].mean():.2f}")
            print(f"  Median: {monthly_df['avg_pollution'].median():.2f}")
            print(f"  Min: {monthly_df['avg_pollution'].min():.2f}")
            print(f"  Max: {monthly_df['avg_pollution'].max():.2f}")
    except Exception as e:
        print(f"Warning: Could not read monthly_analysis table: {e}")
    
    conn.close()
else:
    print("Database not found")

print("\n" + "="*60)

In [None]:
# reproducibility checklist
print("\n" + "="*60)
print("REPRODUCIBILITY CHECKLIST")
print("="*60)

checklist = [
    ("getData.ipynb exists", os.path.exists('getData.ipynb')),
    ("dataIntegration.ipynb exists", os.path.exists('dataIntegration.ipynb')),
    ("databaseSetup.ipynb exists", os.path.exists('databaseSetup.ipynb')),
    ("workflowAutomation.ipynb exists", os.path.exists('workflowAutomation.ipynb')),
    ("data/air.csv exists", os.path.exists('data/air.csv')),
    ("data/hosp_data.csv exists", os.path.exists('data/hosp_data.csv')),
    ("data/analysis.db exists", os.path.exists('data/analysis.db')),
    ("README.md exists", os.path.exists('README.md')),
    ("ProjectPlan.md exists", os.path.exists('ProjectPlan.md')),
    ("StatusReport.md exists", os.path.exists('StatusReport.md')),
]

total_checks = len(checklist)
passed_checks = 0

for item, status in checklist:
    symbol = "Yes" if status else "No"
    print(f"{symbol} - {item}")
    if status:
        passed_checks += 1

print("\n" + "="*60)
print(f"SCORE: {passed_checks}/{total_checks} checks passed ({passed_checks/total_checks*100:.0f}%)")
print("="*60)

In [None]:
# correlation analysis
print("\n" + "="*60)
print("CORRELATION ANALYSIS")
print("="*60)

if not os.path.exists(CONFIG['database']):
    print("Database not found")
else:
    conn = sqlite3.connect(CONFIG['database'])
    
    try:
        df = pd.read_sql('SELECT * FROM monthly_analysis', conn)
        
        if len(df) == 0:
            print("No data in monthly_analysis table")
        else:
            print(f"\nAnalyzing {len(df):,} monthly records...")
            
            # overall correlation
            corr = df[['avg_pollution', 'avg_visit_percent']].corr()
            print(f"\nOverall Correlation:")
            print(f"Pollution vs Visit Percent: {corr.iloc[0, 1]:.4f}")
            
            # correlation by respiratory category
            print(f"\nCorrelation by Respiratory Category:")
            print("-"*60)
            for category in df['respiratory_category'].unique():
                cat_df = df[df['respiratory_category'] == category]
                if len(cat_df) > 2:
                    cat_corr = cat_df[['avg_pollution', 'avg_visit_percent']].corr()
                    print(f"{category:30} r = {cat_corr.iloc[0, 1]:>7.4f} (n={len(cat_df)})")
            
            # correlation by parameter
            print(f"\nCorrelation by Pollutant Parameter:")
            print("-"*60)
            for param in df['parameter'].unique():
                param_df = df[df['parameter'] == param]
                if len(param_df) > 2:
                    param_corr = param_df[['avg_pollution', 'avg_visit_percent']].corr()
                    print(f"{param:30} r = {param_corr.iloc[0, 1]:>7.4f} (n={len(param_df)})")
        
    except Exception as e:
        print(f"Analysis error: {e}")
    finally:
        conn.close()
    
    print("\n" + "="*60)

In [None]:
# export pipeline status
status = {
    'timestamp': datetime.now().isoformat(),
    'files': {},
    'database': {}
}

# check files
for filepath in CONFIG['expected_files']:
    if os.path.exists(filepath):
        file_size = os.path.getsize(filepath)
        status['files'][filepath] = {
            'exists': True,
            'size_bytes': file_size,
            'size_mb': round(file_size / (1024*1024), 2)
        }
    else:
        status['files'][filepath] = {'exists': False}

# check database
if os.path.exists(CONFIG['database']):
    conn = sqlite3.connect(CONFIG['database'])
    cursor = conn.cursor()
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
    tables = cursor.fetchall()
    
    status['database']['exists'] = True
    status['database']['tables'] = {}
    
    for (table_name,) in tables:
        cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
        count = cursor.fetchone()[0]
        status['database']['tables'][table_name] = count
    
    conn.close()
else:
    status['database']['exists'] = False

# save to file
output_file = 'pipeline_status.json'
with open(output_file, 'w') as f:
    json.dump(status, f, indent=2)

print(f"Pipeline status exported to: {output_file}")