# Product Docs Pipeline

In [None]:
import os
import re
import sys
from pathlib import Path

import pandas as pd
import psycopg2
from dotenv import load_dotenv
from IPython.display import display

# loading all paths
ROOT = Path.cwd()
SRC = ROOT / "src"
if str(SRC) not in sys.path:
    sys.path.insert(0, str(SRC))

load_dotenv(ROOT / ".env")

print("ROOT:", ROOT)
print("DB_HOST:", os.getenv("DB_HOST"))
print("DB_NAME:", os.getenv("DB_NAME"))


ROOT: c:\Users\stlp\product-docs-pipeline
DB_HOST: localhost
DB_NAME: takehome


In [2]:
def connect_db():
    return psycopg2.connect(
        host=os.getenv("DB_HOST"),
        port=int(os.getenv("DB_PORT", "5432")),
        dbname=os.getenv("DB_NAME"),
        user=os.getenv("DB_USER"),
        password=os.getenv("DB_PASSWORD"),
    )

def execute_sql(sql: str):
    with connect_db() as conn, conn.cursor() as cur:
        cur.execute(sql)
        conn.commit()

def query_df(sql: str) -> pd.DataFrame:
    with connect_db() as conn:
        return pd.read_sql(sql, conn)

def load_named_queries(path: Path) -> dict:
    queries = {}
    name = None
    lines = []
    for raw in path.read_text(encoding="utf-8").splitlines():
        stripped = raw.strip()
        if stripped.startswith("-- name:"):
            if name and lines:
                queries[name] = "\n".join(lines).strip()
            name = stripped.split(":", 1)[1].strip()
            lines = []
            continue
        if name:
            lines.append(raw)
    if name and lines:
        queries[name] = "\n".join(lines).strip()
    return queries

print("Helpers loaded")


Helpers loaded


## Step 1: Create/Reset (for re-runs) Base Tables

