In [None]:
import requests
import os
import json
import time
import asyncio
import aiohttp
from datetime import datetime, timedelta
import pandas as pd
import logging
from tqdm.notebook import tqdm
import nest_asyncio

nest_asyncio.apply()

logging.basicConfig(
    level=logging.WARNING,  
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("airnow_scrapping.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger()

base_url = "https://files.airnowtech.org/airnow/"
filtered_dir = "FilteredData"
stations_dir = "StationsData"
checkpoint_file = "checkpoint.json"

 
os.makedirs(filtered_dir, exist_ok=True)
os.makedirs(stations_dir, exist_ok=True)

 
TARGET_STATIONS = [
    "Los Angeles - N. Main",  
    "Los Angeles-North Main",  
    "Long Beach",
    "Santa Monica"
]
 
TARGET_STATION_SUBSTRINGS = [
    "Los Angeles",
    "Long Beach",
    "Santa Monica"
]


TARGET_PARAMETERS = {"PM2.5", "PM10"}  
TARGET_HOURS = range(8, 16)  
MAX_RETRIES = 3
RETRY_DELAY = 2   
MAX_WORKERS = 10   
BATCH_SIZE = 30   

def is_weekend(date):
    """Check if date is a weekend (Saturday=5, Sunday=6)"""
    return date.weekday() in [5, 6]

def is_summer_vacation(date):
    """Check if date is during summer vacation (June 15 - August 15)"""
    return (date.month == 6 and date.day >= 15) or (date.month == 7) or (date.month == 8 and date.day <= 15)

def generate_lausd_holidays(start_year, end_year):
    """Generate approximate LAUSD holidays for filtering"""
    holidays = []
    for year in range(start_year, end_year + 1):
        holidays += [
            f"{year}-01-01",  # New Year's Day
            f"{year}-01-20",  # MLK Day
            f"{year}-02-17",  # Presidents' Day
            f"{year}-03-31",  # Cesar Chavez Day
            f"{year}-05-25",  # Memorial Day
            f"{year}-09-01",  # Labor Day
            f"{year}-10-14",  # Indigenous Peoples' Day
            f"{year}-11-11",  # Veterans Day
            f"{year}-11-28",  # Thanksgiving Thurs
            f"{year}-11-29",  # Thanksgiving Fri
            f"{year}-12-25",  # Christmas Day
        ]
        holidays += [f"{year}-03-25", f"{year}-03-26", f"{year}-03-27", f"{year}-03-28", f"{year}-03-29"]
        holidays += [f"{year}-12-{day}" for day in range(23, 32)]
    return pd.to_datetime(holidays)

LAUSD_HOLIDAYS = generate_lausd_holidays(2014, 2024)

def is_school_day(date):
    """Check if a date is a LAUSD school day"""
    if is_weekend(date) or is_summer_vacation(date):
        return False
    
    date_only = pd.Timestamp(date.year, date.month, date.day)
    return date_only not in LAUSD_HOLIDAYS

class Checkpoint:
    """Minimalist checkpoint handling"""
    @staticmethod
    def save(date):
        with open(checkpoint_file, 'w') as f:
            json.dump({'last_processed_date': date.strftime('%Y-%m-%d')}, f)
    
    @staticmethod
    def load():
        if os.path.exists(checkpoint_file):
            try:
                with open(checkpoint_file, 'r') as f:
                    data = json.load(f)
                    return datetime.strptime(data['last_processed_date'], '%Y-%m-%d')
            except:
                return None
        return None

async def download_hour(session, date, hour):
    """Download data for a specific hour using aiohttp"""
    year = date.strftime("%Y")
    date_str = date.strftime("%Y%m%d")
    hour_str = f"{hour:02d}"
    file_name = f"HourlyData_{date_str}{hour_str}.dat"
    file_url = f"{base_url}{year}/{date_str}/{file_name}"
    
    data_entries = []
    
    for attempt in range(MAX_RETRIES):
        try:
            async with session.get(file_url, timeout=15) as response:
                if response.status == 200:
                    content = await response.read()
                    
                    for encoding in ['utf-8', 'latin-1', 'windows-1252', 'iso-8859-1']:
                        try:
                            text = content.decode(encoding)
                            lines = text.strip().split('\n')
                            
                            for line in lines:
                                parts = line.strip().split('|')
                                if len(parts) >= 9:
                                    date_val, time, sitecode, location, timezone, parameter, unit, value, agency = [p.strip() for p in parts]
                                    
                                    is_target_station = (
                                        location in TARGET_STATIONS or
                                        any(substring in location for substring in TARGET_STATION_SUBSTRINGS)
                                    )
                                    
                                    if is_target_station and parameter.upper() in TARGET_PARAMETERS:
                                        try:
                                            numeric_value = float(value)                                                
                                            data_entries.append({
                                                "Date": date_val,
                                                "Time": time,
                                                "SiteCode": sitecode,
                                                "Location": location,
                                                "Timezone": timezone,
                                                "Parameter": parameter.upper(),
                                                "Unit": unit,
                                                "Value": numeric_value,
                                                "Agency": agency
                                            })
                                        except ValueError:
                                            pass
                            
                            break
                        except UnicodeDecodeError:
                            if encoding == 'iso-8859-1': 
                                logger.warning(f"Failed to decode {file_url} with any encoding")
                    
                    break  
                elif response.status == 404:
                    break  
                else:
                    await asyncio.sleep(RETRY_DELAY * (attempt + 1))
        except (asyncio.TimeoutError, aiohttp.ClientError) as e:
            logger.warning(f"Error downloading {file_url}: {str(e)}")
            await asyncio.sleep(RETRY_DELAY * (attempt + 1))
            continue
    
    return data_entries

async def process_day(session, date):
    """Process a single day's data across target hours"""
    tasks = [download_hour(session, date, hour) for hour in TARGET_HOURS]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    all_entries = []
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            hour = TARGET_HOURS[i]
            logger.warning(f"Error processing {date.strftime('%Y-%m-%d')} hour {hour}: {str(result)}")
            continue
        all_entries.extend(result)
    
    station_data = {}
    for entry in all_entries:
        station_name = entry["Location"]
        if station_name not in station_data:
            station_data[station_name] = []
        station_data[station_name].append(entry)
    
    for station_name, entries in station_data.items():
        data_dict = {}
        for entry in entries:
            key = (entry["Date"], entry["Time"])
            if key not in data_dict:
                data_dict[key] = {
                    "Date": entry["Date"],
                    "Time": entry["Time"],
                    "SiteCode": entry.get("SiteCode", ""),
                    "Location": entry.get("Location", ""),
                    "Timezone": entry.get("Timezone", ""),
                    "Unit": entry.get("Unit", ""),
                    "Agency": entry.get("Agency", ""),
                    "PM2.5": None,
                    "PM10": None
                }
            
            param = entry["Parameter"]
            if param in TARGET_PARAMETERS:
                data_dict[key][param] = entry["Value"]
        
        data_list = list(data_dict.values())
        if data_list:
            safe_station_name = station_name.replace(" ", "_").replace("-", "_")
            date_str = date.strftime("%Y%m%d")
            station_dir = os.path.join(stations_dir, safe_station_name)
            os.makedirs(station_dir, exist_ok=True)
            
            file_path = os.path.join(station_dir, f"PM_Data_{date_str}.csv")
            
            pd.DataFrame(data_list).to_csv(file_path, index=False)
    
    all_data_list = []
    for station_entries in station_data.values():
        for entry in station_entries:
            all_data_list.append(entry)
    
    if all_data_list:
        date_str = date.strftime("%Y%m%d")
        file_path = os.path.join(filtered_dir, f"All_Stations_Data_{date_str}.csv")
        pd.DataFrame(all_data_list).to_csv(file_path, index=False)
    
    return all_data_list

async def process_batch(dates):
    """Process a batch of dates in parallel"""
    try:
        async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=MAX_WORKERS)) as session:
            tasks = [process_day(session, date) for date in dates]
            return await asyncio.gather(*tasks, return_exceptions=True)
    except Exception as e:
        logger.error(f"Error in batch processing: {str(e)}")
        return [[] for _ in dates]  

def combine_data_for_station(station_name, start_date, end_date):
    """Combine daily files for a specific station into a single dataset and calculate averages"""
    safe_station_name = station_name.replace(" ", "_").replace("-", "_")
    station_dir = os.path.join(stations_dir, safe_station_name)
    
    all_files = []
    current = start_date
    while current <= end_date:
        date_str = current.strftime("%Y%m%d")
        file_path = os.path.join(station_dir, f"PM_Data_{date_str}.csv")
        if os.path.exists(file_path):
            all_files.append(file_path)
        current += timedelta(days=1)
    
    if not all_files:
        print(f"No data files found to combine for station {station_name}")
        return None
    
    dfs = []
    for file in all_files:
        try:
            df = pd.read_csv(file)
            dfs.append(df)
        except Exception as e:
            logger.warning(f"Error reading {file}: {str(e)}")
            continue
    
    if not dfs:
        print(f"No valid data found in files for station {station_name}")
        return None
    
    combined_df = pd.concat(dfs, ignore_index=True)
    
    combined_df['Datetime'] = pd.to_datetime(combined_df['Date'] + ' ' + combined_df['Time'])
    combined_df['Year'] = combined_df['Datetime'].dt.year
    combined_df['Month'] = combined_df['Datetime'].dt.month
    combined_df['Week'] = combined_df['Datetime'].dt.isocalendar().week
    
    combined_df['YearWeek'] = combined_df['Year'].astype(str) + '-' + combined_df['Week'].astype(str)
    combined_df['YearMonth'] = combined_df['Year'].astype(str) + '-' + combined_df['Month'].astype(str)
    
    weekly_avg = combined_df.groupby('YearWeek')[['PM2.5', 'PM10']].mean().reset_index()
    
    monthly_avg = combined_df.groupby('YearMonth')[['PM2.5', 'PM10']].mean().reset_index()
    
    pm25_weekly_avg_dict = dict(zip(weekly_avg['YearWeek'], weekly_avg['PM2.5']))
    pm10_weekly_avg_dict = dict(zip(weekly_avg['YearWeek'], weekly_avg['PM10']))
    
    pm25_monthly_avg_dict = dict(zip(monthly_avg['YearMonth'], monthly_avg['PM2.5']))
    pm10_monthly_avg_dict = dict(zip(monthly_avg['YearMonth'], monthly_avg['PM10']))
    
    combined_df['PM2.5_Weekly_Avg'] = combined_df['YearWeek'].map(pm25_weekly_avg_dict)
    combined_df['PM10_Weekly_Avg'] = combined_df['YearWeek'].map(pm10_weekly_avg_dict)
    combined_df['PM2.5_Monthly_Avg'] = combined_df['YearMonth'].map(pm25_monthly_avg_dict)
    combined_df['PM10_Monthly_Avg'] = combined_df['YearMonth'].map(pm10_monthly_avg_dict)
    
    final_columns = [
        'Date', 'Time', 'SiteCode', 'Location', 'Timezone', 'Unit', 'Agency',
        'PM2.5', 'PM10', 'Datetime', 'Week', 'Month', 
        'PM2.5_Weekly_Avg', 'PM10_Weekly_Avg', 'PM2.5_Monthly_Avg', 'PM10_Monthly_Avg'
    ]
    
    combined_df = combined_df[final_columns]
    weekly_avg.to_csv(os.path.join(station_dir, f"{safe_station_name}_Weekly_Averages_8am-3pm.csv"), index=False)
    monthly_avg.to_csv(os.path.join(station_dir, f"{safe_station_name}_Monthly_Averages_8am-3pm.csv"), index=False)
    combined_df.to_csv(os.path.join(station_dir, f"{safe_station_name}_Combined_Data_8am-3pm.csv"), index=False)
    
    print(f"Processed {len(all_files)} days of data for station {station_name}")
    print(f"Total records: {len(combined_df)}")
    
    return combined_df

def combine_all_stations_data(start_date, end_date):
    """Combine data from all stations into a single comprehensive dataset"""
    all_stations_dfs = []
    discovered_stations = set()
    
    for item in os.listdir(stations_dir):
        dir_path = os.path.join(stations_dir, item)
        if os.path.isdir(dir_path):
            station_name = item.replace("_", " ")
            discovered_stations.add(station_name)
    
    for station_name in discovered_stations:
        print(f"Combining data for station: {station_name}")
        station_df = combine_data_for_station(station_name, start_date, end_date)
        if station_df is not None:
            all_stations_dfs.append(station_df)
    
    if not all_stations_dfs:
        print("No data available from any station")
        return
    
    all_data_df = pd.concat(all_stations_dfs, ignore_index=True)
    
    all_data_df.to_csv(os.path.join(filtered_dir, "All_Stations_Combined_Data.csv"), index=False)
    
    print("Applying LAUSD school day filtering...")
    filtered_df = filter_for_school_days(all_data_df)
    filtered_df.to_csv(os.path.join(filtered_dir, "LAUSD_School_Days_Data.csv"), index=False)
    
    daily_avg = all_data_df.groupby(['Date'])[['PM2.5', 'PM10']].mean().reset_index()
    daily_avg.to_csv(os.path.join(filtered_dir, "Overall_Daily_Averages.csv"), index=False)
    
    weekly_avg = all_data_df.groupby(['YearWeek'])[['PM2.5', 'PM10']].mean().reset_index()
    weekly_avg.to_csv(os.path.join(filtered_dir, "Overall_Weekly_Averages.csv"), index=False)
    
    monthly_avg = all_data_df.groupby(['YearMonth'])[['PM2.5', 'PM10']].mean().reset_index()
    monthly_avg.to_csv(os.path.join(filtered_dir, "Overall_Monthly_Averages.csv"), index=False)
    
    stations_df = pd.DataFrame({
        "Station": list(discovered_stations)
    })
    stations_df.to_csv(os.path.join(stations_dir, "available_stations.csv"), index=False)
    
    print(f"Combined data from {len(all_stations_dfs)} stations")
    print(f"Total records in combined dataset: {len(all_data_df)}")
    print(f"Total records in LAUSD school days dataset: {len(filtered_df)}")
    print(f"Overall daily averages: {len(daily_avg)}")
    print(f"Overall weekly averages: {len(weekly_avg)}")
    print(f"Overall monthly averages: {len(monthly_avg)}")

