# Data Cleaning - DDI, Medications, and Demographics
This notebook cleans and preprocesses the raw data from v1_raw and writes cleaned versions to v2_clean.

**Input**:  
- `med-data/v1_raw/ddi/db_drug_interactions.parquet`  
- `med-data/v1_raw/medications/medications_combined.parquet`  
- `med-data/v1_raw/demographics/patient_demographics.parquet`

**Output**:  
- `med-data/v2_clean/ddi/db_drug_interactions_clean.parquet`  
- `med-data/v2_clean/medications/medications_clean.parquet`  
- `med-data/v2_clean/demographics/patient_demographics_clean.parquet`

In [None]:
# Import dependencies

import os
import sys
import logging
import time
import re
from datetime import datetime
import numpy as np
import pandas as pd
import s3fs
import pyarrow as pa
from importlib.metadata import version
from config import *

In [None]:
# Verify dependencies

def print_version():
    print("pandas:", pd.__version__)
    print("numpy:", np.__version__)
    print("s3fs:", s3fs.__version__)
    print("pyarrow:", pa.__version__)

print_version()

In [None]:
# Set up logging

for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s"
)

logging.info("Logging configured successfully")

In [None]:
# Load configuration

logging.info(f"MinIO endpoint: {MINIO_ENDPOINT}")
logging.info(f"Source: {DEST_BUCKET}/v1_raw/")
logging.info(f"Destination: {DEST_BUCKET}/v2_clean/")

In [None]:
# Create S3FileSystem for MinIO

logging.info(f"Initializing S3FileSystem for MinIO at {MINIO_ENDPOINT}")
fs = s3fs.S3FileSystem(
    anon=False,
    key=MINIO_ACCESS_KEY,
    secret=MINIO_SECRET_KEY,
    client_kwargs={'endpoint_url': f"http://{MINIO_ENDPOINT}"}
)
logging.info("S3FileSystem created successfully")

---
## Part 1: Load Raw Data

In [None]:
# Load DDI reference dataset from v1_raw

ddi_uri = f"s3://{DEST_BUCKET}/{V1_RAW_DDI_PREFIX}db_drug_interactions.parquet"
logging.info(f"Reading DDI data: {ddi_uri}")

start_time = time.time()
df_ddi_raw = pd.read_parquet(ddi_uri, filesystem=fs)
elapsed = time.time() - start_time

logging.info(f"Loaded {len(df_ddi_raw):,} DDI records in {elapsed:.2f}s")

print(f"\nDDI Raw Data Shape: {df_ddi_raw.shape}")
df_ddi_raw.head()

In [None]:
# Load medications dataset from v1_raw

meds_uri = f"s3://{DEST_BUCKET}/{V1_RAW_MEDICATIONS_PREFIX}medications_combined.parquet"
logging.info(f"Reading medications data: {meds_uri}")

start_time = time.time()
df_meds_raw = pd.read_parquet(meds_uri, filesystem=fs)
elapsed = time.time() - start_time

logging.info(f"Loaded {len(df_meds_raw):,} medication records in {elapsed:.2f}s")

print(f"\nMedications Raw Data Shape: {df_meds_raw.shape}")
df_meds_raw.head()

# Load demographics dataset from v1_raw

demo_uri = f"s3://{DEST_BUCKET}/v1_raw/demographics/patient_demographics.parquet"
logging.info(f"Reading demographics data: {demo_uri}")

start_time = time.time()
df_demo_raw = pd.read_parquet(demo_uri, filesystem=fs)
elapsed = time.time() - start_time

logging.info(f"Loaded {len(df_demo_raw):,} patient demographics records in {elapsed:.2f}s")

print(f"\nDemographics Raw Data Shape: {df_demo_raw.shape}")
df_demo_raw.head()

---
## Part 2: Clean DDI Reference Dataset

In [None]:
# Initial data quality assessment for DDI dataset

print("="*80)
print("DDI DATASET - INITIAL QUALITY ASSESSMENT")
print("="*80)

print(f"\nShape: {df_ddi_raw.shape}")
print(f"\nMissing values:")
print(df_ddi_raw.isnull().sum())

print(f"\nDuplicate rows: {df_ddi_raw.duplicated().sum()}")

print(f"\nData types:")
print(df_ddi_raw.dtypes)

print("="*80)

In [None]:
# Clean DDI dataset

logging.info("Cleaning DDI dataset...")

# Create copy for cleaning
df_ddi_clean = df_ddi_raw.copy()

initial_count = len(df_ddi_clean)
logging.info(f"Starting with {initial_count:,} records")

# 1. Remove duplicates
df_ddi_clean = df_ddi_clean.drop_duplicates()
duplicates_removed = initial_count - len(df_ddi_clean)
logging.info(f"Removed {duplicates_removed:,} duplicate records")

# 2. Remove records with missing values
before_nulls = len(df_ddi_clean)
df_ddi_clean = df_ddi_clean.dropna()
nulls_removed = before_nulls - len(df_ddi_clean)
logging.info(f"Removed {nulls_removed:,} records with missing values")

# 3. Strip whitespace from string columns
df_ddi_clean['Drug 1'] = df_ddi_clean['Drug 1'].str.strip()
df_ddi_clean['Drug 2'] = df_ddi_clean['Drug 2'].str.strip()
df_ddi_clean['Interaction Description'] = df_ddi_clean['Interaction Description'].str.strip()
logging.info("Stripped whitespace from string columns")

# 4. Remove empty strings
before_empty = len(df_ddi_clean)
df_ddi_clean = df_ddi_clean[
    (df_ddi_clean['Drug 1'] != '') & 
    (df_ddi_clean['Drug 2'] != '') & 
    (df_ddi_clean['Interaction Description'] != '')
]
empty_removed = before_empty - len(df_ddi_clean)
logging.info(f"Removed {empty_removed:,} records with empty strings")

# 5. Add normalized drug name columns (for matching)
def normalize_drug_name(drug_name):
    """Normalize drug name: uppercase, remove salt suffixes."""
    if pd.isna(drug_name):
        return None
    name = str(drug_name).upper().strip()
    suffixes = [' HCL', ' HYDROCHLORIDE', ' SODIUM', ' POTASSIUM', ' CALCIUM',
                ' SULFATE', ' TARTRATE', ' SUCCINATE', ' MALEATE', ' FUMARATE',
                ' ACETATE', ' CITRATE', ' PHOSPHATE']
    for suffix in suffixes:
        if name.endswith(suffix):
            name = name[:-len(suffix)].strip()
            break
    return name

df_ddi_clean['Drug1_Normalized'] = df_ddi_clean['Drug 1'].apply(normalize_drug_name)
df_ddi_clean['Drug2_Normalized'] = df_ddi_clean['Drug 2'].apply(normalize_drug_name)
logging.info("Added normalized drug name columns")

# 6. Extract severity classification from interaction description
def classify_severity(description):
    """Classify interaction severity based on keywords."""
    if pd.isna(description):
        return 'Unknown'
    desc_lower = description.lower()
    if any(word in desc_lower for word in ['contraindicated', 'avoid', 'serious', 'severe']):
        return 'High'
    elif any(word in desc_lower for word in ['caution', 'monitor', 'may increase', 'may decrease']):
        return 'Moderate'
    else:
        return 'Low'

df_ddi_clean['Severity'] = df_ddi_clean['Interaction Description'].apply(classify_severity)
logging.info("Added severity classification column")

logging.info(f"DDI cleaning complete: {len(df_ddi_clean):,} records")

print(f"\nCleaned DDI Data Shape: {df_ddi_clean.shape}")
df_ddi_clean.head()

In [None]:
# DDI cleaning summary

print("\n" + "="*80)
print("DDI DATASET - CLEANING SUMMARY")
print("="*80)

print(f"Original records:     {initial_count:,}")
print(f"Cleaned records:      {len(df_ddi_clean):,}")
print(f"Records removed:      {initial_count - len(df_ddi_clean):,}")
print(f"Removal rate:         {((initial_count - len(df_ddi_clean)) / initial_count * 100):.2f}%")

print(f"\nSeverity distribution:")
print(df_ddi_clean['Severity'].value_counts())

print(f"\nNew columns added:")
print("  - Drug1_Normalized")
print("  - Drug2_Normalized")
print("  - Severity")

print("="*80)

---
## Part 3: Clean Medications Dataset

In [None]:
# Initial data quality assessment for medications dataset

print("="*80)
print("MEDICATIONS DATASET - INITIAL QUALITY ASSESSMENT")
print("="*80)

print(f"\nShape: {df_meds_raw.shape}")
print(f"\nMissing values:")
print(df_meds_raw.isnull().sum())

print(f"\nDuplicate rows: {df_meds_raw.duplicated().sum()}")

print(f"\nData types:")
print(df_meds_raw.dtypes)

print(f"\nSource system distribution:")
print(df_meds_raw['SourceSystem'].value_counts())

print("="*80)

In [None]:
# Clean medications dataset

logging.info("Cleaning medications dataset...")

# Create copy for cleaning
df_meds_clean = df_meds_raw.copy()

initial_count = len(df_meds_clean)
logging.info(f"Starting with {initial_count:,} records")

# 1. Remove exact duplicates
df_meds_clean = df_meds_clean.drop_duplicates()
duplicates_removed = initial_count - len(df_meds_clean)
logging.info(f"Removed {duplicates_removed:,} exact duplicate records")

# 2. Remove records with missing critical fields
before_critical = len(df_meds_clean)
df_meds_clean = df_meds_clean.dropna(subset=['PatientSID', 'DrugNameWithoutDose', 'MedicationDateTime'])
critical_removed = before_critical - len(df_meds_clean)
logging.info(f"Removed {critical_removed:,} records with missing critical fields")

# 3. Ensure proper data types
df_meds_clean['PatientSID'] = df_meds_clean['PatientSID'].astype('int64')
df_meds_clean['MedicationDateTime'] = pd.to_datetime(df_meds_clean['MedicationDateTime'])
df_meds_clean['StartDate'] = pd.to_datetime(df_meds_clean['StartDate'])
logging.info("Ensured proper data types")

# 4. Strip whitespace from string columns
string_cols = ['DrugNameWithoutDose', 'DrugNameWithDose', 'SourceSystem', 'Status']
for col in string_cols:
    if col in df_meds_clean.columns:
        df_meds_clean[col] = df_meds_clean[col].astype(str).str.strip()
logging.info("Stripped whitespace from string columns")

# 5. Add normalized drug name column
def extract_base_drug_name(drug_name):
    """Extract base drug name: remove dose, normalize."""
    if pd.isna(drug_name):
        return None
    name = str(drug_name).upper().strip()
    # Remove dose info (everything after first digit)
    name = re.split(r'\s*\d', name)[0].strip()
    # Apply normalization
    return normalize_drug_name(name)

df_meds_clean['DrugName_Normalized'] = df_meds_clean['DrugNameWithoutDose'].apply(extract_base_drug_name)
logging.info("Added normalized drug name column")

# 6. Add date components for analysis
df_meds_clean['MedicationYear'] = df_meds_clean['MedicationDateTime'].dt.year
df_meds_clean['MedicationMonth'] = df_meds_clean['MedicationDateTime'].dt.month
df_meds_clean['MedicationDayOfWeek'] = df_meds_clean['MedicationDateTime'].dt.dayofweek
df_meds_clean['MedicationHour'] = df_meds_clean['MedicationDateTime'].dt.hour
logging.info("Added date component columns")

# 7. Sort by patient and datetime
df_meds_clean = df_meds_clean.sort_values(['PatientSID', 'MedicationDateTime'])
logging.info("Sorted by patient and datetime")

# 8. Reset index
df_meds_clean = df_meds_clean.reset_index(drop=True)

logging.info(f"Medications cleaning complete: {len(df_meds_clean):,} records")

print(f"\nCleaned Medications Data Shape: {df_meds_clean.shape}")
df_meds_clean.head()

In [None]:
# Medications cleaning summary

print("\n" + "="*80)
print("MEDICATIONS DATASET - CLEANING SUMMARY")
print("="*80)

print(f"Original records:     {initial_count:,}")
print(f"Cleaned records:      {len(df_meds_clean):,}")
print(f"Records removed:      {initial_count - len(df_meds_clean):,}")
print(f"Removal rate:         {((initial_count - len(df_meds_clean)) / initial_count * 100):.2f}%")

print(f"\nUnique patients:      {df_meds_clean['PatientSID'].nunique()}")
print(f"Unique medications:   {df_meds_clean['DrugName_Normalized'].nunique()}")

print(f"\nSource system distribution:")
print(df_meds_clean['SourceSystem'].value_counts())

print(f"\nDate range:")
print(f"  Earliest: {df_meds_clean['MedicationDateTime'].min()}")
print(f"  Latest:   {df_meds_clean['MedicationDateTime'].max()}")

print(f"\nNew columns added:")
print("  - DrugName_Normalized")
print("  - MedicationYear")
print("  - MedicationMonth")
print("  - MedicationDayOfWeek")
print("  - MedicationHour")

print("="*80)

---
## Part 3.5: Clean Demographics Dataset

In [None]:
# Initial data quality assessment for demographics dataset

print("="*80)
print("DEMOGRAPHICS DATASET - INITIAL QUALITY ASSESSMENT")
print("="*80)

print(f"\nShape: {df_demo_raw.shape}")
print(f"\nMissing values:")
print(df_demo_raw.isnull().sum())

print(f"\nDuplicate rows: {df_demo_raw.duplicated().sum()}")

print(f"\nData types:")
print(df_demo_raw.dtypes)

print(f"\nGender distribution:")
print(df_demo_raw['Gender'].value_counts())

print(f"\nAge statistics:")
print(df_demo_raw['Age'].describe())

print("="*80)

In [None]:
# Clean demographics dataset

logging.info("Cleaning demographics dataset...")

# Create copy for cleaning
df_demo_clean = df_demo_raw.copy()

initial_count = len(df_demo_clean)
logging.info(f"Starting with {initial_count:,} records")

# 1. Remove duplicates (should be one record per patient)
df_demo_clean = df_demo_clean.drop_duplicates(subset=['PatientSID'])
duplicates_removed = initial_count - len(df_demo_clean)
logging.info(f"Removed {duplicates_removed:,} duplicate patient records")

# 2. Remove records with missing critical fields
before_critical = len(df_demo_clean)
df_demo_clean = df_demo_clean.dropna(subset=['PatientSID', 'DateOfBirth', 'Gender'])
critical_removed = before_critical - len(df_demo_clean)
logging.info(f"Removed {critical_removed:,} records with missing critical fields")

# 3. Ensure proper data types
df_demo_clean['PatientSID'] = df_demo_clean['PatientSID'].astype('int64')
df_demo_clean['DateOfBirth'] = pd.to_datetime(df_demo_clean['DateOfBirth'])
logging.info("Ensured proper data types")

# 4. Standardize Gender values
df_demo_clean['Gender'] = df_demo_clean['Gender'].str.strip().str.upper()
# Map common variations
gender_mapping = {
    'M': 'M', 'MALE': 'M', 
    'F': 'F', 'FEMALE': 'F',
    'U': 'U', 'UNKNOWN': 'U'
}
df_demo_clean['Gender'] = df_demo_clean['Gender'].map(gender_mapping).fillna('U')
logging.info("Standardized gender values")

# 5. Validate and clean Age
# Recalculate age to ensure consistency
today = pd.Timestamp.now()
df_demo_clean['Age'] = ((today - df_demo_clean['DateOfBirth']).dt.days / 365.25).astype(int)

# Flag unrealistic ages
unrealistic_ages = (df_demo_clean['Age'] < 0) | (df_demo_clean['Age'] > 120)
if unrealistic_ages.sum() > 0:
    logging.warning(f"Found {unrealistic_ages.sum()} records with unrealistic ages")
    df_demo_clean = df_demo_clean[~unrealistic_ages]
    logging.info(f"Removed {unrealistic_ages.sum()} records with unrealistic ages")

logging.info("Age validated and recalculated")

# 6. Add age groups for analysis
df_demo_clean['AgeGroup'] = pd.cut(
    df_demo_clean['Age'], 
    bins=[0, 18, 40, 65, 80, 120],
    labels=['<18', '18-39', '40-64', '65-79', '80+']
)
logging.info("Added age group categories")

# 7. Add is_elderly flag (clinical cutoff: 65+)
df_demo_clean['IsElderly'] = (df_demo_clean['Age'] >= 65).astype(int)
logging.info("Added elderly flag")

# 8. Sort by PatientSID
df_demo_clean = df_demo_clean.sort_values('PatientSID').reset_index(drop=True)

logging.info(f"Demographics cleaning complete: {len(df_demo_clean):,} records")

print(f"\nCleaned Demographics Data Shape: {df_demo_clean.shape}")
df_demo_clean.head()

In [None]:
# Demographics cleaning summary

print("\n" + "="*80)
print("DEMOGRAPHICS DATASET - CLEANING SUMMARY")
print("="*80)

print(f"Original records:     {initial_count:,}")
print(f"Cleaned records:      {len(df_demo_clean):,}")
print(f"Records removed:      {initial_count - len(df_demo_clean):,}")
print(f"Removal rate:         {((initial_count - len(df_demo_clean)) / initial_count * 100):.2f}%")

print(f"\nGender distribution:")
print(df_demo_clean['Gender'].value_counts())

print(f"\nAge group distribution:")
print(df_demo_clean['AgeGroup'].value_counts().sort_index())

print(f"\nElderly patients (65+): {df_demo_clean['IsElderly'].sum()} ({df_demo_clean['IsElderly'].mean()*100:.1f}%)")

print(f"\nAge statistics:")
print(df_demo_clean['Age'].describe())

print(f"\nNew columns added:")
print("  - AgeGroup")
print("  - IsElderly")

print("="*80)

---
## Part 4: Write Cleaned Data to v2_clean

In [None]:
# Write cleaned DDI dataset to v2_clean

ddi_clean_filename = "db_drug_interactions_clean.parquet"
ddi_clean_uri = f"s3://{DEST_BUCKET}/{V2_CLEAN_DDI_PREFIX}{ddi_clean_filename}"
logging.info(f"Writing cleaned DDI data: {ddi_clean_uri}")

start_time = time.time()

df_ddi_clean.to_parquet(
    ddi_clean_uri,
    engine='pyarrow',
    filesystem=fs,
    compression='snappy',
    index=False
)

elapsed = time.time() - start_time
logging.info(f"Successfully wrote {len(df_ddi_clean):,} records in {elapsed:.2f}s")

print(f"✓ DDI clean data written to: {ddi_clean_uri}")

In [None]:
# Write cleaned medications dataset to v2_clean

meds_clean_filename = "medications_clean.parquet"
meds_clean_uri = f"s3://{DEST_BUCKET}/{V2_CLEAN_MEDICATIONS_PREFIX}{meds_clean_filename}"
logging.info(f"Writing cleaned medications data: {meds_clean_uri}")

start_time = time.time()

df_meds_clean.to_parquet(
    meds_clean_uri,
    engine='pyarrow',
    filesystem=fs,
    compression='snappy',
    index=False
)

elapsed = time.time() - start_time
logging.info(f"Successfully wrote {len(df_meds_clean):,} records in {elapsed:.2f}s")

print(f"✓ Medications clean data written to: {meds_clean_uri}")

In [None]:
# Write cleaned demographics dataset to v2_clean

demo_clean_filename = "patient_demographics_clean.parquet"
demo_clean_uri = f"s3://{DEST_BUCKET}/v2_clean/demographics/{demo_clean_filename}"
logging.info(f"Writing cleaned demographics data: {demo_clean_uri}")

start_time = time.time()

df_demo_clean.to_parquet(
    demo_clean_uri,
    engine='pyarrow',
    filesystem=fs,
    compression='snappy',
    index=False
)

elapsed = time.time() - start_time
logging.info(f"Successfully wrote {len(df_demo_clean):,} records in {elapsed:.2f}s")

print(f"✓ Demographics clean data written to: {demo_clean_uri}")

---
## Part 5: Verification

In [None]:
# Verify DDI clean data by reading back

logging.info("Verifying DDI clean data...")

