In [1]:
# Core
import os
from pathlib import Path

# Data
import pandas as pd
import numpy as np

# Display / debugging
pd.set_option("display.max_columns", None)
pd.set_option("display.width", 200)

# Warnings
import warnings
warnings.filterwarnings("ignore")


In [2]:
# Project root
PROJECT_ROOT = Path("..")

DATA_DIR = PROJECT_ROOT / "data"
INTERMEDIATE_DIR = DATA_DIR / "intermediate"
MASTER_DIR = DATA_DIR / "master"

# Create master directory if not exists
MASTER_DIR.mkdir(parents=True, exist_ok=True)

print("Project root:", PROJECT_ROOT.resolve())
print("Intermediate data path:", INTERMEDIATE_DIR.resolve())
print("Master output path:", MASTER_DIR.resolve())


Project root: /home/saber/NEST
Intermediate data path: /home/saber/NEST/data/intermediate
Master output path: /home/saber/NEST/data/master


In [3]:
# Divide files into buckets for processing
FILES = {
    # Bucket A
    "cpid": INTERMEDIATE_DIR / "cpid_edc_metrics_agg.parquet",
    "edrr": INTERMEDIATE_DIR / "compiled_edrr_agg.parquet",

    # Bucket B
    "visit_projection": INTERMEDIATE_DIR / "visit_projection_tracker_agg.parquet",
    "missing_pages": INTERMEDIATE_DIR / "global_missing_pages_agg.parquet",
    "inactivated_forms": INTERMEDIATE_DIR / "inactivated_forms_loglines_agg.parquet",
    "missing_labs": INTERMEDIATE_DIR / "missing_lab_issues_agg.parquet",
    "sae_dashboard": INTERMEDIATE_DIR / "sae_dashboard_agg.parquet",

    # Bucket C
    "coding_meddra": INTERMEDIATE_DIR / "coding_reports_agg.parquet",
    "coding_whodrug": INTERMEDIATE_DIR / "coding_reports_whodra_agg.parquet",
}

In [4]:
def load_parquet(path: Path) -> pd.DataFrame:
    if not path.exists():
        raise FileNotFoundError(f"Missing file: {path}")
    df = pd.read_parquet(path)
    print(f"Loaded {path.name} | shape = {df.shape}")
    return df

In [5]:
for name, path in FILES.items():
    print(f"{name:20s} : {'OK' if path.exists() else 'MISSING'}")

cpid                 : OK
edrr                 : OK
visit_projection     : OK
missing_pages        : OK
inactivated_forms    : OK
missing_labs         : OK
sae_dashboard        : OK
coding_meddra        : OK
coding_whodrug       : OK


In [6]:
# Load Bucket A datasets (spine)
cpid_df = load_parquet(FILES["cpid"])
edrr_df = load_parquet(FILES["edrr"])

Loaded cpid_edc_metrics_agg.parquet | shape = (5461, 11)
Loaded compiled_edrr_agg.parquet | shape = (581, 3)


In [7]:
# Check required columns in CPID
REQUIRED_CPID_KEYS = ["study_id", "site_id", "subject_id"]

for col in REQUIRED_CPID_KEYS:
    assert col in cpid_df.columns, f"CPID missing required key column: {col}"

In [8]:
# No null identifiers
assert cpid_df[REQUIRED_CPID_KEYS].isnull().sum().sum() == 0, \
    "Null values found in CPID primary keys"

# Grain validation: one row per study-site-subject
dup_count = (
    cpid_df
    .duplicated(subset=REQUIRED_CPID_KEYS)
    .sum()
)

assert dup_count == 0, f"CPID violates grain: {dup_count} duplicate subject rows"

In [9]:
# Check required columns in EDRR
REQUIRED_EDRR_KEYS = ["study_id", "subject_id"]

for col in REQUIRED_EDRR_KEYS:
    assert col in edrr_df.columns, f"EDRR missing required key column: {col}"

