# Setup, Paths, and Display Helper

This cell initializes a reproducible environment for the ETL. It sets file paths, creates a SQLite connection with foreign keys enabled, and defines a tiny helper (show_df) to preview tables after loading.

-Reproducible paths make runs portable across machines.

-SQLite is lightweight, file-based, and perfect for iterative analytics.

-Foreign keys are enabled to enforce relational integrity throughout the pipeline.

In [1]:
# This notebook builds a SQLite database with a star layout centered on fertility,
# and loads/cleans each table step by step.
from pathlib import Path
import re
import json
import numpy as np
import pandas as pd
from sqlalchemy import create_engine, text
from datetime import datetime

# Pretty display helper (replaces caas_jupyter_tools)
from IPython.display import display
def show_df(title, df, n=200):
    print(f"\n=== {title} (showing up to {n} rows) ===")
    display(df.head(n))

# Point to the directory where your CSVs live
DATA_DIR = Path("RAW data")
DB_PATH = Path("./analytics_panel.sqlite")

# File paths (as uploaded)
FERTILITY_PATH       = DATA_DIR / "WB_FertilityRates19602023.csv"
OWID_MARRIED_PATH    = DATA_DIR / "OWID_married_percentage.csv"
GDP_PC_PATH          = DATA_DIR / "GDP per capita.csv"            # NY.GDP.PCAP.CD
GDP_PC_GROWTH_PATH   = DATA_DIR / "GDP annual growth.csv"         # NY.GDP.PCAP.KD.ZG
GDP_DEFL_GROWTH_PATH = DATA_DIR / "GDP deflated.csv"              # NY.GDP.DEFL.KD.ZG
FEMALE_LFP_PATH      = DATA_DIR / "WB_Femaleworkforce19902024.csv"   # SL.TLF.CACT.FE.ZS
UNEMPLOY_PATH        = DATA_DIR / "WB_Unemployement19912024.csv"     # SL.UEM.TOTL.ZS
WGI_PATH             = DATA_DIR / "WB_WGI19962023.csv"               # Political Stability (WB WGI)
CPI_PATH             = DATA_DIR / "Transparency_CPI20002024.csv"     # Transparency Intl export

# Create DB engine and enable foreign keys
engine = create_engine(f"sqlite:///{DB_PATH}")
with engine.begin() as conn:
    conn.execute(text("PRAGMA foreign_keys = ON;"))
print("Database ready at:", DB_PATH.resolve())

# Helper for SQLAlchemy 2.x bulk inserts (executemany when list of dicts passed)
def exec_many(conn, sql, rows):
    if not rows:
        return 0
    conn.execute(text(sql), rows)
    return len(rows)


Database ready at: /Users/macbookpro/Capstone/analytics_panel.sqlite


# Shared Parsers & Utilities

This cell defines the functions.

### real_worldbank_bulk: 
reliably handle metadata/header quirks and produce tidy long tables (iso_code, year, value).

### read_owid_married: 
prefer Estimate and fall back to Projection; filter to years 1970–2024.

### build_wb_country_lookup: 
ISO code mapping utilities using a WB lookup + an ALIASES dictionary to normalize tricky names (e.g., Türkiye, Kyrgyzstan).

### read_cpi_country_then_split: 
CPI reader that accepts wide or long formats and splits scores into CPI10 (2000–2011) and CPI100 (2012–2024).

### read_wgi_political_stability: 
WGI extraction for “Political Stability & Absence of Violence/Terrorism.”

Notes 

CPI10 vs CPI100: Transparency International changed the CPI scale in 2012. We store them separately to avoid mixing scales; this preserves interpretability and lets us analyze each period correctly.

Name-to-ISO mapping: Transparency’s CPI file did not ship with ISO3 codes, so we map country names to ISO codes using WB names + ALIASES (e.g., “Hong Kong, China”→HKG, “Taipei,China”→TWN).

In [2]:
def find_header_row(csv_path, header_keys=("Country Name","Country Code","Indicator Name","Indicator Code","Entity","Code","Year")):
    """Detect the first line that looks like a header in World Bank/OWID-like CSVs."""
    with open(csv_path, "r", encoding="utf-8", errors="ignore") as f:
        for i, line in enumerate(f):
            if i > 200:
                break
            low = line.lower()
            if any(k.lower() in low for k in header_keys):
                return i
    return 0

