In [1]:
import duckdb as ddb
import pandas as pd

Install and load DuckDB's httpfs extension to allow DuckDB to
read and query air quality data fetched from the OpenAQ API
via HTTP/HTTPS endpoints

In [2]:
ddb.sql("INSTALL httpfs; LOAD httpfs")

Creat a connection to a duckDB database 

In [3]:
con = ddb.connect("../air_quality.db")

execute a schema command in database to create a new schema

In [4]:
con.execute("CREATE schema IF NOT EXISTS raw")

<_duckdb.DuckDBPyConnection at 0x2c495e371f0>

three S's we need in order to connect to the S3 pucket

In [5]:
con.sql(
    """
    SET s3_access_key_id='';
    SET s3_secret_access_key='';    
    SET s3_region='';
    """
)

Initialize the raw air quality staging table.
This table serves as the raw ingestion layer for OpenAQ air quality data,
preserving source fields and adding an ingestion timestamp for auditing.

In [6]:
con.execute("""
    CREATE TABLE IF NOT EXISTS raw.air_quality (
        location_id BIGINT,
        sensors_id BIGINT,
        "location" VARCHAR,
        "datetime" TIMESTAMP,
        lat DOUBLE,
        lon DOUBLE,
        "parameter" VARCHAR,
        units VARCHAR,
        "value" DOUBLE,
        "month" VARCHAR,
        "year" BIGINT,
        ingestion_datetime TIMESTAMP
    );

""")

<_duckdb.DuckDBPyConnection at 0x2c495e371f0>

Populate the raw OpenAQ air quality staging table by reading monthly

partitioned CSV files from the OpenAQ S3 archive. This step represents

the raw ingestion layer of the pipeline and records ingestion time
for traceability.

In [7]:
con.execute("""
INSERT INTO raw.air_quality
SELECT 
    location_id, 
    sensors_id, 
    "location", 
    "datetime", 
    lat, 
    lon, 
    "parameter", 
    units, 
    "value",
    "month", 
    "year",
    current_timestamp AS ingestion_datetime
FROM read_csv('s3://openaq-data-archive/records/csv.gz/locationid=225405/year=2024/month=03/*.csv.gz');
""")

<_duckdb.DuckDBPyConnection at 0x2c495e371f0>

In [8]:
con.close()