In [3]:
# ============================================================
# Pipeline A — Mobility Data
# Block A1 — Mobility Ingestion & Integrity Checks
# ============================================================
# Conversation: Portuguese | Code/comments: English
# NEVER delete original columns. Only add.
# Streaming / chunk-based ingestion (hardware-agnostic).
# Real-time progress bars and logs are MANDATORY.
# ============================================================

import os
import json
import time
from datetime import datetime
from typing import Dict, List

import numpy as np
import pandas as pd
from tqdm.auto import tqdm


# -----------------------------
# 0) Configuration
# -----------------------------
T0 = time.time()

# INPUT: reassembled gzip (DO NOT point to split parts)
INPUT_FILE = os.path.expanduser(
    "~/Desktop/Splited Mobility Data/Mobility_Data.csv.gz"
)

# OUTPUTS: official folder
OUTPUT_DIR = os.path.expanduser(
    "~/Desktop/Output Pipeline A (Mobility)/A1"
)
os.makedirs(OUTPUT_DIR, exist_ok=True)

OUT_CONCAT = os.path.join(
    OUTPUT_DIR, "mobility_raw_aug2024_concat.csv.gz"
)
OUT_QC = os.path.join(
    OUTPUT_DIR, "mobility_qc_report.json"
)

# IO / performance (safe on Mac M1/M2)
CHUNKSIZE = 500_000

# Canonical identifiers (DO NOT rename here)
KEY_COL = "store_id"
STATE_COL = "State"

# Temporal components (existence check only)
WEEK_COLS = [
    "unique_q1", "unique_q2", "unique_q3", "unique_q4",
    "visits_q1", "visits_q2", "visits_q3", "visits_q4"
]

# Metrics for range summaries (only if present)
RANGE_METRICS = [
    "unique", "visits", "repeat_visitors",
    "new_visitors", "dwell_time_mins"
]


# -----------------------------
# 1) Sanity checks
# -----------------------------
if not os.path.exists(INPUT_FILE):
    raise FileNotFoundError(
        f"Input file not found: {INPUT_FILE}"
    )

print("[INFO] Using input file:")
print(" -", INPUT_FILE)


# -----------------------------
# 2) Pass 1 — Schema union (headers only)
# -----------------------------
print("\n[STEP 1/4] Reading header to build schema...")

# Read header only (fast)
header_df = pd.read_csv(
    INPUT_FILE,
    compression="gzip",
    nrows=0
)
union_cols: List[str] = header_df.columns.tolist()

print(f"[INFO] Columns detected: {len(union_cols)}")


# -----------------------------
# 3) Pass 2 — Streaming ingestion & write canonical raw
# -----------------------------
print("\n[STEP 2/4] Streaming ingestion and concatenation...")

rows_total = 0

# Ensure fresh write
if os.path.exists(OUT_CONCAT):
    os.remove(OUT_CONCAT)

write_header = True

for chunk in tqdm(
    pd.read_csv(
        INPUT_FILE,
        compression="gzip",
        chunksize=CHUNKSIZE,
        low_memory=False
    ),
    desc="Reading chunks",
    unit="chunk"
):
    # Enforce schema order (no deletions)
    for c in union_cols:
        if c not in chunk.columns:
            chunk[c] = np.nan
    chunk = chunk[union_cols]

    chunk.to_csv(
        OUT_CONCAT,
        mode="a",
        header=write_header,
        index=False,
        compression="gzip"
    )
    write_header = False

    n = len(chunk)
    rows_total += n

tqdm.write(f"[INFO] TOTAL rows written: {rows_total:,}")


# -----------------------------
# 4) Pass 3 — QC computations
# -----------------------------
print("\n[STEP 3/4] Computing QC statistics...")

qc = {
    "block": "A1",
    "pipeline": "A",
    "created_at": datetime.now().isoformat(),
    "input_file": os.path.basename(INPUT_FILE),
    "rows_total": int(rows_total),
    "columns_total": len(union_cols),
}

unique_store_ids = set()
state_counts: Dict[str, int] = {}
missing_counts = {c: 0 for c in union_cols}
value_ranges = {
    m: {"min": None, "max": None}
    for m in RANGE_METRICS if m in union_cols
}

for chunk in tqdm(
    pd.read_csv(
        OUT_CONCAT,
        compression="gzip",
        chunksize=CHUNKSIZE
    ),
    desc="QC pass (chunks)",
    unit="chunk"
):
    # Unique census tracts
    if KEY_COL in chunk.columns:
        unique_store_ids.update(
            chunk[KEY_COL]
            .dropna()
            .astype(str)
            .unique()
        )

    # State coverage
    if STATE_COL in chunk.columns:
        vc = chunk[STATE_COL].value_counts(dropna=True)
        for k, v in vc.items():
            state_counts[k] = state_counts.get(k, 0) + int(v)

    # Missingness
    for c in union_cols:
        missing_counts[c] += int(chunk[c].isna().sum())

    # Ranges
    for m in value_ranges:
        s = pd.to_numeric(chunk[m], errors="coerce")
        if s.notna().any():
            mn, mx = s.min(), s.max()
            if (
                value_ranges[m]["min"] is None
                or mn < value_ranges[m]["min"]
            ):
                value_ranges[m]["min"] = float(mn)
            if (
                value_ranges[m]["max"] is None
                or mx > value_ranges[m]["max"]
            ):
                value_ranges[m]["max"] = float(mx)

qc["unique_census_tracts"] = len(unique_store_ids)
qc["coverage_by_state"] = state_counts
qc["missing_values"] = {
    k: int(v) for k, v in missing_counts.items()
}
qc["ranges"] = value_ranges
qc["temporal_components_present"] = {
    c: (c in union_cols) for c in WEEK_COLS
}
qc["runtime_seconds"] = round(
    time.time() - T0, 2
)


# -----------------------------
# 5) Save QC report
# -----------------------------
print("\n[STEP 4/4] Writing QC report...")

with open(OUT_QC, "w", encoding="utf-8") as f:
    json.dump(qc, f, indent=2)

print("\n[DONE] Block A1 completed successfully.")
print(f" - Output data: {OUT_CONCAT}")
print(f" - QC report : {OUT_QC}")
print(f" - Runtime   : {qc['runtime_seconds']} seconds")