def read_worldbank_bulk(csv_path):
    """World Bank bulk CSV -> long format with iso_code/year/value and indicator metadata."""
    hdr = find_header_row(csv_path)
    df = pd.read_csv(csv_path, skiprows=hdr, low_memory=False)
    expected = {"Country Name","Country Code","Indicator Name","Indicator Code"}
    if not expected.issubset(df.columns):
        raise ValueError(f"Unexpected columns in {csv_path}: {df.columns[:10].tolist()}")
    keep = ["Country Name","Country Code","Indicator Name","Indicator Code"]
    year_cols = [c for c in df.columns if re.fullmatch(r"\d{4}", str(c))]
    if not year_cols:
        raise ValueError("No YYYY year columns found.")
    df = df[keep + year_cols]
    long = df.melt(id_vars=keep, value_vars=year_cols, var_name="year", value_name="value")
    long["year"] = pd.to_numeric(long["year"], errors="coerce")
    long = long.dropna(subset=["year"])
    long["year"] = long["year"].astype(int)
    long = long.rename(columns={
        "Country Name":"country_name",
        "Country Code":"iso_code",
        "Indicator Name":"indicator_name",
        "Indicator Code":"indicator_code"
    })
    long["value"] = pd.to_numeric(long["value"], errors="coerce")
    long = long.dropna(subset=["iso_code","year","value"])
    return long

def read_owid_married(csv_path, min_year=1970, max_year=2024):
    """OWID 'share of women 15-49 married or in union' -> married_percentage.
       Prefers (Estimate), falls back to (Projection). Filters years [1970, 2024]."""
    hdr = find_header_row(csv_path, header_keys=("Entity","Code","Year","Share of women"))
    df = pd.read_csv(csv_path, skiprows=hdr, low_memory=False)
    df = df.rename(columns={c: c.strip() for c in df.columns})
    proj_col = [c for c in df.columns if "(Projection)" in c]
    est_col  = [c for c in df.columns if "(Estimate)" in c]
    if proj_col or est_col:
        est = est_col[0] if est_col else None
        proj = proj_col[0] if proj_col else None
        est_vals  = pd.to_numeric(df[est], errors="coerce") if est  else np.nan
        proj_vals = pd.to_numeric(df[proj], errors="coerce") if proj else np.nan
        df["married_percentage"] = est_vals.where(~pd.isna(est_vals), proj_vals)
    else:
        # fallback schema
        val_col = "Value" if "Value" in df.columns else None
        if val_col is None: raise ValueError("OWID file missing expected columns.")
        df["married_percentage"] = pd.to_numeric(df[val_col], errors="coerce")
    df = df.rename(columns={"Entity":"country_name","Code":"iso_code","Year":"year"})
    df = df[df["iso_code"].str.fullmatch(r"[A-Z]{3}", na=False)]
    df["year"] = pd.to_numeric(df["year"], errors="coerce")
    df = df.dropna(subset=["year","married_percentage"])
    df["year"] = df["year"].astype(int)
    # Year filter as requested
    df = df[(df["year"] >= min_year) & (df["year"] <= max_year)]
    # Plausibility
    df = df[(df["married_percentage"] >= 0) & (df["married_percentage"] <= 100)]
    return df[["iso_code","year","married_percentage"]]

# CPI and WGI helpers (with name->ISO mapping)
def build_wb_country_lookup(wb_any_long_df):
    """Builds a mapping of World Bank country_name -> iso_code from any WB long dataframe."""
    look = (wb_any_long_df[["country_name","iso_code"]]
            .dropna().drop_duplicates().sort_values("country_name"))
    return dict(zip(look["country_name"], look["iso_code"]))

ALIASES = {
    "Türkiye":"TUR", "Turkey":"TUR",
    "Kyrgyzstan":"KGZ", "Kyrgyz Republic":"KGZ",
    "Czechia":"CZE", "Czech Republic":"CZE",
    "Côte d'Ivoire":"CIV", "Ivory Coast":"CIV", "Cote dIvoire":"CIV",
    "Eswatini":"SWZ", "Swaziland":"SWZ",
    "North Macedonia":"MKD", "Macedonia":"MKD",
    "Russia":"RUS", "Russian Federation":"RUS",
    "Syria":"SYR", "Syrian Arab Republic":"SYR",
    "Vietnam":"VNM", "Viet Nam":"VNM",
    "Gambia":"GMB", "The Gambia":"GMB", "Gambia, The":"GMB",
    "Bahamas":"BHS", "Bahamas, The":"BHS",
    "Democratic Republic of the Congo":"COD", "Congo (Kinshasa)":"COD",
    "Republic of the Congo":"COG", "Congo (Brazzaville)":"COG",
    "Hong Kong":"HKG", "Hong Kong SAR, China":"HKG",
    "Macau":"MAC", "Macao SAR, China":"MAC",
    "Palestine":"PSE", "West Bank and Gaza":"PSE",
    "Laos":"LAO", "Lao PDR":"LAO",
    "Cape Verde":"CPV", "Cabo Verde":"CPV",
    "Micronesia":"FSM", "Micronesia, Fed. Sts.":"FSM",
    "Brunei":"BRN", "Brunei Darussalam":"BRN",
    "Egypt":"EGY", "Egypt, Arab Rep.":"EGY",
    "Iran":"IRN", "Iran, Islamic Rep.":"IRN",
    "Korea, South":"KOR", "Korea, Rep.":"KOR",
    "Korea, North":"PRK", "Korea, Dem. People’s Rep.":"PRK",
    "Yemen":"YEM", "Yemen, Rep.":"YEM",
    "Bolivia":"BOL", "Bolivia (Plurinational State of)":"BOL",
    "Venezuela":"VEN", "Venezuela, RB":"VEN",
    "Tanzania":"TZA", "Tanzania, United Rep.":"TZA",
}

