# üè• Healthcare Data Exploration with PySpark

This notebook demonstrates loading and exploring Synthea synthetic patient data from Google BigQuery using PySpark.

**Dataset:** CMS Synthetic Patient Data (OMOP format)  
**Source:** `bigquery-public-data.cms_synthetic_patient_data_omop`  
**Size:** 1M+ synthetic patients, ~10GB

---

## 1Ô∏è‚É£ Setup Spark Session

In [None]:
import sys
sys.path.append('..')  # Add src to path

from src.utils.spark_session import get_spark_session
from src.data_ingestion.bigquery_loader import BigQueryLoader

# Create Spark session
spark = get_spark_session(
    app_name="HealthcareDataExploration",
    local=True
)

print(f"‚úÖ Spark {spark.version} initialized")
print(f"üìä Spark UI: {spark.sparkContext.uiWebUrl}")

## 2Ô∏è‚É£ Load Patient Data

In [None]:
# Initialize BigQuery loader
loader = BigQueryLoader(
    spark=spark,
    gcp_project="your-project-id"  # CHANGE THIS
)

# Load sample patients (limit for exploration)
patients_df = loader.load_patients(limit=10000)

print(f"\nüìä Loaded {patients_df.count():,} patients")
print(f"üìã Columns: {len(patients_df.columns)}")

In [None]:
# Display schema
patients_df.printSchema()

In [None]:
# Show sample records
patients_df.select(
    "person_id",
    "gender_concept_id",
    "year_of_birth",
    "race_concept_id",
    "ethnicity_concept_id",
    "age"
).show(10)

## 3Ô∏è‚É£ Basic Statistics

In [None]:
from pyspark.sql import functions as F

# Age distribution
print("üìä Age Distribution:")
patients_df.select(
    F.min("age").alias("min_age"),
    F.max("age").alias("max_age"),
    F.avg("age").alias("avg_age"),
    F.stddev("age").alias("std_age")
).show()

# Age groups
print("\nüìä Age Groups:")
patients_df \
    .withColumn("age_group", 
        F.when(F.col("age") < 18, "0-17")
         .when(F.col("age") < 35, "18-34")
         .when(F.col("age") < 50, "35-49")
         .when(F.col("age") < 65, "50-64")
         .otherwise("65+")
    ) \
    .groupBy("age_group") \
    .count() \
    .orderBy("age_group") \
    .show()

## 4Ô∏è‚É£ Load Hospital Encounters

In [None]:
# Load encounters (hospital visits)
encounters_df = loader.load_encounters(
    limit=50000,
    start_date="2020-01-01"
)

print(f"\nüìä Loaded {encounters_df.count():,} encounters")

In [None]:
# Show encounters
encounters_df.select(
    "visit_occurrence_id",
    "person_id",
    "visit_start_date",
    "visit_end_date",
    "length_of_stay_days"
).show(10)

In [None]:
# Length of stay statistics
print("üìä Length of Stay Statistics:")
encounters_df.select(
    F.min("length_of_stay_days").alias("min_los"),
    F.max("length_of_stay_days").alias("max_los"),
    F.avg("length_of_stay_days").alias("avg_los"),
    F.percentile_approx("length_of_stay_days", 0.5).alias("median_los")
).show()

## 5Ô∏è‚É£ Join Patients and Encounters

In [None]:
# Join patients with their encounters
patient_encounters = patients_df.join(
    encounters_df,
    on="person_id",
    how="inner"
)

print(f"üìä Joined DataFrame: {patient_encounters.count():,} rows")

# Show combined data
patient_encounters.select(
    "person_id",
    "age",
    "gender_concept_id",
    "visit_start_date",
    "length_of_stay_days"
).show(10)

## 6Ô∏è‚É£ Aggregations by Patient

In [None]:
# Calculate per-patient statistics
patient_stats = patient_encounters.groupBy("person_id", "age") \
    .agg(
        F.count("visit_occurrence_id").alias("total_visits"),
        F.sum("length_of_stay_days").alias("total_los"),
        F.avg("length_of_stay_days").alias("avg_los"),
        F.min("visit_start_date").alias("first_visit"),
        F.max("visit_start_date").alias("last_visit")
    )

patient_stats.orderBy(F.desc("total_visits")).show(10)

print(f"\nüìä Patients with encounters: {patient_stats.count():,}")

## 7Ô∏è‚É£ High Utilizers Analysis

In [None]:
# Find high utilizers (many hospital visits)
high_utilizers = patient_stats.filter(F.col("total_visits") >= 5)

print(f"üö® High Utilizers (5+ visits): {high_utilizers.count():,} patients")
print(f"   Percentage: {high_utilizers.count() / patient_stats.count() * 100:.1f}%")

# Show top utilizers
high_utilizers.orderBy(F.desc("total_visits")).show(10)

## 8Ô∏è‚É£ Visualization (Convert to Pandas)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Convert small aggregated data to Pandas for plotting
visit_counts = patient_stats.groupBy("total_visits") \
    .count() \
    .orderBy("total_visits") \
    .limit(20) \
    .toPandas()

# Plot distribution
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.bar(visit_counts['total_visits'], visit_counts['count'])
plt.xlabel('Number of Visits')
plt.ylabel('Number of Patients')
plt.title('Distribution of Hospital Visits per Patient')
plt.grid(axis='y', alpha=0.3)

plt.subplot(1, 2, 2)
age_stats = patients_df.groupBy("age") \
    .count() \
    .orderBy("age") \
    .toPandas()
plt.plot(age_stats['age'], age_stats['count'])
plt.xlabel('Age')
plt.ylabel('Number of Patients')
plt.title('Patient Age Distribution')
plt.grid(alpha=0.3)

plt.tight_layout()
plt.show()

## 9Ô∏è‚É£ Save Processed Data

In [None]:
# Save to local parquet (for later use)
output_path = "../data/processed/patient_stats.parquet"

patient_stats.write \
    .mode("overwrite") \
    .parquet(output_path)

print(f"‚úÖ Saved patient statistics to: {output_path}")

## üéØ Summary

In this notebook, we:

1. ‚úÖ Created PySpark session with BigQuery connector
2. ‚úÖ Loaded synthetic patient data (10,000 patients)
3. ‚úÖ Loaded hospital encounters (50,000 visits)
4. ‚úÖ Performed basic exploratory analysis
5. ‚úÖ Joined datasets using PySpark SQL
6. ‚úÖ Calculated per-patient aggregations
7. ‚úÖ Identified high utilizers
8. ‚úÖ Visualized results with Matplotlib
9. ‚úÖ Saved processed data to Parquet

**Next Steps:**
- Explore conditions, procedures, medications
- Feature engineering for ML models
- Build readmission prediction model
- Deploy to GCP Dataproc cluster

---

In [None]:
# Clean up
spark.stop()
print("üõë Spark session stopped")