<h1><center>Hourly Regression-Kriging EAQI Model for Sofia, Bulgaria</center></h1>

This Jupyter Note Book goes through the methodology of collecting data, processing data, computing the regression coefficients, and generating the final prediction map in a step-by-step process.  
Steps 1-4 are run one time in the beginning. Step 5 and 6-? are two scripts that run hourly in the data platform

<center>Step 1: Initialize requirements</center>

In [3]:
# Set file paths
data_folder = r'C:\Users\Austin\Documents\DATABANK\Masters\Thesis\Spatial_Data\VF'
# Independant variable rasters
f_buildheight = data_folder + r'\vf_buildheight.tif'
f_elevation = data_folder + r'\vf_elevation.tif'
f_landuse = data_folder + r'\vf_landuse.tif'
f_ndvi = data_folder + r'\vf_ndvi.tif'
f_pop = data_folder + r'\vf_pop.tif'
f_motor_trunk = data_folder + r'\vf_motorway_trunk.tif'
f_primary = data_folder + r'\vf_primary.tif'
f_residential = data_folder + r'\vf_residential.tif'
f_secondary = data_folder + r'\vf_secondary.tif'
f_tertiary = data_folder + r'\vf_tertiary.tif'
# Independent variable vectors
f_busstop = data_folder + r'\vf_busstop.gpkg'
f_mtp = data_folder + r'\vf_mtp.gpkg'
# Precalculated Rasters
f_roaddis_precalc = data_folder + r'\precalc\mtp50.tif'
f_busstop_precalc = data_folder + r'\precalc\busstop50.tif'
f_elevation_precalc = data_folder + r'\precalc\elevation50.tif'
f_elevation_precalc_100 = data_folder + r'\precalc\elevation.tif'
# Boundary file
f_boundary = data_folder + r'\boundary.gpkg'

In [5]:
# Install dependencies

# API Collection
import requests
from requests.auth import HTTPBasicAuth
# Data manipulation
import pandas as pd
import numpy as np
# Spatial data processing
import pyproj
from pyproj import Transformer
import geopandas as gpd
from shapely.geometry import Point
from shapely.strtree import STRtree
import rasterio
from rasterio.crs import CRS
from rasterio.transform import rowcol
from rasterio.io import MemoryFile
from pykrige.ok import OrdinaryKriging
# Statistics
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import KFold
from sklearn.linear_model import ElasticNetCV
from sklearn.exceptions import ConvergenceWarning
from scipy.stats import directional_stats
from scipy.stats import circmean, linregress
# Visualization
import matplotlib.pyplot as plt
from matplotlib.dates import DateFormatter
import seaborn as sns
import plotly.graph_objects as go
# Debugging
import logging
from tqdm import tqdm
# Processing / Computation
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor, as_completed
from functools import partial
import multiprocessing
from joblib import Parallel, delayed
# Core python functionality
import os
import io
from io import StringIO
import glob
import time
from datetime import datetime, timedelta
import math
from collections import defaultdict
import warnings
import csv
import json
import pickle
import argparse
import ast
import threading
import itertools
# Database
import psycopg2
import psycopg2.extras
from psycopg2 import sql
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from psycopg2.extensions import Binary

warnings.filterwarnings("error")
warnings.simplefilter("ignore", ConvergenceWarning)

In [None]:
# Set API credentials
username = 
password = 

In [None]:
# Set Coordinate Reference Objects #TODO: maybe delete

wkt_7801 = """PROJCS["BGS2005 / CCS2005",
GEOGCS["BGS2005",
DATUM["Bulgaria_Geodetic_System_2005",
    SPHEROID["GRS 1980",6378137,298.257222101],
    TOWGS84[0,0,0,0,0,0,0]],
PRIMEM["Greenwich",0,
    AUTHORITY["EPSG","8901"]],
UNIT["degree",0.0174532925199433,
    AUTHORITY["EPSG","9122"]],
AUTHORITY["EPSG","7798"]],
PROJECTION["Lambert_Conformal_Conic_2SP"],
PARAMETER["latitude_of_origin",42.6678756833333],
PARAMETER["central_meridian",25.5],
PARAMETER["standard_parallel_1",42],
PARAMETER["standard_parallel_2",43.3333333333333],
PARAMETER["false_easting",500000],
PARAMETER["false_northing",4725824.3591],
UNIT["metre",1,
AUTHORITY["EPSG","9001"]],
AUTHORITY["EPSG","7801"]]"""
project_crs = CRS.from_wkt(wkt_7801)
os.environ["GTIFF_SRS_SOURCE"] = "EPSG"
os.environ["PROJ_LIB"] = pyproj.datadir.get_data_dir()

<center>Step 2: Set-up PostGreSQL database and tables<center>

In [16]:
# Set up sql database and tables
 
# Database parameters
dbname='platform_db'
host='localhost'
user='postgres'
password='postgres'
port='5432'
# SQL file to create tables
sql_file = 'setup_tables.sql'

# Function to set up the database
def setup_database(dbname):
    # Connect to server
    conn = psycopg2.connect(host=host, user=user, password=password)
    conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) # Set isolation level needed for db creation
    cursor = conn.cursor()
    # Create database (check if it exists first)
    cursor.execute("SELECT 1 FROM pg_database WHERE datname = %s", (dbname,))
    exists = cursor.fetchone()
    if not exists:
        cursor.execute(f"CREATE DATABASE {dbname}")
        print(f'Database {dbname} created')
    else:
        print(f"Database {dbname} already exists")
    cursor.close()
    conn.close()

# Function to set up the tables in the db, runs the prepared sql script
def setup_tables(sql_file):
    # Connect to the new db
    conn = psycopg2.connect(dbname=dbname, host=host, user=user, password=password)
    conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
    cursor = conn.cursor()
    # Load and execute SQL script
    with open(sql_file, 'r') as f:
        sql_script = f.read()
    try:
        cursor.execute(sql_script)
        print('Tables created')
    except psycopg2.Error as e:
        print(f"Error executing SQL script: {e}")
        # You might want to log the error or handle it more specifically
    finally:
        cursor.close()
        conn.close()

# Set up database
#setup_database(dbname)
# Set up tables
#setup_tables(sql_file)

<center>Step 3: Collect Historical Sensor Data</center>

In [7]:
# Collect air pollution station information
# Can merge this with below one after testing

station_url = 'https://citylab.gate-ai.eu/sofiasensors/api/stations/'
# Use requests to request the API
response = requests.get(station_url, auth=HTTPBasicAuth(username,password))
response.raise_for_status()
stations_data = response.json()
# Save the important information to a pandas dataframe
all_stations_df = pd.DataFrame(stations_data, columns=['id','name', 'longitude', 'latitude', 'operator'])
# Clean up the operator names
all_stations_df['operator'] = all_stations_df['operator'].replace('GATE Institute', 'GATE')
all_stations_df['operator'] = all_stations_df['operator'].replace('Executive environmental agency (ExEA)', 'ExEA')
all_stations_df['operator'] = all_stations_df['operator'].replace('Sofia municipality', 'AirThings')
# Convert Lat/Lon values into EPSG7801
transformer = Transformer.from_crs("EPSG:4326", "EPSG:7801", always_xy=True)
def reproject_coords(row):
    longitude, latitude = row['longitude'], row['latitude']
    x, y = transformer.transform(longitude, latitude)
    # Round the coordinates to decimeters
    x, y = round(x, ndigits=1), round(y, ndigits=1)
    return (x,y)
all_stations_df['location'] = all_stations_df.apply(reproject_coords, axis=1)
all_stations_df.drop(['latitude', 'longitude'], axis=1, inplace=True)

print(f'Station information collected. {len(all_stations_df)} total stations')

Station information collected. 39 total stations


In [12]:
# Read air pollution info (includes removing invalid stations)
all_stations_df = pd.read_csv(r'C:\Users\Austin\Documents\DATABANK\Masters\Thesis\Code\final_method\station_data.csv')
invalid_stations = [36,38,32,9,27,7,19]
station_info = all_stations_df[~all_stations_df['id'].isin(invalid_stations)].copy()
station_info['location'] = station_info['location'].apply(ast.literal_eval)
#print(station_info)

In [None]:
# Collect historical hourly measurement data
# takes ~24 min

# Creates an empty dataframe of all of the hours and stations for the year: 
def create_empty_df(station_df, start_date, end_date):
    # Generate hourly timestamps with pandas.daterange
    timestamps = pd.date_range(start=start_date, end=end_date, freq='h')
    # Repeat each timestamp for the amount of stations
    timestamps_repeated = np.repeat(timestamps, len(station_df))
    # Tile the station names and ids over each timestamp
    stations_repeated = np.tile(station_df['name'], len(timestamps))
    ids_repeated = np.tile(station_df['id'], len(timestamps))
    # Construct the DataFrame with organized entries for each hour and station
    empty_df = pd.DataFrame({'time': timestamps_repeated, 'id': ids_repeated, 'name': stations_repeated})
    # Add parameter columns as NaN values to station_data
    for param in total_params:
        empty_df[param] = pd.NA
    # Set a multi-index for efficient updating
    empty_df.set_index(['time','id'], inplace=True)
    print(f'Dataframe for {start_date} to {end_date} created')
    return empty_df   

# Fills the empty dataframe with measurement values from one year
def station_data_collection(station_df, start_date, end_date):
    # Create an empty dataframe to fill in with station data
    # Hours without measurements will not be returned in API
    year_df = create_empty_df(station_df, start_date, end_date)
    # Iterate over each station
    for _,station in station_df.iterrows():
        station_id = station['id']
        station_name = station['name']
        station_operator = station['operator']
        # Skip first two years of collection for GATE sensors
        if (('2020' in start_date) or ('2021' in start_date)) and station_operator == 'GATE':
            continue
        # Set collected parameters based on station to avoid empty queries
        collected_params = [
            param for param in total_params
            if station_id not in missing_params.get(param, [])]
        # Iterate over each parameter to collect one in each request
        for param in collected_params:
            # Reformat space in param string to fit API parameters
            param_str = param.replace(' ','%20')
            # URL for the 'chart' endpoint which collects serial data the quickest
            yearly_url = rf'https://citylab.gate-ai.eu/sofiasensors/api/aggregated/chart/measurements/?station_name={station_name}&parameter_name={param_str}&start_date={start_date}%2019%3A00%3A00&end_date={end_date}%2019%3A00%3A00'
            # Try API request
            try:
                response = requests.get(yearly_url, auth=HTTPBasicAuth(username,password), timeout=30)
                data = response.json()
                response.raise_for_status() # Raises HTTP error that I can catch and interpret
            except requests.exceptions.RequestException as e:
                print(f"Request failed for station {station_name}: {e}")
                continue # If request fails, move on to next parameter
            # If the data is empty, move on to the next parameter
            if not data:
                print(f'No data for {station_name}, {param}')
                continue
            print(f'{len(data)} measured values for {station_name},{param}')
            # Create a df to format the response json with the station name and param
            st_param_df = pd.DataFrame([{'time': list(d.keys())[0], 
                                         'id': station_id,
                                         'name': station_name, 
                                         param : list(d.values())[0]}
                                         for d in data])
            st_param_df['time'] = pd.to_datetime(st_param_df['time'])
            # Set the multi-index for this df to align it with year_df
            st_param_df.set_index(['time','id'], inplace=True)
            # Update the empty df values with the new station and parameter
            year_df.update(st_param_df)
        print(f'{station_name} data collected...')
    # Reset station_data index to restore name and time columns
    year_df.reset_index(inplace=True)
    # Optimize column dtypes for storage
    year_df['name'] = year_df['name'].astype('category')
    year_df['id'] = year_df['id'].astype(pd.Int8Dtype())
    for param in total_params:
        if param in ['Temperature', 'Relative humidity']: # extra conditional due to super high erroneous values that dont fit into float 16
            year_df[param] = pd.to_numeric(year_df[param], errors='coerce').astype(np.float32)
        else:
            year_df[param] = pd.to_numeric(year_df[param], errors='coerce').astype(np.float16)
    return year_df 

# Define parameters to be collected
total_params = ['Ozone','Nitrogen dioxide', 'Sulphur dioxide', 'Particulate matter 10', 'Particulate matter 2.5', 'Temperature', 'Pressure', 'Relative humidity', 'Wind direction', 'Wind speed']
# Based on stations/measure/parameter endpoint of the API
missing_params = {
    'Ozone': [4],
    'Particulate matter 2.5': [1,3,4,5],
    'Pressure':[3,4],
    'Wind direction': [i for i in range(6,28)], # No airthings for wind
    'Wind speed': [i for i in range(6,28)]
}

all_years_dfs = []
# Collect from when all stations are operational
for year in range(2022,2025):
    start_date = f'{year}-09-01'
    # Set the end date a year after unless it's this year
    if year < 2024:
        end_date = f'{year+1}-09-01'
    else:
        end_date = '2025-06-01'
    # Run yearly collection function for this particular year
    # One year at a time due to memory and HTTP request size constraints
    year_df = station_data_collection(all_stations_df, start_date, end_date)
    all_years_dfs.append(year_df)

# Concatenate all year dfs into one total dataframe
final_df = pd.concat(all_years_dfs, ignore_index=True)
final_df.sort_values(by=['time', 'id'], inplace=True)
final_df.info(memory_usage='deep')

# Save to csv for now
final_df.to_csv(r'C:\Users\Austin\Documents\DATABANK\Masters\Thesis\Code\final_method\cleaned_data.csv')

In [47]:
# Additional script to read csv in the same datatypes
# Shrinks memory space to 25% of just reading the data

file_path = r'C:\Users\Austin\Documents\DATABANK\Masters\Thesis\Code\final_method\raw_data_s.csv'

target_numerical_dtypes = {
    'Ozone': np.float16,
    'Nitrogen dioxide': np.float16,
    'Sulphur dioxide': np.float16,
    'Particulate matter 10': np.float16,
    'Particulate matter 2.5': np.float16,
    'Temperature': np.float32,
    'Pressure': np.float16, 
    'Relative humidity': np.float32,
    'Wind direction': np.float16,
    'Wind speed': np.float16,
    'id': pd.Int8Dtype()
    }

raw_data = pd.read_csv(file_path, parse_dates=['time'])

for col, target_dtype in target_numerical_dtypes.items():
    raw_data[col] = pd.to_numeric(raw_data[col], errors='coerce').astype(target_dtype)
raw_data['name'] = raw_data['name'].astype('category')

In [53]:
# Clean sensor data

# Don't need for final version
total_params = ['Ozone','Nitrogen dioxide', 'Sulphur dioxide', 'Particulate matter 10', 'Particulate matter 2.5', 'Temperature', 'Pressure', 'Relative humidity', 'Wind direction', 'Wind speed']

# Function to set unrealistic values and outliers to nan values
def nan_erroneous(raw_data):
    cleaned_data = raw_data.copy()
    cleaned_data = cleaned_data.sort_values(by=['id', 'time'])
    # First remove impossible values and error values found by examining the data
    cleaned_data.loc[cleaned_data['Temperature'] > 50, 'Temperature'] = np.nan
    cleaned_data.loc[cleaned_data['Relative humidity'] > 105, 'Relative humidity'] = np.nan
    cols_to_check_positive = ['Ozone', 'Nitrogen dioxide', 'Sulphur dioxide', 
        'Particulate matter 10', 'Particulate matter 2.5', 
        'Pressure', 'Relative humidity']
    for col in cols_to_check_positive:
        cleaned_data.loc[cleaned_data[col] <= 0, col] = np.nan
    # Remove O3 data before February 2024 #TODO: Add visualizer to the notebook
    cutoff_date = pd.to_datetime('2024-02-01')
    cleaned_data.loc[cleaned_data['time'] < cutoff_date, 'Ozone'] = np.nan
    # Use rolling average 3 st.dev to find strong outliers
    cleaned_data = cleaned_data.set_index('time')
    for col in total_params:
        mean = cleaned_data.groupby('id')[col].rolling(window='7D', min_periods=1).mean()
        std = cleaned_data.groupby('id')[col].rolling(window='7D', min_periods=1).std()
        # Drop id index level to align to original index
        mean = mean.reset_index(level=0, drop=True)
        std = std.reset_index(level=0, drop=True)
        upper_bound = mean + (3 * std)
        lower_bound = mean - (3 * std)
        cleaned_data[col] = cleaned_data[col].where((cleaned_data[col] <= upper_bound) & (cleaned_data[col] >= lower_bound))
    cleaned_data = cleaned_data.sort_values(by=['time', 'id'])
    cleaned_data = cleaned_data.reset_index()
    return cleaned_data