def filter_for_school_days(df):
    """Filter dataset to include only LAUSD school days"""
    print("Starting LAUSD school day filtering...")
    print(f"Original dataset size: {len(df)}")
    
    if 'Datetime' not in df.columns:
        df['Datetime'] = pd.to_datetime(df['Date'] + ' ' + df['Time'])
    
    df = df[(df['Datetime'].dt.hour >= 8) & (df['Datetime'].dt.hour <= 15)]
    
    df = df[~df['Datetime'].dt.weekday.isin([5, 6])]
    
    df = df[~df['Datetime'].apply(is_summer_vacation)]
    
    df = df[~df['Datetime'].dt.normalize().isin(LAUSD_HOLIDAYS)]
    
    print(f"Filtered dataset size: {len(df)}")
    return df

def filter_all_existing_data():
    """Apply school day filtering to already collected data"""
    combined_file = os.path.join(filtered_dir, "All_Stations_Combined_Data.csv")
    
    if not os.path.exists(combined_file):
        print(f"Combined data file not found: {combined_file}")
        return
    
    print(f"Reading combined data file: {combined_file}")
    df = pd.read_csv(combined_file, parse_dates=['Datetime'])
    
    filtered_df = filter_for_school_days(df)
    
    output_file = os.path.join(filtered_dir, "LAUSD_School_Days_Data.csv")
    filtered_df.to_csv(output_file, index=False)
    print(f"Filtered data saved to: {output_file}")

