# 02 - Data Cleaning

This notebook cleans the raw air quality data:
- Parse timestamps and set timezone to America/New_York
- Normalize units
- Flag outliers
- Spatial join to boroughs
- Save cleaned parquet partitioned by year/month


In [35]:
import pandas as pd
import numpy as np
from pathlib import Path
import json

# Data directories
PROJECT_ROOT = Path().resolve().parent
DATA_RAW = PROJECT_ROOT / "data" / "raw"
DATA_PROCESSED = PROJECT_ROOT / "data" / "processed"
DATA_PROCESSED.mkdir(parents=True, exist_ok=True)

print(f"Raw data: {DATA_RAW}")
print(f"Processed data: {DATA_PROCESSED}")


Raw data: /Users/shohruz/Air-Quality-Analysis/data/raw
Processed data: /Users/shohruz/Air-Quality-Analysis/data/processed


In [36]:
# Load the most recent raw JSON file
raw_files = sorted(DATA_RAW.glob("air_quality_raw_*.json"))
if not raw_files:
    raise FileNotFoundError("No raw JSON files found. Run 01-ingest.ipynb first.")

latest_file = raw_files[-1]
print(f"Loading: {latest_file.name}")

with open(latest_file, 'r') as f:
    data = json.load(f)

df = pd.DataFrame(data)
print(f"Loaded {len(df)} records")
df.head()


Loading: air_quality_raw_20251205_152721.json
Loaded 18862 records


