In [7]:

import os
import requests
import duckdb
import warnings
from datetime import datetime, timedelta

warnings.filterwarnings("ignore", message="Unverified HTTPS request")

# Static URLs for other files
files_info = {
    "complete_donor": "https://data.kijang.net/dea/donations/historical.parquet",
    "daily_retention": "https://data.kijang.net/dea/retention/data.parquet",
    "daily_donor_rates": "https://data.kijang.net/dea/donorrate/data.parquet"
}

# Base URL for daily donor files
base_url = "https://data.kijang.net/dea/donations"

# Folder for downloaded files
folder = "downloaded_parquet"
os.makedirs(folder, exist_ok=True)

# Paths
complete_donor_path = os.path.join(folder, "complete_donor.parquet")
daily_donor_path = os.path.join(folder, "daily_donor.parquet")

# Function to find latest available daily donor file

def get_latest_daily_donor(base_url, max_days=30):
    today = datetime.today()
    for i in range(max_days):
        date_to_try = today - timedelta(days=i)
        date_str = date_to_try.strftime("%Y-%m-%d")
        url = f"{base_url}/{date_str}.parquet"
        try:
            response = requests.get(url, verify=False)
            if response.status_code == 200:
                return url, date_str
        except:
            pass
    return None, None

# Step 1: Download static files (complete_donor only if not exists)
if not os.path.exists(complete_donor_path):
    print(f"Downloading initial complete_donor from {files_info['complete_donor']}...")
    r = requests.get(files_info['complete_donor'], verify=False)
    r.raise_for_status()
    with open(complete_donor_path, "wb") as f:
        f.write(r.content)
    print("✅ Initial complete_donor downloaded")
else:
    print("✅ complete_donor already exists, will update incrementally")

# Always refresh daily_retention and daily_donor_rates
for name in ["daily_retention", "daily_donor_rates"]:
    path = os.path.join(folder, f"{name}.parquet")
    if os.path.exists(path):
        os.remove(path)
    print(f"Downloading {name}...")
    r = requests.get(files_info[name], verify=False)
    r.raise_for_status()
    with open(path, "wb") as f:
        f.write(r.content)
    print(f"✅ {name} downloaded")

# Step 2: Connect to DuckDB and load complete_donor
conn = duckdb.connect("donations.duckdb")
conn.execute("DROP TABLE IF EXISTS complete_donor")
conn.execute(f"CREATE TABLE complete_donor AS SELECT * FROM read_parquet('{complete_donor_path}')")

# Find latest date in complete_donor
latest_complete_date = conn.execute("SELECT MAX(visit_date) FROM complete_donor").fetchone()[0]
print(f"Latest date in complete_donor: {latest_complete_date}")

# Step 3: Incrementally update complete_donor with daily files
start_date = latest_complete_date + timedelta(days=1)
today = datetime.today().date()
new_rows_added = 0

while start_date <= today:
    date_str = start_date.strftime("%Y-%m-%d")
    url = f"{base_url}/{date_str}.parquet"
    try:
        r = requests.get(url, verify=False)
        if r.status_code == 200:
            # Save temporary daily file
            with open(daily_donor_path, "wb") as f:
                f.write(r.content)
            # Append to complete_donor
            conn.execute(f"INSERT INTO complete_donor SELECT * FROM read_parquet('{daily_donor_path}')")
            count = conn.execute("SELECT COUNT(*) FROM read_parquet(?)", [daily_donor_path]).fetchone()[0]
            new_rows_added += count
            print(f"✅ Added {count} rows for {date_str}")
        else:
            print(f"❌ No data for {date_str}")
    except Exception as e:
        print(f"❌ Error for {date_str}: {e}")
    start_date += timedelta(days=1)

# Step 4: Persist updated complete_donor back to Parquet
conn.execute(f"COPY complete_donor TO '{complete_donor_path}' (FORMAT 'parquet')")
print(f"✅ Updated complete_donor saved with {new_rows_added} new rows")

# Step 5: Load other tables
for name in ["daily_retention", "daily_donor_rates"]:
    path = os.path.join(folder, f"{name}.parquet")
    conn.execute(f"DROP TABLE IF EXISTS {name}")
    conn.execute(f"CREATE TABLE {name} AS SELECT * FROM read_parquet('{path}')")
    count = conn.execute(f"SELECT COUNT(*) FROM {name}").fetchone()[0]
    print(f"✅ Loaded {name} with {count} rows")

# Step 6: Load latest daily_donor (fallback logic)
latest_url, latest_date = get_latest_daily_donor(base_url)
if latest_url:
    r = requests.get(latest_url, verify=False)
    r.raise_for_status()
    with open(daily_donor_path, "wb") as f:
        f.write(r.content)
    conn.execute("DROP TABLE IF EXISTS daily_donor")
    conn.execute(f"CREATE TABLE daily_donor AS SELECT * FROM read_parquet('{daily_donor_path}')")
    count = conn.execute("SELECT COUNT(*) FROM daily_donor").fetchone()[0]
    print(f"✅ Loaded daily_donor ({latest_date}) with {count} rows")
else:
    print("❌ Could not find any daily donor file in last 30 days")

# Show tables
print("Tables in database:", conn.execute("SHOW TABLES").fetchall())
conn.close()


✅ complete_donor already exists, will update incrementally
Downloading daily_retention...
✅ daily_retention downloaded
Downloading daily_donor_rates...
✅ daily_donor_rates downloaded
Latest date in complete_donor: 2025-11-26
❌ No data for 2025-11-27
✅ Updated complete_donor saved with 0 new rows
✅ Loaded daily_retention with 6914184 rows
✅ Loaded daily_donor_rates with 643803 rows
✅ Loaded daily_donor (2025-11-26) with 1446 rows
Tables in database: [('complete_donor',), ('daily_donor',), ('daily_donor_rates',), ('daily_retention',)]


In [5]:
conn.close()