[INFO] Using input file:
 - /Users/rafaelalbuquerque/Desktop/Splited Mobility Data/Mobility_Data.csv.gz

[STEP 1/4] Reading header to build schema...
[INFO] Columns detected: 61

[STEP 2/4] Streaming ingestion and concatenation...


Reading chunks: 0chunk [00:00, ?chunk/s]

[INFO] TOTAL rows written: 34,949,440

[STEP 3/4] Computing QC statistics...


QC pass (chunks): 0chunk [00:00, ?chunk/s]


[STEP 4/4] Writing QC report...

[DONE] Block A1 completed successfully.
 - Output data: /Users/rafaelalbuquerque/Desktop/Output Pipeline A (Mobility)/A1/mobility_raw_aug2024_concat.csv.gz
 - QC report : /Users/rafaelalbuquerque/Desktop/Output Pipeline A (Mobility)/A1/mobility_qc_report.json
 - Runtime   : 3335.53 seconds


In [4]:
# ============================================================
# Pipeline A — Mobility Data
# Block A2 — Mobility Canonicalization
# ============================================================
# Conversation: Portuguese | Code/comments: English
# NEVER delete original columns. Only add.
# Create canonical column aliases + derived variables for the paper.
# Produce a data dictionary for Method (JM-proof).
# Real-time progress bars and logs are MANDATORY.
# ============================================================

import os
import re
import time
from datetime import datetime
from typing import Dict, List, Tuple

import numpy as np
import pandas as pd
from tqdm.auto import tqdm


# -----------------------------
# 0) Configuration
# -----------------------------
T0 = time.time()

# INPUT from A1
IN_A1 = os.path.expanduser(
    "~/Desktop/Output Pipeline A (Mobility)/A1/mobility_raw_aug2024_concat.csv.gz"
)

# OUTPUT folder for A2
OUT_DIR = os.path.expanduser(
    "~/Desktop/Output Pipeline A (Mobility)/A2"
)
os.makedirs(OUT_DIR, exist_ok=True)

OUT_CANON = os.path.join(OUT_DIR, "mobility_aug2024_canonical.csv.gz")
OUT_DICT = os.path.join(OUT_DIR, "mobility_data_dictionary.csv")

CHUNKSIZE = 500_000

# Core identifiers (original names)
ORIG_KEY_COL = "store_id"   # do not rename; create alias
ORIG_STATE_COL = "State"    # do not rename; create alias

# Metrics to log-transform (if present)
LOG_TARGETS = [
    "unique", "visits", "repeat_visitors", "new_visitors",
    "dwell_time_mins",
    "unique_q1", "unique_q2", "unique_q3", "unique_q4",
    "visits_q1", "visits_q2", "visits_q3", "visits_q4",
    "repeat_q1", "repeat_q2", "repeat_q3", "repeat_q4",
    "new_visitor_q1", "new_visitor_q2", "new_visitor_q3", "new_visitor_q4",
]

# Paper-first canonical alias map (ONLY ADD NEW COLUMNS, keep originals)
# Notes:
# - We do not drop or rename original columns.
# - We create canonical alias columns with consistent snake_case naming.
ALIAS_MAP = {
    "store_id": "ct_id",                 # census tract id (alias)
    "month_part": "month_ref",
    "State": "state_uf",
    "demographics_gender": "demo_gender",
    "demographics_age_range": "demo_age_range",
    "demographics_class": "demo_class",
}

# If your dataset sometimes has "Store_id" (capital S), we handle it safely
ALT_KEY_CANDIDATES = ["Store_id", "store_id", "Store_ID", "STORE_ID"]


# -----------------------------
# 1) Sanity checks + read schema
# -----------------------------
if not os.path.exists(IN_A1):
    raise FileNotFoundError(f"A1 input not found: {IN_A1}")

print("[INFO] Block A2 starting...")
print(" - Input:", IN_A1)
print(" - Output:", OUT_CANON)
print(" - Dict  :", OUT_DICT)

# Read header to get columns
cols = pd.read_csv(IN_A1, compression="gzip", nrows=0).columns.tolist()
col_set = set(cols)
print(f"[INFO] Columns detected in A1: {len(cols)}")


# -----------------------------
# 2) Resolve key column robustly (without renaming)
# -----------------------------
def resolve_key_column(columns: List[str]) -> str:
    for c in ALT_KEY_CANDIDATES:
        if c in columns:
            return c
    raise KeyError(
        f"Could not find a census-tract identifier column among: {ALT_KEY_CANDIDATES}"
    )

KEY_COL = resolve_key_column(cols)

# Ensure alias map uses the resolved key name
if KEY_COL != "store_id":
    # Keep map for 'store_id' but also map actual found key
    ALIAS_MAP[KEY_COL] = "ct_id"

print(f"[INFO] Using key column: {KEY_COL} (alias will be ct_id)")


# -----------------------------
# 3) Helper: safe numeric conversion
# -----------------------------
def to_numeric_series(s: pd.Series) -> pd.Series:
    # Fast, tolerant numeric conversion
    return pd.to_numeric(s, errors="coerce")


# -----------------------------
# 4) Streaming transform → write canonical dataset
# -----------------------------
print("\n[STEP 1/2] Writing canonical dataset with aliases + derived vars...")

if os.path.exists(OUT_CANON):
    os.remove(OUT_CANON)

write_header = True
rows_total = 0

# Keep a running schema record (original + added)
added_columns: List[str] = []

# Precompute which alias columns are applicable
applicable_aliases = {
    orig: alias for orig, alias in ALIAS_MAP.items() if orig in col_set
}
# Make sure key alias exists
if KEY_COL in col_set:
    applicable_aliases[KEY_COL] = "ct_id"

# Determine which log targets exist
existing_log_targets = [c for c in LOG_TARGETS if c in col_set]

# Derived columns we will add
# - ct_id: alias for tract id (string)
# - state_uf: alias for State
# - log1p_<metric>: log(1 + x) for selected metrics (numeric, coerced)
derived_log_cols = [f"log1p_{c}" for c in existing_log_targets]
added_columns.extend(sorted(set(applicable_aliases.values())))
added_columns.extend(derived_log_cols)

# Stream through dataset
for chunk in tqdm(
    pd.read_csv(IN_A1, compression="gzip", chunksize=CHUNKSIZE, low_memory=False),
    desc="Canonicalization pass (chunks)",
    unit="chunk"
):
    # 4.1) Add alias columns (do not drop originals)
    for orig, alias in applicable_aliases.items():
        if alias in chunk.columns:
            continue
        # Preserve IDs as string to avoid scientific notation / rounding issues
        if orig == KEY_COL:
            chunk[alias] = chunk[orig].astype("Int64", errors="ignore").astype(str)
        else:
            chunk[alias] = chunk[orig]

    # 4.2) Add log1p derived columns (paper-friendly)
    for c in existing_log_targets:
        outc = f"log1p_{c}"
        if outc in chunk.columns:
            continue
        x = to_numeric_series(chunk[c])
        # log1p is stable for zeros and small values; negative values become NaN
        x = x.where(x >= 0, np.nan)
        chunk[outc] = np.log1p(x)

    # 4.3) Add a lightweight QA flag column (paper-first, no cleaning)
    # Example: rows with missing tract id are flagged (not removed).
    if "qa_missing_ct_id" not in chunk.columns:
        chunk["qa_missing_ct_id"] = chunk["ct_id"].isna() if "ct_id" in chunk.columns else True

    # Write
    chunk.to_csv(
        OUT_CANON,
        mode="a",
        header=write_header,
        index=False,
        compression="gzip"
    )
    write_header = False

    rows_total += len(chunk)

tqdm.write(f"[INFO] Rows written to canonical dataset: {rows_total:,}")


# -----------------------------
# 5) Build data dictionary (Method-ready)
# -----------------------------
print("\n[STEP 2/2] Building data dictionary...")

# Reload header from canonical file to capture added columns
canon_cols = pd.read_csv(OUT_CANON, compression="gzip", nrows=0).columns.tolist()

# Type inference on a small sample (fast)
sample = pd.read_csv(OUT_CANON, compression="gzip", nrows=50_000, low_memory=False)

def infer_format(series: pd.Series) -> str:
    dt = series.dtype
    if pd.api.types.is_integer_dtype(dt):
        return "INTEGER"
    if pd.api.types.is_float_dtype(dt):
        return "NUMERIC"
    if pd.api.types.is_bool_dtype(dt):
        return "BOOLEAN"
    if pd.api.types.is_datetime64_any_dtype(dt):
        return "DATETIME"
    return "VARCHAR"

def label_for(col: str) -> str:
    # Human-readable labels (paper-first)
    if col == "ct_id":
        return "Census tract ID (alias)"
    if col == "state_uf":
        return "Brazilian state (alias)"
    if col.startswith("log1p_"):
        base = col.replace("log1p_", "")
        return f"Log(1 + {base})"
    if col.startswith("qa_"):
        return "Quality flag"
    # Fall back to original column name
    return col

def description_for(col: str) -> str:
    # Crisp, defensible descriptions
    if col == "ct_id":
        return "Alias for census tract code (original identifier preserved); stored as string to avoid rounding."
    if col.startswith("log1p_"):
        base = col.replace("log1p_", "")
        return f"Derived variable: natural log of (1 + {base}), with negatives treated as missing."
    if col == "qa_missing_ct_id":
        return "Flag indicating missing census tract ID (no row removal; used for auditing)."
    if col in applicable_aliases.values():
        return "Alias column created for standardized naming; original column retained unchanged."
    return ""

def comment_for(col: str) -> str:
    if col in col_set:
        return "Original column (unchanged)"
    return "Derived/alias column (added)"

dict_rows = []
for c in canon_cols:
    dict_rows.append({
        "variable_name": c,
        "variable_label": label_for(c),
        "format": infer_format(sample[c]) if c in sample.columns else "",
        "description": description_for(c),
        "comments": comment_for(c),
    })

dd = pd.DataFrame(dict_rows)

# Save dictionary
dd.to_csv(OUT_DICT, index=False)

print("\n[DONE] Block A2 completed successfully.")
print(f" - Canonical data: {OUT_CANON}")
print(f" - Data dictionary: {OUT_DICT}")
print(f" - Runtime: {round(time.time() - T0, 2)} seconds")
print(f"[INFO] Added columns (count={len(set(added_columns))}): {sorted(set(added_columns))[:12]}{' ...' if len(set(added_columns))>12 else ''}")

[INFO] Block A2 starting...
 - Input: /Users/rafaelalbuquerque/Desktop/Output Pipeline A (Mobility)/A1/mobility_raw_aug2024_concat.csv.gz
 - Output: /Users/rafaelalbuquerque/Desktop/Output Pipeline A (Mobility)/A2/mobility_aug2024_canonical.csv.gz
 - Dict  : /Users/rafaelalbuquerque/Desktop/Output Pipeline A (Mobility)/A2/mobility_data_dictionary.csv
[INFO] Columns detected in A1: 61
[INFO] Using key column: store_id (alias will be ct_id)

[STEP 1/2] Writing canonical dataset with aliases + derived vars...


Canonicalization pass (chunks): 0chunk [00:00, ?chunk/s]

[INFO] Rows written to canonical dataset: 34,949,440

[STEP 2/2] Building data dictionary...

[DONE] Block A2 completed successfully.
 - Canonical data: /Users/rafaelalbuquerque/Desktop/Output Pipeline A (Mobility)/A2/mobility_aug2024_canonical.csv.gz
 - Data dictionary: /Users/rafaelalbuquerque/Desktop/Output Pipeline A (Mobility)/A2/mobility_data_dictionary.csv
 - Runtime: 4012.9 seconds
[INFO] Added columns (count=26): ['ct_id', 'demo_age_range', 'demo_class', 'demo_gender', 'log1p_dwell_time_mins', 'log1p_new_visitor_q1', 'log1p_new_visitor_q2', 'log1p_new_visitor_q3', 'log1p_new_visitor_q4', 'log1p_new_visitors', 'log1p_repeat_q1', 'log1p_repeat_q2'] ...