# add these alias fixes
ALIASES.update({
    "China, People's Republic of": "CHN",
    "Hong Kong, China": "HKG",
    "Korea, Republic of": "KOR",
    "Lao People's Democratic Republic": "LAO",
    "Taipei,China": "TWN",
})

def make_name_to_iso_fn(wb_lookup_map):
    def name_to_iso(country_name):
        if pd.isna(country_name): return np.nan
        # alias override
        if country_name in ALIASES:
            return ALIASES[country_name]
        # exact WB name
        if country_name in wb_lookup_map:
            return wb_lookup_map[country_name]
        # relaxed: strip punctuation/accents (approx) and compare
        key = re.sub(r"[^A-Za-z0-9 ]+", "", country_name).strip().lower()
        for nm, iso in wb_lookup_map.items():
            nm_key = re.sub(r"[^A-Za-z0-9 ]+", "", nm).strip().lower()
            if nm_key == key:
                return iso
        return np.nan
    return name_to_iso

def read_cpi_country_then_split(csv_path, name_to_iso):
    """
    Reads Transparency International CPI from either:
      - WIDE: 'Economy' + year columns (2000..2025), OR
      - LONG: country + Year + score column.
    Returns columns: iso_code, year, CPI10, CPI100, country_name
    CPI10 is kept for 2000–2011; CPI100 for 2012–2024 (capped at 2024).
    """
    df = pd.read_csv(csv_path)
    df.columns = [c.strip() for c in df.columns]
    cols = set(df.columns)

    # detect WIDE format (your case): "Economy" plus 4-digit year columns
    year_cols = [c for c in df.columns if re.fullmatch(r"\d{4}", str(c))]
    if "Economy" in cols and year_cols:
        economy_col = "Economy"
        long = df.melt(
            id_vars=[economy_col],
            value_vars=year_cols,
            var_name="year",
            value_name="CPI_raw"
        )
        long = long.rename(columns={economy_col: "country_name"})
        long["year"] = pd.to_numeric(long["year"], errors="coerce")
        long["CPI_raw"] = pd.to_numeric(long["CPI_raw"], errors="coerce")
        long = long.dropna(subset=["country_name", "year", "CPI_raw"])
        long["year"] = long["year"].astype(int)

    else:
        # fallback: LONG format (country/year/value)
        cname = next((c for c in ["Country","country","Entity","Country Name","Name","Economy"] if c in cols), None)
        ycol  = next((c for c in ["Year","year","YEAR"] if c in cols), None)
        vcol  = next((c for c in ["CPI","score","Score","CPI score","CPI_Score","Value","value"] if c in cols), None)
        if not (cname and ycol and vcol):
            raise ValueError(f"CPI file must have country+year+score or 'Economy' + year columns. Found: {df.columns.tolist()}")
        long = df[[cname, ycol, vcol]].rename(columns={cname:"country_name", ycol:"year", vcol:"CPI_raw"})
        long["year"] = pd.to_numeric(long["year"], errors="coerce")
        long["CPI_raw"] = pd.to_numeric(long["CPI_raw"], errors="coerce")
        long = long.dropna(subset=["country_name","year","CPI_raw"])
        long["year"] = long["year"].astype(int)

    # map to ISO codes
    long["iso_code"] = long["country_name"].apply(name_to_iso)

    # cap at 2024 (file contains a 2025 column)
    long = long[(long["year"] >= 2000) & (long["year"] <= 2024)]

    # split into CPI10 (2000–2011) vs CPI100 (2012–2024)
    long["CPI10"]  = np.where((long["year"] >= 2000) & (long["year"] <= 2011), long["CPI_raw"], np.nan)
    long["CPI100"] = np.where((long["year"] >= 2012) & (long["year"] <= 2024), long["CPI_raw"], np.nan)

    return long[["iso_code","year","CPI10","CPI100","country_name"]]