async def main(start_date, end_date):
    """Main function with batch processing for speed and school day filtering"""
    school_days = []
    current = start_date
    
    print("Generating list of school days to process...")
    while current <= end_date:
        if is_school_day(current):
            school_days.append(current)
        current += timedelta(days=1)
    
    print(f"Found {len(school_days)} school days out of {(end_date - start_date).days + 1} total days")
    
    checkpoint = Checkpoint.load()
    if checkpoint and checkpoint >= start_date:
        school_days = [day for day in school_days if day > checkpoint]
        if school_days:
            print(f"Resuming from {school_days[0].date()}")
        else:
            print(f"Already completed processing to end date")
            return
    
    total_days = len(school_days)
    print(f"Processing {total_days} school days between {start_date.date()} and {end_date.date()}")
    
    with tqdm(total=total_days, desc="Processing") as pbar:
        for i in range(0, total_days, BATCH_SIZE):
            try:
                batch = school_days[i:i+BATCH_SIZE]
                results = await process_batch(batch)
                
                error_count = sum(1 for result in results if isinstance(result, Exception))
                if error_count > 0:
                    logger.warning(f"Batch had {error_count} errors out of {len(batch)} days")
                
                if batch:
                    Checkpoint.save(batch[-1])
                
                pbar.update(len(batch))
            except Exception as e:
                logger.error(f"Failed to process batch starting at {school_days[i].date()}: {str(e)}")
                if i > 0:
                    Checkpoint.save(school_days[i-1])
                pbar.update(min(BATCH_SIZE, total_days - i))
    
    print("All data downloaded, combining files...")
    combine_all_stations_data(start_date, end_date)
    print("Processing complete!")

async def debug_stations(date, hour):
    """Debug function to check which stations are available on a specific date and hour"""
    async with aiohttp.ClientSession() as session:
        year = date.strftime("%Y")
        date_str = date.strftime("%Y%m%d")
        hour_str = f"{hour:02d}"
        file_name = f"HourlyData_{date_str}{hour_str}.dat"
        file_url = f"{base_url}{year}/{date_str}/{file_name}"
        
        found_stations = set()
        stations_with_pm = set()
        
        try:
            async with session.get(file_url, timeout=15) as response:
                if response.status == 200:
                    content = await response.read()
                    
                    # Try different encodings
                    for encoding in ['utf-8', 'latin-1', 'windows-1252', 'iso-8859-1']:
                        try:
                            text = content.decode(encoding)
                            lines = text.strip().split('\n')
                            
                            for line in lines:
                                parts = line.strip().split('|')
                                if len(parts) >= 9:
                                    _, _, _, location, _, parameter, _, _, _ = [p.strip() for p in parts]
                                    found_stations.add(location)
                                    
                                    if parameter.upper() in TARGET_PARAMETERS:
                                        stations_with_pm.add(location)
                            
                            break
                        except UnicodeDecodeError:
                            continue
        except Exception as e:
            print(f"Error: {str(e)}")
            return None
            
        print(f"\nFound {len(found_stations)} total stations on {date.strftime('%Y-%m-%d')} at hour {hour}")
        print(f"Found {len(stations_with_pm)} stations with PM2.5/PM10 data")
        
        found_target_stations = []
        not_found_target_stations = []
        
        for station in TARGET_STATIONS:
            if station in found_stations:
                found_target_stations.append(station)
            else:
                matching_stations = [s for s in found_stations if station in s]
                if matching_stations:
                    found_target_stations.append(f"{station} (matched as {matching_stations})")
                else:
                    not_found_target_stations.append(station)
        
        print("\nTarget stations found:")
        for station in found_target_stations:
            print(f"- {station}")
            
        print("\nTarget stations NOT found:")
        for station in not_found_target_stations:
            print(f"- {station}")
            
        potential_matches = {}
        for target in TARGET_STATION_SUBSTRINGS:
            matches = [s for s in stations_with_pm if target.lower() in s.lower()]
            if matches:
                potential_matches[target] = matches
        
        if potential_matches:
            print("\nPotential station matches with PM data:")
            for target, matches in potential_matches.items():
                print(f"\nFor '{target}':")
                for match in matches:
                    print(f"- {match}")
        
        return found_stations, stations_with_pm