In [5]:
# ============================================================
# Pipeline A — Mobility Data
# Block A3 — Mobility Aggregation to Base Spatial Unit (Census Tract)
# ============================================================
# Conversation: Portuguese | Code/comments: English
# NEVER delete original columns. Only add.
# Aggregate to one row per census tract (ct_id).
# Real-time progress bars and logs are MANDATORY.
# ============================================================

import os
import time
from typing import Dict, List

import numpy as np
import pandas as pd
from tqdm.auto import tqdm


# -----------------------------
# 0) Configuration
# -----------------------------
T0 = time.time()

IN_A2 = os.path.expanduser(
    "~/Desktop/Output Pipeline A (Mobility)/A2/mobility_aug2024_canonical.csv.gz"
)

OUT_DIR = os.path.expanduser(
    "~/Desktop/Output Pipeline A (Mobility)/A3"
)
os.makedirs(OUT_DIR, exist_ok=True)

OUT_TRACT = os.path.join(OUT_DIR, "mobility_by_tract_aug2024.csv.gz")
OUT_COV = os.path.join(OUT_DIR, "mobility_coverage_by_state.csv.gz")

CHUNKSIZE = 500_000

# Canonical columns created in A2
CT_COL = "ct_id"
STATE_COL = "state_uf"  # alias may exist; fallback handled below

# Core mobility fields (if present)
SUM_VARS = [
    "unique", "visits", "repeat_visitors", "new_visitors",
    "unique_q1", "unique_q2", "unique_q3", "unique_q4",
    "visits_q1", "visits_q2", "visits_q3", "visits_q4",
    "repeat_q1", "repeat_q2", "repeat_q3", "repeat_q4",
    "new_visitor_q1", "new_visitor_q2", "new_visitor_q3", "new_visitor_q4",
]
MEAN_VARS = ["dwell_time_mins"]

# Stability checks
WEEK_UNIQUE = ["unique_q1", "unique_q2", "unique_q3", "unique_q4"]
WEEK_VISITS = ["visits_q1", "visits_q2", "visits_q3", "visits_q4"]


# -----------------------------
# 1) Sanity checks
# -----------------------------
if not os.path.exists(IN_A2):
    raise FileNotFoundError(f"A2 input not found: {IN_A2}")

print("[INFO] Block A3 starting...")
print(" - Input:", IN_A2)
print(" - Output tract:", OUT_TRACT)
print(" - Output coverage:", OUT_COV)

cols = pd.read_csv(IN_A2, compression="gzip", nrows=0).columns.tolist()
col_set = set(cols)

if CT_COL not in col_set:
    raise KeyError(f"Required column not found in A2: {CT_COL}")

# Resolve state column (prefer alias; fallback to original)
if STATE_COL not in col_set:
    if "State" in col_set:
        STATE_COL = "State"
    else:
        STATE_COL = None

present_sum_vars = [c for c in SUM_VARS if c in col_set]
present_mean_vars = [c for c in MEAN_VARS if c in col_set]

print(f"[INFO] Sum vars present: {len(present_sum_vars)}")
print(f"[INFO] Mean vars present: {len(present_mean_vars)}")
print(f"[INFO] State column: {STATE_COL if STATE_COL else 'NOT AVAILABLE'}")


# -----------------------------
# 2) Streaming aggregation (dictionary-of-aggregates)
# -----------------------------
print("\n[STEP 1/3] Streaming aggregation to one row per ct_id...")

# Aggregator stores partial sums/counts per tract
agg_sum: Dict[str, Dict[str, float]] = {}
agg_cnt: Dict[str, Dict[str, int]] = {}  # for mean vars: counts
agg_state: Dict[str, str] = {}           # tract -> state (first non-null)

def get_or_init(d: Dict, key: str, init_factory):
    if key not in d:
        d[key] = init_factory()
    return d[key]

for chunk in tqdm(
    pd.read_csv(IN_A2, compression="gzip", chunksize=CHUNKSIZE, low_memory=False),
    desc="Aggregation pass (chunks)",
    unit="chunk"
):
    # Keep only necessary columns for aggregation to reduce overhead
    keep_cols = [CT_COL] + present_sum_vars + present_mean_vars
    if STATE_COL:
        keep_cols.append(STATE_COL)
    sub = chunk[keep_cols].copy()

    # Ensure ct_id is string for dict keys
    sub[CT_COL] = sub[CT_COL].astype(str)

    # Convert numeric vars safely
    for c in present_sum_vars + present_mean_vars:
        sub[c] = pd.to_numeric(sub[c], errors="coerce")

    # Group by ct_id inside chunk (fast)
    gb = sub.groupby(CT_COL, dropna=False)

    # Sum vars
    if present_sum_vars:
        sums = gb[present_sum_vars].sum(min_count=1)
    else:
        sums = None

    # Mean vars tracked via sum + count
    if present_mean_vars:
        mean_sums = gb[present_mean_vars].sum(min_count=1)
        mean_cnts = gb[present_mean_vars].count()
    else:
        mean_sums = mean_cnts = None

    # State (first non-null)
    if STATE_COL:
        st = gb[STATE_COL].agg(lambda x: x.dropna().iloc[0] if x.dropna().shape[0] else np.nan)

    # Merge into global dictionaries
    idx = sums.index if sums is not None else (mean_sums.index if mean_sums is not None else st.index)

    for ct in idx:
        # init containers
        sdict = get_or_init(agg_sum, ct, lambda: {})
        cdict = get_or_init(agg_cnt, ct, lambda: {})

        if sums is not None:
            row = sums.loc[ct]
            for c in present_sum_vars:
                v = row[c]
                if pd.notna(v):
                    sdict[c] = sdict.get(c, 0.0) + float(v)

        if mean_sums is not None:
            row_s = mean_sums.loc[ct]
            row_c = mean_cnts.loc[ct]
            for c in present_mean_vars:
                vs = row_s[c]
                vc = row_c[c]
                if pd.notna(vs) and vc > 0:
                    # store running sum and count
                    sdict[f"__sum_{c}"] = sdict.get(f"__sum_{c}", 0.0) + float(vs)
                    cdict[c] = cdict.get(c, 0) + int(vc)

        if STATE_COL:
            v = st.loc[ct]
            if ct not in agg_state and pd.notna(v):
                agg_state[ct] = str(v)

