In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Spark_Batch_Processing") \
    .getOrCreate()

In [3]:
spark

# 1. load data into an RDD and filter 

In [3]:
# Load the ADMISSIONS.csv file into an RDD
rdd = spark.sparkContext.textFile("hdfs://namenode:9000/mimic-iii/ADMISSIONS.csv")

In [15]:
# Extract the header (first row)
header = rdd.first()

In [16]:
# Filter out the header and split rows into columns
rows_rdd = rdd.filter(lambda line: line != header).map(lambda line: line.split(","))

# Verify the structure of the RDD
print("First row of the RDD (as a list of columns):")
first_row = rows_rdd.first()
print(first_row)

First row of the RDD (as a list of columns):
['21', '22', '165315', '2196-04-09 12:26:00', '2196-04-10 15:54:00', '', '"EMERGENCY"', '"EMERGENCY ROOM ADMIT"', '"DISC-TRAN CANCER/CHLDRN H"', '"Private"', '', '"UNOBTAINABLE"', '"MARRIED"', '"WHITE"', '2196-04-09 10:06:00', '2196-04-09 13:24:00', '"BENZODIAZEPINE OVERDOSE"', '0', '1']


In [17]:
# Print the index and value of each column
print("Column indices and values:")
for idx, value in enumerate(first_row):
    print(f"Column {idx}: {value}")

Column indices and values:
Column 0: 21
Column 1: 22
Column 2: 165315
Column 3: 2196-04-09 12:26:00
Column 4: 2196-04-10 15:54:00
Column 5: 
Column 6: "EMERGENCY"
Column 7: "EMERGENCY ROOM ADMIT"
Column 8: "DISC-TRAN CANCER/CHLDRN H"
Column 9: "Private"
Column 10: 
Column 11: "UNOBTAINABLE"
Column 12: "MARRIED"
Column 13: "WHITE"
Column 14: 2196-04-09 10:06:00
Column 15: 2196-04-09 13:24:00
Column 16: "BENZODIAZEPINE OVERDOSE"
Column 17: 0
Column 18: 1


In [20]:
# Filter rows based on admission type (e.g., "EMERGENCY")
# Assuming ADMISSION_TYPE is at index 6
filtered_rdd = rows_rdd.filter(lambda row: row[6] == '"EMERGENCY"')

In [21]:
# Show the filtered RDD
print("Filtered rows:")
for row in filtered_rdd.take(5):
    print(row)

