In [1]:
import re
from datetime import datetime
from pathlib import Path

import duckdb
import httpx
import marimo as mo
import pandas as pd
import sqlglot
from loguru import logger
from sqlglot import exp

In [None]:
SQL_PATH = Path.cwd().parent / "sql"
DATA_PATH = Path.cwd().parent / "data"

# PAPAL_DUCKDB = DATA_PATH / "papal_data.duckdb"

In [3]:
def is_sql_safe(sql: str) -> bool:
    """
    Checks SQL for dangerous statements using SQLGlot.
    Flags:
      - DROP TABLE
      - DROP DATABASE
      - TRUNCATE
      - ALTER USER
      - UPDATE or DELETE without WHERE clause
    Returns:
      True if SQL is considered safe, False otherwise.
    """
    try:
        statements = sqlglot.parse(sql)
    except Exception as e:
        print(f"SQL parsing error: {e}")
        return False

    for stmt in statements:
        # DROP TABLE or DROP DATABASE
        if isinstance(stmt, exp.Drop):
            drop_type = stmt.args.get("kind")
            if drop_type and drop_type.upper() in {"TABLE", "DATABASE"}:
                print(f"Warning: Detected DROP {drop_type.upper()} statement.")
                return False
        # TRUNCATE
        if isinstance(stmt, exp.Command):
            command = stmt.args.get("this")
            if command and str(command).upper() == "TRUNCATE":
                print("Warning: Detected TRUNCATE statement.")
                return False
        # ALTER USER
        if isinstance(stmt, exp.Alter):
            kind = stmt.args.get("kind")
            if kind and str(kind).upper() == "USER":
                print("Warning: Detected ALTER USER statement.")
                return False
        # UPDATE/DELETE without WHERE
        if isinstance(stmt, (exp.Update, exp.Delete)):
            if not stmt.args.get("where"):
                print(f"Warning: {stmt.key.upper()} statement without WHERE clause detected.")
                return False

    return True


In [4]:
def run_sql_file(con, sql_path, params=None, check_safety=True, from_loader=False):
    """
    Executes SQL statements from a file on the given DuckDB connection.

    Args:
        con: DuckDB connection object.
        sql_path: Path to the .sql file.
        params: Optional dict for parameter substitution (using {key} syntax).
        check_safety: If True, checks for dangerous SQL statements.
        from_loader: If True, disables safety checks since `load_popes_to_duckdb` manages it.
    """
    sql_path = Path(sql_path)
    sql = sql_path.read_text(encoding="utf-8")
    if params:
        sql = sql.format(**params)

    if from_loader:
        logger.info("Skipping safety check as run from load_popes_to_duckdb")
    elif check_safety and not is_sql_safe(sql):
        raise ValueError("Aborting: potentially dangerous SQL detected.")

    try:
        con.execute(sql)
        logger.info(f"Successfully executed SQL from {sql_path}")
    except Exception as e:
        logger.error(f"SQL execution error: {e}")
        raise



In [5]:
def get_popes_csv_snapshot(force_refresh=False, snapshot_dir=DATA_PATH / "snapshots"):
    url = "https://raw.githubusercontent.com/ksreyes/popes/master/popes.csv"
    now = datetime.now()
    timestamp = now.strftime("%Y-%m-%d_%H%M")
    snapshot_dir = Path(snapshot_dir)
    snapshot_dir.mkdir(parents=True, exist_ok=True)
    snapshot_file = snapshot_dir / f"popes_{timestamp}.csv"    # CSV name format: popes_YYYY-MM-DD_HHMM.csv

    # Find latest snapshot if not force_refresh
    if not force_refresh:
        candidates = sorted(snapshot_dir.glob("popes_*.csv"), reverse=True)
        if candidates:
            logger.info(f"Using cached snapshot: {candidates[0]}")
            return candidates[0]
    
    # Download and save snapshot
    logger.info(f"Downloading latest CSV from {url}")
    resp = httpx.get(url)
    resp.raise_for_status()
    snapshot_file.write_bytes(resp.content)
    logger.info(f"Saved snapshot: {snapshot_file}")
    return snapshot_file


