In [1]:
# Install required packages 
sc.install_pypi_package("boto3")
sc.install_pypi_package("matplotlib")
sc.install_pypi_package("seaborn")

import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

# --- start the Spark session ---
spark = SparkSession.builder.appName("MedicaidDrugAnalysis").getOrCreate()

# --- Load data from S3 ---
df = spark.read.csv("s3://finalprojectankita/Input/SDUD2023.csv", header=True, inferSchema=True)

# --- Clean column names and values ---
for col in df.columns:
    df = df.withColumnRenamed(col, col.strip())

df_clean = df.withColumn("Product Name", F.trim(F.regexp_replace("Product Name", r"[^a-zA-Z0-9\s]", "")))

for col in [
    "Units Reimbursed", "Number of Prescriptions",
    "Total Amount Reimbursed", "Medicaid Amount Reimbursed", "Non Medicaid Amount Reimbursed"
]:
    df_clean = df_clean.withColumn(col, F.col(col).cast("float"))

df_clean = df_clean.dropna(subset=["State", "Product Name", "Total Amount Reimbursed"])

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1749508402837_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Collecting numpy>=1.23
  Downloading numpy-2.0.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (19.5 MB)
Collecting kiwisolver>=1.3.1
  Downloading kiwisolver-1.4.7-cp39-cp39-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (1.6 MB)
Collecting importlib-resources>=3.2.0
  Downloading importlib_resources-6.5.2-py3-none-any.whl (37 kB)
Collecting contourpy>=1.0.1
  Downloading contourpy-1.3.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (321 kB)
Collecting pillow>=8
  Downloading pillow-11.2.1-cp39-cp39-manylinux_2_28_x86_64.whl (4.6 MB)
Collecting fonttools>=4.22.0
  Downloading fonttools-4.58.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.7 MB)
Collecting cycler>=0.10
  Downloading cycler-0.12.1-py3-none-any.whl (8.3 kB)
Collecting zipp>=3.1.0
  Downloading zipp-3.23.0-py3-none-any.whl (10 kB)
Installing collected packages: zipp, numpy, pillow, kiwisolver, importlib-resources, fonttools, cycler, contourpy
Successfully installed contourpy-1.3.0 cycle

In [2]:

# --- List of Top 5 Drugs by State ---
window_spec = Window.partitionBy("State").orderBy(F.desc("Total Amount Reimbursed"))
df_ranked = df_clean.withColumn("rank", F.row_number().over(window_spec))
top5_df = df_ranked.filter(F.col("rank") <= 5).drop("rank")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# --- Regression Model ---
state_indexer = StringIndexer(inputCol="State", outputCol="StateIndex")
drug_indexer = StringIndexer(inputCol="Product Name", outputCol="DrugIndex")
assembler = VectorAssembler(
    inputCols=["StateIndex", "DrugIndex", "Units Reimbursed", "Number of Prescriptions"],
    outputCol="features"
)
lr = LinearRegression(featuresCol="features", labelCol="Total Amount Reimbursed")
pipeline = Pipeline(stages=[state_indexer, drug_indexer, assembler, lr])

train_data, test_data = top5_df.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train_data)
predictions = model.transform(test_data)

evaluator = RegressionEvaluator(
    labelCol="Total Amount Reimbursed", predictionCol="prediction", metricName="rmse"
)
rmse = evaluator.evaluate(predictions)
print(f"RMSE (Root Mean Squared Error): {rmse:.2f}")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

RMSE (Root Mean Squared Error): 45477607.72

In [4]:
# --------------------------------------
# VISUALIZATION SECTION
# --------------------------------------
import boto3
# Convert Spark to Pandas for plotting
pandas_df = df_clean.toPandas()

# Plot 1: Top 10 Drugs by Total Reimbursement
top10_drugs = pandas_df.groupby("Product Name")["Total Amount Reimbursed"].sum().nlargest(10)
plt.figure(figsize=(10, 6))
top10_drugs.sort_values().plot(kind="barh", color="skyblue")
plt.title("Top 10 Drugs by Total Reimbursement")
plt.xlabel("Total Amount Reimbursed")
plt.ylabel("Product Name")
plt.tight_layout()
plt.show()

# Save locally first
plt.savefig("/tmp/top10_drugs.png")

# Upload to S3
s3 = boto3.client("s3")
s3.upload_file("/tmp/top10_drugs.png", "finalprojectankita", "Output/top10_drugs.png")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# Plot 2: Total Reimbursement by State
plt.figure(figsize=(12, 6))
state_totals = pandas_df.groupby("State")["Total Amount Reimbursed"].sum().sort_values(ascending=False)
state_totals.plot(kind="bar", color="salmon")
plt.title("Total Reimbursement per State")
plt.ylabel("Total Amount Reimbursed")
plt.xlabel("State")
plt.xticks(rotation=45)
plt.tight_layout()

# Save locally before showing
plt.savefig("/tmp/total_reimbursement_per_state.png")

# Upload to S3
s3 = boto3.client("s3")
s3.upload_file(
    "/tmp/total_reimbursement_per_state.png",
    "finalprojectankita",
    "Output/total_reimbursement_per_state.png"
)



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:

# Plot 3: Heatmap of Correlation
plt.figure(figsize=(8, 6))
numeric_cols = pandas_df[[
    "Units Reimbursed", "Number of Prescriptions",
    "Total Amount Reimbursed", "Medicaid Amount Reimbursed", "Non Medicaid Amount Reimbursed"
]]
corr = numeric_cols.corr()
sns.heatmap(corr, annot=True, cmap="coolwarm")
plt.title("Correlation Heatmap of Reimbursement Metrics")
plt.tight_layout()

# Save locally before showing
plt.savefig("/tmp/correlation_heatmap_reimbursement_metrics.png")

# Upload to S3
s3 = boto3.client("s3")
s3.upload_file(
    "/tmp/correlation_heatmap_reimbursement_metrics.png",
    "finalprojectankita",
    "Output/correlation_heatmap_reimbursement_metrics.png"
)



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# Write the transformed top5 dataframe to S3 output folder in CSV
top5_df.select("State", "Product Name", "Total Amount Reimbursed") \
    .write.mode("overwrite").option("header", True) \
    .csv("s3://finalprojectankita/Output/top5_drugs_by_state/")

# Write predictions as CSV
predictions.select("State", "Product Name", "prediction", "Total Amount Reimbursed") \
    .write.mode("overwrite").option("header", True) \
    .csv("s3://finalprojectankita/Output/Data_output/")



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox()