1. Load Your Raw (Silver) Data

In [2]:
import duckdb

con = duckdb.connect()

con.execute("""
    CREATE OR REPLACE VIEW weather_raw AS
    SELECT *
    FROM 'data/**/*.parquet'
""")

<_duckdb.DuckDBPyConnection at 0x78731a7866f0>

2. Inspect the Data

In [3]:
con.execute("DESCRIBE weather_raw").fetchdf()

Unnamed: 0,column_name,column_type,null,key,default,extra
0,city,VARCHAR,YES,,,
1,latitude,DOUBLE,YES,,,
2,longitude,DOUBLE,YES,,,
3,temperature,DOUBLE,YES,,,
4,windspeed,DOUBLE,YES,,,
5,winddirection,DOUBLE,YES,,,
6,weathercode,BIGINT,YES,,,
7,observation_time,VARCHAR,YES,,,
8,ingestion_time,TIMESTAMP,YES,,,
9,date,DATE,YES,,,


In [4]:
con.execute("SELECT * FROM weather_raw LIMIT 5").fetchdf()

Unnamed: 0,city,latitude,longitude,temperature,windspeed,winddirection,weathercode,observation_time,ingestion_time,date
0,Delhi,28.6139,77.209,23.7,7.3,290.0,3,2026-02-13T09:30,2026-02-13 09:38:53.510745,2026-02-13
1,Delhi,28.6139,77.209,20.1,3.3,264.0,0,2026-02-14T14:45,2026-02-14 20:26:08.386360,2026-02-14
2,London,51.5074,-0.1278,6.9,4.3,66.0,3,2026-02-13T09:30,2026-02-13 09:38:53.510313,2026-02-13
3,London,51.5074,-0.1278,6.4,7.1,345.0,2,2026-02-14T14:45,2026-02-14 20:26:08.385740,2026-02-14
4,NewYork,40.7128,-74.006,-4.7,5.1,321.0,0,2026-02-13T09:30,2026-02-13 09:38:53.510607,2026-02-13


3. Common Data Cleaning Tasks

In [5]:
# Remove Nulls
con.execute("""
    CREATE OR REPLACE VIEW weather_clean AS
    SELECT *
    FROM weather_raw
    WHERE temperature IS NOT NULL
      AND windspeed IS NOT NULL
""")

<_duckdb.DuckDBPyConnection at 0x78731a7866f0>

In [6]:
# Fix Data Types
con.execute("""
    CREATE OR REPLACE VIEW weather_clean AS
    SELECT 
        city,
        CAST(date AS DATE) AS date,
        CAST(temperature AS DOUBLE) AS temperature,
        CAST(windspeed AS DOUBLE) AS windspeed,
        CAST(winddirection AS DOUBLE) AS winddirection,
        CAST(weathercode AS INTEGER) AS weathercode,
        CAST(observation_time AS TIMESTAMP) AS observation_time,
        ingestion_time
    FROM weather_raw
    WHERE temperature IS NOT NULL
""")

<_duckdb.DuckDBPyConnection at 0x78731a7866f0>

In [7]:
# Remove Duplicates
con.execute("""
    CREATE OR REPLACE VIEW weather_clean AS
    SELECT DISTINCT *
    FROM weather_raw
""")

<_duckdb.DuckDBPyConnection at 0x78731a7866f0>

In [8]:
# deduplicate by city + time
con.execute("""
    CREATE OR REPLACE VIEW weather_clean AS
    SELECT *
    FROM (
        SELECT *,
               ROW_NUMBER() OVER (
                   PARTITION BY city, observation_time
                   ORDER BY ingestion_time DESC
               ) AS rn
        FROM weather_raw
    )
    WHERE rn = 1
""")

<_duckdb.DuckDBPyConnection at 0x78731a7866f0>

In [None]:
# Remove Outliers - temperature outside realistic range
con.execute("""
    CREATE OR REPLACE VIEW weather_clean AS
    SELECT *
    FROM weather_raw
    WHERE temperature BETWEEN -50 AND 60
""")

<_duckdb.DuckDBPyConnection at 0x78731a7866f0>

In [None]:
# Add Derived Columns - wind category
con.execute("""
    CREATE OR REPLACE VIEW weather_clean AS
    SELECT *,
        CASE
            WHEN windspeed < 10 THEN 'Low'
            WHEN windspeed < 25 THEN 'Moderate'
            ELSE 'High'
        END AS wind_category
    FROM weather_raw
""")

<_duckdb.DuckDBPyConnection at 0x78731a7866f0>

4. Validate Cleaned Data

In [11]:
con.execute("""
    SELECT city, COUNT(*) 
    FROM weather_clean
    GROUP BY city
""").fetchdf()

Unnamed: 0,city,count_star()
0,Tokyo,2
1,NewYork,2
2,London,2
3,Delhi,2


5. Save Clean Data

In [12]:
# save clean data with partitions
con.execute("""
    COPY (
        SELECT * FROM weather_clean
    )
    TO 'silver/'
    (FORMAT PARQUET, PARTITION_BY (city, date))
""")


<_duckdb.DuckDBPyConnection at 0x78731a7866f0>

6. Create Gold Aggregation

In [15]:
# Daily aggregates
con.execute("""
    CREATE OR REPLACE TABLE weather_gold AS
    SELECT 
        city,
        date,
        AVG(temperature) AS avg_temp,
        MAX(temperature) AS max_temp,
        MIN(temperature) AS min_temp,
        AVG(windspeed) AS avg_wind
    FROM weather_clean
    GROUP BY city, date
""")

<_duckdb.DuckDBPyConnection at 0x78731a7866f0>

In [18]:
# Never write empty gold tables in production
con.execute("SELECT COUNT(*) FROM weather_gold").fetchall()

[(8,)]

In [16]:
# write Gold to Parquet (Partitioned)
con.execute("""
    COPY weather_gold
    TO 'gold/'
    (FORMAT PARQUET, PARTITION_BY (city, date))
""")

<_duckdb.DuckDBPyConnection at 0x78731a7866f0>