In [6]:
def load_popes_to_duckdb(
    con: duckdb.DuckDBPyConnection,
    csv_path: Path,
    sql_path: Path = SQL_PATH / "create_popes_table.sql",
    force_refresh: bool = False
) -> tuple[int, pd.DataFrame]:
    """
    Loads popes data into DuckDB using a SQL file template with safety checks.
    
    Args:
        con: DuckDB connection
        csv_path: Path to CSV snapshot
        sql_path: Path to SQL template file (default: create_popes_table.sql)
        force_refresh: If True, re-creates sequence and table
    
    Returns:
        Tuple of (row_count, dataframe)
    """
    logger.info(f"Starting load_popes_to_duckdb with csv_path={csv_path}, force_refresh={force_refresh}")
    
    if force_refresh:
        logger.info("force_refresh is True: dropping sequence and table")
        con.execute("DROP SEQUENCE IF EXISTS popes_id_seq;")
        con.execute("DROP TABLE IF EXISTS popes;")  # Drop the table if refresh
    
    # Parameter substitution for SQL template
    params = {"csv_path": csv_path.as_posix()}
    
    # Execute SQL file to CREATE the table
    logger.info(f"Executing SQL file: {sql_path}")
    run_sql_file(con, sql_path, params=params, check_safety=False, from_loader=True)
    
    # Return results
    row_count_df = con.sql("SELECT COUNT(*) FROM popes").df()
    row_count = row_count_df.iloc[0, 0]
    logger.info(f"Loaded {row_count} rows into popes table")
    
    df = con.sql("SELECT * FROM popes").df()
    logger.info(f"Returning dataframe with shape: {df.shape}")
    
    return (row_count, df)


# Papal Data Pipeline Notebook

This notebook documents and executes the process of loading, cleaning, and previewing papal data tables into a persistent DuckDB database.

**Data sources are cited with each table.**

> **Last updated:** 2025-04-21

## 1. Popes Table