start_date = datetime(2014, 1, 1)
end_date = datetime(2024, 12, 31)

In [19]:
await main(start_date, end_date)

Generating list of school days to process...
Found 2194 school days out of 4018 total days
Processing 2194 school days between 2014-01-01 and 2024-12-31


Processing:   0%|          | 0/2194 [00:00<?, ?it/s]



All data downloaded, combining files...
Combining data for station: Los Angeles   N. Mai


  combined_df['Datetime'] = pd.to_datetime(combined_df['Date'] + ' ' + combined_df['Time'])


Processed 1646 days of data for station Los Angeles   N. Mai
Total records: 8930
Combining data for station: Long Beach Signal Hi


  combined_df['Datetime'] = pd.to_datetime(combined_df['Date'] + ' ' + combined_df['Time'])


Processed 404 days of data for station Long Beach Signal Hi
Total records: 1880
Combining data for station: South Long Beach


  combined_df['Datetime'] = pd.to_datetime(combined_df['Date'] + ' ' + combined_df['Time'])


Processed 1094 days of data for station South Long Beach
Total records: 5852
Applying LAUSD school day filtering...
Starting LAUSD school day filtering...
Original dataset size: 16662
Filtered dataset size: 16662


KeyError: 'YearWeek'

In [None]:
import pandas as pd

df = pd.read_csv('FilteredData/All_Stations_Combined_Data.csv') 

df['Location'] = df['Location'].replace('Long Beach Signal Hi', 'South Long Beach')

df.to_csv('updated_file.csv', index=False)

print("Replacement done and file saved as 'updated_file.csv'")


Replacement done and file saved as 'updated_file.csv'


In [None]:
import pandas as pd

df = pd.read_csv("updated_file.csv")

station_coordinates = {
    "Los Angeles - N. Mai": (34.0655, -118.2356),
    "South Long Beach": (33.7701, -118.1937),
}

df["Latitude"] = df["Location"].apply(lambda loc: station_coordinates.get(loc, (None, None))[0])
df["Longitude"] = df["Location"].apply(lambda loc: station_coordinates.get(loc, (None, None))[1])

df.to_csv("pollution_data.csv", index=False)

df.head()


Unnamed: 0,Date,Time,SiteCode,Location,Timezone,Unit,Agency,PM2.5,PM10,Datetime,Week,Month,PM2.5_Weekly_Avg,PM10_Weekly_Avg,PM2.5_Monthly_Avg,PM10_Monthly_Avg,Latitude,Longitude
0,01/02/14,08:00,60371103,Los Angeles - N. Mai,-8,UG/M3,South Coast AQMD,27.0,54.0,2014-01-02 08:00:00,1,1,14.0,38.375,20.895522,44.253731,34.0655,-118.2356
1,01/02/14,09:00,60371103,Los Angeles - N. Mai,-8,UG/M3,South Coast AQMD,17.0,43.0,2014-01-02 09:00:00,1,1,14.0,38.375,20.895522,44.253731,34.0655,-118.2356
2,01/02/14,10:00,60371103,Los Angeles - N. Mai,-8,UG/M3,South Coast AQMD,13.0,41.0,2014-01-02 10:00:00,1,1,14.0,38.375,20.895522,44.253731,34.0655,-118.2356
3,01/02/14,11:00,60371103,Los Angeles - N. Mai,-8,UG/M3,South Coast AQMD,12.0,34.0,2014-01-02 11:00:00,1,1,14.0,38.375,20.895522,44.253731,34.0655,-118.2356
4,01/02/14,12:00,60371103,Los Angeles - N. Mai,-8,UG/M3,South Coast AQMD,9.0,28.0,2014-01-02 12:00:00,1,1,14.0,38.375,20.895522,44.253731,34.0655,-118.2356