start_time = time.time()
df_ddi_verify = pd.read_parquet(ddi_clean_uri, filesystem=fs)
elapsed = time.time() - start_time

assert len(df_ddi_verify) == len(df_ddi_clean), "Row count mismatch!"
assert len(df_ddi_verify.columns) == len(df_ddi_clean.columns), "Column count mismatch!"

logging.info(f"✓ DDI verification successful: {len(df_ddi_verify):,} rows in {elapsed:.2f}s")

print("\nDDI Clean Data (first 5 rows):")
df_ddi_verify.head()

In [None]:
# Verify medications clean data by reading back

logging.info("Verifying medications clean data...")

start_time = time.time()
df_meds_verify = pd.read_parquet(meds_clean_uri, filesystem=fs)
elapsed = time.time() - start_time

assert len(df_meds_verify) == len(df_meds_clean), "Row count mismatch!"
assert len(df_meds_verify.columns) == len(df_meds_clean.columns), "Column count mismatch!"

logging.info(f"✓ Medications verification successful: {len(df_meds_verify):,} rows in {elapsed:.2f}s")

print("\nMedications Clean Data (first 5 rows):")
df_meds_verify.head()

In [None]:
# Verify demographics clean data by reading back

logging.info("Verifying demographics clean data...")

start_time = time.time()
df_demo_verify = pd.read_parquet(demo_clean_uri, filesystem=fs)
elapsed = time.time() - start_time

assert len(df_demo_verify) == len(df_demo_clean), "Row count mismatch!"
assert len(df_demo_verify.columns) == len(df_demo_clean.columns), "Column count mismatch!"

logging.info(f"✓ Demographics verification successful: {len(df_demo_verify):,} rows in {elapsed:.2f}s")

print("\nDemographics Clean Data (first 5 rows):")
df_demo_verify.head()

---
## Part 6: Final Summary

In [None]:
# Final cleaning summary

print("\n" + "="*80)
print("DATA CLEANING SUMMARY")
print("="*80)

print("\nDDI REFERENCE DATASET:")
print(f"  Input:  s3://{DEST_BUCKET}/{V1_RAW_DDI_PREFIX}db_drug_interactions.parquet")
print(f"  Output: s3://{DEST_BUCKET}/{V2_CLEAN_DDI_PREFIX}{ddi_clean_filename}")
print(f"  Records: {len(df_ddi_raw):,} → {len(df_ddi_clean):,}")
print(f"  Columns: {len(df_ddi_raw.columns)} → {len(df_ddi_clean.columns)}")
print(f"  Status: ✓ Complete")

print("\nMEDICATIONS DATASET:")
print(f"  Input:  s3://{DEST_BUCKET}/{V1_RAW_MEDICATIONS_PREFIX}medications_combined.parquet")
print(f"  Output: s3://{DEST_BUCKET}/{V2_CLEAN_MEDICATIONS_PREFIX}{meds_clean_filename}")
print(f"  Records: {len(df_meds_raw):,} → {len(df_meds_clean):,}")
print(f"  Columns: {len(df_meds_raw.columns)} → {len(df_meds_clean.columns)}")
print(f"  Status: ✓ Complete")

print("\nDEMOGRAPHICS DATASET:")
print(f"  Input:  s3://{DEST_BUCKET}/v1_raw/demographics/patient_demographics.parquet")
print(f"  Output: s3://{DEST_BUCKET}/v2_clean/demographics/{demo_clean_filename}")
print(f"  Records: {len(df_demo_raw):,} → {len(df_demo_clean):,}")
print(f"  Columns: {len(df_demo_raw.columns)} → {len(df_demo_clean.columns)}")
print(f"  Status: ✓ Complete")

print("\nCLEANING OPERATIONS APPLIED:")
print("  ✓ Removed duplicate records")
print("  ✓ Removed records with missing critical values")
print("  ✓ Stripped whitespace from text fields")
print("  ✓ Ensured proper data types")
print("  ✓ Added normalized drug names for matching")
print("  ✓ Added derived columns (severity, date components, age groups)")
print("  ✓ Sorted and indexed data")

print("\nNEXT STEPS:")
print("  → Run 04_features.ipynb (demographics will be joined into patient features)")

print("="*80)