**Source:** [ksreyes/popes (GitHub)](https://github.com/ksreyes/popes)
**Raw CSV:** https://raw.githubusercontent.com/ksreyes/popes/master/popes.csv

*Note: This table includes all popes from St. Peter to Pope Francis.*

In [7]:
# Get snapshot (set force_refresh=True to fetch a new one)

logger.info((DATA_PATH / "snapshots"))

assert (DATA_PATH / "snapshots").exists()

csv_file = get_popes_csv_snapshot(force_refresh=False, snapshot_dir=DATA_PATH / "snapshots")

[32m2025-04-22 13:33:27.376[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m3[0m - [1m/Users/mjboothaus/code/github/databooth/conclave/data/snapshots[0m
[32m2025-04-22 13:33:27.378[0m | [1mINFO    [0m | [36m__main__[0m:[36mget_popes_csv_snapshot[0m:[36m13[0m - [1mUsing cached snapshot: /Users/mjboothaus/code/github/databooth/conclave/data/snapshots/popes_2025-04-22_1231.csv[0m


In [None]:
#con = duckdb.connect(str(PAPAL_DUCKDB))
con = duckdb.connect()   # Memory database

In [9]:
n_pope, pope_df = load_popes_to_duckdb(con, csv_file)
logger.info(f"popes table: {n_pope} rows")

[32m2025-04-22 13:33:27.401[0m | [1mINFO    [0m | [36m__main__[0m:[36mload_popes_to_duckdb[0m:[36m19[0m - [1mStarting load_popes_to_duckdb with csv_path=/Users/mjboothaus/code/github/databooth/conclave/data/snapshots/popes_2025-04-22_1231.csv, force_refresh=False[0m
[32m2025-04-22 13:33:27.402[0m | [1mINFO    [0m | [36m__main__[0m:[36mload_popes_to_duckdb[0m:[36m30[0m - [1mExecuting SQL file: /Users/mjboothaus/code/github/databooth/conclave/sql/create_popes_table.sql[0m
[32m2025-04-22 13:33:27.402[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun_sql_file[0m:[36m18[0m - [1mSkipping safety check as run from load_popes_to_duckdb[0m
[32m2025-04-22 13:33:27.429[0m | [31m[1mERROR   [0m | [36m__main__[0m:[36mrun_sql_file[0m:[36m26[0m - [31m[1mSQL execution error: Catalog Error: an index with that name already exists for this table: PRIMARY_popes_id[0m


CatalogException: Catalog Error: an index with that name already exists for this table: PRIMARY_popes_id

In [None]:
# Preview (latest Popes first)

con.sql("SELECT * FROM popes ORDER BY number DESC LIMIT 5").df()

### `popes` Table pipeline

This DuckDB pipeline robustly ingests and cleans the popes dataset in a single step, handling reserved keywords, missing values, and date conversions automatically during import, resulting in a reliable and analysis-ready table.

The query creates (or replaces) the `popes` table in DuckDB by reading directly from a remote CSV file and performing several key data cleaning and transformation steps "on-the-fly":

```sql
CREATE OR REPLACE TABLE popes AS
SELECT
number,
name_full,
name,
suffix,
canonization,
CAST(birth AS DATE) AS birth_date,
CAST(start AS DATE) AS reign_start,
CAST("end" AS DATE) AS reign_end,
age_start,
age_end,
tenure
FROM read_csv(
'https://raw.githubusercontent.com/ksreyes/popes/master/popes.csv',
nullstr=['NA']
);
```

#### Issues addressed in this SQL (DuckDB) pipeline

- **Reserved Keyword Handling:**
  The original CSV uses `end` as a column name, which is a reserved SQL keyword. By quoting it as `"end"` in the query, DuckDB is able to correctly reference and process this column without syntax errors.

- **Missing Value Normalisation:**
  The CSV uses the string `"NA"` to represent missing values. By specifying `nullstr=['NA']`, DuckDB automatically converts all `"NA"` entries to SQL `NULL`, preventing type conversion errors and ensuring missing data is handled consistently.

- **Date Type Conversion:**
  The `birth`, `start`, and `end` columns are stored as ISO 8601 timestamp strings in the CSV. The query uses `CAST(... AS DATE)` to convert these columns to DuckDB's native `DATE` type, enabling accurate date-based calculations and queries.

- **On-the-fly Schema Definition:**
  By selecting and casting columns within the `SELECT` statement, the pipeline defines a clean schema for the `popes` table, ensuring that each column has the correct type and name upon import.

- **Idempotent Table Creation:**
  The use of `CREATE OR REPLACE TABLE` ensures that the pipeline can be rerun safely: if the table already exists, it will be replaced, keeping the workflow reproducible and up-to-date.

In [None]:
pope_df

### Custom update data for recent death of  Pope Francis' End of reign date (21 April 2025)

In [None]:
def update_pope_francis_end_of_reign(con, sql_path=SQL_PATH / "update_pope_francis_details.sql"):
    logger.info("Starting update_pope_francis_end_of_reign")
    # Find Francis's unique id
    logger.info("Looking up Pope Francis' unique ID")
    df = con.execute("SELECT id FROM popes WHERE name = 'Francis'").fetchdf()
    if len(df) != 1:
        raise ValueError(f"Expected one Pope Francis, found {len(df)} rows.")
    pope_id = int(df.iloc[0]['id'])
    logger.info(f"Pope Francis' ID: {pope_id}")
    logger.info(f"Executing SQL from {sql_path} with pope_id={pope_id}")
    run_sql_file(con, sql_path, params={"pope_francis_id": pope_id})
    logger.info("Successfully updated Pope Francis' end of reign details")
    return None


In [None]:
update_pope_francis_end_of_reign(con)


In [None]:
pope_francis_row = con.execute("SELECT * FROM popes WHERE name = 'Francis'").df()

In [None]:
pope_francis_row

## 2. Conclaves Table

**Source:** [Wikipedia: List of Papal Conclaves](https://en.wikipedia.org/wiki/List_of_papal_conclaves)
*(You may need to pre-process or find a CSV version of this data. For demonstration, a placeholder URL is used below.)*

In [None]:
conclaves_csv_url = "https://raw.githubusercontent.com/YOUR-REPO/conclaves.csv"  # Replace with actual
try:
    con.execute(f"""
        CREATE OR REPLACE TABLE conclaves AS
        FROM '{conclaves_csv_url}'
    """)
    conclaves_df = con.execute("SELECT * FROM conclaves LIMIT 5").fetchdf()
except Exception as e:
    conclaves_df = f"Could not load conclaves table: {e}"

In [None]:
conclaves_df

## 3. Cardinals Table

**Source:** [Wikipedia: List of current cardinals](https://en.wikipedia.org/wiki/List_of_current_cardinals)
*(You may need to pre-process or find a CSV version of this data. For demonstration, a placeholder URL is used below.)*

In [None]:
cardinals_csv_url = "https://raw.githubusercontent.com/YOUR-REPO/cardinals.csv"  # Replace with actual
try:
    con.execute(f"""
        CREATE OR REPLACE TABLE cardinals AS
        FROM '{cardinals_csv_url}'
    """)
    cardinals_df = con.execute("SELECT * FROM cardinals LIMIT 5").fetchdf()
except Exception as e:
    cardinals_df = f"Could not load cardinals table: {e}"

In [None]:
cardinals_df

## 4. Papal Documents Table

**Source:** [Vatican.va](https://www.vatican.va/content/vatican/en.html)
*(You may need to pre-process or find a CSV version of this data. For demonstration, a placeholder URL is used below.)*

In [None]:
documents_csv_url = "https://raw.githubusercontent.com/YOUR-REPO/papal_documents.csv"  # Replace with actual
try:
    con.execute(f"""
        CREATE OR REPLACE TABLE papal_documents AS
        FROM '{documents_csv_url}'
    """)
    documents_df = con.execute("SELECT * FROM papal_documents LIMIT 5").fetchdf()
except Exception as e:
    documents_df = f"Could not load papal_documents table: {e}"

In [None]:
documents_df

## Data Sources and References

- **Popes:** [ksreyes/popes (GitHub)](https://github.com/ksreyes/popes)
- **Conclaves:** [Wikipedia: List of Papal Conclaves](https://en.wikipedia.org/wiki/List_of_papal_conclaves) *(CSV needed)*
- **Cardinals:** [Wikipedia: List of current cardinals](https://en.wikipedia.org/wiki/List_of_current_cardinals) *(CSV needed)*
- **Papal Documents:** [Vatican.va](https://www.vatican.va/content/vatican/en.html) *(CSV needed)*

> Please ensure all data sources are cited and that you have permission to use and share these datasets.

In [None]:
con.close()