In [2]:
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import os
import gzip
import shutil
import requests
from io import BytesIO, TextIOWrapper

In [3]:
# Config / Environment
# -----------------------
BASE_URL = "https://dumps.wikimedia.org/other/pageviews"
# local storage inside worker — choose a shared/ephemeral path depending on your infra
BASE_WORKDIR = "/tmp/wikipedia_pipeline"
RAW_DIR = os.path.join(BASE_WORKDIR, "raw")
EXTRACTED_DIR = os.path.join(BASE_WORKDIR, "extracted")
OUTPUT_DIR = os.path.join(BASE_WORKDIR, "output")

# Companies to track (page titles exactly as in dumps)
COMPANIES = ["Apple_Inc.", "Microsoft", "Tesla,_Inc.", "Amazon.com", "Meta_Platforms"]

# Airflow connections/variables (set these in Airflow UI -> Admin -> Connections / Variables)
POSTGRES_CONN_ID = "postgres_core_sentiment"  # must be configured in Airflow connections
ALERT_EMAILS = ["data-team@example.com"]      # configure as you wish (list)

# Ensure directories exist
for d in (RAW_DIR, EXTRACTED_DIR, OUTPUT_DIR):
    os.makedirs(d, exist_ok=True)

In [4]:
# Utility functions
# -----------------------
def _format_hour_filename(dt: datetime):
    """
    Wikimedia file pattern: pageviews-{YYYY}{MM}{DD}-{HH}0000.gz
    Example: pageviews-20251001-000000.gz for 2025-10-01 00:00:00 -> covers 23:00-00:00? (follow docs)
    We'll use dt.hour formatted as two digits and append 0000.
    """
    return f"pageviews-{dt.strftime('%Y%m%d')}-{dt.strftime('%H')}0000.gz"

def _download_url(url: str) -> bytes:
    """Download content and return bytes. Raises exception if non-200."""
    resp = requests.get(url, stream=True, timeout=60)
    if resp.status_code != 200:
        raise AirflowFailException(f"Failed to download {url} (status {resp.status_code})")
    return resp.content

In [5]:
# Task implementations
def download_data(ds, **context):
    """
    Download pageviews file for the execution date/hour.
    By default this DAG expects the logical execution time (ds) and an `execution_hour` variable in params
    (or default to 00).
    """
    # ds is YYYY-MM-DD ; optionally we allow the hour via dag_run.conf or Airflow params
    dag_run = context.get("dag_run")
    conf = getattr(dag_run, "conf", {}) or {}
    hour = conf.get("hour", 0)  # integer 0-23; allow override at runtime
    # Use ds + hour as the target datetime
    dt = datetime.strptime(ds, "%Y-%m-%d").replace(hour=int(hour))
    filename = _format_hour_filename(dt)
    url = f"{BASE_URL}/{dt.strftime('%Y')}/{dt.strftime('%Y')}-{dt.strftime('%m')}/{filename}"
    local_path = os.path.join(RAW_DIR, filename)
    # If file already downloaded, skip (idempotency)
    if os.path.exists(local_path):
        print(f"[download_data] File already exists, skipping download: {local_path}")
        return local_path

    print(f"[download_data] Downloading {url}")
    try:
        content = _download_url(url)
    except Exception as e:
        # bubble up so Airflow marks task failed and retries if configured
        raise

    with open(local_path, "wb") as f:
        f.write(content)
    print(f"[download_data] Saved to {local_path}")
    return local_path

