# Data Transformation with DuckDB

This notebook loads raw GDELT data into DuckDB and performs basic SQL
transformations to create a clean dataset for analysis.


In [1]:
import duckdb
import pandas as pd
import os
import requests
from io import BytesIO



In [15]:
DATE_STR = "20250101"

RAW_DATA_PATH = "../data/raw"
PROCESSED_DATA_PATH = "../data/processed"
os.makedirs(PROCESSED_DATA_PATH, exist_ok=True)

RAW_FILE = f"{RAW_DATA_PATH}/gdelt_events_{DATE_STR}.csv"  # chỉnh đúng tên file raw nếu khác
DB_PATH = "../data/gdelt.duckdb"

print("Using raw file:", RAW_FILE)


Using raw file: ../data/raw/gdelt_events_20250101.csv


In [16]:
con = duckdb.connect(DB_PATH)
print("Connected to DuckDB at:", DB_PATH)

Connected to DuckDB at: ../data/gdelt.duckdb


In [17]:
con.execute("DROP TABLE IF EXISTS raw_events")

con.execute("""
    CREATE TABLE raw_events AS
    SELECT * FROM read_csv_auto(?, header=true)
""", [RAW_FILE])

con.execute("SELECT COUNT(*) AS total_rows FROM raw_events").df()


Unnamed: 0,total_rows
0,70082


In [18]:
con.execute("""
    SELECT
        COUNT(*) AS total_rows,
        COUNT(GLOBALEVENTID) AS event_id_not_null,
        COUNT(SQLDATE) AS sqldate_not_null,
        COUNT(EventCode) AS event_code_not_null,
        COUNT(Actor1CountryCode) AS country_not_null
    FROM raw_events
""").df()
con.close()

In [26]:
cols = con.execute("DESCRIBE raw_events").df()
cols
print("Columns in raw_events table:", cols['column_name'].tolist())

Columns in raw_events table: ['GLOBALEVENTID', 'SQLDATE', 'MonthYear', 'Year', 'FractionDate', 'Actor1Code', 'Actor1Name', 'Actor1CountryCode', 'Actor1KnownGroupCode', 'Actor1EthnicCode', 'Actor1Religion1Code', 'Actor1Religion2Code', 'Actor1Type1Code', 'Actor1Type2Code', 'Actor1Type3Code', 'Actor2Code', 'Actor2Name', 'Actor2CountryCode', 'Actor2KnownGroupCode', 'Actor2EthnicCode', 'Actor2Religion1Code', 'Actor2Religion2Code', 'Actor2Type1Code', 'Actor2Type2Code', 'Actor2Type3Code', 'IsRootEvent', 'EventCode', 'EventBaseCode', 'EventRootCode', 'QuadClass', 'GoldsteinScale', 'NumMentions', 'NumSources', 'NumArticles', 'AvgTone', 'Actor1Geo_Type', 'Actor1Geo_Fullname', 'Actor1Geo_CountryCode', 'Actor1Geo_ADM1Code', 'Actor1Geo_ADM2Code', 'Actor1Geo_Lat', 'Actor1Geo_Long', 'Actor1Geo_FeatureID', 'Actor2Geo_Type', 'Actor2Geo_Fullname', 'Actor2Geo_CountryCode', 'Actor2Geo_ADM1Code', 'Actor2Geo_ADM2Code', 'Actor2Geo_Lat', 'Actor2Geo_Long', 'Actor2Geo_FeatureID', 'ActionGeo_Type', 'ActionGeo_Fu

In [29]:
raw_cols = set(con.execute("DESCRIBE raw_events").df()["column_name"].tolist())

def pick(col, alias=None, cast=None):
    if col not in raw_cols:
        return None
    expr = col
    if cast:
        expr = f"TRY_CAST({col} AS {cast})"
    if alias:
        expr = f"{expr} AS {alias}"
    return expr

select_exprs = [
    pick("GLOBALEVENTID", "event_id", "BIGINT"),
    pick("SQLDATE", "event_date", "INTEGER"),
    pick("Actor1CountryCode", "country"),
    pick("EventCode", "event_code"),
    pick("AvgTone", "avg_tone", "DOUBLE"),
    pick("GoldsteinScale", "goldstein_scale", "DOUBLE"),
    pick("Actor1Name", "actor1_name"),
    pick("Actor2Name", "actor2_name"),
    pick("DATEADDED", "date_added", "INTEGER"),
    pick("SOURCEURL", "source_url"),
]

# remove None
select_exprs = [x for x in select_exprs if x is not None]

# build SELECT safely (NO f-string tricks)
select_sql = ",\n    ".join(select_exprs)

where_clauses = []
for essential in ["GLOBALEVENTID", "SQLDATE", "Actor1CountryCode", "EventCode"]:
    if essential in raw_cols:
        where_clauses.append(f"{essential} IS NOT NULL")

where_sql = ""
if where_clauses:
    where_sql = "WHERE " + " AND ".join(where_clauses)

query = f"""
CREATE OR REPLACE TABLE events_clean AS
SELECT
    {select_sql}
FROM raw_events
{where_sql}
"""

print(query)  # debug: xem SQL cuối cùng
con.execute(query)

con.execute("SELECT COUNT(*) AS rows_clean FROM events_clean").df()



CREATE OR REPLACE TABLE events_clean AS
SELECT
    TRY_CAST(GLOBALEVENTID AS BIGINT) AS event_id,
    TRY_CAST(SQLDATE AS INTEGER) AS event_date,
    Actor1CountryCode AS country,
    EventCode AS event_code,
    TRY_CAST(AvgTone AS DOUBLE) AS avg_tone,
    TRY_CAST(GoldsteinScale AS DOUBLE) AS goldstein_scale,
    Actor1Name AS actor1_name,
    Actor2Name AS actor2_name
FROM raw_events
WHERE GLOBALEVENTID IS NOT NULL AND SQLDATE IS NOT NULL AND Actor1CountryCode IS NOT NULL AND EventCode IS NOT NULL



Unnamed: 0,rows_clean
0,39153


In [33]:
con.execute("""
CREATE OR REPLACE TABLE events_clean_dedup AS
SELECT *
FROM (
    SELECT
        *,
        ROW_NUMBER() OVER (
            PARTITION BY event_id
            ORDER BY event_date DESC
        ) AS rn
    FROM events_clean
)
WHERE rn = 1
""")
OUT_FILE = f"../data/processed/events_clean_{DATE_STR}.csv"
con.execute("COPY events_clean_dedup TO ? (HEADER, DELIMITER ',')", [OUT_FILE])
print("Saved:", OUT_FILE)


Saved: ../data/processed/events_clean_20250101.csv


In [34]:
con.execute("DESCRIBE raw_events").df()


Unnamed: 0,column_name,column_type,null,key,default,extra
0,GLOBALEVENTID,BIGINT,YES,,,
1,SQLDATE,BIGINT,YES,,,
2,MonthYear,BIGINT,YES,,,
3,Year,BIGINT,YES,,,
4,FractionDate,DOUBLE,YES,,,
5,Actor1Code,VARCHAR,YES,,,
6,Actor1Name,VARCHAR,YES,,,
7,Actor1CountryCode,VARCHAR,YES,,,
8,Actor1KnownGroupCode,VARCHAR,YES,,,
9,Actor1EthnicCode,VARCHAR,YES,,,


In [35]:
con.execute("DROP TABLE IF EXISTS events_clean")

con.execute("""
CREATE TABLE events_clean AS
SELECT
    CAST(GLOBALEVENTID AS BIGINT) AS event_id,
    CAST(SQLDATE AS INTEGER)      AS event_date,
    Actor1CountryCode             AS country,
    EventCode                     AS event_code,
    AvgTone                       AS avg_tone,
    GoldsteinScale                AS goldstein_scale,
    Actor1Name                    AS actor1_name,
    Actor2Name                    AS actor2_name,
    ActionGeo_Fullname            AS action_geo_fullname,
    ActionGeo_CountryCode         AS action_geo_countrycode
FROM raw_events
WHERE
    GLOBALEVENTID IS NOT NULL
    AND SQLDATE IS NOT NULL
    AND EventCode IS NOT NULL
    AND Actor1CountryCode IS NOT NULL
""")


<_duckdb.DuckDBPyConnection at 0x111212c70>

In [36]:
con.execute("""
CREATE OR REPLACE TABLE events_clean_dedup AS
SELECT *
FROM (
    SELECT
        *,
        ROW_NUMBER() OVER (
            PARTITION BY event_id
            ORDER BY event_date DESC
        ) AS rn
    FROM events_clean
)
WHERE rn = 1
""")


<_duckdb.DuckDBPyConnection at 0x111212c70>

In [37]:
OUT_FILE = f"../data/processed/events_clean_{DATE_STR}.csv"
con.execute("COPY events_clean_dedup TO ? (HEADER, DELIMITER ',')", [OUT_FILE])
print("Saved:", OUT_FILE)


Saved: ../data/processed/events_clean_20250101.csv