tqdm.write(f"[INFO] Tracts aggregated (unique ct_id): {len(agg_sum):,}")


# -----------------------------
# 3) Materialize aggregated dataframe + stability checks
# -----------------------------
print("\n[STEP 2/3] Materializing tract-level dataset + stability checks...")

rows = []
for ct, sdict in tqdm(agg_sum.items(), desc="Building output rows", unit="tract"):
    row = {CT_COL: ct}

    # Attach state if available
    if STATE_COL:
        row["state_uf"] = agg_state.get(ct, np.nan)

    # Add sums
    for c in present_sum_vars:
        row[c] = sdict.get(c, np.nan)

    # Compute means from running sums/counts
    for c in present_mean_vars:
        s = sdict.get(f"__sum_{c}", np.nan)
        n = agg_cnt.get(ct, {}).get(c, 0)
        row[c] = (s / n) if (pd.notna(s) and n > 0) else np.nan

    rows.append(row)

tract_df = pd.DataFrame(rows)

# Add stability checks (no deletion; add new columns)
def safe_cv(vals: pd.Series) -> float:
    x = pd.to_numeric(vals, errors="coerce")
    x = x.dropna()
    if len(x) < 2:
        return np.nan
    mu = x.mean()
    sd = x.std(ddof=1)
    return float(sd / mu) if (mu and mu > 0) else np.nan

# Unique stability
if all(c in tract_df.columns for c in WEEK_UNIQUE):
    tract_df["unique_week_mean"] = tract_df[WEEK_UNIQUE].mean(axis=1, skipna=True)
    tract_df["unique_week_cv"] = tract_df[WEEK_UNIQUE].apply(safe_cv, axis=1)

# Visits stability
if all(c in tract_df.columns for c in WEEK_VISITS):
    tract_df["visits_week_mean"] = tract_df[WEEK_VISITS].mean(axis=1, skipna=True)
    tract_df["visits_week_cv"] = tract_df[WEEK_VISITS].apply(safe_cv, axis=1)

# Sanity: if totals exist, compare against sum of weeks (add diagnostics)
if "unique" in tract_df.columns and all(c in tract_df.columns for c in WEEK_UNIQUE):
    tract_df["unique_weeks_sum"] = tract_df[WEEK_UNIQUE].sum(axis=1, skipna=True)
    tract_df["unique_total_minus_weeks"] = tract_df["unique"] - tract_df["unique_weeks_sum"]

if "visits" in tract_df.columns and all(c in tract_df.columns for c in WEEK_VISITS):
    tract_df["visits_weeks_sum"] = tract_df[WEEK_VISITS].sum(axis=1, skipna=True)
    tract_df["visits_total_minus_weeks"] = tract_df["visits"] - tract_df["visits_weeks_sum"]

print(f"[INFO] Tract-level rows: {len(tract_df):,}")
print(f"[INFO] Columns in tract dataset: {tract_df.shape[1]}")


# -----------------------------
# 4) Coverage by state (UF)
# -----------------------------
print("\n[STEP 3/3] Computing coverage by state...")

if "state_uf" in tract_df.columns:
    cov = tract_df.groupby("state_uf", dropna=False).agg(
        tracts=("ct_id", "nunique"),
        total_visits=("visits", "sum") if "visits" in tract_df.columns else ("ct_id", "size"),
        total_unique=("unique", "sum") if "unique" in tract_df.columns else ("ct_id", "size"),
    ).reset_index()

    # Sort for readability
    cov = cov.sort_values("tracts", ascending=False)
else:
    cov = pd.DataFrame({"state_uf": [], "tracts": [], "total_visits": [], "total_unique": []})

# -----------------------------
# 5) Save outputs
# -----------------------------
tract_df.to_csv(OUT_TRACT, index=False, compression="gzip")
cov.to_csv(OUT_COV, index=False, compression="gzip")

print("\n[DONE] Block A3 completed successfully.")
print(f" - mobility_by_tract_aug2024.csv.gz: {OUT_TRACT}")
print(f" - mobility_coverage_by_state.csv.gz: {OUT_COV}")
print(f" - Runtime: {round(time.time() - T0, 2)} seconds")

[INFO] Block A3 starting...
 - Input: /Users/rafaelalbuquerque/Desktop/Output Pipeline A (Mobility)/A2/mobility_aug2024_canonical.csv.gz
 - Output tract: /Users/rafaelalbuquerque/Desktop/Output Pipeline A (Mobility)/A3/mobility_by_tract_aug2024.csv.gz
 - Output coverage: /Users/rafaelalbuquerque/Desktop/Output Pipeline A (Mobility)/A3/mobility_coverage_by_state.csv.gz
[INFO] Sum vars present: 20
[INFO] Mean vars present: 1
[INFO] State column: NOT AVAILABLE

[STEP 1/3] Streaming aggregation to one row per ct_id...


Aggregation pass (chunks): 0chunk [00:00, ?chunk/s]

[INFO] Tracts aggregated (unique ct_id): 436,868

[STEP 2/3] Materializing tract-level dataset + stability checks...


Building output rows:   0%|          | 0/436868 [00:00<?, ?tract/s]

[INFO] Tract-level rows: 436,868
[INFO] Columns in tract dataset: 30

[STEP 3/3] Computing coverage by state...

[DONE] Block A3 completed successfully.
 - mobility_by_tract_aug2024.csv.gz: /Users/rafaelalbuquerque/Desktop/Output Pipeline A (Mobility)/A3/mobility_by_tract_aug2024.csv.gz
 - mobility_coverage_by_state.csv.gz: /Users/rafaelalbuquerque/Desktop/Output Pipeline A (Mobility)/A3/mobility_coverage_by_state.csv.gz
 - Runtime: 783.23 seconds


In [6]:
# ============================================================
# Pipeline A — Mobility Data
# Block A3.1 — Coverage by UF derived from IBGE tract code (ct_id)
# ============================================================
# Conversation: Portuguese | Code/comments: English
# NEVER delete original columns. Only add.
# Derive state_uf from IBGE census tract code prefix.
# Real-time progress bars and logs are MANDATORY.
# ============================================================