In [None]:
BASE_SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS candidate_rk_sitemap_staging (
  url TEXT PRIMARY KEY,
  source TEXT,
  lastmod TIMESTAMPTZ,
  discovered_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_candidate_rk_sitemap_staging_source
  ON candidate_rk_sitemap_staging (source);

CREATE INDEX IF NOT EXISTS idx_candidate_rk_sitemap_staging_lastmod
  ON candidate_rk_sitemap_staging (lastmod DESC);

CREATE TABLE IF NOT EXISTS candidate_rk_docs_master (
  url TEXT PRIMARY KEY,
  sources TEXT[] NOT NULL DEFAULT '{}',
  first_seen_at TIMESTAMPTZ,
  last_seen_at TIMESTAMPTZ
);

CREATE INDEX IF NOT EXISTS idx_candidate_rk_docs_master_last_seen
  ON candidate_rk_docs_master (last_seen_at DESC);

CREATE TABLE IF NOT EXISTS candidate_rk_document_content (
  url TEXT PRIMARY KEY,
  etag TEXT,
  last_modified TEXT,
  content_hash TEXT,
  content TEXT,
  content_bytes INTEGER,
  content_type TEXT,
  status_code INTEGER,
  fetched_at TIMESTAMPTZ,
  last_checked_at TIMESTAMPTZ,
  error_message TEXT,
  was_truncated BOOLEAN NOT NULL DEFAULT FALSE,
  is_too_large BOOLEAN NOT NULL DEFAULT FALSE
);

CREATE INDEX IF NOT EXISTS idx_candidate_rk_document_content_status
  ON candidate_rk_document_content (status_code);

CREATE INDEX IF NOT EXISTS idx_candidate_rk_document_content_last_checked
  ON candidate_rk_document_content (last_checked_at DESC);

CREATE INDEX IF NOT EXISTS idx_candidate_rk_document_content_hash
  ON candidate_rk_document_content (content_hash)
  WHERE content_hash IS NOT NULL;
"""

RESET_TABLE_DATA = False # keep true if we want to reset everything and rerun

execute_sql(BASE_SCHEMA_SQL)
execute_sql((ROOT / "src" / "task7" / "task7_create_tables.sql").read_text(encoding="utf-8"))

if RESET_TABLE_DATA:
    execute_sql(
        """
        TRUNCATE TABLE
          alerts,
          pipeline_metrics,
          candidate_rk_document_content,
          candidate_rk_docs_master,
          candidate_rk_sitemap_staging
        RESTART IDENTITY;
        """
    )
    print("Existing data truncated.")

print("Schema ready.")


Schema ready.


## Step 2: Task 1 - Sitemap Extract -> `candidate_rk_sitemap_staging`


In [4]:
from task1.sitemap_extract import main as run_task1

run_task1()
print("Task 1 completed")

t1_count = query_df("SELECT COUNT(*) AS n FROM candidate_rk_sitemap_staging;")
print("candidate_rk_sitemap_staging rows:", int(t1_count.loc[0, "n"]))

t1_preview = query_df("""
    SELECT url, source, lastmod, discovered_at
    FROM candidate_rk_sitemap_staging
    ORDER BY discovered_at DESC
    LIMIT 20;
""")
display(t1_preview)


Processing: https://docs.snowflake.com/en/sitemap.xml
Processing: https://other-docs.snowflake.com/en/sitemap.xml
Done. Visited 2 sitemap files.
Task 1 completed


  return pd.read_sql(sql, conn)


candidate_rk_sitemap_staging rows: 6620


Unnamed: 0,url,source,lastmod,discovered_at
0,https://docs.snowflake.com/en/appendices,https://docs.snowflake.com/en/sitemap.xml,,2026-02-07 17:15:00.867226
1,https://docs.snowflake.com/en/release-notes/20...,https://docs.snowflake.com/en/sitemap.xml,,2026-02-07 17:15:00.867226
2,https://docs.snowflake.com/en/release-notes/20...,https://docs.snowflake.com/en/sitemap.xml,,2026-02-07 17:15:00.867226
3,https://docs.snowflake.com/en/release-notes/20...,https://docs.snowflake.com/en/sitemap.xml,,2026-02-07 17:15:00.867226
4,https://docs.snowflake.com/en/release-notes/20...,https://docs.snowflake.com/en/sitemap.xml,,2026-02-07 17:15:00.867226
5,https://docs.snowflake.com/en/release-notes/20...,https://docs.snowflake.com/en/sitemap.xml,,2026-02-07 17:15:00.867226
6,https://docs.snowflake.com/en/release-notes/20...,https://docs.snowflake.com/en/sitemap.xml,,2026-02-07 17:15:00.867226
7,https://docs.snowflake.com/en/release-notes/20...,https://docs.snowflake.com/en/sitemap.xml,,2026-02-07 17:15:00.867226
8,https://docs.snowflake.com/en/release-notes/20...,https://docs.snowflake.com/en/sitemap.xml,,2026-02-07 17:15:00.867226
9,https://docs.snowflake.com/en/release-notes/20...,https://docs.snowflake.com/en/sitemap.xml,,2026-02-07 17:15:00.867226


## Step 3: Task 2 - Consolidate -> `candidate_rk_docs_master`


In [5]:
from task2.consolidate_docs_master import run_task2_consolidation

run_task2_consolidation()
print("Task 2 completed")

t2_count = query_df("SELECT COUNT(*) AS n FROM candidate_rk_docs_master;")
print("candidate_rk_docs_master rows:", int(t2_count.loc[0, "n"]))

t2_preview = query_df("""
    SELECT url, sources, first_seen_at, last_seen_at
    FROM candidate_rk_docs_master
    ORDER BY last_seen_at DESC NULLS LAST
    LIMIT 20;
""")
display(t2_preview)


Task 2 completed


  return pd.read_sql(sql, conn)


candidate_rk_docs_master rows: 6620


Unnamed: 0,url,sources,first_seen_at,last_seen_at
0,https://docs.snowflake.com/en/collaboration/gu...,[https://docs.snowflake.com/en/sitemap.xml],2026-02-07 17:15:00.867226,2026-02-07 17:15:00.867226
1,https://docs.snowflake.com/en/collaboration/ma...,[https://docs.snowflake.com/en/sitemap.xml],2026-02-07 17:15:00.867226,2026-02-07 17:15:00.867226
2,https://docs.snowflake.com/en/collaboration/po...,[https://docs.snowflake.com/en/sitemap.xml],2026-02-07 17:15:00.867226,2026-02-07 17:15:00.867226
3,https://docs.snowflake.com/en/collaboration/pr...,[https://docs.snowflake.com/en/sitemap.xml],2026-02-07 17:15:00.867226,2026-02-07 17:15:00.867226
4,https://docs.snowflake.com/en/collaboration/pr...,[https://docs.snowflake.com/en/sitemap.xml],2026-02-07 17:15:00.867226,2026-02-07 17:15:00.867226
5,https://docs.snowflake.com/en/collaboration/pr...,[https://docs.snowflake.com/en/sitemap.xml],2026-02-07 17:15:00.867226,2026-02-07 17:15:00.867226
6,https://docs.snowflake.com/en/collaboration/pr...,[https://docs.snowflake.com/en/sitemap.xml],2026-02-07 17:15:00.867226,2026-02-07 17:15:00.867226
7,https://docs.snowflake.com/en/collaboration/pr...,[https://docs.snowflake.com/en/sitemap.xml],2026-02-07 17:15:00.867226,2026-02-07 17:15:00.867226
8,https://docs.snowflake.com/en/collaboration/pr...,[https://docs.snowflake.com/en/sitemap.xml],2026-02-07 17:15:00.867226,2026-02-07 17:15:00.867226
9,https://docs.snowflake.com/en/collaboration/pr...,[https://docs.snowflake.com/en/sitemap.xml],2026-02-07 17:15:00.867226,2026-02-07 17:15:00.867226


## Step 4: Task 3 - Content Ingest -> `candidate_rk_document_content`


In [12]:
from pipeline.ingest import run_content_ingest

task3_stats = run_content_ingest(batch_size=200, host_delay=0.30, max_bytes=1_000_000)
print("Task 3 stats:", task3_stats)

t3_status_breakdown = query_df("""
    SELECT status_code, COUNT(*) AS n
    FROM candidate_rk_document_content
    GROUP BY status_code
    ORDER BY n DESC;
""")
print("candidate_rk_document_content status breakdown")
display(t3_status_breakdown)

t3_preview = query_df("""
    SELECT *
    FROM candidate_rk_document_content
    ORDER BY last_checked_at DESC NULLS LAST
    LIMIT 20;
""")
display(t3_preview)


Task 3 stats: {'processed': 0, 'ok200': 0, 'ok304': 0, 'err': 0}
candidate_rk_document_content status breakdown


  return pd.read_sql(sql, conn)


Unnamed: 0,status_code,n
0,200,6620


  return pd.read_sql(sql, conn)


Unnamed: 0,url,etag,last_modified,content_hash,content,content_bytes,content_type,status_code,fetched_at,last_checked_at,error_message,was_truncated,is_too_large
0,https://docs.snowflake.com/en/user-guide/table...,"""wxrfhqysiceen2""",,e3b1734a34e7e7fe7fa30af0d2db2be0b42eb0aac85979...,"<!DOCTYPE html><html lang=""en""><head><meta cha...",672274,text/html; charset=utf-8,200,2026-02-08 02:11:36.955064,2026-02-08 02:11:36.955064,,False,False
1,https://docs.snowflake.com/en/user-guide/snowf...,"""117cm83eih1ci0u""",,eb28bc3b01163cf9c60dc54e34c01edac404caaceb4cb0...,"<!DOCTYPE html><html lang=""en""><head><meta cha...",583284,text/html; charset=utf-8,200,2026-02-08 02:11:36.650760,2026-02-08 02:11:36.650760,,False,False
2,https://docs.snowflake.com/en/user-guide/tutor...,"""avfn0syq4t201q""",,96c39562236d6388e479ab27147c123e10f8d741eb2e51...,"<!DOCTYPE html><html lang=""en""><head><meta cha...",93416,text/html; charset=utf-8,200,2026-02-08 02:11:36.347400,2026-02-08 02:11:36.347400,,False,False
3,https://docs.snowflake.com/en/user-guide/searc...,"""11ratjb6v8kc2e4""",,35a138a9040ec72f65601962c912de45b9c55a4d858351...,"<!DOCTYPE html><html lang=""en""><head><meta cha...",563022,text/html; charset=utf-8,200,2026-02-08 02:11:36.044624,2026-02-08 02:11:36.044624,,False,False
4,https://docs.snowflake.com/en/user-guide/table...,"""c7vlgvwx8kdupo""",,b75a2124d6c3bb9a80f8743c4291c4ff565390766c92e0...,"<!DOCTYPE html><html lang=""en""><head><meta cha...",646440,text/html; charset=utf-8,200,2026-02-08 02:11:35.743604,2026-02-08 02:11:35.743604,,False,False
5,https://docs.snowflake.com/en/user-guide/ui-sn...,"""kts2b5gdb3cuiq""",,816af7de5016d083a806fec72f9c95e0df5a5431034cc5...,"<!DOCTYPE html><html lang=""en""><head><meta cha...",599548,text/html; charset=utf-8,200,2026-02-08 02:11:35.440501,2026-02-08 02:11:35.440501,,False,False
6,https://docs.snowflake.com/en/user-guide/snowf...,"""my7loj2m80c6ll""",,25049c138277e886f6e011be49f55e89e448ff1be0e6a9...,"<!DOCTYPE html><html lang=""en""><head><meta cha...",568487,text/html; charset=utf-8,200,2026-02-08 02:11:35.139441,2026-02-08 02:11:35.139441,,False,False
7,https://docs.snowflake.com/en/user-guide/views...,"""csqsig9vj7ddcj""",,4cbba84393bad6b9b25d58b6448e19294ad90aca0ff93a...,"<!DOCTYPE html><html lang=""en""><head><meta cha...",623921,text/html; charset=utf-8,200,2026-02-08 02:11:34.835424,2026-02-08 02:11:34.835424,,False,False
8,https://docs.snowflake.com/en/user-guide/snowp...,"""cdnn9wl9xtcoa3""",,f41be7699f1108188920c2350018f537ac137925b95010...,"<!DOCTYPE html><html lang=""en""><head><meta cha...",591451,text/html; charset=utf-8,200,2026-02-08 02:11:34.500366,2026-02-08 02:11:34.500366,,False,False
9,https://docs.snowflake.com/en/user-guide/stora...,"""odsiv66g96butd""",,91c1acf77773cb2270b728d385e4db52f4c6036329daf5...,"<!DOCTYPE html><html lang=""en""><head><meta cha...",553211,text/html; charset=utf-8,200,2026-02-08 02:11:34.050876,2026-02-08 02:11:34.050876,,False,False


## Step 5: Task 4 - Analytics Queries


In [7]:
task4_path = ROOT / "src" / "task4" / "task4_analytics_queries.sql"
task4_queries = load_named_queries(task4_path)

task4_results = {}
for name, sql in task4_queries.items():
    df = query_df(sql)
    task4_results[name] = df
    print(f"\n[{name}] rows={len(df)}")
    display(df.head(20))



[source_counts] rows=2


  return pd.read_sql(sql, conn)


Unnamed: 0,source,doc_count
0,https://docs.snowflake.com/en/sitemap.xml,6617
1,https://other-docs.snowflake.com/en/sitemap.xml,3



[monthly_distribution] rows=1


  return pd.read_sql(sql, conn)


Unnamed: 0,month,doc_count
0,2026-02-01,6620


  return pd.read_sql(sql, conn)



[success_rate] rows=2


Unnamed: 0,source,success_rate
0,https://docs.snowflake.com/en/sitemap.xml,1.0
1,https://other-docs.snowflake.com/en/sitemap.xml,1.0


  return pd.read_sql(sql, conn)



[top_paths] rows=10


Unnamed: 0,path_segment,freq
0,sql-reference,2152
1,user-guide,1605
2,release-notes,1351
3,developer-guide,847
4,migrations,459
5,connectors,108
6,collaboration,60
7,progaccess,3
8,index,2
9,search,2



[stale_docs] rows=1


  return pd.read_sql(sql, conn)


Unnamed: 0,stale_count,stale_percentage
0,0,0.0


## Step 6: Task 5 - Optimization Scenario Queries (Runnable Set)


In [8]:
task5_queries = {
    "scenario1_docs_modified_last_7d": """
        WITH lastmod_per_url AS (
          SELECT url, MAX(lastmod) AS lastmod
          FROM candidate_rk_sitemap_staging
          GROUP BY url
        ),
        effective_ts AS (
          SELECT dm.url, COALESCE(lpu.lastmod, dm.last_seen_at) AS effective_modified_ts
          FROM candidate_rk_docs_master dm
          LEFT JOIN lastmod_per_url lpu ON lpu.url = dm.url
        )
        SELECT COUNT(*) AS docs_modified_last_7d
        FROM effective_ts
        WHERE effective_modified_ts >= (NOW() - INTERVAL '7 days');
    """,
    "scenario2_source_url_count_and_mean_bytes": """
        WITH content_per_url AS (
          SELECT
            url,
            MAX(content_bytes) FILTER (WHERE status_code = 200) AS content_bytes_ok
          FROM candidate_rk_document_content
          GROUP BY url
        ),
        url_facts AS (
          SELECT dm.url, dm.sources, cpu.content_bytes_ok
          FROM candidate_rk_docs_master dm
          LEFT JOIN content_per_url cpu ON cpu.url = dm.url
        )
        SELECT
          s.source_identifier,
          COUNT(DISTINCT uf.url) AS unique_url_count,
          AVG(uf.content_bytes_ok)::numeric(18,2) AS mean_content_bytes
        FROM url_facts uf
        CROSS JOIN LATERAL UNNEST(uf.sources) AS s(source_identifier)
        GROUP BY s.source_identifier
        ORDER BY unique_url_count DESC, s.source_identifier;
    """,
    "scenario3_duplicate_hashes": """
        SELECT
          content_hash,
          COUNT(*) AS url_rows,
          COUNT(DISTINCT url) AS distinct_urls
        FROM candidate_rk_document_content
        WHERE status_code = 200
          AND content_hash IS NOT NULL
        GROUP BY content_hash
        HAVING COUNT(DISTINCT url) > 1
        ORDER BY distinct_urls DESC, content_hash
        LIMIT 50;
    """,
}

for name, sql in task5_queries.items():
    df = query_df(sql)
    print(f"\n[{name}] rows={len(df)}")
    display(df.head(20))



[scenario1_docs_modified_last_7d] rows=1


  return pd.read_sql(sql, conn)


Unnamed: 0,docs_modified_last_7d
0,6620


  return pd.read_sql(sql, conn)



[scenario2_source_url_count_and_mean_bytes] rows=2


Unnamed: 0,source_identifier,unique_url_count,mean_content_bytes
0,https://docs.snowflake.com/en/sitemap.xml,6617,600953.22
1,https://other-docs.snowflake.com/en/sitemap.xml,3,431383.0


  return pd.read_sql(sql, conn)



[scenario3_duplicate_hashes] rows=5


Unnamed: 0,content_hash,url_rows,distinct_urls
0,01d798f0aa05b0953bfc8310d2fcb8345ae8713a9c0a9a...,5,5
1,7246af3bc707a423ca3f3af71c01847593995fa42c265d...,2,2
2,a1012897adffdf90854b6299992275d4b8215eaad9a90e...,2,2
3,df68b73a6e3b298f6ba3e4fd317e2e9443827d7e180875...,2,2
4,f47c00bae5be44e167631b0b4f528367e5a9b5ff1cb04d...,2,2


## Step 7: Task 6 - Tests


In [9]:
import subprocess

result = subprocess.run(
    [sys.executable, "-m", "pytest", "src/task6", "-q"],
    capture_output=True,
    text=True,
)

print(result.stdout)
if result.stderr:
    print("STDERR:\n", result.stderr)
print("pytest exit code:", result.returncode)

if result.returncode == 0:
    print("Task 6 tests passed")
else:
    print("Task 6 tests failed - inspect output above")


[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m                                                                   [100%][0m

pytest exit code: 0
Task 6 tests passed


## Step 8: Task 7 - Observability Wrapper + Alerts


In [10]:
from task7.run_ingest_with_observability import main as run_task7

run_task7()
print("Task 7 completed")

t7_latest_metric = query_df("""
    SELECT metric_id, pipeline_name, run_started_at, run_finished_at,
           processed_count, ok200_count, ok304_count, error_count, error_rate, run_duration_ms
    FROM pipeline_metrics
    ORDER BY metric_id DESC
    LIMIT 1;
""")
print("Latest pipeline_metrics row")
display(t7_latest_metric)

t7_latest_alerts = query_df("""
    SELECT alert_id, metric_id, pipeline_name, alert_type, severity, message, triggered_at
    FROM alerts
    ORDER BY alert_id DESC
    LIMIT 10;
""")
print("Latest alerts")
display(t7_latest_alerts)


Run stats: {'processed': 0, 'ok200': 0, 'ok304': 0, 'err': 0}
Run duration (ms): 97
Metric row saved with metric_id=2
Alerts created: 1
Task 7 completed
Latest pipeline_metrics row


  return pd.read_sql(sql, conn)


Unnamed: 0,metric_id,pipeline_name,run_started_at,run_finished_at,processed_count,ok200_count,ok304_count,error_count,error_rate,run_duration_ms
0,2,content_ingest,2026-02-08 22:55:17.245735,2026-02-08 22:55:17.342536,0,0,0,0,0.0,97


Latest alerts


  return pd.read_sql(sql, conn)


Unnamed: 0,alert_id,metric_id,pipeline_name,alert_type,severity,message,triggered_at
0,2,2,content_ingest,empty_result_set,warning,Pipeline run processed 0 URLs.,2026-02-08 14:55:17.412890
1,1,1,content_ingest,empty_result_set,warning,Pipeline run processed 0 URLs.,2026-02-08 00:43:29.582357


## Step 9: Task 8 - Export Outputs


In [None]:
out_xlsx = ROOT / "src" / "task8" / "rk_pipeline_analytics_rerun.xlsx"
with pd.ExcelWriter(out_xlsx, engine="openpyxl") as writer:
    for tab_name, df in task4_results.items():
        safe_name = re.sub(r"[^A-Za-z0-9_]+", "_", tab_name)[:31] or "sheet1"
        df.to_excel(writer, sheet_name=safe_name, index=False)

print("Local Excel written:", out_xlsx)
if out_xlsx.exists():
    print("Excel size (bytes):", out_xlsx.stat().st_size)

RUN_TASK8_GOOGLE_SHEETS = False
if RUN_TASK8_GOOGLE_SHEETS:
    from task8.export_to_sheets import main as run_task8
    run_task8()
    print("Task 8 Google Sheets export completed")
else:
    print("Skipped Google Sheets export (set RUN_TASK8_GOOGLE_SHEETS=True to enable).")

print("Task 8 sheets generated from query names:", list(task4_results.keys()))


## Step 10: Final Sanity Checks


In [11]:
checks = {
    "sitemap_staging_count": "SELECT COUNT(*) AS n FROM candidate_rk_sitemap_staging;",
    "docs_master_count": "SELECT COUNT(*) AS n FROM candidate_rk_docs_master;",
    "document_content_count": "SELECT COUNT(*) AS n FROM candidate_rk_document_content;",
    "latest_metrics": "SELECT * FROM pipeline_metrics ORDER BY metric_id DESC LIMIT 5;",
    "latest_alerts": "SELECT * FROM alerts ORDER BY alert_id DESC LIMIT 10;",
}

for name, sql in checks.items():
    df = query_df(sql)
    print(f"\n[{name}] rows={len(df)}")
    display(df)



[sitemap_staging_count] rows=1


  return pd.read_sql(sql, conn)


Unnamed: 0,n
0,6620



[docs_master_count] rows=1


  return pd.read_sql(sql, conn)


Unnamed: 0,n
0,6620


  return pd.read_sql(sql, conn)



[document_content_count] rows=1


Unnamed: 0,n
0,6620



[latest_metrics] rows=2


  return pd.read_sql(sql, conn)


Unnamed: 0,metric_id,pipeline_name,run_started_at,run_finished_at,processed_count,ok200_count,ok304_count,error_count,error_rate,run_duration_ms,created_at
0,2,content_ingest,2026-02-08 22:55:17.245735,2026-02-08 22:55:17.342536,0,0,0,0,0.0,97,2026-02-08 14:55:17.412890
1,1,content_ingest,2026-02-08 08:43:29.071210,2026-02-08 08:43:29.516639,0,0,0,0,0.0,442,2026-02-08 00:43:29.582357



[latest_alerts] rows=2


  return pd.read_sql(sql, conn)


Unnamed: 0,alert_id,metric_id,pipeline_name,alert_type,severity,message,details,triggered_at
0,2,2,content_ingest,empty_result_set,warning,Pipeline run processed 0 URLs.,{'processed_count': 0},2026-02-08 14:55:17.412890
1,1,1,content_ingest,empty_result_set,warning,Pipeline run processed 0 URLs.,{'processed_count': 0},2026-02-08 00:43:29.582357


---
## Project 2: GitHub Contributor Pipeline (Task-wise Rerun)
Run each cell below in order. Existing notebook content above is unchanged.


In [6]:
import os
import sys
from pathlib import Path

import pandas as pd
import psycopg2
from dotenv import load_dotenv

ROOT = Path.cwd()
SRC = ROOT / "src"
if str(SRC) not in sys.path:
    sys.path.insert(0, str(SRC))

load_dotenv(ROOT / ".env")

def p2_env_required(name: str) -> str:
    v = os.getenv(name)
    if not v:
        raise RuntimeError(f"Missing required env var: {name}")
    return v

def p2_db_conn():
    return psycopg2.connect(
        host=p2_env_required("DB_HOST"),
        port=int(os.getenv("DB_PORT", "5432")),
        dbname=p2_env_required("DB_NAME"),
        user=p2_env_required("DB_USER"),
        password=p2_env_required("DB_PASSWORD"),
        connect_timeout=8,
    )

def p2_query_df(sql: str) -> pd.DataFrame:
    with p2_db_conn() as conn:
        return pd.read_sql(sql, conn)

def p2_run_sql_file(path: Path):
    with p2_db_conn() as conn, conn.cursor() as cur:
        cur.execute(path.read_text(encoding="utf-8"))
        conn.commit()

print("Project 2 setup loaded")
print("ROOT:", ROOT)
print("DB_HOST:", os.getenv("DB_HOST"))
print("DB_NAME:", os.getenv("DB_NAME"))
print("GH_OWNER:", os.getenv("GH_OWNER"))
print("GH_REPO:", os.getenv("GH_REPO"))


Project 2 setup loaded
ROOT: c:\Users\stlp\Desktop\github-contributor-pipeline
DB_HOST: localhost
DB_NAME: github_pipeline
GH_OWNER: apache
GH_REPO: airflow


### Project 2 - Pre-Step: Create Raw Tables (if missing)


In [7]:
P2_BASE_SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS commits_raw (
  sha TEXT PRIMARY KEY,
  author_login TEXT,
  commit_date TIMESTAMPTZ,
  raw JSONB NOT NULL
);

CREATE TABLE IF NOT EXISTS pulls_raw (
  id BIGINT PRIMARY KEY,
  number INTEGER NOT NULL,
  author_login TEXT,
  created_at TIMESTAMPTZ,
  raw JSONB NOT NULL
);

CREATE TABLE IF NOT EXISTS comments_raw (
  id BIGINT PRIMARY KEY,
  author_login TEXT,
  created_at TIMESTAMPTZ,
  raw JSONB NOT NULL
);

CREATE TABLE IF NOT EXISTS reviews_raw (
  id BIGINT PRIMARY KEY,
  pull_number INTEGER NOT NULL,
  reviewer_login TEXT,
  submitted_at TIMESTAMPTZ,
  raw JSONB NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_commits_raw_author ON commits_raw(author_login);
CREATE INDEX IF NOT EXISTS idx_pulls_raw_author ON pulls_raw(author_login);
CREATE INDEX IF NOT EXISTS idx_comments_raw_author ON comments_raw(author_login);
CREATE INDEX IF NOT EXISTS idx_reviews_raw_reviewer ON reviews_raw(reviewer_login);
"""

with p2_db_conn() as conn, conn.cursor() as cur:
    cur.execute(P2_BASE_SCHEMA_SQL)
    conn.commit()

print("Project 2 raw tables are ready.")


Project 2 raw tables are ready.


### Project 2 - Task 1: Ingest GitHub Data -> Raw Tables


In [8]:
import time
import requests
import ingestion.ingest_github as ig

def p2_request_with_retry(url: str, params: dict):
    headers = ig.gh_headers()
    max_tries = 8
    backoff = 2.0

    for attempt in range(1, max_tries + 1):
        try:
            resp = requests.get(url, headers=headers, params=params, timeout=(15, 180))
        except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
            if attempt == max_tries:
                raise RuntimeError(f"Network error after retries for {url}: {e}") from e
            sleep_s = min(60, backoff)
            print(f"[network-retry] {type(e).__name__} attempt {attempt}/{max_tries}; sleeping {sleep_s}s")
            time.sleep(sleep_s)
            backoff *= 1.8
            continue

        if resp.status_code == 403 and "rate limit" in resp.text.lower():
            reset = resp.headers.get("X-RateLimit-Reset")
            if reset:
                sleep_s = max(1, int(reset) - int(time.time()) + 2)
                print(f"[rate-limit] sleeping {sleep_s}s")
                time.sleep(sleep_s)
                continue

        if resp.status_code in (500, 502, 503, 504):
            if attempt == max_tries:
                raise RuntimeError(f"GitHub {resp.status_code} after retries for {url}")
            sleep_s = min(60, backoff)
            print(f"[http-retry] {resp.status_code} attempt {attempt}/{max_tries}; sleeping {sleep_s}s")
            time.sleep(sleep_s)
            backoff *= 1.8
            continue

        if resp.status_code >= 400:
            raise RuntimeError(
                f"GitHub API error {resp.status_code} for {url} params={params}. "
                f"Response: {resp.text[:500]}"
            )

        return resp

    raise RuntimeError(f"Failed after retries for {url}")

# Patch for this notebook session
ig.request_with_retry = p2_request_with_retry

owner = ig.env_required("GH_OWNER")
repo = ig.env_required("GH_REPO")

print("Task 1 started: staged ingestion with robust retries")
ig.ingest_commits(owner, repo)
ig.ingest_pulls(owner, repo)
ig.ingest_pull_comments(owner, repo)
ig.ingest_reviews_for_all_prs(owner, repo)
ig.print_row_counts()

task1_counts = p2_query_df("""
SELECT 'commits_raw' AS table_name, COUNT(*)::bigint AS row_count FROM commits_raw
UNION ALL
SELECT 'pulls_raw', COUNT(*)::bigint FROM pulls_raw
UNION ALL
SELECT 'comments_raw', COUNT(*)::bigint FROM comments_raw
UNION ALL
SELECT 'reviews_raw', COUNT(*)::bigint FROM reviews_raw
ORDER BY table_name;
""")

print("Task 1 completed")
display(task1_counts)


Task 1 started: staged ingestion with robust retries
Ingesting commits...
commits_raw upserted: 35784
Ingesting pulls (state=all)...
[http-retry] 500 attempt 1/8; sleeping 2.0s
[http-retry] 500 attempt 1/8; sleeping 2.0s
pulls_raw upserted (deduped): 42465
Ingesting pull request review comments...
[http-retry] 502 attempt 1/8; sleeping 2.0s
[http-retry] 502 attempt 1/8; sleeping 2.0s
[http-retry] 502 attempt 1/8; sleeping 2.0s
[http-retry] 502 attempt 2/8; sleeping 3.6s
[network-retry] ConnectionError attempt 1/8; sleeping 2.0s
[http-retry] 502 attempt 1/8; sleeping 2.0s
[network-retry] ConnectionError attempt 1/8; sleeping 2.0s
comments_raw upserted: 117296
Ingesting reviews per PR...
  inserted reviews batch at PR 200/42465
  inserted reviews batch at PR 400/42465
  inserted reviews batch at PR 600/42465
  inserted reviews batch at PR 800/42465
  inserted reviews batch at PR 1000/42465
  inserted reviews batch at PR 1200/42465
  inserted reviews batch at PR 1400/42465
  inserted revi

  return pd.read_sql(sql, conn)


Task 1 completed


Unnamed: 0,table_name,row_count
0,comments_raw,117296
1,commits_raw,35784
2,pulls_raw,42465
3,reviews_raw,149681


### Project 2 - Task 2: Transform Raw Data -> `contributor_analytics`


In [10]:
p2_run_sql_file(ROOT / "src" / "transform" / "transform.sql")

print("Task 2 completed")
preview_df = p2_query_df("""
SELECT author, commits, prs, comments, reviews, score, tier, overall_rank, tier_rank, percentile
FROM contributor_analytics
ORDER BY score DESC, author ASC
LIMIT 10;
""")
display(preview_df)


Task 2 completed


  return pd.read_sql(sql, conn)


Unnamed: 0,author,commits,prs,comments,reviews,score,tier,overall_rank,tier_rank,percentile
0,0ex-d,0,0,11,29,100,observer,1,1,12.18
1,0lai0,22,31,7,7,100,core,2,1,12.18
2,22quinn,13,13,17,14,100,core,3,2,12.18
3,aa3pankaj,4,6,6,6,100,active,4,1,12.18
4,Aakcht,55,55,27,28,100,core,5,3,12.18
5,aaltay,1,1,67,31,100,contributor,6,1,12.18
6,aaron-wolmutt,7,8,23,23,100,active,7,2,12.18
7,Abdur-rahmaanJ,7,7,5,5,100,active,8,3,12.18
8,abhijeets25012-tech,2,10,11,12,100,active,9,4,12.18
9,Abhishek-kumar-ISM,6,12,10,10,100,active,10,5,12.18


### Project 2 - Required Output Cells (for proof)


In [11]:
print("1) Top 10 contributors (by score)")
top10_df = p2_query_df("""
SELECT author, commits, prs, comments, reviews, score, tier
FROM contributor_analytics
ORDER BY score DESC, author ASC
LIMIT 10;
""")
display(top10_df)

print("2) Tier distribution (contributor count per tier)")
tier_df = p2_query_df("""
SELECT tier, COUNT(*) AS contributor_count
FROM contributor_analytics
GROUP BY tier
ORDER BY
  CASE tier
    WHEN 'core' THEN 1
    WHEN 'active' THEN 2
    WHEN 'contributor' THEN 3
    WHEN 'observer' THEN 4
  END;
""")
display(tier_df)

print("3) Summary: total contributors, min/max score, count achieving max score")
summary_df = p2_query_df("""
WITH stats AS (
  SELECT
    COUNT(*) AS total_contributors,
    MIN(score) AS min_score,
    MAX(score) AS max_score
  FROM contributor_analytics
)
SELECT
  s.total_contributors,
  s.min_score,
  s.max_score,
  (SELECT COUNT(*) FROM contributor_analytics WHERE score = s.max_score) AS count_achieving_max_score
FROM stats s;
""")
display(summary_df)


1) Top 10 contributors (by score)


  return pd.read_sql(sql, conn)