cleaned_data = nan_erroneous(raw_data)

In [56]:
# Remove invalid stations
# Plot overall data completeness of stations per month

# Can remove
missing_params = {
    'Ozone': [4],
    'Particulate matter 2.5': [1,3,4,5],
    'Pressure':[3,4],
    'Wind direction': [i for i in range(6,28)], # No airthings for wind
    'Wind speed': [i for i in range(6,28)]
}

def plot_completeness(cleaned_data):
    df_processed = cleaned_data.copy()
    unique_stations = all_stations_df[['name', 'id']]

    # Collect overall completeness of each station
    station_overall_completeness = {}
    for _, station_row in unique_stations.iterrows():
        station_name = station_row['name']
        station_id = station_row['id']
        # Determine the parameters this station collects
        params_collected = [p for p in total_params 
                            if p not in missing_params or station_id not in missing_params.get(p, [])]
        # Filter the df for only the current station's data
        station_data = df_processed[df_processed['name'] == station_name]
        total_expected = len(station_data) * len(params_collected)
        # Calculate the total number of non-null measurements
        total_actual = station_data[params_collected].notnull().sum().sum()
        completeness_pct = (total_actual / total_expected) * 100
        station_overall_completeness[station_name] = completeness_pct
    # Convert to a Series and sort it to get the ranked order
    ranked_stations = pd.Series(station_overall_completeness).sort_values()

    # Calculate the monthly percentages of non-null values
    station_expected_counts = {}
    total_param_count = len(total_params)
    for _, station_row in unique_stations.iterrows():
        num_missing = sum(1 for param, missing_ids in missing_params.items() 
                          if param in total_params and station_row['id'] in missing_ids)
        station_expected_counts[station_row['name']] = total_param_count - num_missing
    df_processed['expected_param_count'] = df_processed['name'].map(station_expected_counts)
    df_processed['expected_param_count'] = df_processed['expected_param_count'].fillna(total_param_count)
    non_null_actuals = df_processed[total_params].notnull().sum(axis=1)
    df_processed['non_null_percentage'] = 100 * (non_null_actuals / df_processed['expected_param_count']).replace([np.inf, -np.inf], 0)
    df_processed['month_year'] = df_processed['time'].dt.to_period('M')
    monthly_station_completeness = df_processed.groupby(['month_year', 'name'], observed=True)['non_null_percentage'].mean().reset_index()
    pivot_df = monthly_station_completeness.pivot(index='month_year', columns='name', values='non_null_percentage')
    pivot_df.index = pivot_df.index.astype(str)
    monthly_median = pivot_df.median(axis=1)
    monthly_mean = pivot_df.mean(axis=1)

    # Plot figure
    fig = go.Figure()
    # Add mean and median lines
    fig.add_trace(go.Scatter(
        x=monthly_mean.index, y=monthly_mean.values, name='Mean (All Stations)',
        mode='lines', line=dict(color='firebrick', width=3, dash='dash'), hoverinfo='x+y'
    ))
    fig.add_trace(go.Scatter(
        x=monthly_median.index, y=monthly_median.values, name='Median (All Stations)',
        mode='lines', line=dict(color='black', width=4), hoverinfo='x+y'
    ))
    # Add individual station lines IN RANKED ORDER
    for station_name, overall_pct in ranked_stations.items():
        if station_name in pivot_df.columns:
            fig.add_trace(go.Scatter(
                x=pivot_df.index, y=pivot_df[station_name],
                name=f"{station_name} ({overall_pct:.1f}%)", # New ranked & labeled name
                mode='lines', line=dict(width=1.5),
                visible='legendonly', # Hidden by default
                hoverinfo='x+y+name'
            ))
    # Customize layout
    fig.update_layout(
        title_text='Interactive Monthly Data Completeness',
        xaxis_title='Month-Year',
        yaxis_title='Non-Null Percentage (%)',
        yaxis_range=[-5, 105],
        legend_title_text='Stations (Ranked by Overall Completeness)',
        hovermode='x unified',
        template='plotly_white',
        xaxis=dict(
            dtick="M1",  
            tickformat="%b\n%Y",  
            tickangle=0  
        )
    )
    fig.show()

plot_completeness(cleaned_data)

# Remove identified stations from cleaned_data
invalid_stations = [36,38,32,9,27,7,19]
mask = ~cleaned_data['id'].isin(invalid_stations)
processed_data = cleaned_data[mask].copy()

# Save to csv
#processed_data.to_csv(r'C:\Users\Austin\Documents\DATABANK\Masters\Thesis\Code\final_method\processed_data.csv')

In [57]:
# Prepare and upload processed data to sensor_data db

# Rename measurements for ease and matching db schema
processed_data.rename({
    'time':'measured_time',
    'id':'station_id',
    'Ozone':'O3',
    'Nitrogen dioxide':'NO2',
    'Sulphur dioxide':'SO2',
    'Particulate matter 10':'PM10',
    'Particulate matter 2.5':'PM25',
    'Temperature':'T',
    'Pressure':'P',
    'Relative humidity':'RH',
    'Wind direction':'WD',
    'Wind speed':'WS'
}, axis=1, inplace=True)
total_params = ['O3','NO2','SO2','PM10','PM25','T','P','RH','WD','WS']
# Remove station name column as its uncessary
f_processed_data = processed_data.drop('name', axis='columns')
# Add timezone to measured_time to match schema
f_processed_data['measured_time'] = f_processed_data['measured_time'].dt.tz_localize('UTC')

# Transform dataframe to 'long' format turning measurement columns to rows
df_long = f_processed_data.melt(
    id_vars=['measured_time', 'station_id'],
    value_vars=total_params,
    var_name='measurement',
    value_name='reading_value'
)
# Change measurement to categorical to save some memory
df_long['measurement'] = pd.Categorical(df_long['measurement'], categories=total_params, ordered=False)

#TODO: could change wind direction to int16 for more saving

# Remove all nan values
df_long.dropna(subset=['reading_value'], inplace=True)
df_long.reset_index(drop=True, inplace=True)
# Reorder columns for schema
df_final = df_long[['measured_time','station_id','measurement','reading_value']]
print('Final Dataframe Created')
print(df_final.info(memory_usage='deep'))
df_final.to_csv(r'C:\Users\Austin\Documents\DATABANK\Masters\Thesis\Code\final_method\long_sensor_measurements.csv', index=False)

def upload_df_to_db(df_final):
    # Copy dataframe to database
    conn = None
    try:
        conn = psycopg2.connect(
            dbname=dbname,
            user=user,
            password=password,
            host=host,
            port=port
        )
        cursor = conn.cursor()
        print('Connected to Database')
        print('Copying df to database expect ~1 min of processsing')

        # Create in-memory buffer
        buffer = io.StringIO()
        # Write df to the buffer as csv
        df_final.to_csv(buffer, index=False, header=False)
        buffer.seek(0)
        sql_copy_command = """
            COPY sensor_data(measured_time, station_id, measurement, reading_value)
            FROM STDIN
            WITH (FORMAT CSV)
        """
        # Execute command
        cursor.copy_expert(sql=sql_copy_command, file=buffer)
        conn.commit()
        print(f"{cursor.rowcount} rows inserted successfully.")
    except (Exception, psycopg2.DatabaseError) as error:
        print(f"Error: {error}")
        if conn:
            conn.rollback()
    finally:
        if conn:
            cursor.close()
            conn.close()
            print("Database connection closed.")

# Run function to upload to database
#upload_df_to_db(df_final=df_final)

Final Dataframe Created
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5115761 entries, 0 to 5115760
Data columns (total 4 columns):
 #   Column         Dtype              
---  ------         -----              
 0   measured_time  datetime64[ns, UTC]
 1   station_id     Int8               
 2   measurement    category           
 3   reading_value  float32            
dtypes: Int8(1), category(1), datetime64[ns, UTC](1), float32(1)
memory usage: 73.2 MB
None


<center>Step 4: Calculate Static Independant Variable Values<center>

In [None]:
# Evaluate wind direction consistency