import os
import pandas as pd
from tqdm.auto import tqdm

IN_TRACT = os.path.expanduser(
    "~/Desktop/Output Pipeline A (Mobility)/A3/mobility_by_tract_aug2024.csv.gz"
)

OUT_COV = os.path.expanduser(
    "~/Desktop/Output Pipeline A (Mobility)/A3/mobility_coverage_by_state.csv.gz"
)

if not os.path.exists(IN_TRACT):
    raise FileNotFoundError(f"Tract-level file not found: {IN_TRACT}")

print("[INFO] Reading tract-level mobility file...")
df = pd.read_csv(IN_TRACT, compression="gzip")

if "ct_id" not in df.columns:
    raise KeyError("ct_id not found in tract-level dataset.")

# IBGE UF code mapping (2-digit)
UF_MAP = {
    "11": "RO", "12": "AC", "13": "AM", "14": "RR", "15": "PA", "16": "AP", "17": "TO",
    "21": "MA", "22": "PI", "23": "CE", "24": "RN", "25": "PB", "26": "PE", "27": "AL", "28": "SE", "29": "BA",
    "31": "MG", "32": "ES", "33": "RJ", "35": "SP",
    "41": "PR", "42": "SC", "43": "RS",
    "50": "MS", "51": "MT", "52": "GO", "53": "DF"
}

print("[INFO] Deriving state_uf from ct_id prefix...")

# Ensure ct_id is string, extract first 2 digits as UF code
df["ct_id"] = df["ct_id"].astype(str)
df["uf_code"] = df["ct_id"].str.slice(0, 2)
df["state_uf"] = df["uf_code"].map(UF_MAP)

# Coverage aggregation
agg_spec = {"tracts": ("ct_id", "nunique")}
if "visits" in df.columns:
    agg_spec["total_visits"] = ("visits", "sum")
if "unique" in df.columns:
    agg_spec["total_unique"] = ("unique", "sum")

cov = (
    df.groupby("state_uf", dropna=False)
      .agg(**agg_spec)
      .reset_index()
      .sort_values("tracts", ascending=False)
)

cov.to_csv(OUT_COV, index=False, compression="gzip")

print("[DONE] Coverage by state written successfully.")
print(" - Output:", OUT_COV)
print("\n[TOP 10] states by number of tracts:")
print(cov.head(10).to_string(index=False))

[INFO] Reading tract-level mobility file...
[INFO] Deriving state_uf from ct_id prefix...
[DONE] Coverage by state written successfully.
 - Output: /Users/rafaelalbuquerque/Desktop/Output Pipeline A (Mobility)/A3/mobility_coverage_by_state.csv.gz

[TOP 10] states by number of tracts:
state_uf  tracts  total_visits  total_unique
      SP  102062  1.159421e+11  3.107754e+10
      MG   49889  2.678671e+10  5.703628e+09
      RJ   41279  3.307024e+10  8.970878e+09
      BA   29569  2.382584e+10  5.108769e+09
      RS   24815  1.841142e+10  4.325322e+09
      PR   22875  1.861888e+10  4.388147e+09
      CE   19231  8.191152e+09  1.585258e+09
      PE   17242  2.705244e+10  6.168713e+09
      SC   15204  1.015107e+10  2.196125e+09
      PA   12868  9.880079e+09  1.811262e+09


In [7]:
# ============================================================
# Pipeline A — Mobility Data
# Block A4 — Mobility Infrastructure Index (MII)
# ============================================================
# Conversation: Portuguese | Code/comments: English
# NEVER delete original columns. Only add.
# Build MII at the census tract level using defensible components.
# Provide both PCA-based index and simple z-score composite as robustness.
# Real-time progress bars and logs are MANDATORY.
# ============================================================

import os
import json
import time
from datetime import datetime
from typing import List, Dict

import numpy as np
import pandas as pd
from tqdm.auto import tqdm

from sklearn.decomposition import PCA


# -----------------------------
# 0) Configuration
# -----------------------------
T0 = time.time()

IN_A3 = os.path.expanduser(
    "~/Desktop/Output Pipeline A (Mobility)/A3/mobility_by_tract_aug2024.csv.gz"
)

OUT_DIR = os.path.expanduser(
    "~/Desktop/Output Pipeline A (Mobility)/A4"
)
os.makedirs(OUT_DIR, exist_ok=True)

OUT_WITH_MII = os.path.join(OUT_DIR, "mobility_by_tract_aug2024_with_mii.csv.gz")
OUT_QC = os.path.join(OUT_DIR, "mii_qc_report.json")

CT_COL = "ct_id"

# MII components:
# - intensity: visits, unique
# - absorption: dwell_time_mins, repeat_visitors
# - renewal: new_visitors
# - stability: negative CV (lower CV = more stable = higher infrastructure)
BASE_COMPONENTS = [
    "visits",
    "unique",
    "repeat_visitors",
    "new_visitors",
    "dwell_time_mins",
    "visits_week_cv",
    "unique_week_cv",
]

# Parameters
EPS = 1e-9


# -----------------------------
# 1) Load tract-level dataset
# -----------------------------
if not os.path.exists(IN_A3):
    raise FileNotFoundError(f"A3 input not found: {IN_A3}")

print("[INFO] Block A4 starting...")
print(" - Input :", IN_A3)
print(" - Output:", OUT_WITH_MII)
print(" - QC    :", OUT_QC)

df = pd.read_csv(IN_A3, compression="gzip")
print(f"[INFO] Loaded rows: {len(df):,} | cols: {df.shape[1]:,}")

if CT_COL not in df.columns:
    raise KeyError(f"Required column not found: {CT_COL}")

# Ensure ID is string (audit-safe)
df[CT_COL] = df[CT_COL].astype(str)


# -----------------------------
# 2) Prepare component matrix (paper-first, defensible)
# -----------------------------
print("\n[STEP 1/4] Preparing MII components...")

present = [c for c in BASE_COMPONENTS if c in df.columns]
missing = [c for c in BASE_COMPONENTS if c not in df.columns]

if missing:
    print("[WARN] Missing MII components (will be ignored):", missing)

if len(present) < 4:
    raise ValueError(
        f"Too few components present to build a defensible index. Present: {present}"
    )

# Convert components to numeric
for c in present:
    df[c] = pd.to_numeric(df[c], errors="coerce")

# Log1p transform for flow variables (intensity/renewal/persistence) to reduce skew
# Note: dwell time is already an average; keep in level but z-score it.
log1p_vars = [c for c in ["visits", "unique", "repeat_visitors", "new_visitors"] if c in present]
for c in tqdm(log1p_vars, desc="Log1p transforms", unit="var"):
    df[f"log1p_{c}_A4"] = np.log1p(df[c].where(df[c] >= 0, np.nan))
present_augmented = present.copy()
for c in log1p_vars:
    present_augmented.remove(c)
    present_augmented.append(f"log1p_{c}_A4")

# Stability: convert CV to "stability score" (higher = more stable)
# We use negative CV and then z-score it; this is monotonic and transparent.
stability_vars = []
for cv in ["visits_week_cv", "unique_week_cv"]:
    if cv in present_augmented:
        df[f"stability_{cv}_A4"] = -1.0 * df[cv]
        stability_vars.append(f"stability_{cv}_A4")
        present_augmented.remove(cv)
        present_augmented.append(f"stability_{cv}_A4")

print("[INFO] Final component set used for indexing:")
for c in present_augmented:
    print(" -", c)


# -----------------------------
# 3) Standardize (z-scores) and compute two indices
# -----------------------------
print("\n[STEP 2/4] Standardizing components (z-scores) ...")

Z_COLS = []
for c in tqdm(present_augmented, desc="Z-scoring", unit="var"):
    x = pd.to_numeric(df[c], errors="coerce")
    mu = x.mean(skipna=True)
    sd = x.std(skipna=True, ddof=1)
    zcol = f"z_{c}"
    if sd is None or np.isnan(sd) or sd < EPS:
        df[zcol] = np.nan
    else:
        df[zcol] = (x - mu) / sd
    Z_COLS.append(zcol)

# Matrix for PCA (rows with any NaNs are dropped ONLY for PCA fit; we still keep all rows in output)
Z = df[Z_COLS]
rows_before = len(df)
Z_complete = Z.dropna()
rows_used = len(Z_complete)

print(f"[INFO] PCA fit will use complete cases: {rows_used:,} / {rows_before:,}")

if rows_used < 10_000:
    print("[WARN] Low number of complete cases for PCA (unexpected). PCA may be unstable.")

# 3.1) Simple composite (mean of z-scores)
df["mii_zmean_A4"] = Z.mean(axis=1, skipna=False)  # strict: if any component missing -> NaN

# 3.2) PCA-based index (1st component on complete cases)
pca = PCA(n_components=1, random_state=42)
pca.fit(Z_complete.values)

# Scores for all rows (NaN where missing)
df["mii_pca1_A4"] = np.nan
df.loc[Z_complete.index, "mii_pca1_A4"] = pca.transform(Z_complete.values).reshape(-1)

# Align sign so that higher values correspond to "more mobility infrastructure"
# We enforce positive correlation with log1p_visits (if present).
sign_anchor = "z_log1p_visits_A4" if "z_log1p_visits_A4" in df.columns else None
if sign_anchor and df.loc[Z_complete.index, sign_anchor].notna().any():
    corr = np.corrcoef(
        df.loc[Z_complete.index, "mii_pca1_A4"].values,
        df.loc[Z_complete.index, sign_anchor].values
    )[0, 1]
    if np.isfinite(corr) and corr < 0:
        df["mii_pca1_A4"] *= -1.0

# Optional: standardized versions of indices for interpretability
for idx_col in ["mii_zmean_A4", "mii_pca1_A4"]:
    x = df[idx_col]
    mu = x.mean(skipna=True)
    sd = x.std(skipna=True, ddof=1)
    df[f"z_{idx_col}"] = (x - mu) / sd if (sd and np.isfinite(sd) and sd > EPS) else np.nan


# -----------------------------
# 4) QC report (audit-ready)
# -----------------------------
print("\n[STEP 3/4] Building QC report...")

qc: Dict = {
    "block": "A4",
    "pipeline": "A",
    "created_at": datetime.now().isoformat(),
    "input_file": os.path.basename(IN_A3),
    "rows_total": int(len(df)),
    "component_base_requested": BASE_COMPONENTS,
    "components_present": present,
    "components_used_final": present_augmented,
    "z_columns": Z_COLS,
    "pca_complete_cases_used": int(rows_used),
    "pca_explained_variance_ratio_pc1": float(pca.explained_variance_ratio_[0]),
    "pca_loadings_pc1": {
        Z_COLS[i]: float(pca.components_[0][i]) for i in range(len(Z_COLS))
    },
    "missingness": {
        "mii_zmean_A4_na": int(df["mii_zmean_A4"].isna().sum()),
        "mii_pca1_A4_na": int(df["mii_pca1_A4"].isna().sum()),
    },
    "runtime_seconds": None,
}

# Distribution sanity checks (no plots, just summary)
def summarize(series: pd.Series) -> Dict[str, float]:
    s = pd.to_numeric(series, errors="coerce").dropna()
    if s.empty:
        return {}
    return {
        "mean": float(s.mean()),
        "std": float(s.std(ddof=1)),
        "p01": float(s.quantile(0.01)),
        "p05": float(s.quantile(0.05)),
        "p50": float(s.quantile(0.50)),
        "p95": float(s.quantile(0.95)),
        "p99": float(s.quantile(0.99)),
        "min": float(s.min()),
        "max": float(s.max()),
        "n": int(s.shape[0]),
    }

qc["index_summaries"] = {
    "mii_zmean_A4": summarize(df["mii_zmean_A4"]),
    "mii_pca1_A4": summarize(df["mii_pca1_A4"]),
    "z_mii_zmean_A4": summarize(df["z_mii_zmean_A4"]),
    "z_mii_pca1_A4": summarize(df["z_mii_pca1_A4"]),
}

# Correlations between indices (complete overlap only)
overlap = df[["mii_zmean_A4", "mii_pca1_A4"]].dropna()
if len(overlap) > 0:
    qc["corr_mii_zmean_vs_pca1"] = float(overlap.corr().iloc[0, 1])
else:
    qc["corr_mii_zmean_vs_pca1"] = None


# -----------------------------
# 5) Write outputs
# -----------------------------
print("\n[STEP 4/4] Writing outputs...")

df.to_csv(OUT_WITH_MII, index=False, compression="gzip")

qc["runtime_seconds"] = round(time.time() - T0, 2)
with open(OUT_QC, "w", encoding="utf-8") as f:
    json.dump(qc, f, indent=2)

print("\n[DONE] Block A4 completed successfully.")
print(f" - Data with MII: {OUT_WITH_MII}")
print(f" - QC report   : {OUT_QC}")
print(f" - Runtime     : {qc['runtime_seconds']} seconds")
print(f"[INFO] PCA EVR (PC1): {qc['pca_explained_variance_ratio_pc1']:.4f}")
if qc.get("corr_mii_zmean_vs_pca1") is not None:
    print(f"[INFO] Corr(z-mean, PCA1): {qc['corr_mii_zmean_vs_pca1']:.4f}")

[INFO] Block A4 starting...
 - Input : /Users/rafaelalbuquerque/Desktop/Output Pipeline A (Mobility)/A3/mobility_by_tract_aug2024.csv.gz
 - Output: /Users/rafaelalbuquerque/Desktop/Output Pipeline A (Mobility)/A4/mobility_by_tract_aug2024_with_mii.csv.gz
 - QC    : /Users/rafaelalbuquerque/Desktop/Output Pipeline A (Mobility)/A4/mii_qc_report.json
[INFO] Loaded rows: 436,868 | cols: 30

[STEP 1/4] Preparing MII components...


Log1p transforms:   0%|          | 0/4 [00:00<?, ?var/s]

[INFO] Final component set used for indexing:
 - dwell_time_mins
 - log1p_visits_A4
 - log1p_unique_A4
 - log1p_repeat_visitors_A4
 - log1p_new_visitors_A4
 - stability_visits_week_cv_A4
 - stability_unique_week_cv_A4

[STEP 2/4] Standardizing components (z-scores) ...


Z-scoring:   0%|          | 0/7 [00:00<?, ?var/s]

[INFO] PCA fit will use complete cases: 426,204 / 436,868

[STEP 3/4] Building QC report...

[STEP 4/4] Writing outputs...

[DONE] Block A4 completed successfully.
 - Data with MII: /Users/rafaelalbuquerque/Desktop/Output Pipeline A (Mobility)/A4/mobility_by_tract_aug2024_with_mii.csv.gz
 - QC report   : /Users/rafaelalbuquerque/Desktop/Output Pipeline A (Mobility)/A4/mii_qc_report.json
 - Runtime     : 32.46 seconds
[INFO] PCA EVR (PC1): 0.6165
[INFO] Corr(z-mean, PCA1): 0.9516


In [1]:
# ============================================================
# PIPELINE A — BLOCK A4b
# Mobility Infrastructure Index (MII) — Materialization
# ============================================================

import pandas as pd
import numpy as np
import json
import time

INPUT = (
    "/Users/rafaelalbuquerque/Desktop/"
    "Output Pipeline A (Mobility)/A4/"
    "mobility_by_tract_aug2024_with_mii.csv.gz"
)

OUTPUT = (
    "/Users/rafaelalbuquerque/Desktop/"
    "Output Pipeline A (Mobility)/A4/"
    "mobility_by_tract_aug2024_with_mii_FINAL.csv.gz"
)

QC_REPORT = (
    "/Users/rafaelalbuquerque/Desktop/"
    "Output Pipeline A (Mobility)/A4/"
    "mii_materialization_qc.json"
)

start = time.time()

print("[INFO] Block A4b starting — MII materialization")
print(f" - Input : {INPUT}")
print(f" - Output: {OUTPUT}")

df = pd.read_csv(INPUT)

# ------------------------------------------------------------
# COMPONENTS USED IN A4 (explicit & frozen)
# ------------------------------------------------------------

COMPONENTS = [
    "dwell_time_mins",
    "log1p_visits_A4",
    "log1p_unique_A4",
    "log1p_repeat_visitors_A4",
    "log1p_new_visitors_A4",
    "stability_visits_week_cv_A4",
    "stability_unique_week_cv_A4"
]

missing = [c for c in COMPONENTS if c not in df.columns]
if missing:
    raise ValueError(f"Missing MII components: {missing}")

# ------------------------------------------------------------
# STANDARDIZATION (Z-SCORES)
# ------------------------------------------------------------

z = df[COMPONENTS].apply(
    lambda x: (x - x.mean()) / x.std(ddof=0)
)

# ------------------------------------------------------------
# MII CONSTRUCTION (Z-MEAN)
# ------------------------------------------------------------

df["mii"] = z.mean(axis=1)

# ------------------------------------------------------------
# QC
# ------------------------------------------------------------

qc = {
    "n_rows": int(len(df)),
    "components": COMPONENTS,
    "method": "z-score mean (validated against PCA in Block A4)",
    "mii_mean": float(df["mii"].mean()),
    "mii_std": float(df["mii"].std()),
}

df.to_csv(OUTPUT, index=False, compression="gzip")

with open(QC_REPORT, "w") as f:
    json.dump(qc, f, indent=2)

elapsed = time.time() - start

print("[DONE] Block A4b completed successfully.")
print(f" - Output saved: {OUTPUT}")
print(f" - Runtime     : {elapsed:.2f}s")

[INFO] Block A4b starting — MII materialization
 - Input : /Users/rafaelalbuquerque/Desktop/Output Pipeline A (Mobility)/A4/mobility_by_tract_aug2024_with_mii.csv.gz
 - Output: /Users/rafaelalbuquerque/Desktop/Output Pipeline A (Mobility)/A4/mobility_by_tract_aug2024_with_mii_FINAL.csv.gz
[DONE] Block A4b completed successfully.
 - Output saved: /Users/rafaelalbuquerque/Desktop/Output Pipeline A (Mobility)/A4/mobility_by_tract_aug2024_with_mii_FINAL.csv.gz
 - Runtime     : 34.14s
