In [1]:
#Importing the dependencies
import duckdb as ddb

In [2]:
#Interface(Http file system) that will allow data download from Http connections or HDFS
ddb.sql("INSTALL httpfs; LOAD httpfs")

In [3]:
#Create a connection to DuckDB database
conn = ddb.connect(r"..\nakuru_air_quality.db")

In [4]:
#Create a schema
conn.execute("CREATE schema IF NOT EXISTS nakuru;")

<duckdb.duckdb.DuckDBPyConnection at 0x1b6c7304630>

In [5]:
#Configuring DuckDB to access S3 storage
conn.sql("""
    SET s3_access_key_id='';
    SET s3_secret_access_key='';
    SET s3_region='';
""")

In [6]:
#Create a table inside the schema to store air quality data
conn.execute("""
    CREATE TABLE IF NOT EXISTS nakuru.air_quality_data (
        location_id BIGINT,
        sensors_id BIGINT,
        "location" VARCHAR,
        "datetime" TIMESTAMP,
        lat DOUBLE,
        lon DOUBLE,
        "parameter" VARCHAR,
        units VARCHAR,
        "value" VARCHAR,
        "month" VARCHAR,
        "year" BIGINT,
        ingenstion_datetime TIMESTAMP
    );
    
""")

<duckdb.duckdb.DuckDBPyConnection at 0x1b6c7304630>

In [7]:
#Test the file before inserting
location_id = 1894637
year = 2025
month = "11"
day = "03"

file_path = f"s3://openaq-data-archive/records/csv.gz/locationid={location_id}/year={year}/month={month}/location-{location_id}-{year}{month}{day}.csv.gz"

conn.execute(f"""
SELECT *
FROM read_csv('{file_path}')
LIMIT 5
""").df()

Unnamed: 0,location_id,sensors_id,location,datetime,lat,lon,parameter,units,value,locationid,month,year
0,1894637,7466385,Nakuru-1864771,2025-11-03 06:00:27,-0.2674,36.0218,pm25,µg/m³,9.03,1894637,11,2025
1,1894637,7466385,Nakuru-1864771,2025-11-03 06:17:38,-0.2674,36.0218,pm25,µg/m³,11.44,1894637,11,2025
2,1894637,7466385,Nakuru-1864771,2025-11-03 06:34:49,-0.2674,36.0218,pm25,µg/m³,9.48,1894637,11,2025
3,1894637,7466385,Nakuru-1864771,2025-11-03 06:52:01,-0.2674,36.0218,pm25,µg/m³,7.42,1894637,11,2025
4,1894637,7466385,Nakuru-1864771,2025-11-03 07:09:12,-0.2674,36.0218,pm25,µg/m³,7.82,1894637,11,2025


In [8]:
#Ingest all months data from CSV files stored on S3 directly into DuckDB table
location_id = 1894637
year = 2025

file_path = f"s3://openaq-data-archive/records/csv.gz/locationid={location_id}/year={year}/month=*/location-{location_id}-{year}*.csv.gz"

conn.execute(f"""
INSERT INTO nakuru.air_quality_data
SELECT
    location_id, 
    sensors_id, 
    "location", 
    "datetime", 
    lat, 
    lon, 
    "parameter", 
    units, 
    "value",
    "month", 
    "year",
    current_timestamp AS ingestion_datetime
FROM read_csv('{file_path}');
""")


<duckdb.duckdb.DuckDBPyConnection at 0x1b6c7304630>

In [9]:
#Close the connection
conn.close()