# No null identifiers
assert edrr_df[REQUIRED_EDRR_KEYS].isnull().sum().sum() == 0, \
    "Null values found in EDRR primary keys"

# Grain validation: one row per study-subject
dup_count = (
    edrr_df
    .duplicated(subset=REQUIRED_EDRR_KEYS)
    .sum()
)

assert dup_count == 0, f"EDRR violates grain: {dup_count} duplicate subject rows"

In [10]:
master_df = (
    cpid_df[["study_id", "site_id", "subject_id"]]
    .copy()
)

master_df["master_subject_id"] = (
    master_df["study_id"].astype(str) + "|" +
    master_df["site_id"].astype(str) + "|" +
    master_df["subject_id"].astype(str)
)

In [11]:
# Uniqueness
assert master_df["master_subject_id"].is_unique, \
    "Master subject ID is not unique"

# Row count must exactly match CPID
assert len(master_df) == len(cpid_df), \
    "Master index row count mismatch with CPID"

print(f"Master subject index created: {len(master_df)} subjects")


Master subject index created: 5461 subjects


In [12]:
master_df = master_df.merge(
    edrr_df,
    on=["study_id", "subject_id"],
    how="left",
    validate="many_to_one"
)

In [13]:
# Ensure no row explosion
assert len(master_df) == len(cpid_df), \
    "Row count changed after joining EDRR (unexpected)"

# Phase 1 canonical EDRR column name
EDRR_RAW_COL = "total_open_issue_count_per_subject"

assert EDRR_RAW_COL in master_df.columns, \
    f"Expected EDRR column missing: {EDRR_RAW_COL}"

# Normalize into Phase 2 canonical metric
master_df["edrr_open_issues_count"] = (
    master_df[EDRR_RAW_COL]
    .fillna(0)
    .astype(int)
)

# Optional safety check
assert (master_df["edrr_open_issues_count"] >= 0).all(), \
    "Negative EDRR issue counts detected"


# Step 3 Loading aggregrate datasets

In [14]:
visit_df = load_parquet(FILES["visit_projection"])

Loaded visit_projection_tracker_agg.parquet | shape = (143, 5)


In [15]:
REQUIRED_VISIT_COLS = [
    "site_id",
    "subject_id",
    "num_days_outstanding"
]

for col in REQUIRED_VISIT_COLS:
    assert col in visit_df.columns, f"Visit Projection missing column: {col}"


In [16]:
# No null identifiers
assert visit_df[["site_id", "subject_id"]].isnull().sum().sum() == 0, \
    "Null site_id / subject_id in Visit Projection Tracker"

# days_outstanding must be numeric
assert pd.api.types.is_numeric_dtype(visit_df["num_days_outstanding"]), \
    "days_outstanding must be numeric"


In [17]:
visit_df = visit_df[visit_df["num_days_outstanding"] > 0].copy()

In [18]:
visit_agg = (
    visit_df
    .groupby(["site_id", "subject_id"], as_index=False)
    .agg(
        missing_visits_count=("num_days_outstanding", "count"),
        max_days_visit_overdue=("num_days_outstanding", "max")
    )
)


In [19]:
# Grain validation
dup_count = (
    visit_agg
    .duplicated(subset=["site_id", "subject_id"])
    .sum()
)

assert dup_count == 0, \
    f"Visit aggregation violated grain: {dup_count} duplicate rows"

# Sanity checks
assert (visit_agg["missing_visits_count"] > 0).all(), \
    "missing_visits_count must be positive"

assert (visit_agg["max_days_visit_overdue"] > 0).all(), \
    "max_days_visit_overdue must be positive"


In [20]:
master_df = master_df.merge(
    visit_agg,
    on=["site_id", "subject_id"],
    how="left",
    validate="many_to_one"
)


In [21]:
# Row count must not change
assert len(master_df) == len(cpid_df), \
    "Row count changed after Visit Projection join"

# Fill missing values (no overdue visits)
master_df["missing_visits_count"] = (
    master_df["missing_visits_count"]
    .fillna(0)
    .astype(int)
)

master_df["max_days_visit_overdue"] = (
    master_df["max_days_visit_overdue"]
    .fillna(0)
    .astype(int)
)

# Final sanity
assert (master_df["missing_visits_count"] >= 0).all()
assert (master_df["max_days_visit_overdue"] >= 0).all()


##### Loading Global Pages

In [22]:
missing_pages_df = load_parquet(FILES["missing_pages"])

Loaded global_missing_pages_agg.parquet | shape = (21, 5)


In [23]:
REQUIRED_MISSING_PAGE_COLS = [
    "study_id",
    "site_id",
    "subject_id",
    "num_of_days_missing"
]

for col in REQUIRED_MISSING_PAGE_COLS:
    assert col in missing_pages_df.columns, \
        f"Missing Pages dataset missing column: {col}"


In [24]:
# No null identifiers
assert missing_pages_df[["study_id", "site_id", "subject_id"]].isnull().sum().sum() == 0, \
    "Null identifiers found in Missing Pages dataset"

# days_page_missing must be numeric
assert pd.api.types.is_numeric_dtype(missing_pages_df["num_of_days_missing"]), \
    "num_of_days_missing must be numeric"

In [25]:
missing_pages_df = missing_pages_df[
    missing_pages_df["num_of_days_missing"] > 0
].copy()

In [26]:
missing_pages_agg = (
    missing_pages_df
    .groupby(["study_id", "site_id", "subject_id"], as_index=False)
    .agg(
        missing_pages_count=("num_of_days_missing", "count"),
        max_days_page_missing=("num_of_days_missing", "max")
    )
)

In [27]:
# Grain validation
dup_count = (
    missing_pages_agg
    .duplicated(subset=["study_id", "site_id", "subject_id"])
    .sum()
)

assert dup_count == 0, \
    f"Missing Pages aggregation violated grain: {dup_count} duplicates"

# Sanity checks
assert (missing_pages_agg["missing_pages_count"] > 0).all()
assert (missing_pages_agg["max_days_page_missing"] > 0).all()

In [28]:
master_df = master_df.merge(
    missing_pages_agg,
    on=["study_id", "site_id", "subject_id"],
    how="left",
    validate="many_to_one"
)

In [29]:
# No row explosion
assert len(master_df) == len(cpid_df), \
    "Row count changed after Missing Pages join"

# Fill defaults (no missing pages)
master_df["missing_pages_count"] = (
    master_df["missing_pages_count"]
    .fillna(0)
    .astype(int)
)

master_df["max_days_page_missing"] = (
    master_df["max_days_page_missing"]
    .fillna(0)
    .astype(int)
)

# Final sanity
assert (master_df["missing_pages_count"] >= 0).all()
assert (master_df["max_days_page_missing"] >= 0).all()

In [30]:
# Missing Labs
labs_df = load_parquet(FILES["missing_labs"])

Loaded missing_lab_issues_agg.parquet | shape = (695, 8)


In [31]:
REQUIRED_LAB_COLS = [
    "study_id",
    "site_id",
    "subject_id",
    "num_lab_issues",
    "num_missing_lab_name",
    "num_missing_ranges_units"
]

for col in REQUIRED_LAB_COLS:
    assert col in labs_df.columns, \
        f"Missing Labs dataset missing column: {col}"

In [32]:
# No null identifiers
assert labs_df[["study_id", "site_id", "subject_id"]].isnull().sum().sum() == 0, \
    "Null identifiers in Missing Labs dataset"

# Numeric checks
for col in [
    "num_lab_issues",
    "num_missing_lab_name",
    "num_missing_ranges_units"
]:
    assert pd.api.types.is_numeric_dtype(labs_df[col]), \
        f"{col} must be numeric"

    assert (labs_df[col] >= 0).all(), \
        f"Negative values detected in {col}"