# Helper function to convert to wind sector
def get_wind_sector(direction_deg):
    return int(direction_deg // 45)

def calculate_wind_direction_alignment(df):
    # Ensure time is in datetime
    df['time'] = pd.to_datetime(df['time'])
    # Drop rows without wind direction
    df_cleaned = df.dropna(subset=['Wind direction']).copy()
    # Convert wind directions to radians for analysis
    df_cleaned['WD_rad'] = np.deg2rad(df_cleaned['Wind direction'])

    # Helper function to calculate statistics for each group
    def calculate_group_stats(group):
        directions_rad = group['WD_rad']
        directions_deg_og = group['Wind direction']
        n = len(directions_rad)
        
        # Convert angles to unit circle coordinates for scipy
        x_coords = np.cos(directions_rad)
        y_coords = np.sin(directions_rad)
        vectors = np.column_stack((x_coords,y_coords))
        # Calculate mean resultant vector and length (R-bar)
        stats_result = directional_stats(vectors, axis=0, normalize=False)
        mean_dir_vec = stats_result.mean_direction
        r_bar = stats_result.mean_resultant_length
        # Convert mean vector back to angle
        mean_dir_rad = np.arctan2(mean_dir_vec[1], mean_dir_vec[0])
        mean_dir_deg = np.rad2deg(mean_dir_rad)
        # Ensure the mean direction is within the standard [0, 360) degree range
        mean_dir_deg = mean_dir_deg % 360
        if mean_dir_deg < 0:
            mean_dir_deg += 360

        # Calculate p-value for Rayleigh test for uniformity
        rayleigh_p_value = np.exp(-n * r_bar**2)

        # Calculate strict alignment percentage
        sectors = directions_deg_og.apply(get_wind_sector)
        strict_alignment_count = sectors.value_counts().max()
        strict_align_per = strict_alignment_count / n

        # Calculate loose alignment percentage (3 sectors)
        sector_counts = sectors.value_counts().reindex(range(8), fill_value=0).sort_index()
        # Wrapped array
        wrapped_counts = np.concatenate((sector_counts.values, sector_counts.values[:2]))
        # Rolling sums
        rolling_sums = [wrapped_counts[i:i+3].sum() for i in range(8)]
        max_loose_count = np.max(rolling_sums)
        loose_align_per = max_loose_count / n

        return pd.Series({
            'num_measurements': n,
            'r_bar': r_bar,
            'rayleigh_p_value': rayleigh_p_value,
            'mean_direction': mean_dir_deg,
            'strict_align_per': strict_align_per,
            'loose_align_per': loose_align_per
        })
    
    # Group hourly dataset by hour and apply helper function
    hourly_stats_df = df_cleaned.groupby(df_cleaned['time'].dt.floor('h')).apply(calculate_group_stats)
    # Reset index to return time as a column
    hourly_stats_df = hourly_stats_df.reset_index()

    return hourly_stats_df

hourly_stats_df = calculate_wind_direction_alignment(processed_data)
hourly_stats_df.to_csv(r'C:\Users\Austin\Documents\DATABANK\Masters\Thesis\Code\final_method\wind_alignment.csv', index=False)

# Print overall statistics
avg_r_bar = hourly_stats_df['r_bar'].mean()
avg_p_value = hourly_stats_df['rayleigh_p_value'].mean()
avg_strict = hourly_stats_df['strict_align_per'].mean()
avg_loose = hourly_stats_df['loose_align_per'].mean()

print(f"Average R-bar: {avg_r_bar:.4f}")
print(f"Average Rayleigh p-value: {avg_p_value:.4f}")
print(f"Average Strict Alignment Percentage: {avg_strict:.2%}")
print(f"Average Loose Alignment Percentage: {avg_loose:.2%}")

Average R-bar: 0.4579
Average Rayleigh p-value: 0.1513
Average Strict Alignment Percentage: 33.60%
Average Loose Alignment Percentage: 68.64%


In [None]:
# Calculate all variable values

# Function to create all masks needed for processing
def create_masks(mask_config):
    masks = {}
    directions = {'E': 0, 'NE': 45, 'N': 90, 'NW': 135,'W': 180, 'SW': 225, 'S': 270, 'SE': 315}
    sector_half_angle_rad = np.deg2rad(22.5)

    for res, shapes in mask_config.items():
        masks[res] = {'circle':{}, 'sector':{}}
        # Create circular masks
        circle_radii = shapes.get('circle', [])
        for buffer_size in circle_radii:
            radius_px = int(np.round(buffer_size / res))
            size = 2 * radius_px + 1
            center = radius_px
            y, x = np.ogrid[:size, :size]
            # Distance from the center of the mask to the center of each pixel
            distance_squared = (x - center)**2 + (y - center)**2
            masks[res]['circle'][buffer_size] = distance_squared <= radius_px**2
        # Create 8 Sector Masks
        sector_radii = shapes.get('sector', [])
        for buffer_size in sector_radii:
            masks[res]['sector'][buffer_size] = {}
            radius_px = int(np.round(buffer_size / res))
            size = 2 * radius_px + 1
            center = radius_px
            y, x = np.ogrid[:size, :size]
            distance_squared = (x - center)**2 + (y - center)**2
            radius_check = distance_squared <= radius_px**2
            # Angle of pixels center relative to mask's center
            pixel_angles_rad = np.arctan2(center - y, x - center)
            for dir_name, dir_angle_deg in directions.items():
                dir_angle_rad = np.deg2rad(dir_angle_deg)
                relative_angles = np.arctan2(np.sin(pixel_angles_rad - dir_angle_rad), 
                                             np.cos(pixel_angles_rad - dir_angle_rad))
                angle_check = np.abs(relative_angles) <= sector_half_angle_rad
                # Final mask is where both radius and angle conditions are met
                final_mask = radius_check & angle_check
                masks[res]['sector'][buffer_size][dir_name] = final_mask
    return masks

# Function to get the index of a raster for a point
def get_cell_indices(point, raster_info): 
    xmin = raster_info['xmin']
    ymax = raster_info['ymax']
    resolution = raster_info['resolution']
    pointx, pointy = point.x, point.y
    col = ((pointx - xmin) / resolution)
    row = ((ymax - pointy) / resolution)
    return round(row), round(col)

# Function which collects cells from the mask
def get_mask_cells(point_index, array, mask):
    rows, cols = array.shape
    point_r, point_c = point_index
    mask_size = mask.shape[0]
    radius = mask_size // 2
    # Calculate bounds for the slice in the main array
    row_start = max(0, point_r - radius)
    row_end = min(rows, point_r + radius + 1)
    col_start = max(0, point_c - radius)
    col_end = min(cols, point_c + radius + 1)
    # Bounds for the mask slice to align with array slice
    mask_row_start = max(0, radius - point_r)
    mask_row_end = mask_size - max(0, (point_r + radius + 1) - rows)
    mask_col_start = max(0, radius - point_c)
    mask_col_end = mask_size - max(0, (point_c + radius + 1) - cols)
    # Extract the slices
    array_slice = array[row_start:row_end, col_start:col_end]
    mask_slice = mask[mask_row_start:mask_row_end, mask_col_start:mask_col_end]
    return array_slice[mask_slice]

# Function to calculate the value of a buffer
def calculate_buffer_value(point, var, raster_info, mask):
    # Convert point to cell coordinate in raster
    point_index = get_cell_indices(point, raster_info)
    # Extract the cells in 1d array 
    buffer_cells = get_mask_cells(point_index, raster_info['array'], mask)
    # Filter nodata values
    nodata_val = raster_info['nodata']
    valid_cells = buffer_cells[buffer_cells != nodata_val]
    
    # Calculation functions depending on variable
    def calculate_land_use(land_use_code):
        return np.isin(valid_cells, land_use_code).sum()
    def calculate_build_std(): # Standard deviation of building heights
        if len(valid_cells) >= 2:
            return np.std(valid_cells)
        else:
            return 0.0
    
    # Dictionary mapping variables to their calculation functions
    calculation_map = {
        'lu_hdr': lambda: calculate_land_use(1),
        'lu_ldr': lambda: calculate_land_use(2),
        'lu_ind': lambda: calculate_land_use(3),
        'lu_ug': lambda: calculate_land_use(4),
        'lu_art': lambda: calculate_land_use(5),
        'lu_for': lambda: calculate_land_use(6),
        'lu_rur': lambda: calculate_land_use(7),
        'build_fp': lambda: len(valid_cells),
        'build_vol': lambda: np.sum(valid_cells),
        'build_var': calculate_build_std,
        'ndvi': lambda: float(np.mean(valid_cells)),
        'pop': lambda: float(np.mean(valid_cells)),
        'rd_mt': lambda: np.count_nonzero(valid_cells),
        'rd_prim': lambda: np.count_nonzero(valid_cells),
        'rd_sec': lambda: np.count_nonzero(valid_cells),
        'rd_ter': lambda: np.count_nonzero(valid_cells),
        'rd_res': lambda: np.count_nonzero(valid_cells)
    }

    # Return the value from the function based on the variable
    return calculation_map[var]()

# Function to collect value of the cell point
def cell_value(point, raster_info):
    # Convert point to cell coordinate in raster
    i,j = get_cell_indices(point, raster_info)
    # Extract cell value
    return raster_info['array'][i,j]

# Function to do distance calculations (for major road and busstop)
def distance_calculations(point, tree, geoms):
    # Find nearest linestring in the tree
    nearest_index = tree.nearest(point)
    nearest_geom = geoms[nearest_index]
    return point.distance(nearest_geom)

# Function to parallel compute the function with input points #TODO: processpool executor is probably better
def parallel_compute(func, input_points):
    with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor:
        results = list(executor.map(func, input_points))
    return results

# Main calculation function
def calculate_points(var_info, points, success_list, skip_config=None):
    # Initialize list to store variable calculations
    var_values = []
    # Define variables from input calculation
    var_name = var_info['name']
    data_file = var_info['file']
    calc_type = var_info['calculation']
    # Check if the whole variable should be skipped
    if skip_config and var_name in skip_config and skip_config[var_name].get('all'):
        print(f'{var_name} skipped')
        return var_values
    # Process most raster data
    if calc_type == 'buffer':
        with rasterio.open(data_file) as src:
            # Save important info of the raster to dictionary
            raster_info = {'name': var_name,
                           'array':src.read(1),
                           'resolution':src.res[0],
                           'xmin':src.bounds.left,
                           'ymax':src.bounds.top,
                           'nodata':src.nodata}
            # Get masks for the raster resolution
            res_masks = masks.get(raster_info['resolution'])
            # Iterate over circle masks
            for radius, mask in res_masks.get('circle', {}).items():
                # Check skip_config before calculating
                if skip_config and var_name in skip_config and 'circle' in skip_config.get(var_name, {}) and radius in skip_config[var_name]['circle']:
                    print(f'{var_name} circle {radius}m skipped')
                    continue
                calc_name = f'{var_name}_{radius}m_cir'
                # Define the partial function pre-filled with arguments
                func = partial(calculate_buffer_value, var=var_name, raster_info=raster_info, mask=mask)
                try: # Catch errors if they happen and add to a success list to not calculate again
                    start_time = time.time()
                    # Calculate and save the values of all of the points calculated for the buffer value
                    var_values.append(pd.Series(parallel_compute(func,points), name=calc_name))
                    end_time = time.time()
                    duration = end_time - start_time
                    print(f'{calc_name} calculated in {duration:.2f} seconds')
                    success_list.append(calc_name)
                except Exception as e:
                    print(f'Calculation error: {e}')
            # Iterate over sector masks
            for radius, dir_masks in res_masks.get('sector', {}).items():
                # Check skip_config before calculating
                if skip_config and var_name in skip_config and 'sector' in skip_config.get(var_name, {}) and radius in skip_config[var_name]['sector']:
                    print(f'{var_name} sector {radius}m skipped')
                    continue
                for direction, mask in dir_masks.items():
                    calc_name = f'{var_name}_{radius}m_{direction}'
                    func = partial(calculate_buffer_value, var=var_name, raster_info=raster_info, mask=mask)
                    try:
                        start_time = time.time()
                        # Save the values
                        var_values.append(pd.Series(parallel_compute(func,points), name=calc_name))
                        end_time = time.time()
                        duration = end_time - start_time
                        print(f'{calc_name} calculated in {duration:.2f} seconds')
                        success_list.append(calc_name)
                    except Exception as e:
                        print(f'Calculation error: {e}')
    # Process elevation and other value
    elif calc_type == 'value':
        with rasterio.open(data_file) as src:
            # Save important info of the raster to dictionary
            raster_info = {'name': var_name,
                           'array':src.read(1),
                           'resolution':src.res[0],
                           'xmin':src.bounds.left,
                           'ymax':src.bounds.top,
                           'nodata':src.nodata}
            func = partial(cell_value, raster_info=raster_info)
            start_time = time.time()
            # Calculate and save values    
            var_values.append(pd.Series(parallel_compute(func,points), name=var_name))
            end_time = time.time()
            duration = end_time - start_time
            print(f'{var_name} calculated in {duration:.2f} seconds')
    # Process distance variables
    else:
        vector = gpd.read_file(data_file)
        # Create STRtree spatial index from the gdf
        geoms = vector.geometry.values
        tree = STRtree(geoms)
        # Set the name and parallel compute the distance calculations
        func = partial(distance_calculations, tree=tree, geoms=geoms)
        start_time = time.time()
        # Calculate and save values    
        var_values.append(pd.Series(parallel_compute(func,points), name=var_name))
        end_time = time.time()
        duration = end_time - start_time
        print(f'{var_name} calculated in {duration:.2f} seconds')
    return var_values

buffer_sizes = [25,100,500,1500,4000]
indep_vars = [
    {'name':'lu_hdr','file':f_landuse,'calculation':'buffer'},
    {'name':'lu_ldr','file':f_landuse,'calculation':'buffer'},
    {'name':'lu_ind','file':f_landuse,'calculation':'buffer'},
    {'name':'lu_ug','file':f_landuse,'calculation':'buffer'},
    {'name':'lu_art','file':f_landuse,'calculation':'buffer'},
    {'name':'lu_for','file':f_landuse,'calculation':'buffer'},
    {'name':'lu_rur','file':f_landuse,'calculation':'buffer'},
    {'name':'build_fp','file':f_buildheight,'calculation':'buffer'},
    {'name':'build_vol','file':f_buildheight,'calculation':'buffer'},
    {'name':'build_var','file':f_buildheight,'calculation':'buffer'},
    {'name':'ndvi','file':f_ndvi,'calculation':'buffer'},
    {'name':'pop','file':f_pop,'calculation':'buffer'},
    {'name':'rd_mt','file':f_motor_trunk,'calculation':'buffer'},
    {'name':'rd_prim','file':f_primary,'calculation':'buffer'},
    {'name':'rd_res','file':f_residential,'calculation':'buffer'},
    {'name':'rd_sec','file':f_secondary,'calculation':'buffer'},
    {'name':'rd_ter','file':f_tertiary,'calculation':'buffer'},
    {'name':'elevation','file':f_elevation,'calculation':'value'},
    {'name':'rd_dis','file':f_mtp,'calculation':'vector_dis'},
    {'name':'bus_dis','file':f_busstop,'calculation':'vector_dis'}
]

# First pre-calculate the masks to be used for buffer sector extraction
mask_config = {10: {'circle':[25,100,500,1500,4000], 
                    'sector':[100,500,1500,4000]},
            100: {'circle':[25,100,500,1500,4000],
                'sector':[500,1500,4000] }}
masks = create_masks(mask_config)
print('Buffer masks created')

def calculate_ivs():
    # Calculate independant variable station values
    station_points = [Point(loc) for loc in station_info['location']]
    station_ids = [id for id in station_info['id']]
    # Initialize df to store all iv values
    iv_values = [pd.Series(station_ids, name='id')]
    for indep_var in indep_vars:
        print(f'Calculating {indep_var["name"]}...')
        # Calculate list of dicts of all calculations and corresponding values
        list_of_calc_series = calculate_points(indep_var, station_points)
        # Iterate over calculations and add to df
        iv_values.extend(list_of_calc_series)
    iv_values_df = pd.concat(iv_values, axis=1)
    return iv_values_df

#iv_values_df = calculate_ivs()
#iv_values_df.to_csv(r'C:\Users\Austin\Documents\DATABANK\Masters\Thesis\Code\final_method\iv_values.csv', index=False)

Buffer masks created


In [14]:
# Process variable values (clean + normalize)
iv_values_df = pd.read_csv(r'C:\Users\Austin\Documents\DATABANK\Masters\Thesis\Code\final_method\iv_values_raw.csv')

# Filter out variables which are the mostly the same across all stations
def clean_consistent_variables(df, threshold):
    cols_to_remove = set()
    directional_suffixes = ['_E', '_NE', '_N', '_NW', '_W', '_SW', '_S', '_SE']
    
    directional_groups = {}
    non_directional_cols = []

    for col in df.columns:
        is_directional = False
        # Treat columns with '_cir' as non-directional.
        if '_cir' in col:
            non_directional_cols.append(col)
            continue

        for suffix in directional_suffixes:
            if col.endswith(suffix):
                base_name = col[:-len(suffix)]
                if base_name not in directional_groups:
                    directional_groups[base_name] = []
                directional_groups[base_name].append(col)
                is_directional = True
                break
        
        if not is_directional:
            non_directional_cols.append(col)

    # A group is removed only if ALL its columns meet the 80% criteria.
    for base_name, group_cols in directional_groups.items():
        all_in_group_meet_criteria = True
        for col in group_cols:
            if df[col].value_counts(normalize=True).max() < threshold:
                all_in_group_meet_criteria = False
                break
        # If all columns met the criteria, add the entire group to the removal list.
        if all_in_group_meet_criteria:
            cols_to_remove.update(group_cols)

    # Check circle buffers
    for col in non_directional_cols:
        if df[col].value_counts(normalize=True).max() >= threshold:
            cols_to_remove.add(col)

    # Remove the identified columns
    df_cleaned = df.drop(columns=list(cols_to_remove))
    return df_cleaned, sorted(list(cols_to_remove))

df_cleaned, removed_vars = clean_consistent_variables(iv_values_df, 0.8)
print(f'Removed {(len(iv_values_df.columns) - len(df_cleaned.columns))}/{len(iv_values_df.columns)} columns: {len(df_cleaned.columns)} remaining')

# Standardize variables and save information
def standardize_variables(df):
    df_transformed = df.copy()
    variable_info_dict = {}
    for col in df.columns:
        # Skip id
        if col == 'id':
            continue
        # Parse column name
        parts = col.split('_')
        radius = None
        buffer_type = None
        base_name = None
        # Check if the name likely follows the convention
        if len(parts) >= 3 and parts[-2].endswith('m'):
            try:
                radius = int(parts[-2][:-1])
                buffer_type = parts[-1]
                base_name = '_'.join(parts[:-2])
            except ValueError:
                base_name = col 
                pass
        else:
            base_name = col

        log_transformed_col = np.log1p(df_transformed[col])
        mean_val = log_transformed_col.mean()
        std_val = log_transformed_col.std()
        # Standardize log-transform
        if std_val > 0:
            df_transformed[col] = (log_transformed_col - mean_val) / std_val
        else:
            # If std is 0, the standardized value is 0
            df_transformed[col] = 0

        variable_info_dict[col] = {
        'base_var_name': base_name,
        'radius': radius,
        'type': buffer_type,
        'mean': mean_val,
        'st_dev': std_val
        }
    return df_transformed, variable_info_dict

st_ivs, iv_info_dict = standardize_variables(df_cleaned)
#st_ivs.to_csv(r'C:\Users\Austin\Documents\DATABANK\Masters\Thesis\Code\final_method\iv_values_process.csv', index=False)

# Store info of removed variables into dict
def create_skip_config(removed_vars):
    skip_config = {}
    directional_suffixes = ['E', 'NE', 'N', 'NW', 'W', 'SW', 'S', 'SE']

    for var in removed_vars:
        parts = var.split('_')
        base_name = var
        buffer_type = None
        radius = None

        # Check for directional or circular buffer patterns
        if len(parts) > 2 and parts[-2].endswith('m'):
            try:
                radius = int(parts[-2][:-1])
                if parts[-1] in directional_suffixes:
                    buffer_type = 'sector'
                    base_name = '_'.join(parts[:-2])
                elif parts[-1] == 'cir':
                    buffer_type = 'circle'
                    base_name = '_'.join(parts[:-2])
            except ValueError:
                # Not a valid buffer format, treat as a single variable name
                pass

        if base_name not in skip_config:
            skip_config[base_name] = {}

        if buffer_type and radius is not None:
            if buffer_type not in skip_config[base_name]:
                skip_config[base_name][buffer_type] = []
            if radius not in skip_config[base_name][buffer_type]:
                skip_config[base_name][buffer_type].append(radius)
        else:
            # If no buffer type, it's a non-buffered var or a group to be skipped
            skip_config[base_name]['all'] = True      
    return skip_config

skip_config = create_skip_config(removed_vars)

Removed 149/625 columns: 476 remaining


In [25]:
# Create precalculated independant variable maps

# Create the input points array of the map within the boundary
def create_input_points():
    # Read a precalculated raster to serve as format
    with rasterio.open(f_elevation_precalc) as src:
        # Read relevant data for raster
        raster_data = src.read(1)
        nodata_val = src.nodata
        transform = src.transform
        shape = src.shape
        crs = src.crs
        # Get indices of non-NaN values
        non_nan_indices = np.where(raster_data != nodata_val)
        rows, cols = non_nan_indices
        # Get coordinates using vectorized operation
        xs, ys = rasterio.transform.xy(transform, rows, cols)
        # Save raster metadata for writing
        raster_metadata = {
            "shape": shape,
            "nodata_val": nodata_val,
            "transform": transform,
            "crs": crs,
            "dtype": raster_data.dtype
        }
        # Convert to numpy arrays
        x_coords = np.array(xs)
        y_coords = np.array(ys)
        # Create array of coordinate tuples
        points_array = np.array([Point(x, y) for x, y in zip(x_coords, y_coords)])
    return points_array, non_nan_indices, raster_metadata 

def store_array(array, variable_info, raster_metadata):
    # Connect to database
    conn = None
    try:
        # Create an in-memory GeoTIFF file from the NumPy array
        with MemoryFile() as memfile:
            with memfile.open(
                driver='GTiff',
                height=array.shape[0],
                width=array.shape[1],
                count=1, 
                dtype=str(array.dtype),
                crs=raster_metadata['crs'],
                transform=raster_metadata['transform'],
                nodata=raster_metadata['nodata_val']
            ) as dataset:
                dataset.write(array, 1) 
            # Read the raw bytes from the in-memory file
            raster_bytes = memfile.read()

        # Connect to the database
        conn = psycopg2.connect(host=host, user=user, password=password, dbname=dbname)
        cur = conn.cursor()

        sql_insert = """
                    INSERT INTO precalc_ivs (indep_var, buffer_type, radius, raster_data)
                    VALUES (%s, %s, %s, ST_SetSRID(ST_FromGDALRaster(%s), %s));
                """
        data_to_insert = (
            variable_info['base_var_name'],
            variable_info['type'],
            variable_info['radius'],
            Binary(raster_bytes),
            7801  # Bulgarian SRID
        )
        
        cur.execute(sql_insert, data_to_insert)
        conn.commit()
    except (Exception, psycopg2.Error) as e:
        print(f"Database error: {e}")
        if conn:
            conn.rollback()
    finally:
        if conn:
            cur.close()
            conn.close()

# Calculate points and transform for pre-calculated maps
map_points, non_nan_indices, raster_metadata = create_input_points()
print(f'Mapping {len(map_points)} points')
# Maps for elevation and distance variables are pre-calculated
map_vars = indep_vars[:-3]
# Initialize list to store completed calculations
success_list = []
# Could remove variables (w/ buffer combinations) from calculation set to do less, but not totally necessary (no time right now)

# Iterate over each variable
for map_var in tqdm(map_vars):
    print(f'Computing maps for {map_var["name"]}...')
    # Run calculate points for the whole variable
    list_of_var_maps = calculate_points(map_var, map_points, success_list, skip_config=skip_config)
    # Iterate over the list saved in memory and save to postgres
    print(f'Maps Computed. Uploading to Postgres...')
    for var_map in list_of_var_maps:
        calc_name = var_map.name
        # Convert series to numpy array
        var_array = var_map.to_numpy()
        # Log transform and standardize maps using station values
        variable_info = iv_info_dict[calc_name]
        var_mean = variable_info['mean']
        var_std = variable_info['st_dev']
        log_t_array = np.log1p(var_array)
        # Apply the standardization using the stored mean and st_dev
        if var_std > 0:
            standardized_array = (log_t_array - var_mean) / var_std
        else:
            # Handle case with no variance
            standardized_array = log_t_array - var_mean
        # Reconstruct data to array shape
        final_raster = np.full(raster_metadata['shape'], raster_metadata['nodata_val'], dtype=np.float32)
        final_raster[non_nan_indices] = standardized_array
        # Save final raster in postgres
        store_array(final_raster, variable_info, raster_metadata)
    print('-' * 20)

# Save success list after all variables are calculated
with open(r'C:\Users\Austin\Documents\DATABANK\Masters\Thesis\Code\final_method\success_list.txt', "w") as file:
    for item in success_list:
        file.write(str(item) + "\n")

Mapping 85591 points


  0%|          | 0/17 [00:00<?, ?it/s]

Computing maps for lu_hdr...
lu_hdr circle 25m skipped
lu_hdr circle 100m skipped
lu_hdr circle 500m skipped
lu_hdr_1500m_cir calculated in 26.41 seconds
lu_hdr_4000m_cir calculated in 56.49 seconds
lu_hdr sector 100m skipped
lu_hdr sector 500m skipped
lu_hdr_1500m_E calculated in 21.57 seconds
lu_hdr_1500m_NE calculated in 21.47 seconds
lu_hdr_1500m_N calculated in 21.27 seconds
lu_hdr_1500m_NW calculated in 22.02 seconds
lu_hdr_1500m_W calculated in 21.69 seconds
lu_hdr_1500m_SW calculated in 21.53 seconds
lu_hdr_1500m_S calculated in 21.48 seconds
lu_hdr_1500m_SE calculated in 21.90 seconds
lu_hdr_4000m_E calculated in 27.22 seconds
lu_hdr_4000m_NE calculated in 27.32 seconds
lu_hdr_4000m_N calculated in 28.27 seconds
lu_hdr_4000m_NW calculated in 27.85 seconds
lu_hdr_4000m_W calculated in 27.75 seconds
lu_hdr_4000m_SW calculated in 27.14 seconds
lu_hdr_4000m_S calculated in 27.49 seconds
lu_hdr_4000m_SE calculated in 27.46 seconds
Maps Computed. Uploading to Postgres...


  6%|▌         | 1/17 [07:59<2:07:46, 479.13s/it]

--------------------
Computing maps for lu_ldr...
lu_ldr_25m_cir calculated in 13.43 seconds
lu_ldr_100m_cir calculated in 13.98 seconds
lu_ldr_500m_cir calculated in 23.92 seconds
lu_ldr_1500m_cir calculated in 32.14 seconds
lu_ldr_4000m_cir calculated in 75.27 seconds
lu_ldr_100m_E calculated in 13.57 seconds
lu_ldr_100m_NE calculated in 13.47 seconds
lu_ldr_100m_N calculated in 14.08 seconds
lu_ldr_100m_NW calculated in 13.45 seconds
lu_ldr_100m_W calculated in 13.95 seconds
lu_ldr_100m_SW calculated in 14.20 seconds
lu_ldr_100m_S calculated in 14.45 seconds
lu_ldr_100m_SE calculated in 13.91 seconds
lu_ldr_500m_E calculated in 19.93 seconds
lu_ldr_500m_NE calculated in 20.25 seconds
lu_ldr_500m_N calculated in 20.18 seconds
lu_ldr_500m_NW calculated in 20.42 seconds
lu_ldr_500m_W calculated in 20.74 seconds
lu_ldr_500m_SW calculated in 20.27 seconds
lu_ldr_500m_S calculated in 20.83 seconds
lu_ldr_500m_SE calculated in 20.69 seconds
lu_ldr_1500m_E calculated in 24.00 seconds
lu_ldr

 12%|█▏        | 2/17 [22:50<3:00:28, 721.91s/it]

--------------------
Computing maps for lu_ind...
lu_ind circle 25m skipped
lu_ind_100m_cir calculated in 14.13 seconds
lu_ind_500m_cir calculated in 22.37 seconds
lu_ind_1500m_cir calculated in 31.52 seconds
lu_ind_4000m_cir calculated in 66.99 seconds
lu_ind sector 100m skipped
lu_ind_500m_E calculated in 19.85 seconds
lu_ind_500m_NE calculated in 19.64 seconds
lu_ind_500m_N calculated in 19.83 seconds
lu_ind_500m_NW calculated in 19.87 seconds
lu_ind_500m_W calculated in 19.76 seconds
lu_ind_500m_SW calculated in 19.69 seconds
lu_ind_500m_S calculated in 19.80 seconds
lu_ind_500m_SE calculated in 20.53 seconds
lu_ind_1500m_E calculated in 23.01 seconds
lu_ind_1500m_NE calculated in 23.09 seconds
lu_ind_1500m_N calculated in 23.44 seconds
lu_ind_1500m_NW calculated in 23.33 seconds
lu_ind_1500m_W calculated in 23.03 seconds
lu_ind_1500m_SW calculated in 23.05 seconds
lu_ind_1500m_S calculated in 23.07 seconds
lu_ind_1500m_SE calculated in 23.08 seconds
lu_ind_4000m_E calculated in 31

 18%|█▊        | 3/17 [35:04<2:49:38, 727.01s/it]

--------------------
Computing maps for lu_ug...
lu_ug circle 25m skipped
lu_ug_100m_cir calculated in 13.96 seconds
lu_ug_500m_cir calculated in 22.02 seconds
lu_ug_1500m_cir calculated in 30.89 seconds
lu_ug_4000m_cir calculated in 64.47 seconds
lu_ug_100m_E calculated in 13.65 seconds
lu_ug_100m_NE calculated in 13.85 seconds
lu_ug_100m_N calculated in 13.35 seconds
lu_ug_100m_NW calculated in 12.99 seconds
lu_ug_100m_W calculated in 13.74 seconds
lu_ug_100m_SW calculated in 13.51 seconds
lu_ug_100m_S calculated in 13.80 seconds
lu_ug_100m_SE calculated in 12.85 seconds
lu_ug_500m_E calculated in 19.54 seconds
lu_ug_500m_NE calculated in 19.52 seconds
lu_ug_500m_N calculated in 19.43 seconds
lu_ug_500m_NW calculated in 19.56 seconds
lu_ug_500m_W calculated in 19.46 seconds
lu_ug_500m_SW calculated in 19.36 seconds
lu_ug_500m_S calculated in 19.36 seconds
lu_ug_500m_SE calculated in 19.60 seconds
lu_ug_1500m_E calculated in 24.21 seconds
lu_ug_1500m_NE calculated in 22.98 seconds
lu_

 24%|██▎       | 4/17 [48:53<2:46:14, 767.30s/it]

--------------------
Computing maps for lu_art...
lu_art circle 25m skipped
lu_art circle 100m skipped
lu_art_500m_cir calculated in 21.51 seconds
lu_art_1500m_cir calculated in 28.96 seconds
lu_art_4000m_cir calculated in 61.81 seconds
lu_art sector 100m skipped
lu_art sector 500m skipped
lu_art_1500m_E calculated in 22.28 seconds
lu_art_1500m_NE calculated in 22.13 seconds
lu_art_1500m_N calculated in 22.16 seconds
lu_art_1500m_NW calculated in 22.16 seconds
lu_art_1500m_W calculated in 21.73 seconds
lu_art_1500m_SW calculated in 22.00 seconds
lu_art_1500m_S calculated in 22.18 seconds
lu_art_1500m_SE calculated in 22.31 seconds
lu_art_4000m_E calculated in 29.72 seconds
lu_art_4000m_NE calculated in 30.41 seconds
lu_art_4000m_N calculated in 28.98 seconds
lu_art_4000m_NW calculated in 29.29 seconds
lu_art_4000m_W calculated in 28.51 seconds
lu_art_4000m_SW calculated in 29.80 seconds
lu_art_4000m_S calculated in 29.36 seconds
lu_art_4000m_SE calculated in 29.69 seconds
Maps Computed

 29%|██▉       | 5/17 [57:40<2:16:11, 680.95s/it]

--------------------
Computing maps for lu_for...
lu_for circle 25m skipped
lu_for circle 100m skipped
lu_for circle 500m skipped
lu_for circle 1500m skipped
lu_for_4000m_cir calculated in 61.84 seconds
lu_for sector 100m skipped
lu_for sector 500m skipped
lu_for sector 1500m skipped
lu_for_4000m_E calculated in 27.83 seconds
lu_for_4000m_NE calculated in 27.97 seconds
lu_for_4000m_N calculated in 28.29 seconds
lu_for_4000m_NW calculated in 28.30 seconds
lu_for_4000m_W calculated in 28.75 seconds
lu_for_4000m_SW calculated in 29.81 seconds
lu_for_4000m_S calculated in 29.73 seconds
lu_for_4000m_SE calculated in 28.89 seconds
Maps Computed. Uploading to Postgres...


 35%|███▌      | 6/17 [1:02:33<1:40:38, 549.00s/it]

--------------------
Computing maps for lu_rur...
lu_rur circle 25m skipped
lu_rur circle 100m skipped
lu_rur circle 500m skipped
lu_rur_1500m_cir calculated in 31.41 seconds
lu_rur_4000m_cir calculated in 71.06 seconds
lu_rur sector 100m skipped
lu_rur sector 500m skipped
lu_rur_1500m_E calculated in 23.28 seconds
lu_rur_1500m_NE calculated in 23.29 seconds
lu_rur_1500m_N calculated in 23.12 seconds
lu_rur_1500m_NW calculated in 23.39 seconds
lu_rur_1500m_W calculated in 24.82 seconds
lu_rur_1500m_SW calculated in 23.14 seconds
lu_rur_1500m_S calculated in 22.85 seconds
lu_rur_1500m_SE calculated in 23.25 seconds
lu_rur_4000m_E calculated in 31.46 seconds
lu_rur_4000m_NE calculated in 31.68 seconds
lu_rur_4000m_N calculated in 31.75 seconds
lu_rur_4000m_NW calculated in 31.66 seconds
lu_rur_4000m_W calculated in 31.21 seconds
lu_rur_4000m_SW calculated in 31.05 seconds
lu_rur_4000m_S calculated in 30.87 seconds
lu_rur_4000m_SE calculated in 31.51 seconds
Maps Computed. Uploading to Po

 41%|████      | 7/17 [1:11:37<1:31:12, 547.22s/it]

--------------------
Computing maps for build_fp...
build_fp_25m_cir calculated in 6.97 seconds
build_fp_100m_cir calculated in 6.66 seconds
build_fp_500m_cir calculated in 9.87 seconds
build_fp_1500m_cir calculated in 13.33 seconds
build_fp_4000m_cir calculated in 36.99 seconds
build_fp_100m_E calculated in 6.98 seconds
build_fp_100m_NE calculated in 7.10 seconds
build_fp_100m_N calculated in 7.20 seconds
build_fp_100m_NW calculated in 7.21 seconds
build_fp_100m_W calculated in 7.85 seconds
build_fp_100m_SW calculated in 6.78 seconds
build_fp_100m_S calculated in 6.81 seconds
build_fp_100m_SE calculated in 6.71 seconds
build_fp_500m_E calculated in 8.71 seconds
build_fp_500m_NE calculated in 8.68 seconds
build_fp_500m_N calculated in 8.37 seconds
build_fp_500m_NW calculated in 8.89 seconds
build_fp_500m_W calculated in 8.68 seconds
build_fp_500m_SW calculated in 8.58 seconds
build_fp_500m_S calculated in 8.50 seconds
build_fp_500m_SE calculated in 8.72 seconds
build_fp_1500m_E calcula

 47%|████▋     | 8/17 [1:18:14<1:14:55, 499.49s/it]

--------------------
Computing maps for build_vol...
build_vol_25m_cir calculated in 8.27 seconds
build_vol_100m_cir calculated in 8.45 seconds
build_vol_500m_cir calculated in 11.39 seconds
build_vol_1500m_cir calculated in 15.42 seconds
build_vol_4000m_cir calculated in 40.37 seconds
build_vol_100m_E calculated in 7.68 seconds
build_vol_100m_NE calculated in 8.04 seconds
build_vol_100m_N calculated in 8.33 seconds
build_vol_100m_NW calculated in 7.36 seconds
build_vol_100m_W calculated in 7.97 seconds
build_vol_100m_SW calculated in 7.54 seconds
build_vol_100m_S calculated in 7.56 seconds
build_vol_100m_SE calculated in 8.28 seconds
build_vol_500m_E calculated in 9.94 seconds
build_vol_500m_NE calculated in 9.97 seconds
build_vol_500m_N calculated in 9.89 seconds
build_vol_500m_NW calculated in 9.71 seconds
build_vol_500m_W calculated in 9.79 seconds
build_vol_500m_SW calculated in 9.85 seconds
build_vol_500m_S calculated in 9.66 seconds
build_vol_500m_SE calculated in 9.79 seconds
b

 53%|█████▎    | 9/17 [1:25:47<1:04:40, 485.04s/it]

--------------------
Computing maps for build_var...
build_var_25m_cir calculated in 8.61 seconds
build_var_100m_cir calculated in 9.35 seconds
build_var_500m_cir calculated in 15.64 seconds
build_var_1500m_cir calculated in 21.93 seconds
build_var_4000m_cir calculated in 50.87 seconds
build_var_100m_E calculated in 9.55 seconds
build_var_100m_NE calculated in 8.82 seconds
build_var_100m_N calculated in 9.67 seconds
build_var_100m_NW calculated in 9.26 seconds
build_var_100m_W calculated in 9.41 seconds
build_var_100m_SW calculated in 9.33 seconds
build_var_100m_S calculated in 8.83 seconds
build_var_100m_SE calculated in 9.31 seconds
build_var_500m_E calculated in 11.99 seconds
build_var_500m_NE calculated in 12.04 seconds
build_var_500m_N calculated in 12.22 seconds
build_var_500m_NW calculated in 12.18 seconds
build_var_500m_W calculated in 12.29 seconds
build_var_500m_SW calculated in 12.00 seconds
build_var_500m_S calculated in 12.09 seconds
build_var_500m_SE calculated in 12.13 s

 59%|█████▉    | 10/17 [1:35:35<1:00:16, 516.60s/it]

--------------------
Computing maps for ndvi...
ndvi_25m_cir calculated in 8.43 seconds
ndvi_100m_cir calculated in 9.82 seconds
ndvi_500m_cir calculated in 31.43 seconds
ndvi_1500m_cir calculated in 190.20 seconds
ndvi_4000m_cir calculated in 1268.75 seconds
ndvi_100m_E calculated in 8.25 seconds
ndvi_100m_NE calculated in 8.12 seconds
ndvi_100m_N calculated in 8.41 seconds
ndvi_100m_NW calculated in 8.37 seconds
ndvi_100m_W calculated in 7.36 seconds
ndvi_100m_SW calculated in 7.57 seconds
ndvi_100m_S calculated in 8.91 seconds
ndvi_100m_SE calculated in 7.64 seconds
ndvi_500m_E calculated in 13.38 seconds
ndvi_500m_NE calculated in 13.41 seconds
ndvi_500m_N calculated in 13.75 seconds
ndvi_500m_NW calculated in 13.95 seconds
ndvi_500m_W calculated in 13.48 seconds
ndvi_500m_SW calculated in 13.21 seconds
ndvi_500m_S calculated in 13.63 seconds
ndvi_500m_SE calculated in 13.34 seconds
ndvi_1500m_E calculated in 33.43 seconds
ndvi_1500m_NE calculated in 33.82 seconds
ndvi_1500m_N calc

 65%|██████▍   | 11/17 [2:31:27<2:18:27, 1384.60s/it]

--------------------
Computing maps for pop...
pop_25m_cir calculated in 7.76 seconds
pop_100m_cir calculated in 8.15 seconds
pop_500m_cir calculated in 7.66 seconds
pop_1500m_cir calculated in 10.11 seconds
pop_4000m_cir calculated in 11.32 seconds
pop_500m_E calculated in 8.26 seconds
pop_500m_NE calculated in 8.34 seconds
pop_500m_N calculated in 7.79 seconds
pop_500m_NW calculated in 7.95 seconds
pop_500m_W calculated in 8.31 seconds
pop_500m_SW calculated in 8.88 seconds
pop_500m_S calculated in 8.04 seconds
pop_500m_SE calculated in 7.79 seconds
pop_1500m_E calculated in 8.55 seconds
pop_1500m_NE calculated in 9.18 seconds
pop_1500m_N calculated in 9.15 seconds
pop_1500m_NW calculated in 8.91 seconds
pop_1500m_W calculated in 8.63 seconds
pop_1500m_SW calculated in 9.03 seconds
pop_1500m_S calculated in 9.07 seconds
pop_1500m_SE calculated in 8.98 seconds
pop_4000m_E calculated in 10.29 seconds
pop_4000m_NE calculated in 10.21 seconds
pop_4000m_N calculated in 10.19 seconds
pop_4

 71%|███████   | 12/17 [2:35:55<1:27:04, 1044.87s/it]

--------------------
Computing maps for rd_mt...
rd_mt circle 25m skipped
rd_mt circle 100m skipped
rd_mt circle 500m skipped
rd_mt_1500m_cir calculated in 11.46 seconds
rd_mt_4000m_cir calculated in 23.86 seconds
rd_mt sector 100m skipped
rd_mt sector 500m skipped
rd_mt sector 1500m skipped
rd_mt_4000m_E calculated in 12.43 seconds
rd_mt_4000m_NE calculated in 12.38 seconds
rd_mt_4000m_N calculated in 12.55 seconds
rd_mt_4000m_NW calculated in 12.48 seconds
rd_mt_4000m_W calculated in 12.40 seconds
rd_mt_4000m_SW calculated in 12.35 seconds
rd_mt_4000m_S calculated in 12.38 seconds
rd_mt_4000m_SE calculated in 12.43 seconds
Maps Computed. Uploading to Postgres...


 76%|███████▋  | 13/17 [2:38:12<51:18, 769.64s/it]   

--------------------
Computing maps for rd_prim...
rd_prim circle 25m skipped
rd_prim_100m_cir calculated in 7.21 seconds
rd_prim_500m_cir calculated in 9.49 seconds
rd_prim_1500m_cir calculated in 11.78 seconds
rd_prim_4000m_cir calculated in 24.25 seconds
rd_prim sector 100m skipped
rd_prim_500m_E calculated in 8.38 seconds
rd_prim_500m_NE calculated in 8.47 seconds
rd_prim_500m_N calculated in 8.21 seconds
rd_prim_500m_NW calculated in 8.93 seconds
rd_prim_500m_W calculated in 8.42 seconds
rd_prim_500m_SW calculated in 8.54 seconds
rd_prim_500m_S calculated in 8.22 seconds
rd_prim_500m_SE calculated in 8.43 seconds
rd_prim_1500m_E calculated in 9.61 seconds
rd_prim_1500m_NE calculated in 9.74 seconds
rd_prim_1500m_N calculated in 9.57 seconds
rd_prim_1500m_NW calculated in 9.75 seconds
rd_prim_1500m_W calculated in 9.74 seconds
rd_prim_1500m_SW calculated in 9.46 seconds
rd_prim_1500m_S calculated in 9.86 seconds
rd_prim_1500m_SE calculated in 9.55 seconds
rd_prim_4000m_E calculated

 82%|████████▏ | 14/17 [2:43:16<31:26, 628.99s/it]

--------------------
Computing maps for rd_res...
rd_res_25m_cir calculated in 7.36 seconds
rd_res_100m_cir calculated in 6.89 seconds
rd_res_500m_cir calculated in 10.11 seconds
rd_res_1500m_cir calculated in 13.00 seconds
rd_res_4000m_cir calculated in 31.72 seconds
rd_res_100m_E calculated in 6.86 seconds
rd_res_100m_NE calculated in 6.85 seconds
rd_res_100m_N calculated in 6.55 seconds
rd_res_100m_NW calculated in 6.94 seconds
rd_res_100m_W calculated in 6.86 seconds
rd_res_100m_SW calculated in 6.29 seconds
rd_res_100m_S calculated in 7.01 seconds
rd_res_100m_SE calculated in 7.17 seconds
rd_res_500m_E calculated in 8.75 seconds
rd_res_500m_NE calculated in 8.68 seconds
rd_res_500m_N calculated in 8.50 seconds
rd_res_500m_NW calculated in 8.94 seconds
rd_res_500m_W calculated in 8.57 seconds
rd_res_500m_SW calculated in 8.93 seconds
rd_res_500m_S calculated in 8.79 seconds
rd_res_500m_SE calculated in 8.63 seconds
rd_res_1500m_E calculated in 10.22 seconds
rd_res_1500m_NE calculat

 88%|████████▊ | 15/17 [2:49:55<18:39, 559.76s/it]

--------------------
Computing maps for rd_sec...
rd_sec circle 25m skipped
rd_sec circle 100m skipped
rd_sec_500m_cir calculated in 9.71 seconds
rd_sec_1500m_cir calculated in 12.44 seconds
rd_sec_4000m_cir calculated in 25.49 seconds
rd_sec sector 100m skipped
rd_sec_500m_E calculated in 8.73 seconds
rd_sec_500m_NE calculated in 8.43 seconds
rd_sec_500m_N calculated in 8.75 seconds
rd_sec_500m_NW calculated in 8.63 seconds
rd_sec_500m_W calculated in 8.75 seconds
rd_sec_500m_SW calculated in 8.58 seconds
rd_sec_500m_S calculated in 8.42 seconds
rd_sec_500m_SE calculated in 8.66 seconds
rd_sec_1500m_E calculated in 9.72 seconds
rd_sec_1500m_NE calculated in 9.90 seconds
rd_sec_1500m_N calculated in 9.61 seconds
rd_sec_1500m_NW calculated in 10.01 seconds
rd_sec_1500m_W calculated in 9.88 seconds
rd_sec_1500m_SW calculated in 9.76 seconds
rd_sec_1500m_S calculated in 9.74 seconds
rd_sec_1500m_SE calculated in 9.88 seconds
rd_sec_4000m_E calculated in 13.11 seconds
rd_sec_4000m_NE calcu

 94%|█████████▍| 16/17 [2:54:59<08:02, 482.91s/it]

--------------------
Computing maps for rd_ter...
rd_ter circle 25m skipped
rd_ter_100m_cir calculated in 7.35 seconds
rd_ter_500m_cir calculated in 9.98 seconds
rd_ter_1500m_cir calculated in 12.21 seconds
rd_ter_4000m_cir calculated in 25.87 seconds
rd_ter sector 100m skipped
rd_ter_500m_E calculated in 8.73 seconds
rd_ter_500m_NE calculated in 9.14 seconds
rd_ter_500m_N calculated in 8.62 seconds
rd_ter_500m_NW calculated in 8.57 seconds
rd_ter_500m_W calculated in 8.70 seconds
rd_ter_500m_SW calculated in 8.64 seconds
rd_ter_500m_S calculated in 8.42 seconds
rd_ter_500m_SE calculated in 8.54 seconds
rd_ter_1500m_E calculated in 10.02 seconds
rd_ter_1500m_NE calculated in 10.13 seconds
rd_ter_1500m_N calculated in 9.94 seconds
rd_ter_1500m_NW calculated in 9.84 seconds
rd_ter_1500m_W calculated in 10.06 seconds
rd_ter_1500m_SW calculated in 10.05 seconds
rd_ter_1500m_S calculated in 10.07 seconds
rd_ter_1500m_SE calculated in 9.85 seconds
rd_ter_4000m_E calculated in 13.15 seconds
r

100%|██████████| 17/17 [3:00:14<00:00, 636.15s/it]

--------------------





In [27]:
# Read and convert success list to skip config if it isn't complete
success_list = []
with open(r'C:\Users\Austin\Documents\DATABANK\Masters\Thesis\Code\final_method\success_list.txt', "r") as file:
    for line in file:
        success_list.append(line.strip())
if len(success_list) <= 471:
    print('Missed calculations')
    calculated_config = create_skip_config(success_list)
else:
    print(f'All {len(success_list)} calculations')

All 472 calculations


<center>Step 6: Define optimal parameters for historical data enhancement<center>

In [None]:
# Data selector optimization (postgres) #TODO: test, need for real-time
# Analysis period
START_DATE = datetime(2024, 6, 1, 0, 0)
END_DATE = datetime(2025, 6, 1, 0, 0)

# Pollutants and meteorological variables to analyze
TARGET_POLLUTANTS = ['o3', 'no2', 'so2', 'pm10', 'pm25']
MET_VARS = ['t', 'p', 'rh', 'ws']
WIND_VAR = 'wd'

# Deviation percentages to test (as fractions)
DEVIATION_LEVELS = [0.05, 0.10, 0.15, 0.20, 0.25]
MAX_DEVIATION = max(DEVIATION_LEVELS)

# File paths
CHECKPOINT_FILE = r'C:\Users\Austin\Documents\DATABANK\Masters\Thesis\Code\final_method\processed_hours_checkpoint.log'
OUTPUT_FILE = r'C:\Users\Austin\Documents\DATABANK\Masters\Thesis\Code\final_method\data_selector_evaluation_results.csv'

# Logging configuration
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- HELPER FUNCTIONS ---

def get_wind_sector(angle_deg):
    """Converts a wind direction in degrees to one of 8 cardinal sectors."""
    if angle_deg is None or np.isnan(angle_deg):
        return None
    sectors = {
        "N": (337.5, 22.5), "NE": (22.5, 67.5), "E": (67.5, 112.5),
        "SE": (112.5, 157.5), "S": (157.5, 202.5), "SW": (202.5, 247.5),
        "W": (247.5, 292.5), "NW": (292.5, 337.5)
    }
    for sector, (lower, upper) in sectors.items():
        if lower > upper:
            if angle_deg >= lower or angle_deg < upper: return sector
        else:
            if lower <= angle_deg < upper: return sector
    return None

def get_adjacent_sectors(sector):
    """Gets a list of a sector and its two adjacent neighbors."""
    if sector is None: return []
    sector_order = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"]
    idx = sector_order.index(sector)
    return [sector_order[(idx - 1) % 8], sector_order[idx], sector_order[(idx + 1) % 8]]

def load_processed_hours():
    """Loads the set of already processed hours from the checkpoint file."""
    if not os.path.exists(CHECKPOINT_FILE): return set()
    with open(CHECKPOINT_FILE, 'r') as f:
        return {line.strip() for line in f}

def save_checkpoint(hour_iso_str):
    """Appends a successfully processed hour to the checkpoint file."""
    with open(CHECKPOINT_FILE, 'a') as f:
        f.write(f"{hour_iso_str}\n")

def aggregate_hourly_data(df):
    """
    Aggregates raw station data to hourly averages.
    Calculates mean for regular variables and circular mean for wind direction.
    """
    if df.empty:
        return pd.DataFrame()

    # Separate wind data for special circular mean calculation
    wind_df = df[df['measurement'] == WIND_VAR]
    other_df = df[df['measurement'] != WIND_VAR]

    # Aggregate non-wind variables by simple mean
    agg_other = other_df.groupby(['measured_time', 'measurement']).agg(
        reading_value=('reading_value', 'mean')
    ).reset_index()

    # Aggregate wind variable using circular mean
    agg_wind = pd.DataFrame()
    if not wind_df.empty:
        # Convert degrees to radians for circmean, then convert back
        wind_rad = np.deg2rad(wind_df['reading_value'])
        # Group by time and apply circmean
        circ_mean_rad = wind_rad.groupby(wind_df['measured_time']).apply(circmean, high=np.pi*2, low=0)
        circ_mean_deg = np.rad2deg(circ_mean_rad)
        
        agg_wind = circ_mean_deg.reset_index(name='reading_value')
        agg_wind['measurement'] = WIND_VAR

    # Combine aggregated data and pivot to wide format
    combined_agg = pd.concat([agg_other, agg_wind])
    
    if combined_agg.empty:
        return pd.DataFrame()

    pivoted_df = combined_agg.pivot_table(
        index='measured_time', columns='measurement', values='reading_value',
    ).reset_index()
    
    return pivoted_df

# --- MAIN PROCESSING LOGIC ---

def run_optimization():
    """Main function to execute the data selector optimization process."""
    logging.info("Starting data selector optimization process.")

    processed_hours = load_processed_hours()
    if processed_hours:
        logging.info(f"Loaded {len(processed_hours)} processed hours from checkpoint.")

    results_aggregator = {}
    dev_combinations = list(itertools.product(DEVIATION_LEVELS, repeat=len(MET_VARS)))
    for pollutant in TARGET_POLLUTANTS:
        for combo in dev_combinations:
            key = (pollutant,) + combo
            results_aggregator[key] = {'actual': [], 'predicted': [], 'n_hours_used': []}

    conn = None
    try:
        conn = psycopg2.connect(host=host, user=user, password=password)
        logging.info("Successfully connected to the database.")

        logging.info("Fetching historical min/max ranges for meteorological variables.")
        met_ranges = {}
        with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
            for var in MET_VARS:
                cur.execute("SELECT MIN(reading_value) as min_val, MAX(reading_value) as max_val FROM sensor_data WHERE measurement = %s;", (var,))
                row = cur.fetchone()
                if row and row['min_val'] is not None:
                    met_ranges[var] = {'min': row['min_val'], 'max': row['max_val'], 'range': row['max_val'] - row['min_val']}
                    logging.info(f"  - {var.upper()}: Range is {met_ranges[var]['range']:.2f}")
                else:
                    logging.error(f"Could not determine range for '{var}'. Exiting."); conn.close(); return

        total_hours = [START_DATE + timedelta(hours=i) for i in range((END_DATE - START_DATE).days * 24)]
        num_total_hours = len(total_hours)
        logging.info(f"Starting to process {num_total_hours} total hours from {START_DATE} to {END_DATE}.")

        for i, current_hour in enumerate(total_hours):
            hour_iso = current_hour.isoformat()
            if hour_iso in processed_hours:
                continue
            logging.info(f"Processing hour {i+1}/{num_total_hours}: {hour_iso}")

            # 1. Get and aggregate "ground truth" data for the current hour
            with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
                all_measurements = TARGET_POLLUTANTS + MET_VARS + [WIND_VAR]
                cur.execute("SELECT measurement, reading_value FROM sensor_data WHERE measured_time = %s AND measurement = ANY(%s);", (current_hour, all_measurements))
                rows = cur.fetchall()
            
            if not rows:
                logging.warning(f"Skipping hour {hour_iso} due to no ground truth data."); continue
            
            current_hour_df = aggregate_hourly_data(pd.DataFrame(rows))
            if current_hour_df.empty or not all(col in current_hour_df.columns for col in all_measurements):
                logging.warning(f"Skipping hour {hour_iso} due to missing aggregated data."); continue
            
            current_hour_data = current_hour_df.iloc[0].to_dict()

            # 2. Define search criteria
            hour_window = [(current_hour.hour - 1 + 24) % 24, current_hour.hour, (current_hour.hour + 1) % 24]
            is_weekday = current_hour.isoweekday() < 6
            month_window = [(current_hour.month - 2 + 12) % 12 + 1, current_hour.month, (current_hour.month % 12) + 1]
            predominant_sector = get_wind_sector(current_hour_data[WIND_VAR])
            allowed_sectors = get_adjacent_sectors(predominant_sector)
            if not allowed_sectors:
                logging.warning(f"Could not determine wind sector for {hour_iso}. Skipping."); continue

            # 3. Query historical data (raw, to be aggregated)
            with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
                sql_query = """
                SELECT measured_time, measurement, reading_value FROM sensor_data
                WHERE EXTRACT(hour FROM measured_time) IN %s
                  AND EXTRACT(month FROM measured_time) IN %s
                  AND (CASE WHEN %s THEN EXTRACT(isodow FROM measured_time) < 6 ELSE EXTRACT(isodow FROM measured_time) >= 6 END)
                  AND measured_time != %s AND measurement = ANY(%s)
                """
                cur.execute(sql_query, (tuple(hour_window), tuple(month_window), is_weekday, current_hour, all_measurements))
                historical_rows = cur.fetchall()

            if not historical_rows:
                logging.warning(f"No historical data found for {hour_iso}."); continue

            # 4. Aggregate historical data to hourly averages
            historical_agg_df = aggregate_hourly_data(pd.DataFrame(historical_rows))
            historical_agg_df.dropna(subset=all_measurements, inplace=True)
            if historical_agg_df.empty:
                logging.warning(f"Historical data for {hour_iso} was empty after aggregation."); continue

            # 5. In-memory filtering for the superset
            historical_agg_df['wind_sector'] = historical_agg_df[WIND_VAR].apply(get_wind_sector)
            superset_df = historical_agg_df[historical_agg_df['wind_sector'].isin(allowed_sectors)].copy()
            
            for var in MET_VARS:
                total_range = met_ranges[var]['range']
                deviation_amount = MAX_DEVIATION * total_range
                center_val = current_hour_data[var]
                low = center_val - (deviation_amount / 2)
                high = center_val + (deviation_amount / 2)
                superset_df = superset_df[superset_df[var].between(low, high)]

            if superset_df.empty:
                logging.warning(f"Superset empty for {hour_iso} after wind/max deviation filters."); continue

            # 6. Iterate through pollutants and deviation combinations
            for pollutant in TARGET_POLLUTANTS:
                actual_value = current_hour_data[pollutant]
                for combo in dev_combinations:
                    key = (pollutant,) + combo
                    temp_df = superset_df.copy()
                    is_valid = True
                    for i, var in enumerate(MET_VARS):
                        deviation = combo[i]
                        total_range = met_ranges[var]['range']
                        deviation_amount = deviation * total_range
                        center_val = current_hour_data[var]
                        low_bound = center_val - (deviation_amount / 2)
                        high_bound = center_val + (deviation_amount / 2)
                        temp_df = temp_df[temp_df[var].between(low_bound, high_bound)]
                        if temp_df.empty: is_valid = False; break
                    
                    predicted_value, n_hours = (np.nan, 0) if not is_valid else (temp_df[pollutant].mean(), len(temp_df))
                    results_aggregator[key]['actual'].append(actual_value)
                    results_aggregator[key]['predicted'].append(predicted_value)
                    results_aggregator[key]['n_hours_used'].append(n_hours)

            save_checkpoint(hour_iso)

        logging.info("Finished processing all hours. Calculating final statistics.")
        final_results = []
        for key, data in results_aggregator.items():
            pollutant, dev_combo = key[0], key[1:]
            combo_df = pd.DataFrame({'actual': data['actual'], 'predicted': data['predicted']}).dropna()
            r_value, r2_value = (np.nan, np.nan)
            if len(combo_df) > 1:
                _, _, r_val, _, _ = linregress(combo_df['actual'], combo_df['predicted'])
                r_value, r2_value = r_val, r_val**2
            
            final_results.append({
                'pollutant': pollutant, 'dev_T': dev_combo[0], 'dev_P': dev_combo[1],
                'dev_RH': dev_combo[2], 'dev_WS': dev_combo[3], 'r': r_value, 'r2': r2_value,
                'num_predictions': len(combo_df), 'total_historical_hours_used': sum(data['n_hours_used'])
            })

        pd.DataFrame(final_results).to_csv(OUTPUT_FILE, index=False)
        logging.info(f"Successfully saved final results to {OUTPUT_FILE}")

    except (Exception, psycopg2.DatabaseError) as error:
        logging.error(f"An error occurred: {error}", exc_info=True)
    finally:
        if conn is not None: conn.close(); logging.info("Database connection closed.")

run_optimization()

In [None]:
# Data selector optimization (all pandas)

# --- CONFIGURATION (Constants) ---
# Use os.path.join for cross-platform compatible file paths.
BASE_DIR = r'C:\Users\Austin\Documents\DATABANK\Masters\Thesis\Code\final_method'
RAW_DATA_CSV = os.path.join(BASE_DIR, 'long_sensor_measurements.csv')

# Analysis period
START_DATE = datetime(2024, 6, 1)
END_DATE = datetime(2025, 6, 1)

# Set the number of parallel workers. Set to None to use all available cores.
MAX_WORKERS = 4

# Pollutants and meteorological variables to analyze (using user's specified case)
TARGET_POLLUTANTS = ['O3', 'NO2', 'SO2', 'PM10', 'PM25']
MET_VARS = ['T', 'P', 'RH', 'WS']
WIND_VAR = 'WD'

# Deviation percentages to test (as fractions)
DEVIATION_LEVELS = [0.05, 0.10, 0.15, 0.20, 0.25]
MAX_DEVIATION = max(DEVIATION_LEVELS)

# File paths for outputs 
HOURLY_CHECKPOINT_FILE = os.path.join(BASE_DIR, 'processed_hours_checkpoint.log')
HOURLY_OUTPUT_FILE = os.path.join(BASE_DIR, 'hourly_optimization_results.csv')
DAILY_CHECKPOINT_FILE = os.path.join(BASE_DIR, 'processed_days_checkpoint.log')
DAILY_OUTPUT_FILE = os.path.join(BASE_DIR, 'daily_optimization_results.csv')

# Logging configuration
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Thread lock to prevent race conditions when writing to the CSV file from the main thread
csv_writer_lock = threading.Lock()

# --- HELPER FUNCTIONS ---

def get_wind_sector(angle_deg):
    """Converts a wind direction in degrees to one of 8 cardinal sectors."""
    if angle_deg is None or np.isnan(angle_deg): return None
    sectors = {
        "N": (337.5, 22.5), "NE": (22.5, 67.5), "E": (67.5, 112.5),
        "SE": (112.5, 157.5), "S": (157.5, 202.5), "SW": (202.5, 247.5),
        "W": (247.5, 292.5), "NW": (292.5, 337.5)
    }
    for sector, (lower, upper) in sectors.items():
        if lower > upper:
            if angle_deg >= lower or angle_deg < upper: return sector
        else:
            if lower <= angle_deg < upper: return sector
    return None

def get_adjacent_sectors(sector):
    """Gets a list of a sector and its two adjacent neighbors."""
    if sector is None: return []
    sector_order = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"]
    try:
        idx = sector_order.index(sector)
        return [sector_order[(idx - 1) % 8], sector_order[idx], sector_order[(idx + 1) % 8]]
    except ValueError:
        return []

def load_processed_items(checkpoint_file):
    """Loads the set of already processed items from a checkpoint file."""
    if not os.path.exists(checkpoint_file): return set()
    with open(checkpoint_file, 'r') as f:
        return {line.strip() for line in f}

def save_checkpoint(item_iso_str, checkpoint_file):
    """Appends a successfully processed item to the checkpoint file."""
    with open(checkpoint_file, 'a') as f:
        f.write(f"{item_iso_str}\n")

def aggregate_data(df, mode='hourly'):
    """Aggregates raw station data to hourly or daily averages."""
    if df.empty: return pd.DataFrame()
    group_key = df['measured_time'].dt.floor('h') if mode == 'hourly' else df['measured_time'].dt.normalize()
    
    wind_df = df[df['measurement'] == WIND_VAR]
    other_df = df[df['measurement'] != WIND_VAR]

    agg_other = other_df.groupby([group_key, 'measurement'], observed=False).agg(reading_value=('reading_value', 'mean')).reset_index()
    
    agg_wind = pd.DataFrame()
    if not wind_df.empty:
        wind_rad = np.deg2rad(wind_df['reading_value'])
        wind_rad.index = group_key[wind_rad.index]
        circ_mean_rad = wind_rad.groupby(level=0, observed=False).apply(circmean, high=np.pi*2, low=0)
        agg_wind = np.rad2deg(circ_mean_rad).reset_index(name='reading_value')
        agg_wind['measurement'] = WIND_VAR

    combined_agg = pd.concat([agg_other, agg_wind])
    if combined_agg.empty: return pd.DataFrame()
    
    return combined_agg.pivot_table(index='measured_time', columns='measurement', values='reading_value', observed=False).reset_index()

# --- NEW HELPER FUNCTION FOR INCREMENTAL SAVING ---

def save_step_results_to_csv(timestamp, step_results, output_file, mode):
    """Formats results from a single time step and appends them to the output CSV."""
    if not step_results:
        return

    # Convert the results dictionary to a list of records
    records = []
    for key, (actual, predicted, n_units) in step_results.items():
        pollutant, dev_combo = key[0], key[1:]
        record = {
            'timestamp': timestamp,
            'pollutant': pollutant,
            'actual': actual,
            'predicted': predicted,
            f'n_units_used': n_units
        }
        for i, var in enumerate(MET_VARS):
            record[f'dev_{var.upper()}'] = dev_combo[i]
        records.append(record)

    # Convert to DataFrame
    results_df = pd.DataFrame(records)

    # Use a lock to ensure file writing is atomic and prevent corruption
    with csv_writer_lock:
        # Write header only if the file doesn't exist, otherwise append
        header = not os.path.exists(output_file)
        results_df.to_csv(output_file, mode='a', header=header, index=False)

# --- PARALLEL WORKER FUNCTION ---

def process_time_step(current_step_row, historical_df, met_ranges, dev_combinations, mode):
    """Processes a single time step. Returns the raw results for that step."""
    current_step_data = current_step_row.to_dict()
    month_window = [(current_step_data['month'] - 2 + 12) % 12 + 1, current_step_data['month'], (current_step_data['month'] % 12) + 1]

    if mode == 'hourly':
        hour_window = [(current_step_data['hour'] - 1 + 24) % 24, current_step_data['hour'], (current_step_data['hour'] + 1) % 24]
        temporal_mask = (
            (historical_df['hour'].isin(hour_window)) &
            (historical_df['month'].isin(month_window)) &
            (historical_df['is_weekday'] == current_step_data['is_weekday']) &
            (historical_df['measured_time'] != current_step_data['measured_time'])
        )
    elif mode == 'daily':
        temporal_mask = (
            (historical_df['month'].isin(month_window)) &
            (historical_df['is_weekday'] == current_step_data['is_weekday']) &
            (historical_df['measured_time'] != current_step_data['measured_time'])
        )
    else:
        return None, None

    allowed_sectors = get_adjacent_sectors(current_step_data['wind_sector'])
    wind_mask = historical_df['wind_sector'].isin(allowed_sectors)
    candidate_df = historical_df[temporal_mask & wind_mask]

    if candidate_df.empty: return current_step_data['measured_time'], {}

    superset_df = candidate_df.copy()
    for var in MET_VARS:
        deviation_amount = MAX_DEVIATION * met_ranges[var]['range']
        center_val = current_step_data[var]
        if pd.notna(center_val):
            superset_df = superset_df[superset_df[var].between(center_val - deviation_amount, center_val + deviation_amount)]

    if superset_df.empty: return current_step_data['measured_time'], {}

    step_results = {}
    for pollutant in TARGET_POLLUTANTS:
        actual_value = current_step_data.get(pollutant)
        if pd.isna(actual_value): continue

        for combo in dev_combinations:
            key = (pollutant,) + combo
            temp_df = superset_df.copy()
            is_valid = True
            for i, var in enumerate(MET_VARS):
                deviation_amount = combo[i] * met_ranges[var]['range']
                center_val = current_step_data[var]
                if pd.notna(center_val):
                    temp_df = temp_df[temp_df[var].between(center_val - deviation_amount, center_val + deviation_amount)]
                    if temp_df.empty:
                        is_valid = False
                        break
            
            predicted_value, n_units = (np.nan, 0)
            if is_valid and not temp_df.empty:
                predicted_value = temp_df[pollutant].mean()
                n_units = len(temp_df)
                
            step_results[key] = (actual_value, predicted_value, n_units)
            
    return current_step_data['measured_time'], step_results


# --- ANALYSIS FUNCTION ---

def run_analysis(raw_df, mode):
    """Runs the full optimization, saving results incrementally."""
    logging.info("="*50)
    logging.info(f"STARTING {mode.upper()} ANALYSIS")
    logging.info("="*50)

    checkpoint_file = HOURLY_CHECKPOINT_FILE if mode == 'hourly' else DAILY_CHECKPOINT_FILE
    output_file = HOURLY_OUTPUT_FILE if mode == 'hourly' else DAILY_OUTPUT_FILE

    logging.info(f"Aggregating raw data to {mode} averages...")
    all_measurements = TARGET_POLLUTANTS + MET_VARS + [WIND_VAR]
    agg_df = aggregate_data(raw_df, mode=mode)
    # Ensure columns from config are present, otherwise add them with NaN
    for col in all_measurements:
        if col not in agg_df.columns:
            agg_df[col] = np.nan
    agg_df.dropna(subset=all_measurements, how='any', inplace=True)
    logging.info(f"{mode.capitalize()} aggregation complete. Resulted in {len(agg_df)} records.")

    if agg_df.empty:
        logging.warning(f"No data left after aggregation for {mode} mode. Skipping analysis.")
        return

    logging.info(f"Pre-calculating {mode} temporal and wind features...")
    if mode == 'hourly':
        agg_df['hour'] = agg_df['measured_time'].dt.hour
    agg_df['month'] = agg_df['measured_time'].dt.month
    agg_df['is_weekday'] = agg_df['measured_time'].dt.dayofweek < 5 # Corrected from day_of_week
    agg_df['wind_sector'] = agg_df[WIND_VAR].apply(get_wind_sector)
    agg_df.dropna(subset=['wind_sector'], inplace=True)

    met_ranges = {var: {'range': agg_df[var].max() - agg_df[var].min()} for var in MET_VARS}

    processed_items = load_processed_items(checkpoint_file)
    dev_combinations = list(itertools.product(DEVIATION_LEVELS, repeat=len(MET_VARS)))

    evaluation_df = agg_df[(agg_df['measured_time'] >= START_DATE) & (agg_df['measured_time'] < END_DATE)].copy()
    
    if not evaluation_df.empty:
        if mode == 'hourly':
            evaluation_df = evaluation_df[~evaluation_df['measured_time'].apply(lambda x: x.isoformat()).isin(processed_items)]
        else:
            evaluation_df = evaluation_df[~evaluation_df['measured_time'].dt.date.astype(str).isin(processed_items)]

    num_total_items = len(evaluation_df)
    if num_total_items == 0:
        logging.info(f"No new {mode}s to process. Analysis may be complete.")
        return
        
    logging.info(f"Starting to process {num_total_items} total {mode}s. Results will be saved to {output_file}")

    # Use ProcessPoolExecutor for CPU-bound tasks
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = {executor.submit(process_time_step, row, agg_df, met_ranges, dev_combinations, mode): row for _, row in evaluation_df.iterrows()}
        
        for i, future in enumerate(as_completed(futures)):
            try:
                timestamp, step_results = future.result()
                if timestamp is None: continue
                
                # Save results for this step to CSV instead of aggregating in memory
                save_step_results_to_csv(timestamp, step_results, output_file, mode)

                # Save checkpoint
                item_iso = timestamp.isoformat() if mode == 'hourly' else timestamp.date().isoformat()
                save_checkpoint(item_iso, checkpoint_file)
                logging.info(f"Completed and saved {mode} {i+1}/{num_total_items}: {item_iso}")
            except Exception as e:
                logging.error(f"An error occurred processing a time step: {e}", exc_info=True)

    logging.info(f"Finished processing all {mode}s.")
    logging.info(f"Detailed results saved in {output_file}.")
    logging.info("Run a separate analysis on this file to calculate final statistics (e.g., R-squared).")


def run_optimization():
    """Main function to load data and run both hourly and daily analyses."""
    logging.info("Starting data selector optimization process.")
    try:
        logging.info(f"Loading raw data from {RAW_DATA_CSV}...")
        raw_df = pd.read_csv(RAW_DATA_CSV)
        raw_df.columns = raw_df.columns.str.upper() # Standardize column names to upper case
        
        raw_df['MEASURED_TIME'] = pd.to_datetime(raw_df['MEASURED_TIME'])
        if hasattr(raw_df['MEASURED_TIME'].dt, 'tz') and raw_df['MEASURED_TIME'].dt.tz is not None:
            raw_df['MEASURED_TIME'] = raw_df['MEASURED_TIME'].dt.tz_localize(None)
        
        # Rename for consistency within the script
        raw_df.rename(columns={'MEASURED_TIME': 'measured_time', 'MEASUREMENT': 'measurement', 'READING_VALUE': 'reading_value'}, inplace=True)

        logging.info(f"Successfully loaded {len(raw_df)} raw measurements into memory.")
    except FileNotFoundError:
        logging.error(f"ERROR: The data file '{RAW_DATA_CSV}' was not found.")
        return
    except Exception as e:
        logging.error(f"An error occurred while loading or parsing the CSV file: {e}", exc_info=True)
        return

    # Run hour and then daily unless it breaks then just daily
    run_analysis(raw_df.copy(), mode='hourly')
    run_analysis(raw_df.copy(), mode='daily')

    logging.info("="*50)
    logging.info("ALL ANALYSES COMPLETE.")
    logging.info("="*50)


if __name__ == '__main__':
    run_optimization()


In [7]:
# Read results
hourly_csv = r'C:\Users\Austin\Documents\DATABANK\Masters\Thesis\Code\final_method\hourly_optimization_results.csv'
daily_csv = r'C:\Users\Austin\Documents\DATABANK\Masters\Thesis\Code\final_method\daily_optimization_results.csv'
hourly_df = pd.read_csv(hourly_csv)
daily_df = pd.read_csv(daily_csv)

In [26]:
# Analyze optimization results
pollutants = ['O3', 'NO2', 'SO2', 'PM10', 'PM25']
parameter_cols = ['dev_T', 'dev_P', 'dev_RH', 'dev_WS']
def analyze_results(df):
    # Drop rows with missing values
    #df.dropna(subset=['actual', 'predicted'], inplace=True)
    # To store the figures
    figures = {}
    for pollutant in pollutants:
        print(f"--- Processing data for {pollutant} ---")
        pollutant_df = df[df['pollutant'] == pollutant].copy()
        grouped = pollutant_df.groupby(parameter_cols)
        
        # --- Calculate Metrics (Mean, Median, Min, and Custom Stat) ---
        # These stats are calculated on all days, including those with 0 samples.
        stats = grouped['n_units_used'].agg([
            'mean', 
            'median', 
            'min', 
            ('percent_under_15_samples', lambda x: (x < 20).sum() / x.size * 100 if x.size > 0 else 0)
        ]).reset_index()
        
        stats.rename(columns={
            'mean': 'average_sample_size',
            'median': 'median_sample_size',
            'min': 'min_sample_size'
        }, inplace=True)
        
        # --- Calculate Correlation ---
        # This function now internally handles days with no predictions.
        def calculate_r(group):
            # Only use rows with valid actual and predicted values for correlation
            valid_group = group.dropna(subset=['actual', 'predicted'])
            if valid_group.shape[0] < 2 or valid_group['actual'].nunique() < 2 or valid_group['predicted'].nunique() < 2:
                return np.nan
            return valid_group['actual'].corr(valid_group['predicted'])

        correlations = grouped.apply(calculate_r, include_groups=False).reset_index(name='r')
        
        # Merge stats and correlations.
        final_df = pd.merge(stats, correlations, on=parameter_cols)
        
        # For plotting, we can only use rows where a correlation was calculated.
        final_df.dropna(subset=['r'], inplace=True)
        
        if final_df.empty:
            print(f"No valid data to plot for {pollutant}. Skipping.")
            continue
            
        final_df['r2'] = final_df['r']**2
        final_df['combination'] = final_df.apply(
            lambda row: f"T{row['dev_T']}_P{row['dev_P']}_RH{row['dev_RH']}_WS{row['dev_WS']}",
            axis=1
        )
        
        # --- Create Detailed Hover Text ---
        final_df['hover_text'] = final_df.apply(
            lambda row: (
                f"<b>{row['combination']}</b><br><br>"
                f"Correlation (r): {row['r']:.3f}<br>"
                f"R-squared: {row['r2']:.3f}<br>"
                f"Avg Samples: {row['average_sample_size']:.2f}<br>"
                f"% Days < 15 Samples: {row['percent_under_15_samples']:.1f}%"
            ),
            axis=1
        )

        # --- Create the Interactive Plot ---
        fig = go.Figure()

        # Add initial trace (will be updated by dropdown)
        fig.add_trace(
            go.Scatter(
                x=final_df['average_sample_size'],
                y=final_df['r'],
                mode='markers',
                marker=dict(
                    size=12,
                    color=final_df['r'],
                    colorscale='viridis',
                    showscale=True,
                    colorbar=dict(title='Correlation (r)'),
                    cmin=-1, cmax=1
                ),
                text=final_df['hover_text'], # Use the new detailed hover text
                hoverinfo='text', # Show only the custom text on hover
            )
        )
        
        # --- Add Dropdown Menu for X-axis ---
        fig.update_layout(
            updatemenus=[
                dict(
                    active=0,
                    buttons=list([
                        dict(label="Average Sample Size",
                             method="update",
                             args=[{"x": [final_df['average_sample_size']]},
                                   {"xaxis.title": "Average Sample Size (n_units_used)"}]),
                        dict(label="Median Sample Size",
                             method="update",
                             args=[{"x": [final_df['median_sample_size']]},
                                   {"xaxis.title": "Median Sample Size (n_units_used)"}]),
                        dict(label="Minimum Sample Size",
                             method="update",
                             args=[{"x": [final_df['min_sample_size']]},
                                   {"xaxis.title": "Minimum Sample Size (n_units_used)"}]),
                        dict(label="percent under 15",
                             method="update",
                             args=[{"x": [final_df['percent_under_15_samples']]},
                                   {"xaxis.title": "Minimum Sample Size (n_units_used)"}]),
                    ]),
                    direction="down",
                    pad={"r": 10, "t": 10},
                    showactive=True,
                    x=0.01,
                    xanchor="left",
                    y=1.14,
                    yanchor="top"
                )
            ]
        )
        
        # --- Update Layout and Autoscale Y-axis ---
        # Calculate y-axis range with a 5% buffer
        min_r = final_df['r'].min()
        max_r = final_df['r'].max()
        buffer = (max_r - min_r) * 0.05 if (max_r - min_r) > 0 else 0.1
        yaxis_range = [min_r - buffer, max_r + buffer]

        fig.update_layout(
            title=f'Correlation vs. Sample Size for {pollutant}',
            xaxis_title='Average Sample Size (n_units_used)', # Initial title
            yaxis_title='Correlation Coefficient (r)',
            yaxis_range=yaxis_range, # Apply autoscaled range
            plot_bgcolor='white',
            showlegend=False,
            hovermode='closest',
            title_x=0.5
        )
        
        figures[pollutant] = fig
        print(f"Analysis for {pollutant} complete.")

    return figures

# --- Execute the Analysis and Display the Plots ---
pollutant_plots = analyze_results(hourly_df)

for pollutant_name, plot in pollutant_plots.items():
    print(f"\nDisplaying plot for {pollutant_name}:")
    plot.show()  

--- Processing data for O3 ---
Analysis for O3 complete.
--- Processing data for NO2 ---
Analysis for NO2 complete.
--- Processing data for SO2 ---
Analysis for SO2 complete.
--- Processing data for PM10 ---
Analysis for PM10 complete.
--- Processing data for PM25 ---
Analysis for PM25 complete.

Displaying plot for O3:



Displaying plot for NO2:



Displaying plot for SO2:



Displaying plot for PM10:



Displaying plot for PM25:


<center>Step 7: Calculate Model Coefficients<center>

In [None]:
# yearly model = 1
# monthly models = 12
# daily models = 365
# hourly models = 8760

# for each model type:
    # iterate over each time step
        # LOOCV: iterate over stations to remove (and do one with all?)   
            # if yearly or monthly
                # Skip data selector, Average dep var values
            # if daily or hourly
                # Use optimal data selector parameters to calculate dep variable values
            # Model calibration to find indep vars and coefs
            # Predict all stations 
        

In [None]:
# Initialize sign contraints

# Sign-constrained directions (for buffre calc) #TODO: update with new variables
sign_constraints = {'O3':{'lu_urban':-1,
                    'lu_grass':1,
                    'lu_forest':1,
                    'build_cover': 0,
                    'build_vol':0,
                    'build_std':0,
                    'ndvi':1,
                    'elevation':0,
                    'pop':0,
                    'mj_road_ln':-1,
                    'mi_road_ln':-1,
                    'parking':0,
                    'bus_stops':0,
                    'pow_plant':0,
                    'airport':0,
                    'mj_road_dis':0,
                    'T':0,
                    'RH':0,
                    'pressure':0,
                    'WS':-1},
                'NO2':{'lu_urban':1,
                    'lu_grass':-1,
                    'lu_forest':-1,
                    'build_cover': 1,
                    'build_vol':1,
                    'build_std':-1,
                    'ndvi':-1,
                    'elevation':-1,
                    'pop':1,
                    'mj_road_ln':1,
                    'mi_road_ln':1,
                    'parking':1,
                    'bus_stops':1,
                    'pow_plant':-1,
                    'airport':-1,
                    'mj_road_dis':-1,
                    'T':-1,
                    'RH':1,
                    'pressure':1,
                    'WS':-1},
                'PM':{'lu_urban':1,
                    'lu_grass':-1,
                    'lu_forest':-1,
                    'build_cover': 1,
                    'build_vol':1,
                    'build_std':-1,
                    'ndvi':-1,
                    'elevation':-1,
                    'pop':1,
                    'mj_road_ln':1,
                    'mi_road_ln':1,
                    'parking':1,
                    'bus_stops':1,
                    'pow_plant':-1,
                    'airport':-1,
                    'mj_road_dis':-1,
                    'T':-1,
                    'RH':-1,
                    'pressure':0,
                    'WS':-1},
                'SO2':{'lu_urban':0,
                    'lu_grass':-1,
                    'lu_forest':-1,
                    'build_cover': 1,
                    'build_vol':1,
                    'build_std':-1,
                    'ndvi':-1,
                    'elevation':0,
                    'pop':1,
                    'mj_road_ln':1,
                    'mi_road_ln':1,
                    'parking':1,
                    'bus_stops':1,
                    'pow_plant':-1,
                    'airport':0,
                    'mj_road_dis':-1,
                    'T':-1,
                    'RH':0,
                    'pressure':0,
                    'WS':-1}}

In [5]:
# Calculating model coefs and setting up SQLAlchemy engine

# Function to return a new SQLAlchemy engine with appropriate connection pool settings
# Replacement for API
def get_engine():
    return create_engine(
        "postgresql://thesis:thesis@localhost:5432/hist_data",
        pool_pre_ping=True,     # Verify connections before using them
        pool_recycle=3600,      # Recycle connections after an hour
        pool_size=5,            # Limit pool size
        max_overflow=10         # Allow extra connections if needed
    )

# Using input time, query the database for historical data
def create_training_set(date_string):
    date_obj = datetime.strptime(date_string, '%Y-%m-%d %H:%M')
    reformatted_string = date_obj.strftime('%Y-%m-%d %H:%M:%S')
    # Connect to database
    engine = get_engine()
    # First collect current data
    # Will be an API but I'm going to use the postgresql db
    query = f"""
    SELECT * FROM sensor_data
    WHERE time = '{reformatted_string}'
    """
    current_data = pd.read_sql_query(text(query), engine)
    return current_data

'''
'''

# 20 predictors
predictors = [
    'lu_urban', 'lu_grass', 'lu_forest', 'build_cover', 'build_vol',
    'build_std', 'ndvi', 'elevation', 'pop', 'mj_road_ln',
    'mi_road_ln', 'parking', 'bus_stops', 'pow_plant', 'airport',
    'mj_road_dis', 'T', 'RH', 'pressure', 'WS'
]
# 17 predictors
predictors = [
    'lu_urban', 'lu_grass', 'lu_forest', 'build_cover', 'build_vol',
    'build_std', 'ndvi', 'elevation', 'pop', 'mj_road_ln',
    'mi_road_ln', 'parking', 'bus_stops', 'pow_plant', 'airport',
    'mj_road_dis'
]
# predictors with buffer size (so that the stored coefs can be used in mapping function)


# Generate pollutant data with different relationships to predictors
pollutants = ['O3', 'NO2', 'SO2', 'PM10', 'PM2_5']
# Defined sign constraints ahead with buffer calculation
sign_constraints = {'O3':{'lu_urban':-1,
                    'lu_grass':1,
                    'lu_forest':1,
                    'build_cover': 0,
                    'build_vol':0,
                    'build_std':0,
                    'ndvi':1,
                    'elevation':0,
                    'pop':0,
                    'mj_road_ln':-1,
                    'mi_road_ln':-1,
                    'parking':0,
                    'bus_stops':0,
                    'pow_plant':0,
                    'airport':0,
                    'mj_road_dis':0,
                    'T':0,
                    'RH':0,
                    'pressure':0,
                    'WS':-1},
                'NO2':{'lu_urban':1,
                    'lu_grass':-1,
                    'lu_forest':-1,
                    'build_cover': 1,
                    'build_vol':1,
                    'build_std':-1,
                    'ndvi':-1,
                    'elevation':-1,
                    'pop':1,
                    'mj_road_ln':1,
                    'mi_road_ln':1,
                    'parking':1,
                    'bus_stops':1,
                    'pow_plant':-1,
                    'airport':-1,
                    'mj_road_dis':-1,
                    'T':-1,
                    'RH':1,
                    'pressure':1,
                    'WS':-1},
                'PM':{'lu_urban':1,
                    'lu_grass':-1,
                    'lu_forest':-1,
                    'build_cover': 1,
                    'build_vol':1,
                    'build_std':-1,
                    'ndvi':-1,
                    'elevation':-1,
                    'pop':1,
                    'mj_road_ln':1,
                    'mi_road_ln':1,
                    'parking':1,
                    'bus_stops':1,
                    'pow_plant':-1,
                    'airport':-1,
                    'mj_road_dis':-1,
                    'T':-1,
                    'RH':-1,
                    'pressure':0,
                    'WS':-1},
                'SO2':{'lu_urban':0,
                    'lu_grass':-1,
                    'lu_forest':-1,
                    'build_cover': 1,
                    'build_vol':1,
                    'build_std':-1,
                    'ndvi':-1,
                    'elevation':0,
                    'pop':1,
                    'mj_road_ln':1,
                    'mi_road_ln':1,
                    'parking':1,
                    'bus_stops':1,
                    'pow_plant':-1,
                    'airport':0,
                    'mj_road_dis':-1,
                    'T':-1,
                    'RH':0,
                    'pressure':0,
                    'WS':-1,}}

# Function to run to turn a time into model coefficients
# Depending on how I want to initiate model calculation, might change from being a timestamp. Currently good for demo
def calc_model_coefs(input_datetime):
    # Create the training set
    current_data = create_training_set(input_datetime)
    print(f'Modelling for {input_datetime}')

    if current_data.shape[0] < 10:
        print('Insufficient station data')
        return None
    else:
        print(f'Queried training set, {current_data.shape[0]} samples')

    # Run models for all pollutants
    model_coefs, _ = run_multi_pollutant_models(
        data=current_data, # Also each model has its own training data (can make data a dict with the training data inside) (not optimally efficient though, have this function include a filter_by_pollutant(copy of filter by deviation)
        predictors=predictors, # Will have to fix this in the function to include the buffers (maybe)
        pollutants=pollutants,
        sign_constraints=sign_constraints,
        alphas=np.logspace(-3, 0, 4),  # [0.001, 0.01, 0.1, 1.0]
        l1_ratios = [0.1, 0.5, 0.9],
        cv=5,
        verbose=True
    )
    
    # Flatten coefficients for all the pollutants into one dictionary
    row_idcs = model_coefs.index.astype(str)
    col_names = model_coefs.columns
    m_flat = {'time':input_datetime}
    for row_idx in row_idcs:
        for col_name in col_names:
            new_col_name = f'{row_idx}_{col_name}'
            value = model_coefs.loc[row_idx, col_name]
            m_flat[new_col_name] = value
    return m_flat

In [None]:
# Calculate and store n days of model coefficients in db (for now just do the last 24 hours)

# Create the table
def create_coef_table(sample_dict):
    table_name = 'coef_data'
    # Create engine
    engine = create_engine('postgresql://thesis:thesis@localhost:5432/hist_data')
    # Drop the table if it exists already
    with engine.connect() as conn:
        conn.execute(text("DROP TABLE IF EXISTS coef_data CASCADE"))
        conn.commit()
    # Create tables using the modern pattern
    Base = declarative_base()
    # Define column types mapping
    dtype_mapping = {
        'int64': Integer,
        'float64': Float,
        'object': String,
        'datetime64[ns]': DateTime,
    }
    # Create attributes dictionary
    attrs = {
        '__tablename__': table_name,
        'id': Column(Integer, primary_key=True) # Primary key indicates that its an id maker
    }
    # Add each column from the sample dictionary
    for column, value in sample_dict.items():
        # Additional handling for NumPy types
        value_type = type(value).__name__
        
        if 'float' in value_type:
            sql_type = Float
        elif 'int' in value_type:
            sql_type = Integer
        elif 'str' in value_type or 'object' in value_type:
            sql_type = String
        elif 'datetime' in value_type or 'Timestamp' in value_type:
            sql_type = DateTime
        else:
            # Default to standard Python type mapping or String as fallback
            sql_type = dtype_mapping.get(type(value), String)

        attrs[column] = Column(sql_type)
    # Create the model class
    TableClass = type(table_name.capitalize(), (Base,), attrs)
    # Create table
    Base.metadata.create_all(engine)
    return TableClass

# Fill table with data / add new rows
def populate_table(TableClass, coefs):
    # Create engine
    engine = create_engine('postgresql://thesis:thesis@localhost:5432/hist_data')

    # Populate the table with data
    with Session(engine) as session:
        # Initialize list for converted python native types (not necessary if I format it correctly in the beginning)
        processed_records = []
        for record in coefs:
            processed_record = {}
            for key, value in record.items():
                if 'numpy' in str(type(value)):
                    if 'float' in str(type(value)):
                        processed_record[key] = float(value)
                    elif 'int' in str(type(value)):
                        processed_record[key] = int(value)
                    elif 'datetime' in str(type(value)) or 'Timestamp' in str(type(value)):
                        processed_record[key] = value.to_pydatetime() if hasattr(value, 'to_pydatetime') else value
                    else:
                        processed_record[key] = str(value)
                else:
                    processed_record[key] = value
            processed_records.append(processed_record)
        
        for record in processed_records:
            # Create a new instance of the model with the record data
            model_instance = TableClass(**record)
            session.add(model_instance)
        
        # Commit the session to save all records
        session.commit()
        print(f"Added {len(coefs)} records to {TableClass.__tablename__} table")

input_date = '2024-12-04 22:00'
prev_hours = 6
# Generate strings for the last 24 hours in a list
base_date = datetime.strptime(input_date, '%Y-%m-%d %H:%M')
input_dates = []
for i in range(prev_hours):
    new_dt = base_date - timedelta(hours=i)
    new_dt_format = new_dt.strftime('%Y-%m-%d %H:%M')
    input_dates.append(new_dt_format)

# Create the table using a sample (input date)
sample_dict = calc_model_coefs(input_date) # TODO: in this process, you calculate the first date twice, not really necessary
coef_table = create_coef_table(sample_dict)

# Iterate over each date, calculate model coefficients, and add to the db
coefs_ls = []
for date in input_dates:
    model_coefs = calc_model_coefs(date)
    if model_coefs: # Checking if there is data or not
        coefs_ls.append(model_coefs)
populate_table(coef_table, coefs_ls)

<center>Step 8: Regression Prediction Mapping<center>

In [None]:
# Use current wind direction data to calculate 100m wind field (3 bit data storage?)
# Create wind-direction base maps with wind field + indep var base maps
# Use coefs + intercept to calculate the prediction map

In [None]:
# Step 2: Use model coefficients to calculate the regression prediction maps
# Prediction maps are going to be saved in memory for now

def pollutant_preds(pollutant, timestamp):
    print(f'Building Prediction Map for {pollutant} at {timestamp}...')
    
    # Set up db connection:
    engine = get_engine()
    # Query coefs for the timestamp
    query = f"SELECT * FROM coef_data WHERE time = '{timestamp}'"
    coefs_df = pd.read_sql_query(text(query), engine)
    # Filter the coefs df to only include pollutant values
    pol_coefs = coefs_df.loc[:,coefs_df.columns.str.contains(pollutant)]
    # Filter coefficients that equal zero
    filter_coefs = pol_coefs.loc[:,pol_coefs.iloc[0] != 0]

    # Get scaling params to rescale maps (could pickle the scaler but won't for now)
    x_values = pd.read_csv(r'C:\Users\Austin\Documents\DATABANK\Masters\Thesis\Code\V5\model_indep_vars2.csv')
    filtered_cols = [col1 for col1 in x_values.columns if any(col2 in col1 for col2 in filter_coefs)]
    
    if len(filtered_cols) < 4:
        print('Insufficient predictors')
        return None
    else:
        print(f'Independant Variables Used in Map Creation: {filtered_cols}')

    filtered_x_vals = x_values[filtered_cols]
    scaler = StandardScaler()
    _ = scaler.fit_transform(filtered_x_vals)
    # Get scaling parameters for x values
    feature_means = scaler.mean_
    feature_stds = scaler.scale_
    # Dictionary to map feature names to their scaling params
    scaling_params = {}
    for i, feature in enumerate(filtered_x_vals.columns):
        scaling_params[feature] = {
            'mean':feature_means[i],
            'std':feature_stds[i]}
    
    # Apply scaling params to maps
    # Iterate over each basemap / x value
    for variable in filtered_x_vals.columns:
        dist_var = False
        
        # Need to reformat the variable for query so that it doesn't contain pollutant for distance calcs or buffer size for buffer calcs
        # TODO: Really need to clean up this code and make PM2_5 PM25 instead or its so confusing
        if any(precalc in variable for precalc in precalcs):
            dist_var = True
            # Remove the pollutant before
            if pollutant == 'PM2_5': # Have to do seperate one bc I'm a dummy
                q_variable = variable.split('_',2)[2]
            else:
                q_variable = variable.split('_',1)[1]
        else:
            # Remove meters after for non-distance variables
            q_variable = variable.rsplit('_',1)[0]
        
        query = f"SELECT values FROM base_maps WHERE array_id = '{q_variable}'"
        df = pd.read_sql_query(query, engine)
        array = np.array(df['values'].iloc[0])

        # Multiply precalcs by 100 TODO: need to redo this when I save the maps to postgres
        if any(precalc in variable for precalc in precalcs):
            array = array * 100

        # Scale array
        scaled_array = (array - scaling_params[variable]['mean']) / scaling_params[variable]['std']
        # Multiply coefficient
        # Horrible way to do this but I need to set a condition for the distance calcs
        if dist_var:
            contribution_array = scaled_array * filter_coefs[variable][0]
        else:
            contribution_array = scaled_array * filter_coefs[q_variable][0]
        # Add to prediction array
        if 'prediction_array' not in locals():
            prediction_array = contribution_array
        else:
            prediction_array += contribution_array
    # Finally add intercept
    intercept_val = filter_coefs[f'{pollutant}_intercept'][0]
    print(f'Adding intercept: {intercept_val}')
    prediction_array += intercept_val
    return prediction_array

'''
'''

# Calculate prediction
input_time = '2024-12-04 22:00'
# Set PM rolling average count
rolling_avg_hours = 6
precalcs = ['pow_plant', 'airport', 'mj_road_dis']

predictions = {}
pollutants = ['O3','NO2','SO2','PM10','PM2_5']
for pollutant in pollutants:
    
    if pollutant in ['PM10', 'PM2_5']:
        # Create list of timestamps for rolling average calculation
        timestamps = []
        dt_ts = datetime.strptime(input_time, '%Y-%m-%d %H:%M')
        for i in range(rolling_avg_hours):
            new_dt = dt_ts - timedelta(hours=i)
            new_dt_format = new_dt.strftime('%Y-%m-%d %H:%M')
            timestamps.append(new_dt_format)
        # Initialize list of arrays to store all of the past values
        past_arrays = []
        for timestamp in timestamps:
            past_arr = pollutant_preds(pollutant, timestamp)
            # Don't add the array if the model 'fails'
            if past_arr is not None:
                past_arrays.append(past_arr)
            else:
                continue
        # Stack arrays into 2d and average
        if len(past_arr) > 1:
            stacked = np.vstack(past_arrays)
            predictions[pollutant] = np.mean(stacked, axis=0)
        else:
            predictions[pollutant] = None
    # Normal calculation for other variables
    else:
        predictions[pollutant] = pollutant_preds(pollutant, input_time)
        # Will save none value for model failure

<center>Step 9: Regression Kriging<center>

In [None]:
# Set station information in map

# Creates dictionaries of the indices in both 1d and 2d arrays for each station
def create_closest_point_dict_vectorized(df, raster_path):
    # Open the raster file
    with rasterio.open(raster_path) as src:
        raster_data = src.read(1)
        transform = src.transform
        
        # Get indices of non-NaN values (valid points)
        non_nan_indices = np.where(raster_data != 0)
        valid_rows, valid_cols = non_nan_indices
        
        # Create arrays to store row,col for each location in df
        df_rows = np.zeros(len(df), dtype=int)
        df_cols = np.zeros(len(df), dtype=int)
        
        # Convert all df locations to row,col in one go
        for i, (x, y) in enumerate(df['location']):
            r, c = rowcol(transform, x, y)
            df_rows[i] = r
            df_cols[i] = c
        
        # Create result dictionaries
        closest_point_dict = {}  # 1D flattened index
        indices_2d_dict = {}     # 2D array indices (row, col)
        
        # Process each df point
        for i, name in enumerate(df['name']):
            r, c = df_rows[i], df_cols[i]
            
            # Check if this point is valid in the raster
            if 0 <= r < raster_data.shape[0] and 0 <= c < raster_data.shape[1] and raster_data[r, c] != 0:
                # Find position in the non_nan_indices
                flat_idx = np.where((valid_rows == r) & (valid_cols == c))[0][0]
                closest_r, closest_c = r, c
            else:
                # Calculate distances to all valid points
                distances = np.sqrt((valid_rows - r)**2 + (valid_cols - c)**2)
                # Find the closest valid point
                min_idx = np.argmin(distances)
                flat_idx = min_idx
                closest_r, closest_c = valid_rows[min_idx], valid_cols[min_idx]
            
            closest_point_dict[name] = flat_idx
            indices_2d_dict[name] = (closest_r, closest_c)
            
        return closest_point_dict, indices_2d_dict

station_locs_1d, station_locs_2d = create_closest_point_dict_vectorized(station_info, f_boundary_r)
print(station_locs_1d)
print(len(station_locs_1d))

In [None]:
# Experiment with kriging variables

current_measures = create_training_set(input_time)
pollutant = 'SO2'
current_pol_vals = {name: pol for name, pol in zip(current_measures['name'], current_measures[pollutant])}
print(current_pol_vals)

def save_raster(map, output_path):
    # Open the boundary raster as a template
    with rasterio.open(f_boundary_r) as src:
        profile = src.profile.copy()
        raster_shape = src.shape
        # Create empty raster of zeroes
        output_raster = np.zeros(raster_shape, dtype=map.dtype)
        # Get row and col indices
        rows = non_nan_indices[0]
        cols = non_nan_indices[1]
        # Place 1d values back into 2d raster
        output_raster[rows,cols] = map

        # Write the new raster to the output path
        with rasterio.open(output_path, 'w', **profile) as dst:
            dst.write(output_raster, 1)

def krig(values):
    # TODO: define all of this stuff beforehand, doesn't need to be done multiple times
    # Define the coordinates (indexes) of the stations
    names = list(station_info['name'])
    y_coords = np.array([float(station_locs_2d[name][0]) for name in names])
    x_coords = np.array([float(station_locs_2d[name][1]) for name in names])

    values_arr = np.array([values[name] for name in names])
    # Define the grid over which to do interpolation
    grid_x = np.arange(188, dtype=float)
    grid_y = np.arange(177, dtype=float)

    OK = OrdinaryKriging(
        x_coords, y_coords, values_arr,
        variogram_model='linear',
        variogram_parameters={'slope': 1.0, 'nugget': 0.1},
        verbose = False,
        enable_plotting=True,
        nlags=10
    )
    # Krige the grid with the data values
    z, ss = OK.execute('grid', grid_x, grid_y)
    # Save krig into 1d array within boundary
    rows = non_nan_indices[0]
    cols = non_nan_indices[1]
    oneD_krig = z[rows,cols]
    return np.array(oneD_krig)

so2_krig = krig(current_pol_vals)
save_raster(so2_krig, r'rasters\so2_krig3.tif')



In [None]:
# Create RK postgres table

def save_raster(map, output_path):
    # Open the boundary raster as a template
    with rasterio.open(f_boundary_r) as src:
        profile = src.profile.copy()
        raster_shape = src.shape
        # Create empty raster of zeroes
        output_raster = np.zeros(raster_shape, dtype=map.dtype)
        # Get row and col indices
        rows = non_nan_indices[0]
        cols = non_nan_indices[1]
        # Place 1d values back into 2d raster
        output_raster[rows,cols] = map

        # Write the new raster to the output path
        with rasterio.open(output_path, 'w', **profile) as dst:
            dst.write(output_raster, 1)

# Function to krig station points and return 1d array
# Values have to be in the right order! 
def krig(values):
    # TODO: define all of this stuff beforehand, doesn't need to be done multiple times
    # Define the coordinates (indexes) of the stations
    names = list(station_info['name'])
    y_coords = np.array([float(station_locs_2d[name][0]) for name in names])
    x_coords = np.array([float(station_locs_2d[name][1]) for name in names])

    values_arr = np.array([values[name] for name in names])
    # Define the grid over which to do interpolation
    grid_x = np.arange(188, dtype=float)
    grid_y = np.arange(177, dtype=float)

    OK = OrdinaryKriging(
        x_coords, y_coords, values_arr,
        variogram_model='linear',
        verbose = False,
        enable_plotting=False
    )
    # Krige the grid with the data values
    z, ss = OK.execute('grid', grid_x, grid_y)
    
    # Save krig into 1d array within boundary
    rows = non_nan_indices[0]
    cols = non_nan_indices[1]
    oneD_krig = z[rows,cols]
    return np.array(oneD_krig)

def regkrig(values, prediction): 
    names = list(station_info['name'])
    # Calculate predicted values at each station from the maps
    station_predictions = np.array([prediction[station_locs_1d[station]] for station in names])
    values_arr = np.array([values[name] for name in names]) #TODO: Not calculating average value for PM in this
    # Calculate residuals
    residuals = values_arr - station_predictions
    residual_dict = {name:residual for name,residual in zip(names,residuals)}
    # Krig residuals
    res_krig = krig(residual_dict)
    # save residual krig for inspection
    save_raster(res_krig, rf'rasters\{}')
    
    # Add back to the prediction
    return prediction + res_krig


# Discrete and continuous to visualize
# Just do continuous for now
aqi_levels = [1,2,3,4,5,6]
breakpoints = [
[0,50,100,130,240,380], #O3
[0,40,90,120,230,340], #NO2
[0,100,200,350,500,750], #SO2
[0,20,40,50,100,150], #PM10
[0,10,20,25,50,75]  #PM2.5
]

# Needs a bit more documentation ngl
def calc_aqi(pollutant_arrays):
    def aqi_bin(pollutant_arr, pol_breakpoint):
        return np.interp(pollutant_arr, pol_breakpoint, aqi_levels)
    all_pollutants = np.stack(list(pollutant_arrays.values()), axis=0)
    aqi_maps = np.array([aqi_bin(all_pollutants[i], breakpoints[i]) for i in range(len(breakpoints))])
    aqi_array = np.max(aqi_maps, axis=0)
    return aqi_array

'''
'''


# Get current measurements using previous function
# Should make a new function that also properly formats PM values for kriging (just not going to bother with it now)
current_measures = create_training_set(input_time)

pollutant_maps = {}
# Calculate map for each pollutant
for pollutant in pollutants:
    print(f'Calculating final map for {pollutant}')
    # Get pollutant values
    # TODO: Need to set up stronger contingency for when not all of the measurements are there

    current_pol_vals = {name: pol for name, pol in zip(current_measures['name'], current_measures[pollutant])}
    # Get prediction from previous step
    prediction = predictions[pollutant]
    
    # If the model failed
    # For now, the reason is it regularized too many variables, later I may just test if its better or not (if calculating the OK everytime, may need to move krig)
    if prediction is None:
        print('No prediction, kriging values')
        map = krig(current_pol_vals)
    else:
        print('Regression kriging')
        map = regkrig(current_pol_vals, prediction)

    # Save map 
    #save_map(map)
    output_path = rf'rasters\rk_{pollutant}.tif'
    save_raster(map, output_path)
    # Save to dictionary for AQI calc
    pollutant_maps[pollutant] = map

# Calculate AQI map from all of the arrays
#aqi_map = calc_aqi(pollutant_maps)

# Save AQI to table
output_path = rf'rasters\rk_aqi.tif'
save_raster(aqi_map, output_path)
#save_map(aqi_map)

<center>Step 10: Validation / Mapping Confidence<center>

In [None]:
# Test LOOCV accuracy of current hour
# Go through each station
    # Use training set from step 6 saved in memory to take out the station
        # technically would want to re-query removing the station from the avg calc used in the data selector
    # Calculate coefs based on this
    # Predict station points (not the removed station)
    # Krig residuals but only calculating the removed station
# Calculate MAE / r2 for all the predicted and actual values combined into one set

# Will be adapted to do every hour for final validation assessment