def read_wgi_political_stability(csv_path):
    """Extracts Political Stability & Absence of Violence/Terrorism from WGI export."""
    wb = read_worldbank_bulk(csv_path)
    wgi = wb[wb["indicator_name"].str_contains("Political Stability", case=False, na=False)] if hasattr(wb["indicator_name"], "str_contains") else wb[wb["indicator_name"].str.contains("Political Stability", case=False, na=False)]
    if wgi.empty:
        wgi = wb[wb["indicator_code"].str.startswith("PV", na=False)].copy()
    wgi = wgi.rename(columns={"value":"WGI"})
    # keep plausible range
    wgi = wgi[(wgi["WGI"] >= -5) & (wgi["WGI"] <= 5)]
    return wgi[["iso_code","year","WGI"]]

***Fertility Table (Fact Table)***

Source: World Bank bulk CSV — filter to SP.DYN.TFRT.IN (total fertility rate, births per woman).

***Processing Rules***

Apply plausibility bounds: 0 to 10.

Create or refresh CountryYear dimension with (iso_code, year) as composite key.

Load fertility values into the central table.

***Why Fertility is Central***

The analysis compares which factors correlate most strongly with fertility.

Positioning fertility as the “hub” ensures clean joins and provides a single, authoritative source for the dependent variable.

In [3]:
# Read WB long
wb_any = read_worldbank_bulk(FERTILITY_PATH)

# Filter fertility indicator
fertility_df = (
    wb_any.query("indicator_code == 'SP.DYN.TFRT.IN'")
          .rename(columns={"value":"fertility_rate"})
          [["iso_code","year","fertility_rate"]]
)
# Plausibility bounds
fertility_df = fertility_df[(fertility_df["fertility_rate"] >= 0) & (fertility_df["fertility_rate"] <= 10)]

# Create/refresh CountryYear based on fertility (will expand later)
with engine.begin() as conn:
    conn.execute(text("DROP TABLE IF EXISTS CountryYear;"))
    conn.execute(text("""
        CREATE TABLE CountryYear (
            iso_code TEXT NOT NULL,
            year INTEGER NOT NULL,
            PRIMARY KEY (iso_code, year)
        );
    """))
    # Insert keys from fertility
    keys = fertility_df[["iso_code","year"]].drop_duplicates().to_dict(orient="records")
    exec_many(conn, "INSERT OR IGNORE INTO CountryYear (iso_code, year) VALUES (:iso_code, :year)", keys)

# Create fertility table & insert
with engine.begin() as conn:
    conn.execute(text("DROP TABLE IF EXISTS fertility;"))
    conn.execute(text("""
        CREATE TABLE fertility (
            iso_code TEXT NOT NULL,
            year INTEGER NOT NULL,
            fertility_rate REAL NOT NULL,
            PRIMARY KEY (iso_code, year),
            FOREIGN KEY (iso_code, year) REFERENCES CountryYear(iso_code, year)
              ON DELETE CASCADE ON UPDATE CASCADE
        );
    """))
    rows = fertility_df.to_dict(orient="records")
    exec_many(conn, "INSERT OR REPLACE INTO fertility (iso_code, year, fertility_rate) VALUES (:iso_code, :year, :fertility_rate)", rows)
    conn.execute(text("CREATE INDEX IF NOT EXISTS idx_fert_ccy ON fertility(iso_code, year);"))

# Display fertility from DB
fert_view = pd.read_sql("SELECT * FROM fertility ORDER BY iso_code, year LIMIT 200;", con=engine)
show_df("Fertility_table_preview", fert_view)



=== Fertility_table_preview (showing up to 200 rows) ===


Unnamed: 0,iso_code,year,fertility_rate
0,ABW,1960,4.567000
1,ABW,1961,4.422000
2,ABW,1962,4.262000
3,ABW,1963,4.107000
4,ABW,1964,3.940000
...,...,...,...
195,AFW,1963,6.500229
196,AFW,1964,6.516739
197,AFW,1965,6.532766
198,AFW,1966,6.556550


***Married Status Table***

Source: OWID series — share of women aged 15–49 married or in union for married_percentage.

***Cleaning Rules***

Use Estimates; fall back to Projections if missing.

Restrict to 1970–2024 (project scope).

Enforce plausible bounds: 0–100.

***Rationale for OWID***

Raw national surveys are inconsistent (counts vs. rates vs. %; many gaps).

OWID harmonizes data for more complete, comparable across countries & years.

Best suited for reliable cross-national correlation analysis.

In [4]:
# Clean OWID with years limited to [1970, 2024]
married_df = read_owid_married(OWID_MARRIED_PATH, min_year=1970, max_year=2024)

# Ensure CountryYear has all keys
with engine.begin() as conn:
    keys = married_df[["iso_code","year"]].drop_duplicates().to_dict(orient="records")
    exec_many(conn, "INSERT OR IGNORE INTO CountryYear (iso_code, year) VALUES (:iso_code, :year)", keys)

# Create table & insert
with engine.begin() as conn:
    conn.execute(text("DROP TABLE IF EXISTS married_status;"))
    conn.execute(text("""
        CREATE TABLE married_status (
            iso_code TEXT NOT NULL,
            year INTEGER NOT NULL,
            married_percentage REAL NOT NULL,
            PRIMARY KEY (iso_code, year),
            FOREIGN KEY (iso_code, year) REFERENCES CountryYear(iso_code, year)
              ON DELETE CASCADE ON UPDATE CASCADE
        );
    """))
    rows = married_df.to_dict(orient="records")
    exec_many(conn, "INSERT OR REPLACE INTO married_status (iso_code, year, married_percentage) VALUES (:iso_code, :year, :married_percentage)", rows)
    conn.execute(text("CREATE INDEX IF NOT EXISTS idx_mar_ccy ON married_status(iso_code, year);"))

# Display married_status from DB
married_view = pd.read_sql("SELECT * FROM married_status ORDER BY iso_code, year LIMIT 200;", con=engine)
show_df("MarriedStatus_table_preview", married_view)



=== MarriedStatus_table_preview (showing up to 200 rows) ===


Unnamed: 0,iso_code,year,married_percentage
0,ABW,1970,48.828510
1,ABW,1971,48.856102
2,ABW,1972,48.783600
3,ABW,1973,48.768890
4,ABW,1974,48.848003
...,...,...,...
195,AIA,2000,42.820680
196,AIA,2001,42.635544
197,AIA,2002,42.320374
198,AIA,2003,41.998540


***GDP Table (Three Economic Views)***

Source: World Bank data.

***Measures captured:***

current_usd — GDP per capita (current US$) for income level.

deflation — GDP deflator growth (%) for price environment.

annual_rate — GDP per capita annual growth (%) for economic momentum.

***Why Three Measures***

Current US$ reflects overall living standards.

Deflator growth captures inflation or deflation pressures.

Per-capita growth highlights sharp growth in emerging economies versus stability in mature economies like the U.S.—important for understanding fertility shifts.

In [5]:
# Build three components and outer-merge on (iso_code, year)
wb_gdp_pc    = read_worldbank_bulk(GDP_PC_PATH)
wb_gdp_grow  = read_worldbank_bulk(GDP_PC_GROWTH_PATH)
wb_gdp_defl  = read_worldbank_bulk(GDP_DEFL_GROWTH_PATH)

gdp_pc = (wb_gdp_pc.query("indicator_code == 'NY.GDP.PCAP.CD'")
                     .rename(columns={"value":"current_usd"})
                     [["iso_code","year","current_usd"]])
gdp_pc = gdp_pc[gdp_pc["current_usd"] >= 0]

gdp_defl = (wb_gdp_defl.query("indicator_code == 'NY.GDP.DEFL.KD.ZG'")
                       .rename(columns={"value":"deflation"})
                       [["iso_code","year","deflation"]])

gdp_growth = (wb_gdp_grow.query("indicator_code == 'NY.GDP.PCAP.KD.ZG'")
                         .rename(columns={"value":"annual_rate"})
                         [["iso_code","year","annual_rate"]])

# Merge components
gdp_df = gdp_pc.merge(gdp_defl, on=["iso_code","year"], how="outer")\
               .merge(gdp_growth, on=["iso_code","year"], how="outer")

# Ensure CountryYear has keys
with engine.begin() as conn:
    keys = gdp_df[["iso_code","year"]].dropna().drop_duplicates().to_dict(orient="records")
    exec_many(conn, "INSERT OR IGNORE INTO CountryYear (iso_code, year) VALUES (:iso_code, :year)", keys)

# Create GDP table & insert
with engine.begin() as conn:
    conn.execute(text("DROP TABLE IF EXISTS gdp;"))
    conn.execute(text("""
        CREATE TABLE gdp (
            iso_code TEXT NOT NULL,
            year INTEGER NOT NULL,
            current_usd REAL,
            deflation REAL,
            annual_rate REAL,
            PRIMARY KEY (iso_code, year),
            FOREIGN KEY (iso_code, year) REFERENCES CountryYear(iso_code, year)
              ON DELETE CASCADE ON UPDATE CASCADE
        );
    """))
    rows = gdp_df.dropna(subset=["iso_code","year"]).to_dict(orient="records")
    exec_many(conn,
        "INSERT OR REPLACE INTO gdp (iso_code, year, current_usd, deflation, annual_rate) "
        "VALUES (:iso_code, :year, :current_usd, :deflation, :annual_rate)",
        rows)
    conn.execute(text("CREATE INDEX IF NOT EXISTS idx_gdp_ccy ON gdp(iso_code, year);"))

# Display GDP from DB
gdp_view = pd.read_sql("SELECT * FROM gdp ORDER BY iso_code, year LIMIT 200;", con=engine)
show_df("GDP_table_preview", gdp_view)


=== GDP_table_preview (showing up to 200 rows) ===


Unnamed: 0,iso_code,year,current_usd,deflation,annual_rate
0,ABW,1986,6767.559229,,
1,ABW,1987,8244.045660,3.591970,17.593206
2,ABW,1988,10056.261393,3.108439,18.304687
3,ABW,1989,11507.217151,3.962543,10.066932
4,ABW,1990,12187.536361,5.769870,0.134480
...,...,...,...,...,...
195,AGO,1983,636.832811,0.012764,0.398926
196,AGO,1984,650.491094,0.001188,2.143507
197,AGO,1985,772.468833,19.035206,-0.238221
198,AGO,1986,697.526602,-9.013062,-0.756800


***Employment Table***

Source: World Bank data.

***Measures captured:***

female_participation — Female labor force participation (%), bounded 0–100.

unemployment — Unemployment rate (%), bounded 0–100.

***Why Both Indicators***

Female participation reflects education, opportunity cost, and social norms linked to fertility choices.

Unemployment measures economic stress that can delay or reduce marriage and fertility.

In [6]:
# Female LFP and Unemployment
wb_lfp = read_worldbank_bulk(FEMALE_LFP_PATH)
lfp_df = (wb_lfp.query("indicator_code == 'SL.TLF.CACT.FE.ZS'")
                 .rename(columns={"value":"female_participation"})
                 [["iso_code","year","female_participation"]])
lfp_df = lfp_df[(lfp_df["female_participation"] >= 0) & (lfp_df["female_participation"] <= 100)]

wb_un = read_worldbank_bulk(UNEMPLOY_PATH)
unemp_df = (wb_un.query("indicator_code == 'SL.UEM.TOTL.ZS'")
                  .rename(columns={"value":"unemployment"})
                  [["iso_code","year","unemployment"]])
unemp_df = unemp_df[(unemp_df["unemployment"] >= 0) & (unemp_df["unemployment"] <= 100)]

employment_df = lfp_df.merge(unemp_df, on=["iso_code","year"], how="outer")

# Ensure CountryYear has keys
with engine.begin() as conn:
    keys = employment_df[["iso_code","year"]].dropna().drop_duplicates().to_dict(orient="records")
    exec_many(conn, "INSERT OR IGNORE INTO CountryYear (iso_code, year) VALUES (:iso_code, :year)", keys)

# Create employment table & insert
with engine.begin() as conn:
    conn.execute(text("DROP TABLE IF EXISTS employment;"))
    conn.execute(text("""
        CREATE TABLE employment (
            iso_code TEXT NOT NULL,
            year INTEGER NOT NULL,
            female_participation REAL,
            unemployment REAL,
            PRIMARY KEY (iso_code, year),
            FOREIGN KEY (iso_code, year) REFERENCES CountryYear(iso_code, year)
              ON DELETE CASCADE ON UPDATE CASCADE
        );
    """))
    rows = employment_df.dropna(subset=["iso_code","year"]).to_dict(orient="records")
    exec_many(conn,
        "INSERT OR REPLACE INTO employment (iso_code, year, female_participation, unemployment) "
        "VALUES (:iso_code, :year, :female_participation, :unemployment)",
        rows)
    conn.execute(text("CREATE INDEX IF NOT EXISTS idx_emp_ccy ON employment(iso_code, year);"))

# Display Employment from DB
emp_view = pd.read_sql("SELECT * FROM employment ORDER BY iso_code, year LIMIT 200;", con=engine)
show_df("Employment_table_preview", emp_view)



=== Employment_table_preview (showing up to 200 rows) ===