Filtered rows:
['21', '22', '165315', '2196-04-09 12:26:00', '2196-04-10 15:54:00', '', '"EMERGENCY"', '"EMERGENCY ROOM ADMIT"', '"DISC-TRAN CANCER/CHLDRN H"', '"Private"', '', '"UNOBTAINABLE"', '"MARRIED"', '"WHITE"', '2196-04-09 10:06:00', '2196-04-09 13:24:00', '"BENZODIAZEPINE OVERDOSE"', '0', '1']
['23', '23', '124321', '2157-10-18 19:34:00', '2157-10-25 14:00:00', '', '"EMERGENCY"', '"TRANSFER FROM HOSP/EXTRAM"', '"HOME HEALTH CARE"', '"Medicare"', '"ENGL"', '"CATHOLIC"', '"MARRIED"', '"WHITE"', '', '', '"BRAIN MASS"', '0', '1']
['24', '24', '161859', '2139-06-06 16:14:00', '2139-06-09 12:48:00', '', '"EMERGENCY"', '"TRANSFER FROM HOSP/EXTRAM"', '"HOME"', '"Private"', '', '"PROTESTANT QUAKER"', '"SINGLE"', '"WHITE"', '', '', '"INTERIOR MYOCARDIAL INFARCTION"', '0', '1']
['25', '25', '129635', '2160-11-02 02:06:00', '2160-11-05 14:55:00', '', '"EMERGENCY"', '"EMERGENCY ROOM ADMIT"', '"HOME"', '"Private"', '', '"UNOBTAINABLE"', '"MARRIED"', '"WHITE"', '2160-11-02 01:01:00', '2160-1

# 2. load data and filter using Spark Dataframes

In [4]:
df = spark.read.format("csv").option("header", True).option("inferSchema", True).load("hdfs://namenode:9000/mimic-iii/ADMISSIONS.csv")

In [5]:
df.show(10)

+------+----------+-------+-------------------+-------------------+-------------------+--------------+--------------------+--------------------+---------+--------+-----------------+--------------+--------------------+-------------------+-------------------+--------------------+--------------------+--------------------+
|ROW_ID|SUBJECT_ID|HADM_ID|          ADMITTIME|          DISCHTIME|          DEATHTIME|ADMISSION_TYPE|  ADMISSION_LOCATION|  DISCHARGE_LOCATION|INSURANCE|LANGUAGE|         RELIGION|MARITAL_STATUS|           ETHNICITY|          EDREGTIME|          EDOUTTIME|           DIAGNOSIS|HOSPITAL_EXPIRE_FLAG|HAS_CHARTEVENTS_DATA|
+------+----------+-------+-------------------+-------------------+-------------------+--------------+--------------------+--------------------+---------+--------+-----------------+--------------+--------------------+-------------------+-------------------+--------------------+--------------------+--------------------+
|    21|        22| 165315|2196-04-09

In [5]:
df

DataFrame[ROW_ID: int, SUBJECT_ID: int, HADM_ID: int, ADMITTIME: timestamp, DISCHTIME: timestamp, DEATHTIME: timestamp, ADMISSION_TYPE: string, ADMISSION_LOCATION: string, DISCHARGE_LOCATION: string, INSURANCE: string, LANGUAGE: string, RELIGION: string, MARITAL_STATUS: string, ETHNICITY: string, EDREGTIME: timestamp, EDOUTTIME: timestamp, DIAGNOSIS: string, HOSPITAL_EXPIRE_FLAG: int, HAS_CHARTEVENTS_DATA: int]

In [6]:
df.select('admission_type').distinct().show()

+--------------+
|admission_type|
+--------------+
|       NEWBORN|
|      ELECTIVE|
|     EMERGENCY|
|        URGENT|
+--------------+



In [7]:
elective_patients = df.filter(df["admission_type"] == 'ELECTIVE')
elective_patients.show(10)

+------+----------+-------+-------------------+-------------------+---------+--------------+--------------------+--------------------+---------+--------+-----------------+--------------+---------+---------+---------+--------------------+--------------------+--------------------+
|ROW_ID|SUBJECT_ID|HADM_ID|          ADMITTIME|          DISCHTIME|DEATHTIME|ADMISSION_TYPE|  ADMISSION_LOCATION|  DISCHARGE_LOCATION|INSURANCE|LANGUAGE|         RELIGION|MARITAL_STATUS|ETHNICITY|EDREGTIME|EDOUTTIME|           DIAGNOSIS|HOSPITAL_EXPIRE_FLAG|HAS_CHARTEVENTS_DATA|
+------+----------+-------+-------------------+-------------------+---------+--------------+--------------------+--------------------+---------+--------+-----------------+--------------+---------+---------+---------+--------------------+--------------------+--------------------+
|    22|        23| 152223|2153-09-03 07:15:00|2153-09-08 19:10:00|     NULL|      ELECTIVE|PHYS REFERRAL/NOR...|    HOME HEALTH CARE| Medicare|    NULL|       

In [8]:
emergency_patients = df.filter(df["admission_type"] == 'EMERGENCY')
emergency_patients.show(10)

+------+----------+-------+-------------------+-------------------+-------------------+--------------+--------------------+--------------------+---------+--------+-----------------+--------------+--------------------+-------------------+-------------------+--------------------+--------------------+--------------------+
|ROW_ID|SUBJECT_ID|HADM_ID|          ADMITTIME|          DISCHTIME|          DEATHTIME|ADMISSION_TYPE|  ADMISSION_LOCATION|  DISCHARGE_LOCATION|INSURANCE|LANGUAGE|         RELIGION|MARITAL_STATUS|           ETHNICITY|          EDREGTIME|          EDOUTTIME|           DIAGNOSIS|HOSPITAL_EXPIRE_FLAG|HAS_CHARTEVENTS_DATA|
+------+----------+-------+-------------------+-------------------+-------------------+--------------+--------------------+--------------------+---------+--------+-----------------+--------------+--------------------+-------------------+-------------------+--------------------+--------------------+--------------------+
|    21|        22| 165315|2196-04-09

# 3. load into dataFrame and filter using spark SQL

In [13]:
# register df as temporary view to run SparkSQL commands
df.createOrReplaceTempView("admissions")

In [15]:
# For example, to filter patients with an admission type of "EMERGENCY"
emergency_patients = spark.sql("""
    SELECT *
    FROM admissions
    WHERE ADMISSION_TYPE = 'EMERGENCY'
""")
emergency_patients.show()

+------+----------+-------+-------------------+-------------------+-------------------+--------------+--------------------+--------------------+---------+--------+-----------------+--------------+--------------------+-------------------+-------------------+--------------------+--------------------+--------------------+-------------------+
|ROW_ID|SUBJECT_ID|HADM_ID|          ADMITTIME|          DISCHTIME|          DEATHTIME|ADMISSION_TYPE|  ADMISSION_LOCATION|  DISCHARGE_LOCATION|INSURANCE|LANGUAGE|         RELIGION|MARITAL_STATUS|           ETHNICITY|          EDREGTIME|          EDOUTTIME|           DIAGNOSIS|HOSPITAL_EXPIRE_FLAG|HAS_CHARTEVENTS_DATA|length_of_stay_days|
+------+----------+-------+-------------------+-------------------+-------------------+--------------+--------------------+--------------------+---------+--------+-----------------+--------------+--------------------+-------------------+-------------------+--------------------+--------------------+-------------------

In [16]:
# save filtered data as csv file in HDFS
emergency_patients.write.format("csv") \
    .option("header", True) \
    .mode("overwrite") \
    .save("hdfs://namenode:9000/mimic-iii/emergency_patients")

# Part 2: Aggregating data and calculating statistics

## 1. Basic Statistics

In [6]:
total_admissions = df.count()
print("Total Admissions:", total_admissions)

Total Admissions: 58976


In [7]:
unique_patients = df.select("SUBJECT_ID").distinct().count()
print("Unique Patients:", unique_patients)

Unique Patients: 46520


In [8]:
from pyspark.sql.functions import count
df.groupBy("ADMISSION_TYPE").agg(count("*").alias("ADMISSION_COUNT")).show()

+--------------+---------------+
|ADMISSION_TYPE|ADMISSION_COUNT|
+--------------+---------------+
|       NEWBORN|           7863|
|      ELECTIVE|           7706|
|     EMERGENCY|          42071|
|        URGENT|           1336|
+--------------+---------------+



In [None]:
# 2. Length of stay

In [9]:
from pyspark.sql.functions import col, unix_timestamp, datediff

# Convert ADMITTIME and DISCHTIME to timestamps
df = df.withColumn("ADMITTIME", col("ADMITTIME").cast("timestamp")) \
       .withColumn("DISCHTIME", col("DISCHTIME").cast("timestamp"))

# Calculate length of stay in days
df = df.withColumn("LENGTH_OF_STAY", datediff(col("DISCHTIME"), col("ADMITTIME")))

# Show the DataFrame with the new LENGTH_OF_STAY column
df.select("SUBJECT_ID", "HADM_ID", "ADMITTIME", "DISCHTIME", "LENGTH_OF_STAY").show(5)

+----------+-------+-------------------+-------------------+--------------+
|SUBJECT_ID|HADM_ID|          ADMITTIME|          DISCHTIME|LENGTH_OF_STAY|
+----------+-------+-------------------+-------------------+--------------+
|        22| 165315|2196-04-09 12:26:00|2196-04-10 15:54:00|             1|
|        23| 152223|2153-09-03 07:15:00|2153-09-08 19:10:00|             5|
|        23| 124321|2157-10-18 19:34:00|2157-10-25 14:00:00|             7|
|        24| 161859|2139-06-06 16:14:00|2139-06-09 12:48:00|             3|
|        25| 129635|2160-11-02 02:06:00|2160-11-05 14:55:00|             3|
+----------+-------+-------------------+-------------------+--------------+
only showing top 5 rows



In [10]:
median_los = df.approxQuantile("LENGTH_OF_STAY", [0.5], 0.01)
print("Median Length of Stay (in days):", median_los[0])

Median Length of Stay (in days): 6.0


In [12]:
from pyspark.sql.functions import mean, min, max

min_los = df.select(min("LENGTH_OF_STAY")).collect()[0][0]
max_los = df.select(max("LENGTH_OF_STAY")).collect()[0][0]
mean_los = df.select(mean("LENGTH_OF_STAY")).collect()[0][0]

print("Minimum Length of Stay (in days):", min_los)
print("Maximum Length of Stay (in days):", max_los)
print("Mean Length of Stay (in days):", mean_los)

Minimum Length of Stay (in days): 0
Maximum Length of Stay (in days): 295
Mean Length of Stay (in days): 10.090765735214324


In [13]:
from pyspark.sql.functions import avg

df.groupBy("ADMISSION_TYPE").agg(
    avg("LENGTH_OF_STAY").alias("AVG_LOS"),
    min("LENGTH_OF_STAY").alias("MIN_LOS"),
    max("LENGTH_OF_STAY").alias("MAX_LOS")
).show()

+--------------+------------------+-------+-------+
|ADMISSION_TYPE|           AVG_LOS|MIN_LOS|MAX_LOS|
+--------------+------------------+-------+-------+
|       NEWBORN|11.461783034465217|      0|    172|
|      ELECTIVE|   8.6549441993252|      0|    295|
|     EMERGENCY|10.026241353901737|      0|    207|
|        URGENT|12.335329341317365|      0|    174|
+--------------+------------------+-------+-------+



In [14]:
# 3. Mortality Statistics

In [17]:
mortality_rate = df.filter(col("HOSPITAL_EXPIRE_FLAG") == 1).count() / total_admissions
print("In-Hospital Mortality Rate:", mortality_rate)

In-Hospital Mortality Rate: 0.09926071622354857


In [21]:
# 4. Admission Location and Discharge Location

In [22]:
from pyspark.sql.functions import desc

# Most Common Admission Locations
df.groupBy("ADMISSION_LOCATION").agg(count("*").alias("COUNT")).orderBy(desc("COUNT")).show()

+--------------------+-----+
|  ADMISSION_LOCATION|COUNT|
+--------------------+-----+
|EMERGENCY ROOM ADMIT|22754|
|PHYS REFERRAL/NOR...|15079|
|CLINIC REFERRAL/P...|12032|
|TRANSFER FROM HOS...| 8456|
|TRANSFER FROM SKI...|  273|
|** INFO NOT AVAIL...|  204|
|   HMO REFERRAL/SICK|  102|
|TRANSFER FROM OTH...|   71|
|TRSF WITHIN THIS ...|    5|
+--------------------+-----+



In [23]:
# 5. Insurance Statistics

In [25]:
# Distribution of Insurance Types:
df.groupBy("INSURANCE").agg(count("*").alias("COUNT")).orderBy(desc("COUNT")).show()

+----------+-----+
| INSURANCE|COUNT|
+----------+-----+
|  Medicare|28215|
|   Private|22582|
|  Medicaid| 5785|
|Government| 1783|
|  Self Pay|  611|
+----------+-----+



In [26]:
# Length of Stay by Insurance Type:
df.groupBy("INSURANCE").agg(
    avg("LENGTH_OF_STAY").alias("AVG_LOS"),
    min("LENGTH_OF_STAY").alias("MIN_LOS"),
    max("LENGTH_OF_STAY").alias("MAX_LOS")
).show()

+----------+------------------+-------+-------+
| INSURANCE|           AVG_LOS|MIN_LOS|MAX_LOS|
+----------+------------------+-------+-------+
|Government| 9.729108244531687|      0|    133|
|  Self Pay| 6.337152209492635|      0|     85|
|   Private|10.138074572668497|      0|    207|
|  Medicaid|11.436646499567848|      0|    202|
|  Medicare| 9.881091617933723|      0|    295|
+----------+------------------+-------+-------+



In [27]:
# 6. Ethnicity and Language Statistics
# Distribution of Ethnicities:
df.groupBy("ETHNICITY").agg(count("*").alias("COUNT")).orderBy(desc("COUNT")).show()

+--------------------+-----+
|           ETHNICITY|COUNT|
+--------------------+-----+
|               WHITE|40996|
|BLACK/AFRICAN AME...| 5440|
|UNKNOWN/NOT SPECI...| 4523|
|  HISPANIC OR LATINO| 1696|
|               OTHER| 1512|
|               ASIAN| 1509|
|    UNABLE TO OBTAIN|  814|
|PATIENT DECLINED ...|  559|
|     ASIAN - CHINESE|  277|
|HISPANIC/LATINO -...|  232|
|  BLACK/CAPE VERDEAN|  200|
|     WHITE - RUSSIAN|  164|
|MULTI RACE ETHNICITY|  130|
|       BLACK/HAITIAN|  101|
|ASIAN - ASIAN INDIAN|   85|
|WHITE - OTHER EUR...|   81|
|HISPANIC/LATINO -...|   78|
|          PORTUGUESE|   61|
|   WHITE - BRAZILIAN|   59|
|  ASIAN - VIETNAMESE|   53|
+--------------------+-----+
only showing top 20 rows



In [28]:
# Most Common Languages:
df.groupBy("LANGUAGE").agg(count("*").alias("COUNT")).orderBy(desc("COUNT")).show()

+--------+-----+
|LANGUAGE|COUNT|
+--------+-----+
|    ENGL|29086|
|    NULL|25332|
|    SPAN| 1083|
|    RUSS|  790|
|    PTUN|  628|
|    CANT|  413|
|    PORT|  342|
|    CAPE|  256|
|    MAND|  156|
|    HAIT|  150|
|    ITAL|  124|
|    VIET|   94|
|    GREE|   75|
|    ARAB|   47|
|    PERS|   44|
|    CAMB|   37|
|    POLI|   34|
|    AMER|   31|
|    HIND|   24|
|    KORE|   23|
+--------+-----+
only showing top 20 rows



In [29]:
# 7. Diagnosis Statistics
# Most Common Diagnoses:
df.groupBy("DIAGNOSIS").agg(count("*").alias("COUNT")).orderBy(desc("COUNT")).show(truncate=False)

+---------------------------------------------------------+-----+
|DIAGNOSIS                                                |COUNT|
+---------------------------------------------------------+-----+
|NEWBORN                                                  |7823 |
|PNEUMONIA                                                |1566 |
|SEPSIS                                                   |1184 |
|CONGESTIVE HEART FAILURE                                 |928  |
|CORONARY ARTERY DISEASE                                  |840  |
|CHEST PAIN                                               |778  |
|INTRACRANIAL HEMORRHAGE                                  |713  |
|ALTERED MENTAL STATUS                                    |712  |
|GASTROINTESTINAL BLEED                                   |686  |
|CORONARY ARTERY DISEASE\CORONARY ARTERY BYPASS GRAFT /SDA|583  |
|UPPER GI BLEED                                           |580  |
|ABDOMINAL PAIN                                           |554  |
|FEVER    

In [30]:
# Length of Stay by Diagnosis:
df.groupBy("DIAGNOSIS").agg(
    avg("LENGTH_OF_STAY").alias("AVG_LOS"),
    min("LENGTH_OF_STAY").alias("MIN_LOS"),
    max("LENGTH_OF_STAY").alias("MAX_LOS")
).orderBy(desc("AVG_LOS")).show(truncate=False)

+------------------------------------------------------------------+-------+-------+-------+
|DIAGNOSIS                                                         |AVG_LOS|MIN_LOS|MAX_LOS|
+------------------------------------------------------------------+-------+-------+-------+
|CROHN'S DISEASE;ABDOMINAL FISTULA                                 |295.0  |295    |295    |
|ASPIRATION; FAILURE TO THRIVE                                     |192.0  |192    |192    |
|CHRONIC LYMPHOCYTIC LEUKEMIA\BONE MARROW TRANSPLANT               |169.0  |169    |169    |
|L THIGH FLUID COLLECTION                                          |167.0  |167    |167    |
|AMC;FEVER                                                         |165.0  |165    |165    |
|S/P LIVER TX-NAUSEA-VOMITING                                      |138.0  |138    |138    |
|APLASTIC ANEMIA;PANCYTOPENIA                                      |138.0  |138    |138    |
|PULMONARY EMBOLISM;SUBDURAL HEMATOMA                              |13