# 02 — Schema Validation & Missingness Profiling

## Objective
Convert the raw ClinicalTrials.gov NDJSON download into a **flat, analysis-ready table** containing only decision-relevant fields, then generate:
- shape + duplicate checks
- missingness profile
- quick distributions (phase, status, sponsor class)

## Outputs
- `data/interim/ctgov_flat.parquet` (full flattened dataset)
- `data/interim/schema_missingness_profile.csv` (column types + missingness + example)
- `data/interim/quick_counts_phase_status.csv` (for quick QA)

In [1]:
import glob
import json
from pathlib import Path
from typing import Any, Dict, List

import pandas as pd

## Locate newest raw download

This notebook automatically selects the most recent NDJSON file in `data/raw/`.

In [2]:
REPO_ROOT = Path("..").resolve()
DATA_RAW = REPO_ROOT / "data" / "raw"
DATA_INTERIM = REPO_ROOT / "data" / "interim"
DATA_INTERIM.mkdir(parents=True, exist_ok=True)

def newest_raw_ndjson() -> Path:
    files = sorted(glob.glob(str(DATA_RAW / "ctgov_studies_*.ndjson")))
    if not files:
        raise FileNotFoundError(f"No ctgov_studies_*.ndjson found in {DATA_RAW}")
    return Path(files[-1])

raw_path = newest_raw_ndjson()
raw_path

PosixPath('/Users/saturnine/Desktop/trialpulse/data/raw/ctgov_studies_20260210T040533Z_3c62edb50608b1de.ndjson')

## Helper functions

We use defensive accessors because API fields can be missing or structurally inconsistent across studies.

In [3]:
def safe_get(d: Dict, path: List[str], default=None):
    cur: Any = d
    for p in path:
        if not isinstance(cur, dict) or p not in cur:
            return default
        cur = cur[p]
    return cur

def to_list(x):
    if x is None:
        return []
    return x if isinstance(x, list) else [x]

def join_clean(items, sep="|"):
    out = []
    for x in to_list(items):
        if isinstance(x, str) and x.strip():
            out.append(x.strip())
    return sep.join(out)

def join_from_dict_list(items, key, sep="|"):
    out = []
    for it in to_list(items):
        if isinstance(it, dict):
            v = it.get(key)
            if isinstance(v, str) and v.strip():
                out.append(v.strip())
    return sep.join(out)

def first_date(module: Dict, field: str):
    """
    Expects field like startDateStruct / completionDateStruct.
    Returns module[field]["date"] if present, else None.
    """
    if not isinstance(module, dict):
        return None
    ds = module.get(field)
    if isinstance(ds, dict):
        return ds.get("date")
    return None

## Flatten one study into one row

Fields extracted are chosen for clinical operations analytics:
- identifiers + titles
- phase + status + why stopped
- start/primary completion/completion dates
- enrollment count/type
- sponsor name/class
- conditions + intervention names/types
- geographic coverage (countries/states)
- a few design signals when available (allocation/masking, etc.)

In [4]:
def flatten_study(study: Dict) -> Dict:
    ps = safe_get(study, ["protocolSection"], {}) or {}

    id_mod = ps.get("identificationModule", {}) or {}
    status_mod = ps.get("statusModule", {}) or {}
    sponsor_mod = ps.get("sponsorCollaboratorsModule", {}) or {}
    design_mod = ps.get("designModule", {}) or {}
    cond_mod = ps.get("conditionsModule", {}) or {}
    arms_mod = ps.get("armsInterventionsModule", {}) or {}
    loc_mod = ps.get("contactsLocationsModule", {}) or {}

    nct_id = id_mod.get("nctId")
    brief_title = id_mod.get("briefTitle")
    official_title = id_mod.get("officialTitle")

    overall_status = status_mod.get("overallStatus")
    why_stopped = status_mod.get("whyStopped")

    start_date = first_date(status_mod, "startDateStruct")
    primary_completion_date = first_date(status_mod, "primaryCompletionDateStruct")
    completion_date = first_date(status_mod, "completionDateStruct")

    enroll_info = status_mod.get("enrollmentInfo", {}) or {}
    enrollment_count = enroll_info.get("count")
    enrollment_type = enroll_info.get("type")  # ACTUAL / ANTICIPATED when present

    phases = design_mod.get("phases")
    phases_str = ",".join(phases) if isinstance(phases, list) else phases

    # Design signals (availability varies)
    study_type = design_mod.get("studyType") or ps.get("designModule", {}).get("studyType")
    allocation = design_mod.get("allocation")
    intervention_model = design_mod.get("interventionModel")
    masking = design_mod.get("masking")
    primary_purpose = design_mod.get("primaryPurpose")

    lead = sponsor_mod.get("leadSponsor", {}) or {}
    lead_sponsor_name = lead.get("name")
    lead_sponsor_class = lead.get("class")

    conditions = cond_mod.get("conditions", [])
    interventions = arms_mod.get("interventions", [])

    intervention_types = join_from_dict_list(interventions, "type")
    intervention_names = join_from_dict_list(interventions, "name")

    locations = loc_mod.get("locations", [])
    countries = []
    states = []
    for loc in to_list(locations):
        if not isinstance(loc, dict):
            continue
        c = loc.get("country")
        s = loc.get("state")
        if isinstance(c, str) and c.strip():
            countries.append(c.strip())
        if isinstance(s, str) and s.strip():
            states.append(s.strip())

    return {
        "nct_id": nct_id,
        "brief_title": brief_title,
        "official_title": official_title,

        "overall_status": overall_status,
        "why_stopped": why_stopped,

        "start_date": start_date,
        "primary_completion_date": primary_completion_date,
        "completion_date": completion_date,

        "enrollment_count": enrollment_count,
        "enrollment_type": enrollment_type,

        "phases_raw": phases_str,

        "lead_sponsor_name": lead_sponsor_name,
        "lead_sponsor_class": lead_sponsor_class,

        "conditions": join_clean(conditions),
        "intervention_types": intervention_types,
        "intervention_names": intervention_names,

        "countries": "|".join(sorted(set(countries))),
        "states": "|".join(sorted(set(states))),

        # design signals
        "study_type": study_type,
        "allocation": allocation,
        "intervention_model": intervention_model,
        "masking": masking,
        "primary_purpose": primary_purpose,
    }

## Parse NDJSON → DataFrame

This will read the full file (~25k rows), flatten, and build a tabular dataset.

In [5]:
records = []
with raw_path.open("rb") as f:
    for line in f:
        line = line.strip()
        if not line:
            continue
        study = json.loads(line)
        records.append(flatten_study(study))

df = pd.DataFrame(records)
df.shape

(25000, 23)

## Basic QA checks
- Duplicate NCT IDs
- Missingness overview
- Snapshot of key categorical distributions

In [6]:
dup_nct = df["nct_id"].duplicated().sum()
dup_nct

np.int64(0)

In [7]:
missing_pct = (df.isna().mean() * 100).sort_values(ascending=False).round(1)
missing_pct.head(15)

primary_purpose            100.0
masking                    100.0
enrollment_count           100.0
enrollment_type            100.0
intervention_model         100.0
allocation                 100.0
why_stopped                 87.8
primary_completion_date      5.7
completion_date              4.0
official_title               1.5
start_date                   1.0
intervention_names           0.0
study_type                   0.0
states                       0.0
countries                    0.0
dtype: float64

In [8]:
df["overall_status"].value_counts(dropna=False).head(15)

overall_status
COMPLETED                  13604
UNKNOWN                     3019
RECRUITING                  2576
TERMINATED                  2526
ACTIVE_NOT_RECRUITING       1246
NOT_YET_RECRUITING           948
WITHDRAWN                    901
ENROLLING_BY_INVITATION       93
SUSPENDED                     87
Name: count, dtype: int64

In [9]:
df["lead_sponsor_class"].value_counts(dropna=False).head(15)

lead_sponsor_class
OTHER        12792
INDUSTRY     10318
NIH            839
OTHER_GOV      447
NETWORK        410
FED            159
INDIV           25
UNKNOWN         10
Name: count, dtype: int64

In [10]:
df["phases_raw"].value_counts(dropna=False).head(15)

phases_raw
PHASE2           12346
PHASE3            8024
PHASE1,PHASE2     3219
PHASE2,PHASE3     1411
Name: count, dtype: int64

## Write interim artifacts

We persist:
- full flattened dataset (parquet)
- missingness profile table (csv)
- quick phase/status counts (csv)

In [11]:
out_parquet = DATA_INTERIM / "ctgov_flat.parquet"
df.to_parquet(out_parquet, index=False)

profile = pd.DataFrame({
    "column": df.columns,
    "dtype": [str(t) for t in df.dtypes],
    "missing_pct": (df.isna().mean() * 100).round(2).values,
    "example_non_null": [df[c].dropna().iloc[0] if df[c].notna().any() else None for c in df.columns],
})
out_profile = DATA_INTERIM / "schema_missingness_profile.csv"
profile.to_csv(out_profile, index=False)

quick = (
    df.assign(phases_raw=df["phases_raw"].fillna("MISSING"),
              overall_status=df["overall_status"].fillna("MISSING"))
      .groupby(["phases_raw", "overall_status"])
      .size()
      .reset_index(name="n")
      .sort_values("n", ascending=False)
)
out_quick = DATA_INTERIM / "quick_counts_phase_status.csv"
quick.to_csv(out_quick, index=False)

(out_parquet, out_profile, out_quick)

(PosixPath('/Users/saturnine/Desktop/trialpulse/data/interim/ctgov_flat.parquet'),
 PosixPath('/Users/saturnine/Desktop/trialpulse/data/interim/schema_missingness_profile.csv'),
 PosixPath('/Users/saturnine/Desktop/trialpulse/data/interim/quick_counts_phase_status.csv'))

## Expected outputs check

If these paths exist, Notebook 02 is complete.

In [12]:
out_parquet.exists(), out_profile.exists(), out_quick.exists()

(True, True, True)