## OpenAQ - Filtrage du dataset via SQL (DuckDB)

In [2]:
# ================================================================
# INPUTS: change depending on the data you're working with

import os
from openaq_anomaly_prediction.config import Configuration as cfg

PARQUET_FILEPATH = os.path.join(cfg.DATA_EXPORT_PATH, "seoul_complete.int.parquet")
CITY_TIMEZONE = "Asia/Seoul"  # available for each row/location

### Imports

In [3]:
# ================================================================
# IMPORTS: python packages and settings (don't change anything)

import duckdb
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds

import matplotlib.pyplot as plt
import seaborn as sns

from pprint import pprint

# CONFIG: to display all rows/columns of a dataframe (debug)
pd.set_option("display.max_columns", None)
pd.set_option("display.max_rows", None)
pd.set_option("display.width", 1000)
sep_length = 64  # Only so we can print separators of a fixed length

# PyArrow
parquet_file = pq.ParquetFile(PARQUET_FILEPATH)

# DuckDB:
con = duckdb.connect()
# con.execute(f"SET TimeZone='{CITY_TIMEZONE}';")  # available for each row/location, no need to set globally
con.execute(f"CREATE OR REPLACE VIEW measurements AS SELECT * FROM read_parquet('{PARQUET_FILEPATH}')")

<_duckdb.DuckDBPyConnection at 0x745af0aafc30>

In [4]:
# ================================================================
# UTILS: helper functions (don't change anything)

def print_summary(df: pd.DataFrame) -> None:
    """Print a summary of the useful stats of a dataframe."""
    # Summary
    print(f"{'='*sep_length}\nSUMMARY: Custom DataFrame\n{'-'*sep_length}")
    print(f"Total rows: {len(df)}")
    print(f"From: {df["period.datetimeTo.local"].min()}")
    print(f"To  : {df["period.datetimeTo.local"].max()}")
    print(f"{'-'*int(sep_length/3)*2}")
    print(f"Unique locations: {df["location_id"].nunique()}")
    print(f"Unique pollutants: {df["name"].nunique()}")
    print(f"{'-'*int(sep_length/3)*2}")
    for pollutant in df["name"].unique():
        print(f"> {pollutant}")

def print_summary_timeseries(df: pd.DataFrame) -> None:
    """Print a summary of the useful stats of a TIME-SERIES dataframe."""

    memory_usage = df.memory_usage(index=True, deep=True).sum() / (1024**2)

    # Summary
    print(f"{'='*sep_length}\nSUMMARY: Time-series DataFrame\n{'-'*sep_length}")
    print(f"Total rows  : {len(df)}")
    print(f"Memory usage: {memory_usage:.2f} MB")
    print(f"From: {df["datetimeTo_local"].min()}")
    print(f"To  : {df["datetimeTo_local"].max()}")
    print(f"{'-'*int(sep_length/3)*2}")
    print(f"Unique locations: {df["location_id"].nunique()}")
    print(f"Unique pollutants: {df["name"].nunique()}")
    print(f"{'-'*int(sep_length/3)*1}")
    for pollutant in df["name"].unique():
        print(f"> {pollutant}")

### Résumé du fichier de base (parquet)

In [5]:
# ================================================================
# SUMMARY: for information only (don't change anything)

# DuckDB
db_start_date = con.execute("""
    SELECT min("period.datetimeTo.local")
    FROM measurements
""", {}).fetchone()[0]

db_end_date = con.execute("""
    SELECT max("period.datetimeTo.local")
    FROM measurements
""", {}).fetchone()[0]

db_unique_locations_count = con.execute("""
    SELECT count(DISTINCT "location_id")
    FROM measurements
""", {}).fetchone()[0]

db_unique_pollutants_count = con.execute("""
    SELECT count(DISTINCT "name")
    FROM measurements
""", {}).fetchone()[0]

db_unique_pollutants = con.execute("""
    SELECT DISTINCT "name"
    FROM measurements
""", {}).fetchnumpy()["name"].tolist()  # could use a list comprehension too
#""", {}).df().values.flatten()

# Summary
print(f"{'='*sep_length}\nSUMMARY: {PARQUET_FILEPATH}\n{'-'*sep_length}")
print(f"Total rows: {parquet_file.metadata.num_rows}")
print(f"From: {db_start_date}")
print(f"To  : {db_end_date}")
print(f"{'-'*int(sep_length/3)*2}")
print(f"Unique locations: {db_unique_locations_count}")
print(f"Unique pollutants: {db_unique_pollutants_count}")
print(f"{'-'*int(sep_length/3)*2}")
for pollutant in db_unique_pollutants:
    print(f"> {pollutant}")
