In [0]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("Forecasting the Future of Healthcare Costs").getOrCreate()

In [0]:
df=spark.read.csv("/FileStore/tables/synthetic_healthcare_costs_large.csv",inferSchema=True,header=True)

In [0]:
df.show(1)

+---------+---+------+-----------+------------------+-------------+--------------+-----------------+---------+---------------+-------------------+--------------+------------------+-----------+----------------+-------------+-----------------+----+-------+
|PatientID|Age|Gender|IncomeLevel|         Diagnosis|TreatmentType|NumberOfVisits|TreatmentDuration|TotalCost|OutOfPocketCost|HospitalizationDays|MedicationCost|DiagnosticTestCost|SurgeryCost|ChronicCondition|FamilyHistory|InsuranceCoverage|Year|Quarter|
+---------+---+------+-----------+------------------+-------------+--------------+-----------------+---------+---------------+-------------------+--------------+------------------+-----------+----------------+-------------+-----------------+----+-------+
|   P00001| 76|Female|        Low|Respiratory Issues|   Medication|             6|                3| 17263.84|        4003.07|                  2|       2251.03|            472.54|    7199.77|             Yes|          Yes|          Pa

In [0]:
1 # Analyze healthcare costs across demographics, treatments, and diagnoses
from pyspark.sql.functions import col, avg, sum, count
df.groupBy("Age").agg(avg("TotalCost").alias("AvgCost")).show()
df.groupBy("Gender").agg(avg("TotalCost").alias("AvgCost")).show()
df.groupBy("IncomeLevel").agg(avg("TotalCost").alias("AvgCost")).show()

+---+------------------+
|Age|           AvgCost|
+---+------------------+
| 31|14999.602296296298|
| 85|15242.566338028166|
| 65|14492.644895104902|
| 53|15421.605395683446|
| 78| 15289.47901408451|
| 34|14297.321241830068|
| 81|14865.634318181821|
| 28|      15076.790625|
| 76| 14832.88061224491|
| 27|14801.118895705533|
| 26|15150.798417266187|
| 44| 14005.59322147652|
| 22|15329.355359477126|
| 47|14989.994085365857|
| 52| 14651.38704918032|
| 86|14974.679275362316|
| 20|15387.520000000002|
| 40|14624.245447761194|
| 57|14735.550937500004|
| 54|14621.762166666671|
+---+------------------+
only showing top 20 rows

+------+------------------+
|Gender|           AvgCost|
+------+------------------+
|Female|14979.845273069628|
| Other|14730.580313252995|
|  Male|14910.906579275907|
+------+------------------+

+-----------+------------------+
|IncomeLevel|           AvgCost|
+-----------+------------------+
|       High|14844.726505964236|
|        Low|14967.357999999971|
|     Middle

In [0]:
# Analyze costs by treatments and diagnoses
df.groupBy("TreatmentType").agg(avg("TotalCost").alias("AvgCost")).show()
df.groupBy("Diagnosis").agg(avg("TotalCost").alias("AvgCost")).show()

+-------------+------------------+
|TreatmentType|           AvgCost|
+-------------+------------------+
|      Therapy| 15176.58406994424|
|   Medication|14916.026465789882|
|      Surgery|14762.236168316846|
+-------------+------------------+

+------------------+------------------+
|         Diagnosis|           AvgCost|
+------------------+------------------+
|          Diabetes|14865.651183282984|
|Respiratory Issues|15045.227030393618|
|      Hypertension|14930.694149195528|
|            Cancer|14781.239489124975|
|    Cardiac Issues|15057.791985851429|
+------------------+------------------+



In [0]:
#Build a predictive model for estimating healthcare costs based on patient profiles and medical history
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Prepare features and labels
feature_cols = ["Age", "NumberOfVisits", "TreatmentDuration", "HospitalizationDays", 
                "MedicationCost", "DiagnosticTestCost", "SurgeryCost"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)
data = df.select("features", col("TotalCost").alias("label"))

# Split into training and test datasets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Train a Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="label")
model = lr.fit(train_data)

# Evaluate the model
predictions = model.transform(test_data)
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

Root Mean Squared Error (RMSE): 4866.718845729292


In [0]:
3   #  Identify high-cost drivers to optimize and manage healthcare expenses effectively
from pyspark.sql.functions import corr

# Compute correlations with TotalCost
for col_name in ["Age", "NumberOfVisits", "HospitalizationDays", "MedicationCost", "DiagnosticTestCost", "SurgeryCost"]:
    correlation = df.select(corr(col_name, "TotalCost").alias("correlation")).collect()[0][0]
    print(f"Correlation between {col_name} and TotalCost: {correlation}")

