<a href="https://colab.research.google.com/github/fishee82oo/nfs-oil-price-prediction/blob/training/GDELT_Model_Training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install -q google-cloud-storage pyarrow pycountry tqdm

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/6.3 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.3/6.3 MB[0m [31m9.2 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.8/6.3 MB[0m [31m36.5 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━[0m [32m6.1/6.3 MB[0m [31m55.3 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m6.3/6.3 MB[0m [31m42.5 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.3/6.3 MB[0m [31m30.1 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
import importlib.util, sys, os, io, json, gzip
from datetime import datetime, timedelta
import pandas as pd, numpy as np
from tqdm import tqdm

In [3]:
try:
    from google.colab import auth
    auth.authenticate_user()
except Exception:
    pass
import google.auth
from google.cloud import storage
credentials, default_project = google.auth.default()
client = storage.Client(project=default_project, credentials=credentials)

# Import data from GCS

In [4]:
spec = importlib.util.spec_from_file_location("gdelt_module", "/mnt/data/gdelt_data_engineering_clean.ipynb")
gdelt_module = None
if spec and spec.loader:
    gm = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(gm)
    gdelt_module = gm
if gdelt_module and hasattr(gdelt_module, "GCS_BUCKET_NAME"):
    BUCKET_NAME = getattr(gdelt_module, "GCS_BUCKET_NAME")
else:
    BUCKET_NAME = os.environ.get("GCS_BUCKET_NAME", "gdelt_raw_3_years")
if gdelt_module and hasattr(gdelt_module, "GCS_PROCESSED_PATH"):
    PROCESSED_PREFIX = getattr(gdelt_module, "GCS_PROCESSED_PATH")
else:
    PROCESSED_PREFIX = os.environ.get("GCS_PROCESSED_PATH", "processed_data/")
bucket = client.bucket(BUCKET_NAME)
blobs = list(client.list_blobs(BUCKET_NAME, prefix=PROCESSED_PREFIX))
final_blobs = [b for b in blobs if b.name.startswith(f"{PROCESSED_PREFIX}final_aligned_data_") and b.name.endswith(".json.gz")]
final_blobs_sorted = sorted(final_blobs, key=lambda b: b.name, reverse=True)
if len(final_blobs_sorted)==0:
    raise SystemExit("No final_aligned_data_*.json.gz found under the processed prefix")
latest_blob = final_blobs_sorted[0]
local_download_path = "/tmp/latest_final_aligned_data.json.gz"
with open(local_download_path, "wb") as f:
    f.write(latest_blob.download_as_bytes())

# Forming Graph

In [5]:
import os

if os.path.exists(local_download_path):
    file_size = os.path.getsize(local_download_path) / (1024*1024)
    print(f"File downloaded: {local_download_path}")
    print(f"Size: {file_size:.2f} MB")

else:
    print("File NOT found locally")

print(f"Downloaded from GCS: {latest_blob.name}")
print(f"Bucket: {BUCKET_NAME}")
print(f"Last modified: {latest_blob.updated}")

with gzip.open(local_download_path, "rt", encoding="utf-8") as f:
    sample = f.read(500)
    print(f"Data preview:\n{sample[:200]}...")

File downloaded: /tmp/latest_final_aligned_data.json.gz
Size: 2.37 MB
Downloaded from GCS: processed_data/final_aligned_data_20250908.json.gz
Bucket: gdelt_raw_3_years
Last modified: 2025-09-08 01:30:57.993000+00:00
Data preview:
[{"date": "20220825", "country": "US", "event_count": 2301, "avg_sentiment": -0.023938528465884453, "unique_sources": 728, "wti_price": 93.33, "brent_price": 98.81, "theme_energy": 45, "theme_conflict...


In [21]:
with gzip.open(local_download_path, "rt", encoding="utf-8") as f:
    raw = f.read()
records = json.loads(raw)
df = pd.DataFrame.from_records(records)
df["date"] = pd.to_datetime(df["date"])
df = df.sort_values("date").reset_index(drop=True)
df.shape

for column in df.columns:
    print(f"Column: {column}")
    print(df[column])
    print("-" * 30)

Column: date
0        2022-08-25
1        2022-08-25
2        2022-08-25
3        2022-08-25
4        2022-08-25
            ...    
143396   2025-08-25
143397   2025-08-25
143398   2025-08-25
143399   2025-08-25
143400   2025-08-25
Name: date, Length: 143401, dtype: datetime64[ns]
------------------------------
Column: country
0         US
1         LU
2         NU
3         ID
4         DO
          ..
143396    MV
143397    LY
143398    MU
143399    BC
143400    GI
Name: country, Length: 143401, dtype: object
------------------------------
Column: event_count
0         2301
1            7
2            2
3           35
4            1
          ... 
143396       6
143397       8
143398       8
143399       3
143400       1
Name: event_count, Length: 143401, dtype: int64
------------------------------
Column: avg_sentiment
0        -0.023939
1        -0.039387
2        -0.069690
3        -0.038237
4         0.000000
            ...   
143396   -0.031522
143397   -0.022810
143398   -0.0

In [23]:
import pycountry
def to_iso3(name):
    try:
        c = pycountry.countries.lookup(name)
        return c.alpha_3
    except Exception:
        s = str(name).upper()
        s2 = "".join([c for c in s if c.isalpha() or c==" "]).strip().replace(" ", "_")
        return s2
df["country_iso3"] = df["country"].fillna("UNKNOWN").apply(to_iso3)
df["node_id"] = df["country_iso3"].astype(str) + "_" + df["date"].dt.strftime("%Y%m%d")
df.shape

(143401, 15)

In [24]:
price_by_date = df[["date","wti_price"]].drop_duplicates().set_index("date").sort_index()
price_by_date["wti_price"] = pd.to_numeric(price_by_date["wti_price"], errors="coerce")
price_by_date = price_by_date.sort_index()
price_by_date["wti_price_next"] = price_by_date["wti_price"].shift(-1)
price_by_date["wti_delta_next"] = price_by_date["wti_price_next"] - price_by_date["wti_price"]
price_by_date["wti_ret_next"] = price_by_date["wti_delta_next"] / price_by_date["wti_price"]
price_map = price_by_date.to_dict(orient="index")
def attach_targets(row):
    pdx = row["date"]
    v = price_map.get(pdx)
    if v is None:
        return pd.Series([np.nan,np.nan])
    return pd.Series([v.get("wti_delta_next"), v.get("wti_ret_next")])
df[["wti_delta_next","wti_ret_next"]] = df.apply(attach_targets, axis=1)

for column in df.columns:
    print(f"Column: {column}")
    print(df[column])
    print("-" * 30)

Column: date
0        2022-08-25
1        2022-08-25
2        2022-08-25
3        2022-08-25
4        2022-08-25
            ...    
143396   2025-08-25
143397   2025-08-25
143398   2025-08-25
143399   2025-08-25
143400   2025-08-25
Name: date, Length: 143401, dtype: datetime64[ns]
------------------------------
Column: country
0         US
1         LU
2         NU
3         ID
4         DO
          ..
143396    MV
143397    LY
143398    MU
143399    BC
143400    GI
Name: country, Length: 143401, dtype: object
------------------------------
Column: event_count
0         2301
1            7
2            2
3           35
4            1
          ... 
143396       6
143397       8
143398       8
143399       3
143400       1
Name: event_count, Length: 143401, dtype: int64
------------------------------
Column: avg_sentiment
0        -0.023939
1        -0.039387
2        -0.069690
3        -0.038237
4         0.000000
            ...   
143396   -0.031522
143397   -0.022810
143398   -0.0

In [9]:
feature_cols = [c for c in df.columns if c not in ["country","date","node_id","country_iso3","wti_delta_next","wti_ret_next"]]
feature_cols = [c for c in feature_cols if df[c].dtype != "object" or c.startswith("theme_")]
node_features = df[["node_id","country","country_iso3","date","wti_price","brent_price","wti_delta_next","wti_ret_next"] + [c for c in df.columns if c in feature_cols]]
node_features = node_features.fillna(0)

In [10]:
opec_members = ["Venezuela","Saudi Arabia","Iran","Iraq","Kuwait","UAE","Qatar","Algeria","Angola","Libya","Nigeria","Ecuador","Gabon","Republic of the Congo","Equatorial Guinea"]
iso_map = {}
for name in opec_members:
    try:
        iso_map[name] = pycountry.countries.lookup(name).alpha_3
    except Exception:
        iso_map[name] = name.upper().replace(" ","_")
opec_iso = list(iso_map.values())
from itertools import combinations
static_edges = []
for a,b in combinations(opec_iso,2):
    static_edges.append({"source":a,"target":b,"edge_type":"opec_member"})
static_edges_df = pd.DataFrame(static_edges)

In [11]:
dyn_edges_df = pd.DataFrame(columns=["source","target","edge_type","timestamp","date"])
potential_actor_cols = [c for c in df.columns if "actor" in c.lower() or "actor1" in c.lower() or "actor2" in c.lower()]
if len(potential_actor_cols)>0:
    actors = []
    for idx,row in df.iterrows():
        for k in potential_actor_cols:
            v = row.get(k)
        if row.get("actor1") and row.get("actor2"):
            s = str(row.get("actor1"))
            t = str(row.get("actor2"))
            dyn_edges_df.loc[len(dyn_edges_df)] = [s,t,"gdelt_event",row.get("date"),row.get("date")]


In [12]:
out_dir_local = "/tmp/graph_export"
os.makedirs(out_dir_local, exist_ok=True)
nodes_out = os.path.join(out_dir_local, "nodes.parquet")
static_edges_out = os.path.join(out_dir_local, "edges_static.parquet")
dyn_edges_out = os.path.join(out_dir_local, "edges_dynamic.parquet")
node_features.to_parquet(nodes_out, index=False)
static_edges_df.to_parquet(static_edges_out, index=False)
dyn_edges_df.to_parquet(dyn_edges_out, index=False)


ValueError: Duplicate column names found: ['node_id', 'country', 'country_iso3', 'date', 'wti_price', 'brent_price', 'wti_delta_next', 'wti_ret_next', 'event_count', 'avg_sentiment', 'unique_sources', 'wti_price', 'brent_price', 'theme_energy', 'theme_conflict', 'theme_sanctions', 'theme_trade', 'theme_economy', 'theme_policy']

In [13]:
gcs_prefix = PROCESSED_PREFIX + "graph_dataset/"
for p in [nodes_out, static_edges_out, dyn_edges_out]:
    bn = os.path.basename(p)
    blob = bucket.blob(f"{gcs_prefix}{bn}")
    with open(p,"rb") as f:
        blob.upload_from_file(f)
meta = {"nodes":nodes_out,"edges_static":static_edges_out,"edges_dynamic":dyn_edges_out,"uploaded_at":datetime.utcnow().isoformat()}
meta_blob = bucket.blob(f"{gcs_prefix}metadata.json")
meta_blob.upload_from_string(json.dumps(meta), content_type="application/json")

FileNotFoundError: [Errno 2] No such file or directory: '/tmp/graph_export/nodes.parquet'

In [23]:
df = []


In [None]:
unique_dates = sorted(df["date"].dt.date.unique())
for d in tqdm(unique_dates):
    sub = node_features[node_features["date"].dt.date==d].copy()
    if sub.shape[0]==0:
        continue
    fn = f"node_features_{d.strftime('%Y%m%d')}.parquet"
    localp = os.path.join(out_dir_local,fn)
    sub.to_parquet(localp, index=False)
    blob = bucket.blob(f"{gcs_prefix}{fn}")
    with open(localp,"rb") as f:
        blob.upload_from_file(f)

In [None]:
print("done")
print("bucket",BUCKET_NAME)
print("processed prefix",PROCESSED_PREFIX)
print("latest aligned blob", latest_blob.name)
print("graph files uploaded to", PROCESSED_PREFIX + "graph_dataset/")