In [0]:
import requests
from datetime import datetime, timedelta
import pandas as pd
from pyspark.sql.functions import current_timestamp, lit

# --- Date Range Setup (Next 5 Days) ---
start_date = datetime.utcnow().date()
end_date = start_date + timedelta(days=5)
print(f"📅 Getting PredictHQ events from {start_date} to {end_date}")

# --- API Setup ---
ACCESS_TOKEN = "k0LREaYjfPWZ1Hvma3tIKQsNP5BG6RZw35DU7R3p"
headers = {
    "Authorization": f"Bearer {ACCESS_TOKEN}",
    "Accept": "application/json"
}
url = "https://api.predicthq.com/v1/events/"

# --- Params & Pagination ---
params = {
    "country": "SG",
    "start.gte": start_date.isoformat(),
    "start.lte": end_date.isoformat(),
    "limit": 100,
    "offset": 0
}
all_events = []

# --- Loop through paginated responses ---
while True:
    response = requests.get(url, headers=headers, params=params)
    if response.status_code != 200:
        raise Exception(f"❌ Request Failed: {response.status_code} - {response.text}")

    data = response.json()
    events = data.get("results", [])
    all_events.extend(events)

    if not data.get("next"):
        break
    params["offset"] += params["limit"]

# --- Load into pandas DataFrame ---
df_pd = pd.DataFrame(all_events)

if df_pd.empty:
    print("⚠️ No events found in the API response.")
else:
    # Extract lat/lon from the 'location' field
    df_pd["lon"] = df_pd["location"].apply(lambda x: x[0] if isinstance(x, list) else None)
    df_pd["lat"] = df_pd["location"].apply(lambda x: x[1] if isinstance(x, list) else None)

    # Drop unused columns
    df_pd.drop(columns=["location"], inplace=True, errors="ignore")
    df_pd.drop(columns=["geo"], inplace=True, errors="ignore")

    # --- Convert to Spark DataFrame ---
    df_spark = spark.createDataFrame(df_pd)

    # Add metadata columns
    df_spark = df_spark.withColumn("source", lit("PredictHQ")) \
                       .withColumn("ingestion_timestamp", current_timestamp())

    # --- Preview Data ---
    display(df_spark)


    # --- Write to Delta Table in Unity Catalog ---
    df_spark.write.format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable("civAI.raw.predict_hq_events")

    print("✅ Data written to table: civAI.raw.predict_hq_events")