Unnamed: 0,iso_code,year,female_participation,unemployment
0,AFE,1990,65.537699,
1,AFE,1991,65.692338,8.179629
2,AFE,1992,65.851327,8.270724
3,AFE,1993,65.930883,8.266327
4,AFE,1994,66.137332,8.138291
...,...,...,...,...
195,ARB,2010,20.714920,9.577474
196,ARB,2011,20.749832,10.693296
197,ARB,2012,20.523912,11.074942
198,ARB,2013,20.641849,11.043728


***Politics Table (WGI + CPI10/CPI100)***

Sources: World Bank Governance Indicators (WGI), Transparency International (CPI).

***Measures captured:***

WGI — Political Stability & Absence of Violence/Terrorism (range: −2.5 to 2.5).

CPI10 — Corruption Perceptions Index, 2000–2011 (0–10 scale).

CPI100 — Corruption Perceptions Index, 2012–2024 (0–100 scale).

***Processing Rules***

Reshape CPI data from wide (economy + years) to long (country, year, value).

Map country names to ISO codes using World Bank lookup and aliases (e.g., “China, People’s Republic of” → CHN; “Taipei, China” → TWN).

Split CPI series at 2012 to respect the scale change.

***Why Separate CPI10 and CPI100***

The CPI scale and methodology changed in 2012.

Storing them as distinct columns preserves historical accuracy and prevents accidental mixing.

Normalization (e.g., multiplying CPI10 by 10) can be applied later during analysis, not in storage.

In [7]:
# Build a WB country lookup from any WB long DF (we already have wb_any from fertility)
wb_lookup_map = build_wb_country_lookup(wb_any)
name_to_iso = make_name_to_iso_fn(wb_lookup_map)

# Load CPI by country name and split
cpi_split = read_cpi_country_then_split(CPI_PATH, name_to_iso=name_to_iso)

# Show any unmatched names for manual alias extension
unmatched = cpi_split[cpi_split["iso_code"].isna()]
unmatched_countries = sorted(unmatched["country_name"].unique().tolist())
print("Unmatched CPI country names (extend ALIASES if needed):", unmatched_countries[:25])

# Drop rows without iso_code
cpi_split = cpi_split.dropna(subset=["iso_code"])

# Load WGI
wgi_df = read_wgi_political_stability(WGI_PATH)

# Merge to politics
politics_df = pd.merge(wgi_df, cpi_split.drop(columns=["country_name"]), on=["iso_code","year"], how="outer")

# Ensure CountryYear has keys
with engine.begin() as conn:
    keys = politics_df[["iso_code","year"]].dropna().drop_duplicates().to_dict(orient="records")
    exec_many(conn, "INSERT OR IGNORE INTO CountryYear (iso_code, year) VALUES (:iso_code, :year)", keys)

# Create politics table & insert
with engine.begin() as conn:
    conn.execute(text("DROP TABLE IF EXISTS politics;"))
    conn.execute(text("""
        CREATE TABLE politics (
            iso_code TEXT NOT NULL,
            year INTEGER NOT NULL,
            WGI REAL,
            CPI10 REAL,
            CPI100 REAL,
            PRIMARY KEY (iso_code, year),
            FOREIGN KEY (iso_code, year) REFERENCES CountryYear(iso_code, year)
              ON DELETE CASCADE ON UPDATE CASCADE
        );
    """))
    rows = politics_df.dropna(subset=["iso_code","year"]).to_dict(orient="records")
    exec_many(conn,
        "INSERT OR REPLACE INTO politics (iso_code, year, WGI, CPI10, CPI100) "
        "VALUES (:iso_code, :year, :WGI, :CPI10, :CPI100)",
        rows)
    conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pol_ccy ON politics(iso_code, year);"))

# Display Politics from DB
pol_view = pd.read_sql("SELECT * FROM politics ORDER BY iso_code, year LIMIT 200;", con=engine)
show_df("Politics_table_preview", pol_view)

Unmatched CPI country names (extend ALIASES if needed): []

=== Politics_table_preview (showing up to 200 rows) ===


Unnamed: 0,iso_code,year,WGI,CPI10,CPI100
0,AFG,1996,2.127660,,
1,AFG,1998,0.531915,,
2,AFG,2000,0.529101,,
3,AFG,2002,1.587302,,
4,AFG,2003,2.010050,,
...,...,...,...,...,...
195,CHN,2021,,,45.0
196,CHN,2022,,,45.0
197,CHN,2023,,,42.0
198,CHN,2024,,,43.0


***Create the Unified Panel View (v_panel_all)***

Purpose: Provide a single, queryable panel for EDA and modeling after cleaning is validated.

***Join Logic***

Start from CountryYear.

Inner join to fertility (central fact).

Left join to spoke tables:

gdp (current_usd, deflation, annual_rate)

employment (female_participation, unemployment)

married_status (married_percentage)

politics (WGI, CPI10, CPI100)

***Why a View***

Keeps storage normalized and governance clean.

Centralizes join logic so analysts can SELECT from one place without rewriting joins.

Enables quick iteration in EDA and smooth handoff to modeling.

***Workflow Note***

This view is for verification first; analysis (correlations/regressions) comes after confirming correctness and completeness.

In [8]:
with engine.begin() as conn:
    conn.execute(text("DROP VIEW IF EXISTS v_panel_all;"))
    conn.execute(text("""
        CREATE VIEW v_panel_all AS
        SELECT
          cy.iso_code, cy.year,
          f.fertility_rate,
          g.current_usd, g.deflation, g.annual_rate,
          e.female_participation, e.unemployment,
          m.married_percentage,
          p.WGI, p.CPI10, p.CPI100
        FROM CountryYear cy
        LEFT JOIN fertility f      ON f.iso_code=cy.iso_code AND f.year=cy.year
        LEFT JOIN gdp g            ON g.iso_code=cy.iso_code AND g.year=cy.year
        LEFT JOIN employment e     ON e.iso_code=cy.iso_code AND e.year=cy.year
        LEFT JOIN married_status m ON m.iso_code=cy.iso_code AND m.year=cy.year
        LEFT JOIN politics p       ON p.iso_code=cy.iso_code AND p.year=cy.year;
    """))
print("All tables loaded and view v_panel_all created. Ready for analysis.")


All tables loaded and view v_panel_all created. Ready for analysis.


In [9]:
import sqlite3
import pandas as pd

# connect to your database
conn = sqlite3.connect("/Users/macbookpro/Capstone/analytics_panel.sqlite")

# list all tables
tables = pd.read_sql("SELECT name FROM sqlite_master WHERE type='table';", conn)
tables


Unnamed: 0,name
0,CountryYear
1,fertility
2,married_status
3,gdp
4,employment
5,politics


In [10]:
# get column names for each table
def get_columns(table_name):
    return pd.read_sql(f"PRAGMA table_info({table_name});", conn)

for table in tables["name"]:
    print(f"--- {table} ---")
    display(get_columns(table))


--- CountryYear ---


Unnamed: 0,cid,name,type,notnull,dflt_value,pk
0,0,iso_code,TEXT,1,,1
1,1,year,INTEGER,1,,2


--- fertility ---


Unnamed: 0,cid,name,type,notnull,dflt_value,pk
0,0,iso_code,TEXT,1,,1
1,1,year,INTEGER,1,,2
2,2,fertility_rate,REAL,1,,0


--- married_status ---


Unnamed: 0,cid,name,type,notnull,dflt_value,pk
0,0,iso_code,TEXT,1,,1
1,1,year,INTEGER,1,,2
2,2,married_percentage,REAL,1,,0


--- gdp ---


Unnamed: 0,cid,name,type,notnull,dflt_value,pk
0,0,iso_code,TEXT,1,,1
1,1,year,INTEGER,1,,2
2,2,current_usd,REAL,0,,0
3,3,deflation,REAL,0,,0
4,4,annual_rate,REAL,0,,0


--- employment ---


Unnamed: 0,cid,name,type,notnull,dflt_value,pk
0,0,iso_code,TEXT,1,,1
1,1,year,INTEGER,1,,2
2,2,female_participation,REAL,0,,0
3,3,unemployment,REAL,0,,0


--- politics ---


Unnamed: 0,cid,name,type,notnull,dflt_value,pk
0,0,iso_code,TEXT,1,,1
1,1,year,INTEGER,1,,2
2,2,WGI,REAL,0,,0
3,3,CPI10,REAL,0,,0
4,4,CPI100,REAL,0,,0


***Schema & Analytical Design***

Fertility = Fact Table (center): outcome of interest.

Dimensions (GDP, Employment, Marriage, Politics): domain-specific measures.

Country-Year table: enforces alignment, simplifies joins.

Benefit: supports quick correlation checks + panel regression with minimal rework.

***Data Sources***

World Bank: Fertility, GDP, labor force, unemployment, political stability.

OWID: Women 15–49 married/in union (preferred for coverage & consistency).

Transparency Int’l: Corruption Perceptions Index (split CPI10 vs CPI100 for scale change).

***Governance & Quality***

Cleaning: plausible ranges, drop aggregates, enforce ISO3.

Reproducibility: normalized schema, explicit codes, versioned SQLite.

Integrity: composite keys + foreign keys keep data consistent.

Ethics: public macro indicators only; official sources cited.