In [33]:
labs_agg = labs_df.copy()

labs_agg["missing_lab_issues_count"] = labs_agg["num_lab_issues"].astype(int)

labs_agg["lab_ranges_missing_flag"] = (
    labs_agg["num_missing_ranges_units"] > 0
)

In [34]:
dup_count = (
    labs_agg
    .duplicated(subset=["study_id", "site_id", "subject_id"])
    .sum()
)

assert dup_count == 0, \
    f"Missing Labs violates grain: {dup_count} duplicate rows"


In [35]:
labs_agg = labs_agg[
    [
        "study_id",
        "site_id",
        "subject_id",
        "missing_lab_issues_count",
        "lab_ranges_missing_flag",
    ]
]

In [36]:
master_df = master_df.merge(
    labs_agg,
    on=["study_id", "site_id", "subject_id"],
    how="left",
    validate="many_to_one"
)

In [37]:
# No row explosion
assert len(master_df) == len(cpid_df), \
    "Row count changed after Missing Labs join"

# Fill defaults (no lab issues)
master_df["missing_lab_issues_count"] = (
    master_df["missing_lab_issues_count"]
    .fillna(0)
    .astype(int)
)

master_df["lab_ranges_missing_flag"] = (
    master_df["lab_ranges_missing_flag"]
    .fillna(False)
    .astype(bool)
)

# Final sanity
assert (master_df["missing_lab_issues_count"] >= 0).all()

In [38]:
# SAE Dashboard
sae_df = load_parquet(FILES["sae_dashboard"])

Loaded sae_dashboard_agg.parquet | shape = (5543, 10)


In [39]:
REQUIRED_SAE_COLS = [
    "study_id",
    "site_id",
    "subject_id",
    "num_open_sae",
    "num_review_pending",
    "num_action_pending",
    "num_case_open",
]

for col in REQUIRED_SAE_COLS:
    assert col in sae_df.columns, \
        f"SAE dataset missing required column: {col}"


In [40]:
# No null identifiers
assert sae_df[["study_id", "site_id", "subject_id"]].isnull().sum().sum() == 0, \
    "Null identifiers in SAE dataset"

# Numeric safety checks
for col in [
    "num_open_sae",
    "num_review_pending",
    "num_action_pending",
    "num_case_open",
]:
    assert pd.api.types.is_numeric_dtype(sae_df[col]), \
        f"{col} must be numeric"

    assert (sae_df[col] >= 0).all(), \
        f"Negative values detected in {col}"

In [41]:
sae_agg = sae_df.copy()

# Canonical count
sae_agg["open_sae_count"] = sae_agg["num_open_sae"].astype(int)

# Pending reviews / actions are treated as blocking flags
sae_agg["pending_sae_dm_review_flag"] = (
    (sae_agg["num_review_pending"] > 0) |
    (sae_agg["num_action_pending"] > 0)
)

# Case still open implies safety team not fully resolved
sae_agg["pending_sae_safety_review_flag"] = (
    sae_agg["num_case_open"] > 0
)

In [42]:
dup_count = (
    sae_agg
    .duplicated(subset=["study_id", "site_id", "subject_id"])
    .sum()
)

assert dup_count == 0, \
    f"SAE aggregation violates grain: {dup_count} duplicate rows"

In [43]:
sae_agg = sae_agg[
    [
        "study_id",
        "site_id",
        "subject_id",
        "open_sae_count",
        "pending_sae_dm_review_flag",
        "pending_sae_safety_review_flag",
    ]
]

In [44]:
master_df = master_df.merge(
    sae_agg,
    on=["study_id", "site_id", "subject_id"],
    how="left",
    validate="many_to_one"
)