Unnamed: 0,author,commits,prs,comments,reviews,score,tier
0,0ex-d,0,0,11,29,100,observer
1,0lai0,22,31,7,7,100,core
2,22quinn,13,13,17,14,100,core
3,aa3pankaj,4,6,6,6,100,active
4,Aakcht,55,55,27,28,100,core
5,aaltay,1,1,67,31,100,contributor
6,aaron-wolmutt,7,8,23,23,100,active
7,Abdur-rahmaanJ,7,7,5,5,100,active
8,abhijeets25012-tech,2,10,11,12,100,active
9,Abhishek-kumar-ISM,6,12,10,10,100,active


2) Tier distribution (contributor count per tier)


  return pd.read_sql(sql, conn)


Unnamed: 0,tier,contributor_count
0,core,279
1,active,844
2,contributor,4063
3,observer,392


  return pd.read_sql(sql, conn)


3) Summary: total contributors, min/max score, count achieving max score


Unnamed: 0,total_contributors,min_score,max_score,count_achieving_max_score
0,5578,2,100,680


In [13]:
import os
from pathlib import Path
os.environ["GOOGLE_SERVICE_ACCOUNT_JSON"] = str(Path.cwd() / "service_account.json")


### Project 2 - Task 3: Export to Google Sheets


In [14]:
from export.export_to_sheets import run_export

run_export()
print("Task 3 completed")


Exported 5578 rows to Google Sheet tab: contributor_analytics
Task 3 completed
