In [1]:
import os 


In [2]:
%cd ..


/home/jovyan


In [3]:
os.listdir()
os.listdir("data")


['diabetic_data.csv', 'IDs_mapping.csv']

In [4]:
!pip install pyspark
from pyspark.sql import SparkSession 

spark = SparkSession.builder \
    .appName("Hospital analysis")  \
    .getOrCreate()

spark





In [44]:
df= spark.read.csv("data/diabetic_data.csv", header= True, inferSchema= True )
spark.read.csv("data/diabetic_data.csv").count()
df.printSchema()

root
 |-- encounter_id: integer (nullable = true)
 |-- patient_nbr: integer (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: string (nullable = true)
 |-- weight: string (nullable = true)
 |-- admission_type_id: integer (nullable = true)
 |-- discharge_disposition_id: integer (nullable = true)
 |-- admission_source_id: integer (nullable = true)
 |-- time_in_hospital: integer (nullable = true)
 |-- payer_code: string (nullable = true)
 |-- medical_specialty: string (nullable = true)
 |-- num_lab_procedures: integer (nullable = true)
 |-- num_procedures: integer (nullable = true)
 |-- num_medications: integer (nullable = true)
 |-- number_outpatient: integer (nullable = true)
 |-- number_emergency: integer (nullable = true)
 |-- number_inpatient: integer (nullable = true)
 |-- diag_1: string (nullable = true)
 |-- diag_2: string (nullable = true)
 |-- diag_3: string (nullable = true)
 |-- number_diagnoses: integer (nullable = true)
 |-

In [45]:
spark.read.csv("data/diabetic_data.csv").count()

101767

In [6]:
from pyspark.sql.functions import col, when, regexp_extract
df_clean= df.replace("?", None)
df_clean = df_clean.withColumn( 
    "age_num", regexp_extract("age", r"\[(\d+)", 1).cast("int"))
df_clean = df_clean.withColumn ( 
    "diag_1_cat", regexp_extract("diag_1", r"^(\d{3})",1))

In [7]:
from pyspark.sql.functions import when, col
df_clean = df_clean.withColumn( 
    "readmitted_flag", when(col("readmitted") =="<30",1).otherwise(0))
#Create readmitted_flag,addss a new column 1 if re-admittted within 30 days ("<30")

In [8]:
from pyspark.sql.functions import when, col
medication_cols = [
    "metformin", "repaglinide", "nateglinide", "chlorpropamide",
    "glimepiride", "acetohexamide", "glipizide", "glyburide",
    "tolbutamide", "pioglitazone", "rosiglitazone", "acarbose",
    "miglitol", "troglitazone", "tolazamide", "insulin",
    "glyburide-metformin", "glipizide-metformin", "glimepiride-pioglitazone",
    "metformin-rosiglitazone", "metformin-pioglitazone"
]

for old_col in medication_cols:
    if "-" in old_col:
        new_col = old_col.replace("-", "_")
        df_clean = df_clean.withColumnRenamed(old_col, new_col)

# Update the med column list to match renamed columns
medication_cols = [col.replace("-", "_") for col in medication_cols]

for med in medication_cols:
    df_clean = df_clean.withColumn( 
        f"{med}_effect", 
        when(col(med) == "No", 0)
        .when(col(med) == "Steady", 1)
        .when(col(med) == "Down", -1)
        .when(col(med) == "Up", 2)
        .otherwise(None)
    )



df_clean.select("insulin", "insulin_effect", "metformin", "metformin_effect").show(5)


+-------+--------------+---------+----------------+
|insulin|insulin_effect|metformin|metformin_effect|
+-------+--------------+---------+----------------+
|     No|             0|       No|               0|
|     Up|             2|       No|               0|
|     No|             0|       No|               0|
|     Up|             2|       No|               0|
| Steady|             1|       No|               0|
+-------+--------------+---------+----------------+
only showing top 5 rows



In [9]:
df_clean.createOrReplaceTempView("patients")

df_intensity = spark.sql("""
SELECT
    *,
    (
      COALESCE(metformin_effect, 0) +
      COALESCE(repaglinide_effect, 0) +
      COALESCE(nateglinide_effect, 0) +
      COALESCE(chlorpropamide_effect, 0) +
      COALESCE(glimepiride_effect, 0) +
      COALESCE(acetohexamide_effect, 0) +
      COALESCE(glipizide_effect, 0) +
      COALESCE(glyburide_effect, 0) +
      COALESCE(tolbutamide_effect, 0) +
      COALESCE(pioglitazone_effect, 0) +
      COALESCE(rosiglitazone_effect, 0) +
      COALESCE(acarbose_effect, 0) +
      COALESCE(miglitol_effect, 0) +
      COALESCE(troglitazone_effect, 0) +
      COALESCE(tolazamide_effect, 0) +
      COALESCE(insulin_effect, 0) +
      COALESCE(glyburide_metformin_effect, 0) +
      COALESCE(glipizide_metformin_effect, 0) +
      COALESCE(glimepiride_pioglitazone_effect, 0) +
      COALESCE(metformin_rosiglitazone_effect, 0) +
      COALESCE(metformin_pioglitazone_effect, 0)
    ) AS medication_intensity_score
FROM patients
""")



### insight 1: Readmission Rate by Age Group
Which age groups have the highest risk of being readmitted within 30 days?



In [21]:
from pyspark.sql.functions import avg, round 

df_clean.groupBy("race") \
   .agg(round(avg("readmitted_flag") * 100, 2).alias("readmission_pct")) \
   .orderBy("readmission_pct", ascending=False) \
    .show()



+---------------+---------------+
|           race|readmission_pct|
+---------------+---------------+
|      Caucasian|          11.29|
|AfricanAmerican|          11.22|
|       Hispanic|          10.41|
|          Asian|          10.14|
|          Other|           9.63|
|           NULL|           8.27|
+---------------+---------------+



In [22]:
df_clean.groupBy("race").count().orderBy("count", ascending=False).show()

+---------------+-----+
|           race|count|
+---------------+-----+
|      Caucasian|76099|
|AfricanAmerican|19210|
|           NULL| 2273|
|       Hispanic| 2037|
|          Other| 1506|
|          Asian|  641|
+---------------+-----+



In [23]:
df_clean.createOrReplaceTempView("patients")
df_summary = spark.sql("""
SELECT
    race,
    COUNT(*) AS race_count,
    ROUND(AVG(readmitted_flag) * 100, 2) AS pct_readmitted
FROM patients
GROUP BY race
ORDER BY pct_readmitted DESC
""")

df_summary.show()
df_summary.createOrReplaceTempView("readmit_by_race_summary")


+---------------+----------+--------------+
|           race|race_count|pct_readmitted|
+---------------+----------+--------------+
|      Caucasian|     76099|         11.29|
|AfricanAmerican|     19210|         11.22|
|       Hispanic|      2037|         10.41|
|          Asian|       641|         10.14|
|          Other|      1506|          9.63|
|           NULL|      2273|          8.27|
+---------------+----------+--------------+



## Insight #3: Readmission by Primary Diagnosis


In [13]:
df_clean.select("diag_1", "diag_1_cat").distinct().show(10)


+------+----------+
|diag_1|diag_1_cat|
+------+----------+
|   441|       441|
|   447|       447|
|   821|       821|
|   397|       397|
|   600|       600|
|   217|       217|
|   791|       791|
|   337|       337|
|   846|       846|
|   686|       686|
+------+----------+
only showing top 10 rows



In [14]:
from pyspark.sql.functions import avg, round, count

diag_summary = df_clean.groupBy("diag_1_cat") \
    .agg(
        round(avg("readmitted_flag") * 100, 2).alias("readmission_pct"),
        count("*").alias("diagnosis_count")
    ) \
    .orderBy("readmission_pct", ascending=False)


In [15]:
diag_summary.orderBy("readmission_pct", ascending=False).show(10)


+----------+---------------+---------------+
|diag_1_cat|readmission_pct|diagnosis_count|
+----------+---------------+---------------+
|       904|          100.0|              2|
|       543|          100.0|              1|
|       299|          100.0|              1|
|       906|          100.0|              1|
|       974|          100.0|              1|
|       347|          100.0|              1|
|       271|          100.0|              3|
|       391|          100.0|              1|
|       731|           75.0|              4|
|       506|          66.67|              3|
+----------+---------------+---------------+
only showing top 10 rows



In [16]:
diag_summary.orderBy("diagnosis_count", ascending=False).show(10)


+----------+---------------+---------------+
|diag_1_cat|readmission_pct|diagnosis_count|
+----------+---------------+---------------+
|       250|          12.98|           8757|
|       428|          14.11|           6862|
|       414|           9.04|           6581|
|          |          13.26|           4291|
|       786|           7.25|           4016|
|       410|          10.32|           3614|
|       486|           8.95|           3508|
|       427|           9.11|           2766|
|       491|          12.62|           2275|
|       715|           10.0|           2151|
+----------+---------------+---------------+
only showing top 10 rows



In [24]:
diagnosis_map = {
    "250": "Diabetes",
    "401": "Hypertension",
    "414": "Heart Disease",
    "428": "Heart Failure",
    "486": "Pneumonia",
    "496": "COPD",
    "518": "Respiratory Failure",
    "276": "Fluid/Electrolyte Disorder",
    "427": "Cardiac Dysrhythmia",
    "038": "Septicemia",
    "285": "Anemia",
    "V45": "Post-Surgery/Implants",
    "782": "Symptoms, Not Elsewhere Classified"
}



from pyspark.sql.functions import when, col, lit 

def add_diag_labels(df, diagnosis_map):
    """
    Adds a new column 'diagnosis_label' to a DataFrame
    using a mapping from diag_1_cat to readable labels.

    Parameters:
        df (DataFrame): Input Spark DataFrame with 'diag_1_cat' column
        diagnosis_map (dict): Dict of {'ICD-9 code': 'label'}

    Returns:
        DataFrame with a new 'diagnosis_label' column
    """

    # Initialize the new column to null
    df_labeled = df.withColumn("diagnosis_label", lit(None))

    # Apply mapping row by row
    for code, label in diagnosis_map.items():
        df_labeled = df_labeled.withColumn(
            "diagnosis_label",
            when(col("diag_1_cat") == code, label).otherwise(col("diagnosis_label"))
        )

    return df_labeled


In [78]:
new_labels = {
    "358": "Myasthenia Gravis / Neuromuscular Disorders",
    "967": "Poisoning by Sedatives & Hypnotics",
    "966": "Poisoning by Psychotropic Agents",
    "263": "Malnutrition",
    "457": "Lymphedema / Other Circulatory Disorders",
    "204": "Lymphoid Leukemia",
    "281": "Vitamin B12 Deficiency Anemia",
    "646": "Complications of Pregnancy",
    "338": "Pain Disorders Related to Psychological Factors",
    "350": "Disorders of Trigeminal Nerve",
    "152": "Malignant Neoplasm of Small Intestine",
    "200": "Lymphoma (Hodgkin/Non-Hodgkin)",
    "567": "Peritonitis / Intra-abdominal Infection",
    "340": "Multiple Sclerosis",
    "586": "Renal Failure, Unspecified",
    "935": "Foreign Body in Respiratory Tract",
    "853": "Hemorrhage Following Injury",
    "298": "Other Nonorganic Psychoses",
    "443": "Peripheral Vascular Disease",
    "475": "Chronic Tonsillitis and Adenoiditis",
    "199": "Malignant Neoplasm, Unspecified",
    "297": "Delusional Disorders",
    "239": "Uncertain Behavior Neoplasms",
    "514": "Pulmonary Congestion and Hypostasis",
    "593": "Other Disorders of Kidney and Ureter",
    "282": "Hereditary Hemolytic Anemia",
    "572": "Liver Abscess / Chronic Liver Disease",
    "756": "Congenital Anomalies of Musculoskeletal System",
    "464": "Acute Laryngitis / Upper Respiratory Infection",
    "438": "Late Effects of Cerebrovascular Disease",
    "202": "Other Lymphomas",
    "112": "Candidiasis (Yeast Infection)",
    "790": "Abnormal Blood Chemistry",
    "150": "Malignant Neoplasm of Esophagus",
    "787": "Symptoms Involving Digestive System",
    "242": "Thyrotoxicosis (Hyperthyroidism)",
    "293": "Transient Mental Disorders",
    "718": "Internal Derangement of Knee",
    "972": "Poisoning by Agents Affecting Blood",
    "962": "Poisoning by Hormones and Synthetic Substitutes",
    "447": "Other Disorders of Arteries and Veins",
    "512": "Other Pneumothorax / Lung Conditions",
    "420": "Acute Pericarditis",
    "922": "Contusion with Intact Skin Surface",
    "707": "Chronic Ulcer of Skin",
    "310": "Mild Mental Disorders",
    "171": "Malignant Neoplasm of Connective Tissue",
    "494": "Bronchiectasis",
    "255": "Disorders of Adrenal Glands",
    "312": "Disturbance of Conduct (Adolescent Behavior)",
    "396": "Diseases of Mitral and Aortic Valves",
    "368": "Visual Disturbances",
    "284": "Aplastic Anemia",
    "294": "Dementia (Other than Alzheimer's)",
    "287": "Purpura and Other Hemorrhagic Conditions",
    "571": "Chronic Liver Disease and Cirrhosis",
    "434": "Occlusion of Cerebral Arteries",
    "537": "Disorders of Stomach and Duodenum",
    "429": "Ill-defined Heart Disease",
    "860": "Traumatic Pneumothorax",
    "808": "Pelvic Fracture",
    "444": "Arterial Embolism and Thrombosis",
    "440": "Atherosclerosis",
    "820": "Fracture of Neck of Femur",
    "478": "Other Diseases of Upper Respiratory Tract",
    "576": "Disorders of Biliary Tract",
    "403": "Hypertensive Kidney Disease",
    "788": "Urinary Symptoms",
    "423": "Other Pericardial Diseases",
    "288": "Diseases of White Blood Cells",
    "225": "Benign Neoplasm of Brain",
    "511": "Pleural Effusion",
    "714": "Rheumatoid Arthritis",
    "644": "Premature Labor",
    "965": "Poisoning by Analgesics and Antipyretics",
    "507": "Pneumonitis Due to Solids and Liquids",
    "781": "Symptoms Involving Nervous and Musculoskeletal Systems",
    "421": "Acute and Subacute Endocarditis",
    "348": "Other Conditions of Brain",
    "203": "Multiple Myeloma",
    "594": "Calculus of Lower Urinary Tract",
    "736": "Other Disorders of Bone and Cartilage",
    "197": "Secondary Malignant Neoplasm of Respiratory Organs",
    "292": "Drug-induced Mental Disorders",
    "536": "Disorders of Function of Stomach",
    "824": "Fracture of Ankle",
    "969": "Poisoning by Psychotropic Agents",
    "711": "Arthropathy with Infection",
    "198": "Secondary Malignant Neoplasm of Other Sites",
    "852": "Subarachnoid, Subdural, and Extradural Hemorrhage",
    "309": "Adjustment Reaction",
    "577": "Pancreatic Disorders",
    "996": "Complications of Surgical and Medical Care",
    "251": "Disorders of Pancreatic Internal Secretion",
    "585": "Chronic Kidney Disease",
    "584": "Acute Kidney Failure",
    "398": "Rheumatic Heart Disease",
    "608": "Disorders of Male Genital Organs",
    "291": "Alcohol-induced Mental Disorders",
    "157": "Malignant Neoplasm of Pancreas",
    "730": "Osteomyelitis",
    "473": "Chronic Sinusitis",
    "183": "Malignant Neoplasm of Ovary",
    "332": "Parkinson's Disease",
    "482": "Pneumonia Due to Bacteria",
    "491": "Chronic Bronchitis",
    "404": "Hypertensive Heart and Kidney Disease",
    "558": "Noninfectious Gastroenteritis",
    "311": "Depressive Disorder, Not Elsewhere Classified",
    "307": "Special Symptoms or Syndromes",
    "710": "Diffuse Connective Tissue Disease",
    "196": "Secondary Malignant Neoplasm of Lymph Nodes"
}

diagnosis_map.update(new_labels)


In [79]:




diag_labeled = add_diag_labels(diag_summary, diagnosis_map)
diag_labeled.select("diag_1_cat", "diagnosis_label", "readmission_pct").show()

+----------+---------------+---------------+
|diag_1_cat|diagnosis_label|readmission_pct|
+----------+---------------+---------------+
|       904|           NULL|          100.0|
|       906|           NULL|          100.0|
|       974|           NULL|          100.0|
|       347|           NULL|          100.0|
|       299|           NULL|          100.0|
|       543|           NULL|          100.0|
|       271|           NULL|          100.0|
|       391|           NULL|          100.0|
|       731|           NULL|           75.0|
|       506|           NULL|          66.67|
|       356|           NULL|           60.0|
|       643|           NULL|           60.0|
|       352|           NULL|           50.0|
|       973|           NULL|           50.0|
|       146|           NULL|           50.0|
|       580|           NULL|           50.0|
|       279|           NULL|           50.0|
|       526|           NULL|          42.86|
|       508|           NULL|           40.0|
|       20

In [80]:
diag_labeled.filter(col("diagnosis_label").isNull()) \
    .groupBy("diag_1_cat") \
    .agg(count("*").alias("count")) \
    .orderBy("count", ascending=False) \
    .show(20)



+----------+-----+
|diag_1_cat|count|
+----------+-----+
|       296|    1|
|       451|    1|
|       800|    1|
|       591|    1|
|       574|    1|
|       581|    1|
|       205|    1|
|       747|    1|
|       334|    1|
|       462|    1|
|       647|    1|
|       442|    1|
|       383|    1|
|       851|    1|
|       155|    1|
|       483|    1|
|       154|    1|
|       727|    1|
|       987|    1|
|       625|    1|
+----------+-----+
only showing top 20 rows



In [82]:
diag_filtered = diag_labeled.filter(col("diagnosis_count") >= 10)


In [83]:
#view TOp Diagnoses by readmission % 
#anwers common diagnoses have the highest30 day readmission rate?

diag_filtered.orderBy("readmission_pct", ascending= False).show(50,truncate =False) 



+----------+---------------+---------------+-----------------------------------------------+
|diag_1_cat|readmission_pct|diagnosis_count|diagnosis_label                                |
+----------+---------------+---------------+-----------------------------------------------+
|358       |35.71          |14             |Myasthenia Gravis / Neuromuscular Disorders    |
|967       |35.29          |17             |Poisoning by Sedatives & Hypnotics             |
|966       |33.33          |12             |Poisoning by Psychotropic Agents               |
|263       |33.33          |12             |Malnutrition                                   |
|457       |33.33          |12             |Lymphedema / Other Circulatory Disorders       |
|204       |30.43          |23             |Lymphoid Leukemia                              |
|281       |30.0           |20             |Vitamin B12 Deficiency Anemia                  |
|646       |30.0           |10             |Complications of Pregnancy

In [36]:
diag_filtered.filter( 
    (col("diagnosis_label").isNull()) & (col("readmission_pct") >= 10)
).select("diag_1_cat","diagnosis_count","readmission_pct")  \
 .toPandas().to_csv("/home/jovyan/work/to_map_later.csv", index=False)

In [37]:
%ls /home/jovyan/work/

to_map_later.csv


In [39]:
import pandas as pd
pd.read_csv("/home/jovyan/work/to_map_later.csv").count()


diag_1_cat         173
diagnosis_count    175
readmission_pct    175
dtype: int64

In [67]:
pd.set_option('display.max_rows', 172)

In [68]:
import pandas as pd
pd.read_csv("/home/jovyan/work/to_map_later.csv").head(172)

Unnamed: 0,diag_1_cat,diagnosis_count,readmission_pct
0,358.0,14,35.71
1,967.0,17,35.29
2,966.0,12,33.33
3,263.0,12,33.33
4,457.0,12,33.33
5,204.0,23,30.43
6,281.0,20,30.0
7,646.0,10,30.0
8,338.0,20,30.0
9,350.0,11,27.27


In [85]:
#view top diagnoses by Volume
#“Which conditions make up the largest share of admissions?
diag_filtered.orderBy("diagnosis_count",ascending=False).show(10, truncate=False)



+----------+---------------+---------------+-------------------+
|diag_1_cat|readmission_pct|diagnosis_count|diagnosis_label    |
+----------+---------------+---------------+-------------------+
|250       |12.98          |8757           |Diabetes           |
|428       |14.11          |6862           |Heart Failure      |
|414       |9.04           |6581           |Heart Disease      |
|          |13.26          |4291           |NULL               |
|786       |7.25           |4016           |NULL               |
|410       |10.32          |3614           |NULL               |
|486       |8.95           |3508           |Pneumonia          |
|427       |9.11           |2766           |Cardiac Dysrhythmia|
|491       |12.62          |2275           |Chronic Bronchitis |
|715       |10.0           |2151           |NULL               |
+----------+---------------+---------------+-------------------+
only showing top 10 rows



In [86]:
diag_filtered.createOrReplaceTempView("readmit_by_diag_filtered")


In [92]:
diag_filtered.write.mode("overwrite").parquet("output/readmission_by_diag.parquet")

### Time in Hospital vs. Readmission


In [97]:
from pyspark.sql.functions import avg, round 
length_summary = df_clean.groupBy("time_in_hospital") \
   .agg(
       round(avg("readmitted_flag")*100,2).alias("readmission_pct"), 

        count("*").alias("patient_count")
    ) \
    .orderBy("time_in_hospital")




In [98]:
length_summary.show()

+----------------+---------------+-------------+
|time_in_hospital|readmission_pct|patient_count|
+----------------+---------------+-------------+
|               1|           8.18|        14208|
|               2|           9.94|        17224|
|               3|          10.67|        17756|
|               4|          11.81|        13924|
|               5|          12.03|         9966|
|               6|          12.59|         7539|
|               7|          12.83|         5859|
|               8|          14.23|         4391|
|               9|          13.72|         3002|
|              10|          14.35|         2342|
|              11|          10.51|         1855|
|              12|          13.33|         1448|
|              13|          12.31|         1210|
|              14|          12.96|         1042|
+----------------+---------------+-------------+



In [99]:
length_summary.createOrReplaceTempView("readmission_by_length")


In [100]:
length_summary.write.mode("overwrite").parquet("output/readmission_by_length.parquet")