In [45]:
# No row explosion
assert len(master_df) == len(cpid_df), \
    "Row count changed after SAE join"

# Defaults for subjects with no SAE records
master_df["open_sae_count"] = (
    master_df["open_sae_count"]
    .fillna(0)
    .astype(int)
)

master_df["pending_sae_dm_review_flag"] = (
    master_df["pending_sae_dm_review_flag"]
    .fillna(False)
    .astype(bool)
)

master_df["pending_sae_safety_review_flag"] = (
    master_df["pending_sae_safety_review_flag"]
    .fillna(False)
    .astype(bool)
)

# Final sanity
assert (master_df["open_sae_count"] >= 0).all()

In [46]:
# Inactivated Forms
inact_df = load_parquet(FILES["inactivated_forms"])

Loaded inactivated_forms_loglines_agg.parquet | shape = (24358, 4)


In [47]:
REQUIRED_INACT_COLS = [
    "study_id",
    "subject_id",
    "num_inactivated_records",
]

for col in REQUIRED_INACT_COLS:
    assert col in inact_df.columns, \
        f"Inactivated Forms dataset missing column: {col}"


In [48]:
# No null identifiers
assert inact_df[["study_id", "subject_id"]].isnull().sum().sum() == 0, \
    "Null identifiers in Inactivated Forms dataset"

# Numeric checks
assert pd.api.types.is_numeric_dtype(inact_df["num_inactivated_records"]), \
    "num_inactivated_records must be numeric"

assert (inact_df["num_inactivated_records"] >= 0).all(), \
    "Negative values detected in num_inactivated_records"


In [49]:
inact_agg = inact_df.copy()

inact_agg["inactivated_forms_count"] = (
    inact_agg["num_inactivated_records"]
    .astype(int)
)

inact_agg["inactivated_records_flag"] = (
    inact_agg["inactivated_forms_count"] > 0
)

In [50]:
dup_count = (
    inact_agg
    .duplicated(subset=["study_id", "subject_id"])
    .sum()
)

assert dup_count == 0, \
    f"Inactivated Forms violates grain: {dup_count} duplicate rows"

In [51]:
inact_agg = inact_agg[
    [
        "study_id",
        "subject_id",
        "inactivated_forms_count",
        "inactivated_records_flag",
    ]
]

In [52]:
master_df = master_df.merge(
    inact_agg,
    on=["study_id", "subject_id"],
    how="left",
    validate="many_to_one"
)

In [53]:
# No row explosion
assert len(master_df) == len(cpid_df), \
    "Row count changed after Inactivated Forms join"

# Defaults (no inactivated records)
master_df["inactivated_forms_count"] = (
    master_df["inactivated_forms_count"]
    .fillna(0)
    .astype(int)
)

master_df["inactivated_records_flag"] = (
    master_df["inactivated_records_flag"]
    .fillna(False)
    .astype(bool)
)

# Final sanity
assert (master_df["inactivated_forms_count"] >= 0).all()

##### Save Checkpoint

In [54]:
INTEGRATED_MASTER_PATH = MASTER_DIR / "master_subject_integrated_phase2.parquet"

master_df.to_parquet(INTEGRATED_MASTER_PATH, index=False)

print(f"Saved integrated master (pre-DQI): {INTEGRATED_MASTER_PATH}")

Saved integrated master (pre-DQI): ../data/master/master_subject_integrated_phase2.parquet


In [55]:
BUCKET_C_METADATA = [
    {
        "dataset": "GlobalCodingReport_MedDRA",
        "canonical_keys": ["study_id", "subject_id"],
        "contributes_to": "CPID_EDC_Metrics",
        "master_fields": [
            "coded_terms_count",
            "uncoded_terms_count",
            "coding_required_flag",
        ],
        "usage": "Drill-down, audit, AI explanation",
        "join_policy": "DO_NOT_JOIN",
    },
    {
        "dataset": "GlobalCodingReport_WHODRA",
        "canonical_keys": ["study_id", "subject_id"],
        "contributes_to": "CPID_EDC_Metrics",
        "master_fields": [
            "coded_terms_count",
            "uncoded_terms_count",
            "coding_required_flag",
        ],
        "usage": "Drill-down, audit, AI explanation",
        "join_policy": "DO_NOT_JOIN",
    },
]