Correlation between Age and TotalCost: 0.006291451628312267
Correlation between NumberOfVisits and TotalCost: 0.00400489744185796
Correlation between HospitalizationDays and TotalCost: 0.015517819582616232
Correlation between MedicationCost and TotalCost: -0.0010727881555040405
Correlation between DiagnosticTestCost and TotalCost: 0.021992242433913088
Correlation between SurgeryCost and TotalCost: 0.004978128281359015


In [0]:
#4. Support decision-making with data-driven insights for cost management
# Find cost trends by year and quarter
df.groupBy("Year", "Quarter").agg(avg("TotalCost").alias("AvgCost")).orderBy("Year", "Quarter").show()

# Identify cost differences by insurance coverage
df.groupBy("InsuranceCoverage").agg(avg("TotalCost").alias("AvgCost")).show()


+----+-------+------------------+
|Year|Quarter|           AvgCost|
+----+-------+------------------+
|2020|     Q1|15167.513485804406|
|2020|     Q2|14653.014424040051|
|2020|     Q3|15260.646462585033|
|2020|     Q4|14553.310030769244|
|2021|     Q1|14835.016893819344|
|2021|     Q2|14955.484741784043|
|2021|     Q3|14956.208673300165|
|2021|     Q4|14767.507511811029|
|2022|     Q1|14974.168455414012|
|2022|     Q2|14643.093106312288|
|2022|     Q3|14541.001712962965|
|2022|     Q4| 15036.64642967542|
|2023|     Q1|15198.495357142858|
|2023|     Q2|15097.901555555556|
|2023|     Q3|15212.372683706057|
|2023|     Q4|15155.305224358965|
+----+-------+------------------+

+-----------------+-----------------+
|InsuranceCoverage|          AvgCost|
+-----------------+-----------------+
|             None|14845.49491159136|
|          Partial|15047.36129994937|
|             Full|14867.48369331743|
+-----------------+-----------------+



In [0]:
#5. Identify patterns to enable personalized, cost-effective care
from pyspark.ml.clustering import KMeans

# Apply K-Means clustering
kmeans = KMeans(featuresCol="features", k=3, seed=42)
model = kmeans.fit(data)

# Predict clusters
clusters = model.transform(data)
clusters.groupBy("prediction").count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1| 2771|
|         2| 2614|
|         0| 4615|
+----------+-----+



In [0]:
#6. Continuously monitor and forecast healthcare cost trends
# Monitor cost trends over time
df.groupBy("Year", "Quarter").agg(sum("TotalCost").alias("TotalCost")).orderBy("Year", "Quarter").show()

# Forecasting: Use a linear regression model on yearly/quarterly data
from pyspark.sql.functions import when

# Convert Quarter to numerical values
trends = trends.withColumn("QuarterNum", 
                           when(trends["Quarter"] == "Q1", 1)
                           .when(trends["Quarter"] == "Q2", 2)
                           .when(trends["Quarter"] == "Q3", 3)
                           .when(trends["Quarter"] == "Q4", 4))

# Define feature columns for the assembler
feature_cols = ["Year", "QuarterNum"]

# Create feature vector
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
trends = assembler.transform(trends)

# Select features and label for model training
trends_data = trends.select("features", col("TotalCost").alias("label"))

# Train a linear regression model
forecast_model = lr.fit(trends_data)

# Make predictions
predictions = forecast_model.transform(trends_data)
predictions.show()


+----+-------+-----------------+
|Year|Quarter|        TotalCost|
+----+-------+-----------------+
|2020|     Q1|9616203.549999993|
|2020|     Q2|8777155.639999991|
|2020|     Q3|       8973260.12|
|2020|     Q4|9459651.520000009|
|2021|     Q1|9360895.660000006|
|2021|     Q2|9556554.750000004|
|2021|     Q3|       9018593.83|
|2021|     Q4|9377367.270000003|
|2022|     Q1|       9403777.79|
|2022|     Q2|8815142.049999997|
|2022|     Q3|9422569.110000001|
|2022|     Q4|9728710.239999996|
|2023|     Q1|       9362273.14|
|2023|     Q2|       9511677.98|
|2023|     Q3|9522945.299999991|
|2023|     Q4|9456910.459999993|
+----+-------+-----------------+

+------------+-----------------+-----------------+
|    features|            label|       prediction|
+------------+-----------------+-----------------+
|[2023.0,2.0]|       9511677.98|9439016.544482052|
|[2023.0,3.0]|9522945.299999991|9466899.194232196|
|[2023.0,1.0]|       9362273.14|9411133.894731939|
|[2020.0,1.0]|9616203.549999993|9