<a href="https://colab.research.google.com/github/NigelWilliamUOP/vibe-coding/blob/main/Passport_Bro_data_preprocessing_01_ingest_validate_QA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 01 — Data ingest & QA (r/thepassportbros)
This notebook ingests the CSV export, performs reproducible QA checks, and writes cleaned Parquet + a QA report.

**Inputs**: `passport bro.csv` (upload or mount)

**Outputs** (default):
- `artefacts/raw.parquet`
- `artefacts/qa_report.json`

No manual qualitative steps: all labelling/QA is programmatic.


In [1]:
# --- Install deps (Colab-safe) ---
# Pandas is usually preinstalled; we install/upgrade only what's needed for Parquet + schema validation.
!pip -q install -U pyarrow pandera

import sys, platform, json, re, math, hashlib
from pathlib import Path
import pandas as pd
import numpy as np

import pyarrow  # noqa: F401
import pandera as pa
from pandera import Column, Check, DataFrameSchema

print("Python:", sys.version.split()[0])
print("Platform:", platform.platform())
print("pandas:", pd.__version__)
print("pyarrow:", __import__("pyarrow").__version__)
print("pandera:", pa.__version__)


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m47.7/47.7 MB[0m [31m17.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m295.0/295.0 kB[0m [31m15.0 MB/s[0m eta [36m0:00:00[0m
[?25hPython: 3.12.12
Platform: Linux-6.6.105+-x86_64-with-glibc2.35
pandas: 2.2.2
pyarrow: 22.0.0
pandera: 0.28.1


## 1) Set paths and load data
If you’re using Colab, either upload the CSV or mount Google Drive.
This notebook will try common locations automatically.

In [3]:
# --- Paths / config ---
# Adjust as needed. In Colab, your working directory is typically /content.
DEFAULT_CANDIDATES = [
    "/content/passport bro.csv",
    "/content/passport_bro.csv",
    "/content/data/passport bro.csv",
    "/content/data/passport_bro.csv",
]

ARTEFACT_DIR = Path("/content/artefacts")
ARTEFACT_DIR.mkdir(parents=True, exist_ok=True)

def find_input_csv(candidates=DEFAULT_CANDIDATES) -> Path:
    for p in candidates:
        if Path(p).exists():
            return Path(p)
    # Fallback: search a bit
    hits = list(Path("/content").rglob("passport*bro*.csv"))
    if hits:
        return hits[0]
    raise FileNotFoundError(
        "Could not find the CSV. Upload it (Files pane) or set INPUT_CSV_PATH manually."
    )

INPUT_CSV_PATH = find_input_csv()
INPUT_CSV_PATH


PosixPath('/content/passport bro.csv')

In [4]:
# --- Optional: upload from local machine (run this if needed) ---
# from google.colab import files
# uploaded = files.upload()
# INPUT_CSV_PATH = Path(next(iter(uploaded.keys())))
# INPUT_CSV_PATH


In [5]:
# --- Helper: compute SHA-256 for provenance ---
def sha256_file(path: Path, chunk_size: int = 1_048_576) -> str:
    h = hashlib.sha256()
    with path.open("rb") as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            h.update(chunk)
    return h.hexdigest()

input_sha256 = sha256_file(INPUT_CSV_PATH)
input_sha256


'6f3183d17bfc7fde6c6927afab62cfafedc80863660ffca663c8d052f4c5375c'

In [6]:
# --- Load CSV ---
# Use low_memory=False to reduce mixed-type inference surprises; parse dates explicitly afterward.
df = pd.read_csv(INPUT_CSV_PATH, low_memory=False)

print("Rows:", len(df))
print("Cols:", df.shape[1])
df.head(3)


Rows: 76800
Cols: 26


Unnamed: 0,id,date,author,title,text,comment_on,type,score,upvote_ratio,url,...,Insult,Profanity,Threat,textblob_polarity,vader_sentiment_compound,RuSentiment_positive,RuSentiment_neutral,RuSentiment_negative,language,cluster_label
0,1gt7gx8,11-17-2024 06:24:46,,Dating in the West in 2024,,,Submission,6171,0.96,https://i.redd.it/8vwl3xqxne1e1.jpeg,...,0.000196,0.000343,7e-05,0.0,0.0,,,,en,1: Outliers
1,1i5zk4y,01-20-2025 20:04:50,IamDreamzzz,men with an asian wife seeing a latina up close,,,Submission,4324,0.93,https://i.redd.it/ph5kyu3lg7ee1.jpeg,...,0.001662,0.005809,0.001816,0.0,0.0,,,,en,1: Outliers
2,1ktcez8,05-23-2025 06:07:41,VdelaM,Interesting thing to think about,,,Submission,3646,0.92,https://i.redd.it/e8sfwemc3h2f1.jpeg,...,0.000284,0.000355,5.6e-05,0.5,0.4,,,,en,1: Outliers


## 2) Normalise types and derive standard fields

In [7]:
# --- Column sanity (expected columns) ---
expected_cols = [
    "id","comment_on","type","title","author","text","score","url","selftext",
    "subreddit","date","upvote_ratio","language","cluster_label","cluster_prob",
    "Toxicity","Severe Toxicity","Obscene","Threat","Insult","Identity Attack",
    "vader_sentiment_compound","textblob_polarity","num_comments"
]
missing = [c for c in expected_cols if c not in df.columns]
extra = [c for c in df.columns if c not in expected_cols]

print("Missing expected:", missing)
print("Extra cols:", extra[:20], "..." if len(extra) > 20 else "")


Missing expected: ['selftext', 'subreddit', 'cluster_prob', 'Obscene', 'num_comments']
Extra cols: ['permalink', 'user_flair', 'submission_flair', 'Profanity', 'RuSentiment_positive', 'RuSentiment_neutral', 'RuSentiment_negative'] 


In [8]:
# --- Normalise core identifier columns as strings ---
for c in ["id","comment_on","author","type","subreddit","language","cluster_label","url"]:
    if c in df.columns:
        df[c] = df[c].astype("string")

# --- Parse date (data appears as 'MM-DD-YYYY HH:MM:SS'; fall back to infer) ---
if "date" in df.columns:
    dt = pd.to_datetime(df["date"], format="%m-%d-%Y %H:%M:%S", errors="coerce")
    # fallback for any non-matching rows
    if dt.isna().any():
        dt2 = pd.to_datetime(df.loc[dt.isna(), "date"], errors="coerce")
        dt.loc[dt.isna()] = dt2
    df["date_dt"] = dt
else:
    df["date_dt"] = pd.NaT

# --- Numeric columns ---
num_float_cols = [
    "upvote_ratio","cluster_prob",
    "Toxicity","Severe Toxicity","Obscene","Threat","Insult","Identity Attack",
    "vader_sentiment_compound","textblob_polarity"
]
for c in num_float_cols:
    if c in df.columns:
        df[c] = pd.to_numeric(df[c], errors="coerce")

num_int_cols = ["score","num_comments"]
for c in num_int_cols:
    if c in df.columns:
        df[c] = pd.to_numeric(df[c], errors="coerce").astype("Int64")

# --- Derivations used throughout the pipeline ---
df["title"] = df.get("title", pd.Series(dtype="string")).fillna("").astype("string")
df["text"] = df.get("text", pd.Series(dtype="string")).fillna("").astype("string")
df["selftext"] = df.get("selftext", pd.Series(dtype="string")).fillna("").astype("string")

df["text_all"] = (df["title"].fillna("") + "\n" + df["text"].fillna("")).astype("string")
df["text_len"] = df["text_all"].str.len().astype("Int64")

df["month"] = df["date_dt"].dt.to_period("M").astype("string")
df["week"]  = df["date_dt"].dt.to_period("W").astype("string")
df["is_submission"] = (df["type"].str.lower() == "submission").fillna(False)
df["is_comment"] = (df["type"].str.lower() == "comment").fillna(False)
df["is_reply"] = (df["type"].str.lower() == "reply").fillna(False)

# --- Stable author hash for non-identifying outputs ---
HASH_SALT = "passportbros_v1"
def hash_author(a):
    if pd.isna(a) or a is None:
        return pd.NA
    return hashlib.sha256((HASH_SALT + str(a)).encode("utf-8")).hexdigest()

df["author_hash"] = df["author"].apply(hash_author).astype("string")

df[["id","comment_on","type","author","author_hash","date_dt","month","text_len"]].head(3)


Unnamed: 0,id,comment_on,type,author,author_hash,date_dt,month,text_len
0,1gt7gx8,,Submission,,,2024-11-17 06:24:46,2024-11,27
1,1i5zk4y,,Submission,IamDreamzzz,c8febdf20a50dfcfe675d1542f69dbac5cd523e4d32848...,2025-01-20 20:04:50,2025-01,48
2,1ktcez8,,Submission,VdelaM,260ea31d86958a47e0a69608522741ee3e681937de9e52...,2025-05-23 06:07:41,2025-05,33


## 3) QA checks
The intent is to catch errors early and emit a machine-readable report (`qa_report.json`).

In [9]:
# --- QA helpers ---
def pct(n, d):
    return float(n) / float(d) if d else float("nan")

def range_violations(series, lo=0.0, hi=1.0):
    s = series.dropna()
    if s.empty:
        return {"count": 0, "examples": []}
    bad = s[(s < lo) | (s > hi)]
    return {"count": int(bad.shape[0]), "examples": bad.head(5).tolist()}

qa = {
    "input_csv": str(INPUT_CSV_PATH),
    "input_sha256": input_sha256,
    "generated_at_utc": pd.Timestamp.utcnow().isoformat(),
    "rows": int(len(df)),
    "cols": int(df.shape[1]),
    "columns": list(df.columns),
    "missing_expected_columns": missing,
    "extra_columns": extra,
    "warnings": [],
    "checks": {}
}

# IDs
qa["checks"]["id_unique"] = {
    "unique_ids": int(df["id"].nunique(dropna=True)),
    "duplicate_id_rows": int(df["id"].duplicated().sum())
}
if qa["checks"]["id_unique"]["duplicate_id_rows"] > 0:
    qa["warnings"].append("Duplicate ids detected: check upstream extraction.")

# Parent / orphan rate (excluding submissions where parent is expected to be NA)
if "comment_on" in df.columns:
    parent_known = df["comment_on"].notna()
    parent_exists = df.loc[parent_known, "comment_on"].isin(df["id"])
    orphan_count = int((~parent_exists).sum())
    qa["checks"]["parent_resolution"] = {
        "parent_known_rows": int(parent_known.sum()),
        "orphan_rows": orphan_count,
        "orphan_rate": pct(orphan_count, int(parent_known.sum()))
    }
    if orphan_count > 0:
        qa["warnings"].append("Some comment_on values do not resolve to an id (orphans).")

# Date parsing
bad_dates = int(df["date_dt"].isna().sum())
qa["checks"]["date_parse"] = {
    "parsed_ok": int(len(df) - bad_dates),
    "parsed_fail": bad_dates,
    "fail_rate": pct(bad_dates, len(df))
}
if bad_dates > 0:
    qa["warnings"].append("Some dates failed to parse; downstream time slicing may exclude these rows.")

# Type distribution
qa["checks"]["type_distribution"] = df["type"].value_counts(dropna=False).head(20).to_dict()

# Language share
if "language" in df.columns:
    lang_counts = df["language"].value_counts(dropna=False).head(20)
    qa["checks"]["language_distribution_top20"] = lang_counts.to_dict()
    en_share = pct(int((df["language"] == "en").sum()), len(df))
    qa["checks"]["english_share"] = en_share

# Missingness
for c in ["author","text","title","score","upvote_ratio","cluster_label"]:
    if c in df.columns:
        qa["checks"][f"missing_{c}"] = {
            "missing": int(df[c].isna().sum()),
            "missing_rate": pct(int(df[c].isna().sum()), len(df))
        }

# Range checks for probabilities/scores
prob_cols = ["upvote_ratio","cluster_prob","Toxicity","Severe Toxicity","Obscene","Threat","Insult","Identity Attack"]
for c in prob_cols:
    if c in df.columns:
        qa["checks"][f"range_{c}"] = range_violations(df[c], 0.0, 1.0)

# Sentiment ranges are model-dependent; record observed min/max
if "vader_sentiment_compound" in df.columns:
    s = df["vader_sentiment_compound"].dropna()
    qa["checks"]["vader_range"] = {
        "min": float(s.min()) if not s.empty else None,
        "max": float(s.max()) if not s.empty else None
    }
if "textblob_polarity" in df.columns:
    s = df["textblob_polarity"].dropna()
    qa["checks"]["textblob_range"] = {
        "min": float(s.min()) if not s.empty else None,
        "max": float(s.max()) if not s.empty else None
    }

# Quick schema validation (pandera): validate only the most critical constraints
schema = DataFrameSchema({
    "id": Column(str, nullable=False),
    "comment_on": Column(object, nullable=True),  # parent can be null for submissions
    "type": Column(str, nullable=True),
    "date_dt": Column(pa.DateTime, nullable=True),
    "upvote_ratio": Column(float, Check.in_range(0.0, 1.0, inclusive=True), nullable=True),
    "text_len": Column(object, nullable=True),
})

try:
    _ = schema.validate(df, lazy=True)
    qa["checks"]["pandera_schema"] = {"status": "pass"}
except Exception as e:
    qa["checks"]["pandera_schema"] = {"status": "fail", "error": str(e)[:2000]}
    qa["warnings"].append("Pandera schema validation failed (see qa_report.json for details).")

qa


top-level pandera module will be **removed in a future version of pandera**.
If you're using pandera to validate pandas objects, we highly recommend updating
your import:

```
# old import
import pandera as pa

# new import
import pandera.pandas as pa
```

If you're using pandera to validate objects from other compatible libraries
like pyspark or polars, see the supported libraries section of the documentation
for more information on how to import pandera:

https://pandera.readthedocs.io/en/stable/supported_libraries.html


```
```



{'input_csv': '/content/passport bro.csv',
 'input_sha256': '6f3183d17bfc7fde6c6927afab62cfafedc80863660ffca663c8d052f4c5375c',
 'generated_at_utc': '2026-01-15T10:04:07.946071+00:00',
 'rows': 76800,
 'cols': 36,
 'columns': ['id',
  'date',
  'author',
  'title',
  'text',
  'comment_on',
  'type',
  'score',
  'upvote_ratio',
  'url',
  'permalink',
  'user_flair',
  'submission_flair',
  'Toxicity',
  'Severe Toxicity',
  'Identity Attack',
  'Insult',
  'Profanity',
  'Threat',
  'textblob_polarity',
  'vader_sentiment_compound',
  'RuSentiment_positive',
  'RuSentiment_neutral',
  'RuSentiment_negative',
  'language',
  'cluster_label',
  'date_dt',
  'selftext',
  'text_all',
  'text_len',
  'month',
  'week',
  'is_submission',
  'is_comment',
  'is_reply',
  'author_hash'],
 'missing_expected_columns': ['selftext',
  'subreddit',
  'cluster_prob',
  'Obscene',
  'num_comments'],
 'extra_columns': ['permalink',
  'user_flair',
  'submission_flair',
  'Profanity',
  'RuSentiment

In [10]:
# Display a compact QA summary
summary = {
    "rows": qa["rows"],
    "duplicate_id_rows": qa["checks"]["id_unique"]["duplicate_id_rows"],
    "orphan_rate": qa["checks"].get("parent_resolution", {}).get("orphan_rate", None),
    "date_fail_rate": qa["checks"]["date_parse"]["fail_rate"],
    "english_share": qa["checks"].get("english_share", None),
    "warnings": qa["warnings"][:10],
}
pd.Series(summary)


Unnamed: 0,0
rows,76800
duplicate_id_rows,0
orphan_rate,0.0
date_fail_rate,0.0
english_share,0.88638
warnings,[Pandera schema validation failed (see qa_repo...


## 4) Write artefacts (Parquet + QA JSON)

In [11]:
# --- Write outputs ---
RAW_PARQUET_PATH = ARTEFACT_DIR / "raw.parquet"
QA_JSON_PATH = ARTEFACT_DIR / "qa_report.json"

# Parquet is faster and smaller than CSV for repeated reads.
df.to_parquet(RAW_PARQUET_PATH, engine="pyarrow", compression="snappy", index=False)

with QA_JSON_PATH.open("w", encoding="utf-8") as f:
    json.dump(qa, f, indent=2)

print("Wrote:", RAW_PARQUET_PATH)
print("Wrote:", QA_JSON_PATH)
print("Artefact dir contents:")
for p in sorted(ARTEFACT_DIR.glob("*")):
    print("-", p.name, f"({p.stat().st_size/1e6:.2f} MB)")


Wrote: /content/artefacts/raw.parquet
Wrote: /content/artefacts/qa_report.json
Artefact dir contents:
- qa_report.json (0.01 MB)
- raw.parquet (29.19 MB)


In [12]:
# --- Optional: download artefacts from Colab ---
# from google.colab import files
# files.download(str(RAW_PARQUET_PATH))
# files.download(str(QA_JSON_PATH))


## 5) Next notebook
Proceed to `02_threads_structure.ipynb` to build thread roots, depths, and structural features from `raw.parquet`.