print()

# Display a list of all columns available in the dataframe
# print("Available Columns:")
# pprint(parquet_file.schema.names)

SUMMARY: /home/deniscck/code/denis-cck/openaq_anomaly_prediction/data/export/seoul_complete.int.parquet
----------------------------------------------------------------
Total rows: 4918548
From: 2024-04-30 17:00:00+02:00
To  : 2025-12-16 03:00:00+01:00
------------------------------------------
Unique locations: 58
Unique pollutants: 6
------------------------------------------
> o3 ppm
> co ppm
> pm25 µg/m³
> pm10 µg/m³
> no2 ppm
> so2 ppm



### Exemples de filtres

In [6]:
# ================================================================
# SELECT the columns (use -- to comment out a column or /* */ for multiline)

query = """
    CREATE OR REPLACE VIEW bronze_measurements AS
    SELECT
        location_id,
        sensor_id,
        name,
        value,

        "period.datetimeFrom.local",
        "period.datetimeFrom.local",
        "period.datetimeTo.local",
        "period.datetimeTo.utc",
        "datetimeWeather.local",
        timezone,
        "coordinates.latitude",
        "coordinates.longitude",

        -- parameter.id,
        -- parameter.name,
        -- parameter.units,
        "parameter.displayName",

        -- period.datetimeFrom.utc,
        -- location.datetimeFirst.utc,
        -- location.datetimeLast.utc,
        -- timestampTo,

        -- location_name,
        -- country.code,
        -- owner.name,
        -- provider.name,
        -- datetimeWeather.utc,
        -- weather_latitude,
        -- weather_longitude,

        elevation,
        temperature_2m,
        relative_humidity_2m,
        dew_point_2m,
        apparent_temperature,
        precipitation,
        rain,
        snowfall,
        snow_depth,
        shortwave_radiation,
        direct_radiation,
        diffuse_radiation,
        global_tilted_irradiance,
        direct_normal_irradiance,
        terrestrial_radiation,
        weather_code,
        pressure_msl,
        surface_pressure,
        cloud_cover,
        cloud_cover_low,
        cloud_cover_mid,
        cloud_cover_high,
        vapour_pressure_deficit,
        et0_fao_evapotranspiration,
        wind_speed_100m,
        wind_speed_10m,
        wind_direction_10m,
        wind_direction_100m,
        wind_gusts_10m,
        is_day,

    FROM measurements
"""

custom_df = con.execute(query, {}).df()

In [7]:
# ================================================================
# FILTERS: create filters corresponding to SQL syntax (DuckDB)
# Here are examples of what you can do, take it in your own hands
# or get help with your favourite AI friend.

def dont_run_its_an_example() -> None:

    # -------------------------------------------------------------
    # Filter by DATE (be mindful of timezones)
    query = """
        SELECT
            *
        FROM bronze_measurements
        WHERE "period.datetimeTo.local" >= $start_date
          AND "period.datetimeTo.local" <= $end_date
    """
    custom_df = con.execute(query, {
        "start_date": "2025-01-01 00:00:00+09:00",
        "end_date": "2025-12-31 23:59:59+09:00"
    }).df()

    # -------------------------------------------------------------
    # Filter by LOCATION_ID

    # This works perfectly fine in modern DuckDB
    query = """
        SELECT
            *
        FROM bronze_measurements
        WHERE location_id IN $location_ids
    """
    # This is "better" because it turns the list in a temporary table
    query = """
        SELECT
            *
        FROM bronze_measurements
        WHERE location_id IN (SELECT unnest($location_ids))
    """

    custom_df = con.execute(query, {
        "location_ids": [2622825, 2623440, 2623080],
    }).df()

    # -------------------------------------------------------------
    # Filter by POLLUTANT ("name")

    # Same as LOCATION_ID
    query = """
        SELECT
            *
        FROM bronze_measurements
        WHERE name IN (SELECT unnest($pollutants))
    """
    custom_df = con.execute(query, {
        "pollutants": ["pm25 µg/m³", "pm10 µg/m³"],
    }).df()


### Création du dataframe personnalisé

In [8]:
#  # -------------------------------------------------------------
# # Query Example