In [6]:
def transform_data(ds, **context):
    """
    Read extracted text files and aggregate pageviews for COMPANIES.
    Output CSV has columns: page, date_hour (YYYY-MM-DD HH:00:00), views
    """
    items = []
    # Determine date_hour string from ds and conf hour to match download
    dag_run = context.get("dag_run")
    conf = getattr(dag_run, "conf", {}) or {}
    hour = int(conf.get("hour", 0))
    date_hour_dt = datetime.strptime(ds, "%Y-%m-%d").replace(hour=hour)
    date_hour_str = date_hour_dt.strftime("%Y-%m-%d %H:00:00")

    for fname in os.listdir(EXTRACTED_DIR):
        file_path = os.path.join(EXTRACTED_DIR, fname)
        print(f"[transform_data] Processing {file_path}")
        # file format: domain_code page_name view_count response_size
        # whitespace separated — page_name may contain underscores but no spaces
        try:
            # Use pandas for speed but read with engine python to be robust to whitespace
            df = pd.read_csv(file_path, sep=r"\s+", header=None, names=["domain", "page", "views", "bytes"], usecols=[0,1,2,3], engine="python")
        except Exception as e:
            print(f"[transform_data] Failed to parse {file_path}: {e}")
            continue

        df_filtered = df[df["page"].isin(COMPANIES)]
        if df_filtered.empty:
            continue
        df_agg = df_filtered.groupby("page", as_index=False)["views"].sum()
        df_agg["date_hour"] = date_hour_str
        items.append(df_agg)

    if not items:
        # Nothing to store — raise or simply return. We'll just create an empty CSV to indicate run complete.
        out_file = os.path.join(OUTPUT_DIR, f"aggregated_{ds.replace('-', '')}_{hour:02d}.csv")
        pd.DataFrame(columns=["page", "views", "date_hour"]).to_csv(out_file, index=False)
        print(f"[transform_data] No matching pageviews found. Created empty output: {out_file}")
        return out_file

    final = pd.concat(items, ignore_index=True)
    # If multiple extracted files exist for same page (rare in our pattern), sum them
    final = final.groupby(["page","date_hour"], as_index=False)["views"].sum()

    out_file = os.path.join(OUTPUT_DIR, f"aggregated_{ds.replace('-', '')}_{hour:02d}.csv")
    final.to_csv(out_file, index=False)
    print(f"[transform_data] Aggregation complete. Wrote {out_file}")
    return out_file


In [7]:
def load_data_to_postgres(ti, **context):
    """
    Upsert aggregated CSV rows into Postgres.
    Table schema expected:
      CREATE TABLE IF NOT EXISTS pageviews_aggregated (
        page TEXT NOT NULL,
        date_hour TIMESTAMP NOT NULL,
        views BIGINT NOT NULL,
        PRIMARY KEY (page, date_hour)
      );
    Upsert uses ON CONFLICT (page, date_hour) DO UPDATE SET views = EXCLUDED.views
    """
    csv_path = ti.xcom_pull(task_ids="transform_data")
    if not csv_path or not os.path.exists(csv_path):
        raise AirflowFailException(f"No aggregated CSV available at {csv_path}")

    df = pd.read_csv(csv_path)
    if df.empty:
        print("[load_data_to_postgres] No rows to insert.")
        return

    pg = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID)
    # Prepare rows as tuples
    # Ensure date_hour castable to timestamp; PostgresHook will handle param substitution
    rows = []
    for _, r in df.iterrows():
        # Normalize page and ensure types
        page = str(r["page"])
        views = int(r["views"])
        date_hour = str(r["date_hour"])  # 'YYYY-MM-DD HH:00:00'
        rows.append((page, date_hour, views))

    upsert_sql = """
    INSERT INTO pageviews_aggregated (page, date_hour, views)
    VALUES (%s, %s, %s)
    ON CONFLICT (page, date_hour)
    DO UPDATE SET views = EXCLUDED.views;
    """

    conn = pg.get_conn()
    cur = conn.cursor()
    try:
        cur.executemany(upsert_sql, rows)
        conn.commit()
        print(f"[load_data_to_postgres] Upserted {len(rows)} rows into pageviews_aggregated")
    except Exception as e:
        conn.rollback()
        raise
    finally:
        cur.close()
        conn.close()