Unnamed: 0,unique_id,indicator_id,name,measure,measure_info,geo_type_name,geo_join_id,geo_place_name,time_period,start_date,data_value
0,154596,643,Annual vehicle miles traveled,Million miles,per square mile,CD,313,Coney Island (CD13),2005,2005-01-01T00:00:00.000,31.85136
1,131251,657,Asthma emergency department visits due to PM2.5,Estimated annual rate (age 18+),"per 100,000 adults",UHF42,405,Ridgewood - Forest Hills,2005-2007,2005-01-01T00:00:00.000,19.1
2,151656,643,Annual vehicle miles traveled,Million miles,per square mile,UHF42,406,Fresh Meadows,2005,2005-01-01T00:00:00.000,61.967759
3,131253,657,Asthma emergency department visits due to PM2.5,Estimated annual rate (age 18+),"per 100,000 adults",UHF42,407,Southwest Queens,2005-2007,2005-01-01T00:00:00.000,30.6
4,130915,650,Respiratory hospitalizations due to PM2.5 (age...,Estimated annual rate,"per 100,000 adults",UHF42,405,Ridgewood - Forest Hills,2005-2007,2005-01-01T00:00:00.000,18.3


## Step 1: Parse Timestamps


In [37]:
# Parse timestamps and set timezone to America/New_York
# Note: start_date represents the start of the time_period (season), not an hourly measurement
df['timestamp'] = pd.to_datetime(df['start_date'], errors='coerce')

# Set timezone (assume UTC if no timezone, then convert to Eastern)
if df['timestamp'].dt.tz is None:
    df['timestamp'] = df['timestamp'].dt.tz_localize('UTC')
df['timestamp'] = df['timestamp'].dt.tz_convert('America/New_York')

# Derive date, month, year from timestamp
df['date'] = df['timestamp'].dt.date
df['month'] = df['timestamp'].dt.month
df['year'] = df['timestamp'].dt.year

# Extract season from time_period (e.g., "Summer 2023" -> "Summer")
def extract_season(time_period):
    """Extract season from time_period string."""
    if pd.isna(time_period):
        return None
    parts = str(time_period).split()
    if len(parts) >= 1:
        return parts[0]  # "Summer", "Winter", etc.
    return None

df['season'] = df['time_period'].apply(extract_season)

# Note: Since this is seasonal aggregate data, we don't have hour/weekday
# Set to None for consistency with schema
df['hour'] = None
df['weekday'] = None

print(f"Date range: {df['timestamp'].min()} to {df['timestamp'].max()}")
print(f"Timezone: {df['timestamp'].dt.tz}")
print(f"\nSeasons: {df['season'].value_counts()}")
print(f"\nTime periods: {df['time_period'].value_counts().head(10)}")
df[['start_date', 'timestamp', 'date', 'time_period', 'season', 'year', 'month']].head()


Date range: 2004-12-31 19:00:00-05:00 to 2023-05-31 20:00:00-04:00
Timezone: America/New_York

Seasons: season
Summer       6345
Winter       4230
Annual       4230
2005-2007     480
2009-2011     480
2012-2014     480
2015-2017     480
2017-2019     480
2005          417
2010          321
2019          321
2011          214
2013          144
2015          144
2014           96
Name: count, dtype: int64

Time periods: time_period
2017-2019      480
2005-2007      480
2009-2011      480
2012-2014      480
2015-2017      480
Summer 2023    423
Summer 2012    423
Summer 2015    423
Summer 2011    423
Summer 2016    423
Name: count, dtype: int64


Unnamed: 0,start_date,timestamp,date,time_period,season,year,month
0,2005-01-01T00:00:00.000,2004-12-31 19:00:00-05:00,2004-12-31,2005,2005,2004,12
1,2005-01-01T00:00:00.000,2004-12-31 19:00:00-05:00,2004-12-31,2005-2007,2005-2007,2004,12
2,2005-01-01T00:00:00.000,2004-12-31 19:00:00-05:00,2004-12-31,2005,2005,2004,12
3,2005-01-01T00:00:00.000,2004-12-31 19:00:00-05:00,2004-12-31,2005-2007,2005-2007,2004,12
4,2005-01-01T00:00:00.000,2004-12-31 19:00:00-05:00,2004-12-31,2005-2007,2005-2007,2004,12


## Step 2: Normalize Units


In [38]:
# Normalize units
# Standardize to µg/m³ for particulates, ppb for gases
df['unit'] = df['measure_info'].str.lower().str.strip()

# Unit mapping
unit_mapping = {
    'mcg/m3': 'µg/m³',
    'ug/m3': 'µg/m³',
    'µg/m³': 'µg/m³',
    'micrograms/m3': 'µg/m³',
    'ppb': 'ppb',
    'parts per billion': 'ppb',
    'ppm': 'ppb',  # Will convert values
}

df['unit_normalized'] = df['unit'].map(unit_mapping).fillna(df['unit'])

# Convert ppm to ppb (multiply by 1000)
ppm_mask = df['unit'].str.contains('ppm', case=False, na=False)
if ppm_mask.any():
    df.loc[ppm_mask, 'data_value'] = df.loc[ppm_mask, 'data_value'] * 1000
    print(f"Converted {ppm_mask.sum()} records from ppm to ppb")

print("Unit normalization:")
print(df['unit'].value_counts())
print("\nNormalized units:")
print(df['unit_normalized'].value_counts())


Unit normalization:
unit
ppb                     8460
mcg/m3                  6345
per 100,000 adults      1440
per square mile          963
per 100,000 children     720
âµg/m3                   406
number                   288
per 100,000              240
Name: count, dtype: int64

Normalized units:
unit_normalized
ppb                     8460
µg/m³                   6345
per 100,000 adults      1440
per square mile          963
per 100,000 children     720
âµg/m3                   406
number                   288
per 100,000              240
Name: count, dtype: int64


## Step 3: Flag Outliers


In [39]:
# Flag outliers using z-score method (grouped by pollutant and station)
# Ensure data_value is numeric before outlier detection
if df['data_value'].dtype == 'object':
    df['data_value'] = pd.to_numeric(df['data_value'], errors='coerce')

df['is_outlier'] = False

# Only process groups with valid numeric data
for (pollutant, station), group in df.groupby(['name', 'geo_join_id']):
    # Filter out NaN values for this group
    group_clean = group[group['data_value'].notna()]
    
    if len(group_clean) < 2:  # Need at least 2 values to calculate std
        continue
    
    mean = group_clean['data_value'].mean()
    std = group_clean['data_value'].std()
    
    if std > 0:
        z_scores = np.abs((group_clean['data_value'] - mean) / std)
        outlier_mask = z_scores > 3.0
        df.loc[outlier_mask.index, 'is_outlier'] = outlier_mask.values

outlier_count = df['is_outlier'].sum()
print(f"Flagged {outlier_count} outliers ({outlier_count/len(df)*100:.2f}%)")

# Show some outliers
if outlier_count > 0:
    print("\nSample outliers:")
    print(df[df['is_outlier']][['name', 'geo_place_name', 'data_value', 'timestamp']].head(10))
else:
    print("No outliers detected.")


Flagged 20 outliers (0.11%)

Sample outliers:
                         name                            geo_place_name  \
923   Fine particles (PM 2.5)  South Ozone Park and Howard Beach (CD10)   
934   Fine particles (PM 2.5)           Kew Gardens and Woodhaven (CD9)   
1153  Fine particles (PM 2.5)            Highbridge and Concourse (CD4)   
1156  Fine particles (PM 2.5)            Belmont and East Tremont (CD6)   
1227  Fine particles (PM 2.5)             Riverdale and Fieldston (CD8)   
1254   Nitrogen dioxide (NO2)                 Clinton and Chelsea (CD4)   
1305   Nitrogen dioxide (NO2)          Greenwich Village and Soho (CD2)   
1412   Nitrogen dioxide (NO2)                  Financial District (CD1)   
1439  Fine particles (PM 2.5)      Fordham and University Heights (CD5)   
1443  Fine particles (PM 2.5)     Kingsbridge Heights and Bedford (CD7)   

      data_value                 timestamp  
923        12.60 2008-11-30 19:00:00-05:00  
934        13.24 2008-11-30 19:00:00-0

## Step 4: Extract Station Metadata and Boroughs


In [40]:
# Extract borough from location name and geo_type_name
def extract_borough(location_name, geo_type_name):
    """Extract borough from location name and geo_type."""
    # If geo_type is Borough, the location_name is the borough
    if pd.notna(geo_type_name) and str(geo_type_name).lower() == 'borough':
        return str(location_name).title()
    
    if pd.isna(location_name):
        return "Unknown"
    
    location_lower = str(location_name).lower()
    
    # Direct borough mentions
    boroughs = {
        'manhattan': 'Manhattan',
        'brooklyn': 'Brooklyn',
        'queens': 'Queens',
        'bronx': 'Bronx',
        'staten island': 'Staten Island',
        'staten': 'Staten Island',
        'si': 'Staten Island'
    }
    
    for key, value in boroughs.items():
        if key in location_lower:
            return value
    
    # Heuristics based on location patterns
    if any(x in location_lower for x in ['harlem', 'midtown', 'upper east', 'upper west', 'chelsea', 'soho']):
        return 'Manhattan'
    if any(x in location_lower for x in ['flatbush', 'crown heights', 'sunset park', 'bay ridge', 'bensonhurst']):
        return 'Brooklyn'
    if any(x in location_lower for x in ['astoria', 'flushing', 'bayside', 'jamaica', 'queens']):
        return 'Queens'
    if any(x in location_lower for x in ['bronx', 'throgs neck', 'fordham', 'pelham']):
        return 'Bronx'
    if any(x in location_lower for x in ['tottenville', 'great kills', 'staten']):
        return 'Staten Island'
    
    return "Unknown"

# Create station metadata
df['station_id'] = df['geo_join_id'].astype(str)  # Keep as string to handle composite IDs
df['station_name'] = df['geo_place_name']
df['geo_type'] = df['geo_type_name']  # Keep geo_type for reference
df['borough'] = df.apply(lambda row: extract_borough(row['geo_place_name'], row['geo_type_name']), axis=1)

print("Borough distribution:")
print(df['borough'].value_counts())
print(f"\nGeo types: {df['geo_type_name'].value_counts()}")
print(f"\nUnknown boroughs: {df[df['borough'] == 'Unknown']['geo_place_name'].unique()[:10]}")


Borough distribution:
borough
Unknown          7990
Staten Island    3187
Queens           2326
Brooklyn         2228
Manhattan        1775
Bronx            1356
Name: count, dtype: int64

Geo types: geo_type_name
UHF42       7392
CD          6844
UHF34       3570
Borough      880
Citywide     176
Name: count, dtype: int64

Unknown boroughs: ['Coney Island (CD13)' 'Ridgewood - Forest Hills' 'Fresh Meadows'
 'Willowbrook' 'Stapleton - St. George'
 'Hillcrest and Fresh Meadows (CD8)' 'Port Richmond'
 'High Bridge - Morrisania' 'Hunts Point - Mott Haven'
 'Rego Park and Forest Hills (CD6)']


## Step 5: Create Final Tidy Dataset


In [41]:
# Select and rename columns to match required schema
# Required: timestamp, date, hour, pollutant, value, unit, station_id, station_name, lat, lon, borough, year, month

df_clean = df[[
    'timestamp', 'date', 'hour', 'weekday', 'month', 'year', 'season', 'time_period',
    'name', 'data_value', 'unit_normalized',
    'station_id', 'station_name', 'geo_type', 'borough',
    'is_outlier'
]].copy()

# Rename columns
df_clean = df_clean.rename(columns={
    'name': 'pollutant',
    'data_value': 'value',
    'unit_normalized': 'unit'
})

# Clean pollutant names (simplify)
df_clean['pollutant'] = df_clean['pollutant'].str.replace('Fine particles (PM 2.5)', 'PM2.5', regex=False)
df_clean['pollutant'] = df_clean['pollutant'].str.replace('Nitrogen dioxide (NO2)', 'NO2', regex=False)
df_clean['pollutant'] = df_clean['pollutant'].str.replace('Ozone (O3)', 'O3', regex=False)

# Note: lat/lon not in original dataset - would need spatial join with station locations
# For now, set to None
df_clean['lat'] = None
df_clean['lon'] = None

# Reorder columns
df_clean = df_clean[[
    'timestamp', 'date', 'hour', 'weekday', 'year', 'month', 'season', 'time_period',
    'pollutant', 'value', 'unit',
    'station_id', 'station_name', 'geo_type', 'lat', 'lon', 'borough',
    'is_outlier'
]]

print(f"Cleaned dataset shape: {df_clean.shape}")
print(f"\nColumns: {list(df_clean.columns)}")
print(f"\nPollutants: {df_clean['pollutant'].unique()}")
print(f"\nSample data:")
df_clean.head()


Cleaned dataset shape: (18862, 18)

Columns: ['timestamp', 'date', 'hour', 'weekday', 'year', 'month', 'season', 'time_period', 'pollutant', 'value', 'unit', 'station_id', 'station_name', 'geo_type', 'lat', 'lon', 'borough', 'is_outlier']

Pollutants: ['Annual vehicle miles traveled'
 'Asthma emergency department visits due to PM2.5'
 'Respiratory hospitalizations due to PM2.5 (age 20+)'
 'Asthma hospitalizations due to Ozone'
 'Outdoor Air Toxics - Formaldehyde'
 'Asthma emergency departments visits due to Ozone'
 'Annual vehicle miles traveled (cars)' 'Outdoor Air Toxics - Benzene'
 'Annual vehicle miles traveled (trucks)' 'Deaths due to PM2.5'
 'Cardiac and respiratory deaths due to Ozone'
 'Cardiovascular hospitalizations due to PM2.5 (age 40+)' 'NO2' 'PM2.5'
 'O3' 'Boiler Emissions- Total SO2 Emissions'
 'Boiler Emissions- Total PM2.5 Emissions'
 'Boiler Emissions- Total NOx Emissions']

Sample data:


Unnamed: 0,timestamp,date,hour,weekday,year,month,season,time_period,pollutant,value,unit,station_id,station_name,geo_type,lat,lon,borough,is_outlier
0,2004-12-31 19:00:00-05:00,2004-12-31,,,2004,12,2005,2005,Annual vehicle miles traveled,31.85136,per square mile,313,Coney Island (CD13),CD,,,Unknown,False
1,2004-12-31 19:00:00-05:00,2004-12-31,,,2004,12,2005-2007,2005-2007,Asthma emergency department visits due to PM2.5,19.1,"per 100,000 adults",405,Ridgewood - Forest Hills,UHF42,,,Unknown,False
2,2004-12-31 19:00:00-05:00,2004-12-31,,,2004,12,2005,2005,Annual vehicle miles traveled,61.967759,per square mile,406,Fresh Meadows,UHF42,,,Unknown,False
3,2004-12-31 19:00:00-05:00,2004-12-31,,,2004,12,2005-2007,2005-2007,Asthma emergency department visits due to PM2.5,30.6,"per 100,000 adults",407,Southwest Queens,UHF42,,,Queens,False
4,2004-12-31 19:00:00-05:00,2004-12-31,,,2004,12,2005-2007,2005-2007,Respiratory hospitalizations due to PM2.5 (age...,18.3,"per 100,000 adults",405,Ridgewood - Forest Hills,UHF42,,,Unknown,False


In [42]:
# Check for missing values
print("Missing values:")
print(df_clean.isnull().sum())
print(f"\nTotal rows: {len(df_clean)}")
print(f"Rows with missing critical values: {df_clean[['timestamp', 'value', 'pollutant']].isnull().any(axis=1).sum()}")

# Remove rows with missing critical values
df_clean = df_clean.dropna(subset=['timestamp', 'value', 'pollutant'])
print(f"Rows after removing missing critical values: {len(df_clean)}")


Missing values:
timestamp           0
date                0
hour            18862
weekday         18862
year                0
month               0
season              0
time_period         0
pollutant           0
value               0
unit                0
station_id          0
station_name        0
geo_type            0
lat             18862
lon             18862
borough             0
is_outlier          0
dtype: int64

Total rows: 18862
Rows with missing critical values: 0
Rows after removing missing critical values: 18862


## Step 6: Save Processed Parquet


In [43]:
# Save as single parquet file
parquet_path = DATA_PROCESSED / "measurements.parquet"
df_clean.to_parquet(parquet_path, index=False, engine='pyarrow')
print(f"Saved to: {parquet_path}")
print(f"File size: {parquet_path.stat().st_size / 1024 / 1024:.2f} MB")

# Also save partitioned by year/month for large datasets
partition_dir = DATA_PROCESSED / "measurements_partitioned"
df_clean.to_parquet(
    partition_dir,
    partition_cols=['year', 'month'],
    engine='pyarrow',
    index=False
)
print(f"Saved partitioned data to: {partition_dir}")

# Verify it loads correctly
df_test = pd.read_parquet(parquet_path)
print(f"\nVerification: Loaded {len(df_test)} records")
print(f"Columns: {list(df_test.columns)}")
df_test.head()


Saved to: /Users/shohruz/Air-Quality-Analysis/data/processed/measurements.parquet
File size: 0.15 MB
Saved partitioned data to: /Users/shohruz/Air-Quality-Analysis/data/processed/measurements_partitioned

Verification: Loaded 18862 records
Columns: ['timestamp', 'date', 'hour', 'weekday', 'year', 'month', 'season', 'time_period', 'pollutant', 'value', 'unit', 'station_id', 'station_name', 'geo_type', 'lat', 'lon', 'borough', 'is_outlier']


Unnamed: 0,timestamp,date,hour,weekday,year,month,season,time_period,pollutant,value,unit,station_id,station_name,geo_type,lat,lon,borough,is_outlier
0,2004-12-31 19:00:00-05:00,2004-12-31,,,2004,12,2005,2005,Annual vehicle miles traveled,31.85136,per square mile,313,Coney Island (CD13),CD,,,Unknown,False
1,2004-12-31 19:00:00-05:00,2004-12-31,,,2004,12,2005-2007,2005-2007,Asthma emergency department visits due to PM2.5,19.1,"per 100,000 adults",405,Ridgewood - Forest Hills,UHF42,,,Unknown,False
2,2004-12-31 19:00:00-05:00,2004-12-31,,,2004,12,2005,2005,Annual vehicle miles traveled,61.967759,per square mile,406,Fresh Meadows,UHF42,,,Unknown,False
3,2004-12-31 19:00:00-05:00,2004-12-31,,,2004,12,2005-2007,2005-2007,Asthma emergency department visits due to PM2.5,30.6,"per 100,000 adults",407,Southwest Queens,UHF42,,,Queens,False
4,2004-12-31 19:00:00-05:00,2004-12-31,,,2004,12,2005-2007,2005-2007,Respiratory hospitalizations due to PM2.5 (age...,18.3,"per 100,000 adults",405,Ridgewood - Forest Hills,UHF42,,,Unknown,False