# query = """
#     SELECT
#         "period.datetimeTo.local",
#         "datetimeWeather.local"
#     FROM bronze_measurements
#     WHERE value < 10000
#       AND "period.datetimeTo.local" >= $start_date
#       AND "period.datetimeTo.local" <= $end_date
#       -- AND location_id IN (SELECT unnest($location_ids))
#       -- AND name IN (SELECT unnest($pollutants))
# """
# custom_df = con.execute(query, {
#     "start_date": "2025-12-01 00:00:00+09:00",
#     "end_date": "2025-12-31 23:59:59+09:00",
#     # "location_ids": [2622825, 2623440, 2623080],
#     # "pollutants": ["pm25 µg/m³"],
# }).df()

# # Stats
# memory_usage = custom_df.memory_usage(index=True, deep=True).sum() / (1024**2)
# print(f"Number of rows: {len(custom_df)}")
# print(f"Memory usage: {memory_usage:.2f} MB")
# # print_summary(custom_df)


In [9]:
# # Check if the two datetime columns are equal (measurements vs weather)
# test_df = custom_df["period.datetimeTo.local"] != custom_df["datetimeWeather.local"]
# test_df.sum()

In [10]:
# sns.lineplot(data=custom_df, x="period.datetimeTo.local", y="value")
# plt.show()

### DuckDB Views

In [11]:
# ================================================================
# TIME-SERIES: BRONZE

query = """
    CREATE OR REPLACE VIEW bronze_timeseries AS
    SELECT
        location_id,
        sensor_id,
        "parameter.displayName" AS displayName,
        name,
        value,

        ("period.datetimeTo.utc" AT TIME ZONE 'UTC')::TIMESTAMP AS datetimeTo_utc,
        ("period.datetimeTo.local" AT TIME ZONE timezone)::TIMESTAMP AS datetimeTo_local,
        ("datetimeWeather.local" AT TIME ZONE timezone)::TIMESTAMP AS datetimeWeather_local,
        -- timezone(timezone, "period.datetimeTo.local") AS datetimeTo_local,
        -- timezone('UTC', "period.datetimeTo.utc") AS datetimeTo_utc,
        -- timezone(timezone, "datetimeWeather.local") AS datetimeWeather_local,
        -- timezone,

        "coordinates.latitude" AS latitude,
        "coordinates.longitude" AS longitude,

        -- parameter.id,
        -- parameter.name,
        -- parameter.units,

        -- period.datetimeFrom.utc,
        -- location.datetimeFirst.utc,
        -- location.datetimeLast.utc,
        -- timestampTo,

        -- location_name,
        -- country.code,
        -- owner.name,
        -- provider.name,
        -- datetimeWeather.utc,
        -- weather_latitude,
        -- weather_longitude,

        elevation,
        temperature_2m,
        relative_humidity_2m,
        dew_point_2m,
        apparent_temperature,
        precipitation,
        rain,
        snowfall,
        snow_depth,
        shortwave_radiation,
        direct_radiation,
        diffuse_radiation,
        global_tilted_irradiance,
        direct_normal_irradiance,
        terrestrial_radiation,
        weather_code,
        pressure_msl,
        surface_pressure,
        cloud_cover,
        cloud_cover_low,
        cloud_cover_mid,
        cloud_cover_high,
        vapour_pressure_deficit,
        et0_fao_evapotranspiration,
        wind_speed_100m,
        wind_speed_10m,
        wind_direction_10m,
        wind_direction_100m,
        wind_gusts_10m,
        is_day,

    FROM measurements
"""
custom_df = con.execute(query, {}).df()

In [14]:
# ================================================================
# TIME-SERIES: SILVER

query = """
    CREATE OR REPLACE VIEW silver_timeseries AS
    SELECT

    -- MEASUREMENTS ==========================

        /* SENSOR METADATA                  */
            location_id,
            sensor_id,
            -- displayName,
            name,

        /* POLLUTANT FEATURES               */
            -- Set values >= 10,000 to NULL for imputation later
            CAST(CASE WHEN value >= 10000 THEN NULL ELSE value END AS DOUBLE) AS value,

        /* TIME FEATURES                    */
            datetimeTo_utc,
            datetimeTo_local,
            datetimeWeather_local,
        
        /* LOCATION FEATURES                */
            latitude,
            longitude,
            elevation,

    -- WEATHER ===============================
        /* ATMOSPHERIC FEATURES             */
            temperature_2m,
            relative_humidity_2m,
            dew_point_2m,
            apparent_temperature,
            pressure_msl,
            surface_pressure,
            weather_code,

        /* PRECIPITATION FEATURES           */
            precipitation,
            rain,
            snowfall,
            snow_depth,

        /* RADIATION/DAYLIGHT FEATURES      */
            shortwave_radiation,
            direct_radiation,
            diffuse_radiation,
            global_tilted_irradiance,
            direct_normal_irradiance,
            terrestrial_radiation,
            is_day,

        /* CLOUD & MOISTURE FEATURES        */
            cloud_cover,
            cloud_cover_low,
            cloud_cover_mid,
            cloud_cover_high,
            vapour_pressure_deficit,
            et0_fao_evapotranspiration,

        /* WIND FEATURES                    */
            wind_speed_100m,
            wind_speed_10m,
            wind_direction_10m,
            wind_direction_100m,
            wind_gusts_10m,

    FROM bronze_timeseries
"""
custom_df = con.execute(query, {}).df()