bucket_c_meta_df = pd.DataFrame(BUCKET_C_METADATA)
bucket_c_meta_df


Unnamed: 0,dataset,canonical_keys,contributes_to,master_fields,usage,join_policy
0,GlobalCodingReport_MedDRA,"[study_id, subject_id]",CPID_EDC_Metrics,"[coded_terms_count, uncoded_terms_count, codin...","Drill-down, audit, AI explanation",DO_NOT_JOIN
1,GlobalCodingReport_WHODRA,"[study_id, subject_id]",CPID_EDC_Metrics,"[coded_terms_count, uncoded_terms_count, codin...","Drill-down, audit, AI explanation",DO_NOT_JOIN


In [56]:
bucket_c_meta_df.to_parquet(
    MASTER_DIR / "bucket_c_metadata.parquet",
    index=False
)


In [57]:
master_df.columns

Index(['study_id', 'site_id', 'subject_id', 'master_subject_id', 'total_open_issue_count_per_subject', 'edrr_open_issues_count', 'missing_visits_count', 'max_days_visit_overdue',
       'missing_pages_count', 'max_days_page_missing', 'missing_lab_issues_count', 'lab_ranges_missing_flag', 'open_sae_count', 'pending_sae_dm_review_flag', 'pending_sae_safety_review_flag',
       'inactivated_forms_count', 'inactivated_records_flag'],
      dtype='object')

##### Metrics

In [58]:
REQUIRED_COMPLETENESS_COLS = [
    "missing_visits_count",
    "missing_pages_count",
    "missing_lab_issues_count",
]

for col in REQUIRED_COMPLETENESS_COLS:
    assert col in master_df.columns, \
        f"Missing required column for completeness: {col}"


In [59]:
# Compute visit completeness flag
master_df["visit_completeness"] = np.where(
    master_df["missing_visits_count"] == 0,
    1.0,
    0.0
)
assert master_df["visit_completeness"].isin([0.0, 1.0]).all()

In [60]:
# Page completeness flag
master_df["page_completeness"] = np.where(
    master_df["missing_pages_count"] == 0,
    1.0,
    0.0
)

assert master_df["page_completeness"].isin([0.0, 1.0]).all()


In [61]:
# Lab completeness flag
master_df["lab_completeness"] = np.where(
    master_df["missing_lab_issues_count"] == 0,
    1.0,
    0.0
)
assert master_df["lab_completeness"].isin([0.0, 1.0]).all()

In [62]:
master_df["completeness_score"] = (
    0.4 * master_df["visit_completeness"] +
    0.3 * master_df["page_completeness"] +
    0.3 * master_df["lab_completeness"]
)

assert master_df["completeness_score"].between(0, 1).all()


In [63]:
master_df[
    [
        "missing_visits_count",
        "missing_pages_count",
        "missing_lab_issues_count",
        "visit_completeness",
        "page_completeness",
        "lab_completeness",
        "completeness_score",
    ]
].head()


Unnamed: 0,missing_visits_count,missing_pages_count,missing_lab_issues_count,visit_completeness,page_completeness,lab_completeness,completeness_score
0,0,0,0,1.0,1.0,1.0,1.0
1,0,0,0,1.0,1.0,1.0,1.0
2,0,0,0,1.0,1.0,1.0,1.0
3,0,0,0,1.0,1.0,1.0,1.0
4,0,0,0,1.0,1.0,1.0,1.0


##### Timeliness Metrics

In [64]:
REQUIRED_TIMELINESS_COLS = [
    "max_days_visit_overdue",
    "max_days_page_missing",
]

for col in REQUIRED_TIMELINESS_COLS:
    assert col in master_df.columns, \
        f"Missing required column for timeliness: {col}"

In [65]:
def visit_timeliness_score(days):
    if days <= 0:
        return 1.0
    elif days <= 7:
        return 0.8
    elif days <= 14:
        return 0.6
    elif days <= 30:
        return 0.4
    else:
        return 0.2

master_df["visit_timeliness"] = master_df["max_days_visit_overdue"].apply(
    visit_timeliness_score
)

assert master_df["visit_timeliness"].between(0, 1).all()

In [66]:
def page_timeliness_score(days):
    if days <= 0:
        return 1.0
    elif days <= 14:
        return 0.8
    elif days <= 30:
        return 0.6
    elif days <= 60:
        return 0.4
    else:
        return 0.2

master_df["page_timeliness"] = master_df["max_days_page_missing"].apply(
    page_timeliness_score
)

assert master_df["page_timeliness"].between(0, 1).all()

In [67]:
master_df["timeliness_score"] = (
    0.6 * master_df["visit_timeliness"] +
    0.4 * master_df["page_timeliness"]
)

assert master_df["timeliness_score"].between(0, 1).all()

In [68]:
# Sanity check: display timeliness metrics
master_df[
    [
        "max_days_visit_overdue",
        "visit_timeliness",
        "max_days_page_missing",
        "page_timeliness",
        "timeliness_score",
    ]
].head()

Unnamed: 0,max_days_visit_overdue,visit_timeliness,max_days_page_missing,page_timeliness,timeliness_score
0,0,1.0,0,1.0,1.0
1,0,1.0,0,1.0,1.0
2,0,1.0,0,1.0,1.0
3,0,1.0,0,1.0,1.0
4,0,1.0,0,1.0,1.0


##### Conformity and Consistency

In [69]:
REQUIRED_CONFORMITY_COLS = [
    "lab_ranges_missing_flag",
    "inactivated_records_flag",
    "open_sae_count",
]

for col in REQUIRED_CONFORMITY_COLS:
    assert col in master_df.columns, \
        f"Missing required column for conformity/consistency: {col}"

In [70]:
master_df["conformity_score"] = np.where(
    (master_df["lab_ranges_missing_flag"] == False) &
    (master_df["open_sae_count"] == 0) &
    (master_df["inactivated_records_flag"] == False),
    1.0,
    0.0
)

assert master_df["conformity_score"].isin([0.0, 1.0]).all()

In [71]:
# Consistency Flag
master_df["consistency_flag"] = np.where(
    (
        ((master_df["missing_visits_count"] == 0) & (master_df["max_days_visit_overdue"] > 0)) |
        ((master_df["missing_pages_count"] == 0) & (master_df["max_days_page_missing"] > 0))
    ),
    0,
    1
)

assert master_df["consistency_flag"].isin([0, 1]).all()

In [72]:
master_df["consistency_flag"].value_counts()

consistency_flag
1    5461
Name: count, dtype: int64

### DQI

In [73]:
REQUIRED_DQI_COLS = [
    "completeness_score",
    "timeliness_score",
    "conformity_score",
    "consistency_flag",
    "open_sae_count",
    "pending_sae_dm_review_flag",
    "pending_sae_safety_review_flag",
    "edrr_open_issues_count",
]

for col in REQUIRED_DQI_COLS:
    assert col in master_df.columns, \
        f"Missing required column for DQI computation: {col}"

In [74]:
master_df["base_dqi"] = (
    0.35 * master_df["completeness_score"] +
    0.25 * master_df["timeliness_score"] +
    0.25 * master_df["conformity_score"] +
    0.15 * master_df["consistency_flag"]
)

assert master_df["base_dqi"].between(0, 1).all()

In [75]:
master_df["subject_data_quality_score"] = master_df["base_dqi"]

# Safety override: open SAE
master_df.loc[
    master_df["open_sae_count"] > 0,
    "subject_data_quality_score"
] = master_df.loc[
    master_df["open_sae_count"] > 0,
    "subject_data_quality_score"
].clip(upper=0.30)

# Pending SAE reviews
master_df.loc[
    (master_df["pending_sae_dm_review_flag"]) |
    (master_df["pending_sae_safety_review_flag"]),
    "subject_data_quality_score"
] = master_df.loc[
    (master_df["pending_sae_dm_review_flag"]) |
    (master_df["pending_sae_safety_review_flag"]),
    "subject_data_quality_score"
].clip(upper=0.40)

# EDRR open issues
master_df.loc[
    master_df["edrr_open_issues_count"] > 0,
    "subject_data_quality_score"
] = master_df.loc[
    master_df["edrr_open_issues_count"] > 0,
    "subject_data_quality_score"
].clip(upper=0.50)

assert master_df["subject_data_quality_score"].between(0, 1).all(), \
    "Final DQI out of bounds"

In [87]:
master_df[REQUIRED_DQI_COLS].head()

Unnamed: 0,completeness_score,timeliness_score,conformity_score,consistency_flag,open_sae_count,pending_sae_dm_review_flag,pending_sae_safety_review_flag,edrr_open_issues_count
0,1.0,1.0,1.0,1,0,False,False,0
1,1.0,1.0,1.0,1,0,False,False,0
2,1.0,1.0,0.0,1,13,True,True,2
3,1.0,1.0,0.0,1,39,True,True,3
4,1.0,1.0,1.0,1,0,False,False,0


In [76]:
master_df["is_clean_subject_flag"] = np.where(
    (master_df["missing_visits_count"] == 0) &
    (master_df["missing_pages_count"] == 0) &
    (master_df["missing_lab_issues_count"] == 0) &
    (master_df["open_sae_count"] == 0) &
    (~master_df["pending_sae_dm_review_flag"]) &
    (~master_df["pending_sae_safety_review_flag"]) &
    (master_df["edrr_open_issues_count"] == 0) &
    (master_df["conformity_score"] == 1.0) &
    (master_df["consistency_flag"] == 1),
    True,
    False
)

assert master_df["is_clean_subject_flag"].dtype == bool

In [77]:
master_df["is_clean_subject_flag"].value_counts()

is_clean_subject_flag
True     4797
False     664
Name: count, dtype: int64

In [78]:
master_df["subject_data_quality_score"].describe()

count    5461.000000
mean        0.943754
std         0.165818
min         0.300000
25%         1.000000
50%         1.000000
75%         1.000000
max         1.000000
Name: subject_data_quality_score, dtype: float64

In [79]:
# Save master dataset
OUTPUT_PATH = MASTER_DIR / "clinical_data_master_subject_index.parquet"
master_df.to_parquet(OUTPUT_PATH)

print(f"Saved final master subject index with DQI: {OUTPUT_PATH}")

Saved final master subject index with DQI: ../data/master/clinical_data_master_subject_index.parquet


In [80]:
master_df.head

<bound method NDFrame.head of      study_id  site_id  subject_id           master_subject_id  total_open_issue_count_per_subject  edrr_open_issues_count  missing_visits_count  max_days_visit_overdue  missing_pages_count  \
0     Study 1   Site 1   Subject 1    Study 1|Site 1|Subject 1                                 NaN                       0                     0                       0                    0   
1     Study 1  Site 10  Subject 17  Study 1|Site 10|Subject 17                                 NaN                       0                     0                       0                    0   
2     Study 1  Site 10  Subject 18  Study 1|Site 10|Subject 18                                 2.0                       2                     0                       0                    0   
3     Study 1  Site 10  Subject 19  Study 1|Site 10|Subject 19                                 3.0                       3                     0                       0                    0   
4    