In [15]:
# ================================================================
# Query last view

query = """
    SELECT
        *
    FROM silver_timeseries
    -- WHERE value < 1000000 -- don't do that, instead put the 10k values to NaN for imputing
    WHERE datetimeTo_local >= $start_date
      AND datetimeTo_local <= $end_date
      -- AND location_id IN (SELECT unnest($location_ids))
      AND name IN (SELECT unnest($pollutants))
"""
timeseries_df = con.execute(query, {
    "start_date": "2024-01-01 00:00:00+09:00",
    "end_date": "2025-12-31 23:59:59+09:00",
    # "location_ids": [2622825, 2623440, 2623080],
    "pollutants": ["pm25 µg/m³"],
}).df()

# Stats
print_summary_timeseries(timeseries_df)
timeseries_df.info()

SUMMARY: Time-series DataFrame
----------------------------------------------------------------
Total rows  : 819758
Memory usage: 296.30 MB
From: 2024-05-01 00:00:00
To  : 2025-12-16 11:00:00
------------------------------------------
Unique locations: 58
Unique pollutants: 1
---------------------
> pm25 µg/m³
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 819758 entries, 0 to 819757
Data columns (total 39 columns):
 #   Column                      Non-Null Count   Dtype         
---  ------                      --------------   -----         
 0   location_id                 819758 non-null  int64         
 1   sensor_id                   819758 non-null  int64         
 2   name                        819758 non-null  object        
 3   value                       779644 non-null  float64       
 4   datetimeTo_utc              819758 non-null  datetime64[us]
 5   datetimeTo_local            819758 non-null  datetime64[us]
 6   datetimeWeather_local       819758 non-null  dateti

In [16]:
total_count = len(timeseries_df)
outliers_count = len(timeseries_df[timeseries_df["value"] == 10000])
print(f"Outliers (value == 10000): {outliers_count/total_count:.2%} ({outliers_count}/{total_count})")
print(f"NaN values: {timeseries_df['value'].isna().sum()/total_count:.2%} ({timeseries_df['value'].isna().sum()}/{total_count})")

Outliers (value == 10000): 0.00% (0/819758)
NaN values: 4.89% (40114/819758)


In [17]:
# timeseries_df[timeseries_df['value'].isna()].head()

In [18]:
# custom_df.info()
# display(custom_df.tail())

### Baseline: Linear Regression

In [53]:
from sklearn.model_selection import train_test_split

# Just for the baseline we remove everything NaN
removed_nan_indices = timeseries_df.dropna()

X = removed_nan_indices.drop(columns=["value"])
y = removed_nan_indices["value"].values

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)

In [54]:
print(f"X_train shape: {X_train.shape} (NaN: {X_train.isna().sum().sum()})")
print(f"X_test shape: {X_test.shape} (NaN: {X_test.isna().sum().sum()})")
print(f"y_train shape: {y_train.shape} (NaN: {np.isnan(y_train).sum()})")
print(f"y_test shape: {y_test.shape} (NaN: {np.isnan(y_test).sum()})")


X_train shape: (623715, 38) (NaN: 0)
X_test shape: (155929, 38) (NaN: 0)
y_train shape: (623715,) (NaN: 0)
y_test shape: (155929,) (NaN: 0)


In [57]:
num_columns = X.select_dtypes(include=['number'])
cat_columns = X.select_dtypes(include=['object'])

print(num_columns)
print(cat_columns)

KeyboardInterrupt: 

In [None]:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer

num_pipeline = Pipeline([
    ('scaler', StandardScaler())
])

cat_pipeline = Pipeline([
    ('ohe', OneHotEncoder())
])