In [1]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Spark_Batch_Processing") \
    .getOrCreate()

In [2]:
spark

# 1. load data into an RDD and filter 

In [3]:
# Load the ADMISSIONS.csv file into an RDD
rdd = spark.sparkContext.textFile("hdfs://namenode:9000/data/ADMISSIONS.csv")

In [4]:
# Extract the header (first row)
header = rdd.first()

In [5]:
# Filter out the header and split rows into columns
rows_rdd = rdd.filter(lambda line: line != header).map(lambda line: line.split(","))

# Verify the structure of the RDD
print("First row of the RDD (as a list of columns):")
first_row = rows_rdd.first()
print(first_row)

First row of the RDD (as a list of columns):
['1', '10001', '20001', '2021-01-01 08:00:00', '2021-01-10 12:00:00', '', 'EMERGENCY', 'EMERGENCY ROOM', 'HOME', 'Medicare', 'ENGL', 'CATHOLIC', 'MARRIED', 'WHITE', '2021-01-01 07:30:00', '2021-01-01 08:30:00', 'PNEUMONIA', '0', '1']


In [6]:
# Print the index and value of each column
print("Column indices and values:")
for idx, value in enumerate(first_row):
    print(f"Column {idx}: {value}")

Column indices and values:
Column 0: 1
Column 1: 10001
Column 2: 20001
Column 3: 2021-01-01 08:00:00
Column 4: 2021-01-10 12:00:00
Column 5: 
Column 6: EMERGENCY
Column 7: EMERGENCY ROOM
Column 8: HOME
Column 9: Medicare
Column 10: ENGL
Column 11: CATHOLIC
Column 12: MARRIED
Column 13: WHITE
Column 14: 2021-01-01 07:30:00
Column 15: 2021-01-01 08:30:00
Column 16: PNEUMONIA
Column 17: 0
Column 18: 1


In [7]:
# Filter rows based on admission type (e.g., "EMERGENCY")
# Assuming ADMISSION_TYPE is at index 6
filtered_rdd = rows_rdd.filter(lambda row: row[6] =='EMERGENCY')

In [8]:
# Show the filtered RDD
print("Filtered rows:")
for row in filtered_rdd.take(5):
    print(row)

Filtered rows:
['1', '10001', '20001', '2021-01-01 08:00:00', '2021-01-10 12:00:00', '', 'EMERGENCY', 'EMERGENCY ROOM', 'HOME', 'Medicare', 'ENGL', 'CATHOLIC', 'MARRIED', 'WHITE', '2021-01-01 07:30:00', '2021-01-01 08:30:00', 'PNEUMONIA', '0', '1']


# 2. load data and filter using Spark Dataframes

In [9]:
df = spark.read.format("csv").option("header", True).option("inferSchema", True).load("hdfs://namenode:9000/data/ADMISSIONS.csv")

In [10]:
df.show(10)

+------+----------+-------+-------------------+-------------------+-------------------+--------------+--------------------+--------------------+---------+--------+-----------------+--------------+--------------------+-------------------+-------------------+------------+--------------------+--------------------+
|ROW_ID|SUBJECT_ID|HADM_ID|          ADMITTIME|          DISCHTIME|          DEATHTIME|ADMISSION_TYPE|  ADMISSION_LOCATION|  DISCHARGE_LOCATION|INSURANCE|LANGUAGE|         RELIGION|MARITAL_STATUS|           ETHNICITY|          EDREGTIME|          EDOUTTIME|   DIAGNOSIS|HOSPITAL_EXPIRE_FLAG|HAS_CHARTEVENTS_DATA|
+------+----------+-------+-------------------+-------------------+-------------------+--------------+--------------------+--------------------+---------+--------+-----------------+--------------+--------------------+-------------------+-------------------+------------+--------------------+--------------------+
|     1|     10001|  20001|2021-01-01 08:00:00|2021-01-10 12:

In [11]:
df

DataFrame[ROW_ID: int, SUBJECT_ID: int, HADM_ID: int, ADMITTIME: timestamp, DISCHTIME: timestamp, DEATHTIME: timestamp, ADMISSION_TYPE: string, ADMISSION_LOCATION: string, DISCHARGE_LOCATION: string, INSURANCE: string, LANGUAGE: string, RELIGION: string, MARITAL_STATUS: string, ETHNICITY: string, EDREGTIME: timestamp, EDOUTTIME: timestamp, DIAGNOSIS: string, HOSPITAL_EXPIRE_FLAG: int, HAS_CHARTEVENTS_DATA: int]

In [12]:
df.select('admission_type').distinct().show()

+--------------+
|admission_type|
+--------------+
|      ELECTIVE|
|     EMERGENCY|
|        URGENT|
+--------------+



In [13]:
elective_patients = df.filter(df["admission_type"] == 'ELECTIVE')
elective_patients.show(10)

+------+----------+-------+-------------------+-------------------+---------+--------------+--------------------+--------------------+---------+--------+------------+--------------+---------------+-------------------+-------------------+------------+--------------------+--------------------+
|ROW_ID|SUBJECT_ID|HADM_ID|          ADMITTIME|          DISCHTIME|DEATHTIME|ADMISSION_TYPE|  ADMISSION_LOCATION|  DISCHARGE_LOCATION|INSURANCE|LANGUAGE|    RELIGION|MARITAL_STATUS|      ETHNICITY|          EDREGTIME|          EDOUTTIME|   DIAGNOSIS|HOSPITAL_EXPIRE_FLAG|HAS_CHARTEVENTS_DATA|
+------+----------+-------+-------------------+-------------------+---------+--------------+--------------------+--------------------+---------+--------+------------+--------------+---------------+-------------------+-------------------+------------+--------------------+--------------------+
|     3|     10003|  20003|2022-06-15 10:00:00|2022-06-20 09:00:00|     NULL|      ELECTIVE|TRANSFER FROM HOS...|REHAB/DI

In [14]:
emergency_patients = df.filter(df["admission_type"] == 'EMERGENCY')
emergency_patients.show(10)

+------+----------+-------+-------------------+-------------------+---------+--------------+------------------+------------------+---------+--------+--------+--------------+---------+-------------------+-------------------+---------+--------------------+--------------------+
|ROW_ID|SUBJECT_ID|HADM_ID|          ADMITTIME|          DISCHTIME|DEATHTIME|ADMISSION_TYPE|ADMISSION_LOCATION|DISCHARGE_LOCATION|INSURANCE|LANGUAGE|RELIGION|MARITAL_STATUS|ETHNICITY|          EDREGTIME|          EDOUTTIME|DIAGNOSIS|HOSPITAL_EXPIRE_FLAG|HAS_CHARTEVENTS_DATA|
+------+----------+-------+-------------------+-------------------+---------+--------------+------------------+------------------+---------+--------+--------+--------------+---------+-------------------+-------------------+---------+--------------------+--------------------+
|     1|     10001|  20001|2021-01-01 08:00:00|2021-01-10 12:00:00|     NULL|     EMERGENCY|    EMERGENCY ROOM|              HOME| Medicare|    ENGL|CATHOLIC|       MARRIED

# 3. load into dataFrame and filter using spark SQL

In [15]:
# register df as temporary view to run SparkSQL commands
df.createOrReplaceTempView("admissions")

In [16]:
# For example, to filter patients with an admission type of "EMERGENCY"
emergency_patients = spark.sql("""
    SELECT *
    FROM admissions
    WHERE ADMISSION_TYPE = 'EMERGENCY'
""")
emergency_patients.show()

+------+----------+-------+-------------------+-------------------+---------+--------------+------------------+------------------+---------+--------+--------+--------------+---------+-------------------+-------------------+---------+--------------------+--------------------+
|ROW_ID|SUBJECT_ID|HADM_ID|          ADMITTIME|          DISCHTIME|DEATHTIME|ADMISSION_TYPE|ADMISSION_LOCATION|DISCHARGE_LOCATION|INSURANCE|LANGUAGE|RELIGION|MARITAL_STATUS|ETHNICITY|          EDREGTIME|          EDOUTTIME|DIAGNOSIS|HOSPITAL_EXPIRE_FLAG|HAS_CHARTEVENTS_DATA|
+------+----------+-------+-------------------+-------------------+---------+--------------+------------------+------------------+---------+--------+--------+--------------+---------+-------------------+-------------------+---------+--------------------+--------------------+
|     1|     10001|  20001|2021-01-01 08:00:00|2021-01-10 12:00:00|     NULL|     EMERGENCY|    EMERGENCY ROOM|              HOME| Medicare|    ENGL|CATHOLIC|       MARRIED

In [17]:
# save filtered data as csv file in HDFS
emergency_patients.write.format("csv") \
    .option("header", True) \
    .mode("overwrite") \
    .save("hdfs://namenode:9000/data/emergency_patients")

# Part 2: Aggregating data and calculating statistics

## 1. Basic Statistics

In [18]:
total_admissions = df.count()
print("Total Admissions:", total_admissions)

Total Admissions: 3


In [19]:
unique_patients = df.select("SUBJECT_ID").distinct().count()
print("Unique Patients:", unique_patients)

Unique Patients: 3


In [20]:
from pyspark.sql.functions import count
df.groupBy("ADMISSION_TYPE").agg(count("*").alias("ADMISSION_COUNT")).show()

+--------------+---------------+
|ADMISSION_TYPE|ADMISSION_COUNT|
+--------------+---------------+
|      ELECTIVE|              1|
|     EMERGENCY|              1|
|        URGENT|              1|
+--------------+---------------+



In [21]:
# 2. Length of stay

In [22]:
from pyspark.sql.functions import col, unix_timestamp, datediff

# Convert ADMITTIME and DISCHTIME to timestamps
df = df.withColumn("ADMITTIME", col("ADMITTIME").cast("timestamp")) \
       .withColumn("DISCHTIME", col("DISCHTIME").cast("timestamp"))

# Calculate length of stay in days
df = df.withColumn("LENGTH_OF_STAY", datediff(col("DISCHTIME"), col("ADMITTIME")))

# Show the DataFrame with the new LENGTH_OF_STAY column
df.select("SUBJECT_ID", "HADM_ID", "ADMITTIME", "DISCHTIME", "LENGTH_OF_STAY").show(5)

+----------+-------+-------------------+-------------------+--------------+
|SUBJECT_ID|HADM_ID|          ADMITTIME|          DISCHTIME|LENGTH_OF_STAY|
+----------+-------+-------------------+-------------------+--------------+
|     10001|  20001|2021-01-01 08:00:00|2021-01-10 12:00:00|             9|
|     10002|  20002|2019-11-20 14:30:00|2019-11-25 10:00:00|             5|
|     10003|  20003|2022-06-15 10:00:00|2022-06-20 09:00:00|             5|
+----------+-------+-------------------+-------------------+--------------+



In [23]:
median_los = df.approxQuantile("LENGTH_OF_STAY", [0.5], 0.01)
print("Median Length of Stay (in days):", median_los[0])

Median Length of Stay (in days): 5.0


In [24]:
from pyspark.sql.functions import mean, min, max

min_los = df.select(min("LENGTH_OF_STAY")).collect()[0][0]
max_los = df.select(max("LENGTH_OF_STAY")).collect()[0][0]
mean_los = df.select(mean("LENGTH_OF_STAY")).collect()[0][0]

print("Minimum Length of Stay (in days):", min_los)
print("Maximum Length of Stay (in days):", max_los)
print("Mean Length of Stay (in days):", mean_los)

Minimum Length of Stay (in days): 5
Maximum Length of Stay (in days): 9
Mean Length of Stay (in days): 6.333333333333333


In [25]:
from pyspark.sql.functions import avg

df.groupBy("ADMISSION_TYPE").agg(
    avg("LENGTH_OF_STAY").alias("AVG_LOS"),
    min("LENGTH_OF_STAY").alias("MIN_LOS"),
    max("LENGTH_OF_STAY").alias("MAX_LOS")
).show()

+--------------+-------+-------+-------+
|ADMISSION_TYPE|AVG_LOS|MIN_LOS|MAX_LOS|
+--------------+-------+-------+-------+
|      ELECTIVE|    5.0|      5|      5|
|     EMERGENCY|    9.0|      9|      9|
|        URGENT|    5.0|      5|      5|
+--------------+-------+-------+-------+



In [None]:
# 3. Mortality Statistics

In [26]:
mortality_rate = df.filter(col("HOSPITAL_EXPIRE_FLAG") == 1).count() / total_admissions
print("In-Hospital Mortality Rate:", mortality_rate)

In-Hospital Mortality Rate: 0.3333333333333333


In [27]:
# 4. Admission Location and Discharge Location

In [28]:
from pyspark.sql.functions import desc

# Most Common Admission Locations
df.groupBy("ADMISSION_LOCATION").agg(count("*").alias("COUNT")).orderBy(desc("COUNT")).show()

+--------------------+-----+
|  ADMISSION_LOCATION|COUNT|
+--------------------+-----+
|      EMERGENCY ROOM|    1|
|       PHYS REFERRAL|    1|
|TRANSFER FROM HOS...|    1|
+--------------------+-----+



In [29]:
# 5. Insurance Statistics

In [30]:
# Distribution of Insurance Types:
df.groupBy("INSURANCE").agg(count("*").alias("COUNT")).orderBy(desc("COUNT")).show()

+---------+-----+
|INSURANCE|COUNT|
+---------+-----+
| Self Pay|    1|
|  Private|    1|
| Medicare|    1|
+---------+-----+



In [31]:
# Length of Stay by Insurance Type:
df.groupBy("INSURANCE").agg(
    avg("LENGTH_OF_STAY").alias("AVG_LOS"),
    min("LENGTH_OF_STAY").alias("MIN_LOS"),
    max("LENGTH_OF_STAY").alias("MAX_LOS")
).show()

+---------+-------+-------+-------+
|INSURANCE|AVG_LOS|MIN_LOS|MAX_LOS|
+---------+-------+-------+-------+
| Self Pay|    5.0|      5|      5|
|  Private|    5.0|      5|      5|
| Medicare|    9.0|      9|      9|
+---------+-------+-------+-------+



In [32]:
# 6. Ethnicity and Language Statistics
# Distribution of Ethnicities:
df.groupBy("ETHNICITY").agg(count("*").alias("COUNT")).orderBy(desc("COUNT")).show()

+--------------------+-----+
|           ETHNICITY|COUNT|
+--------------------+-----+
|               WHITE|    1|
|     HISPANIC/LATINO|    1|
|BLACK/AFRICAN AME...|    1|
+--------------------+-----+



In [33]:
# Most Common Languages:
df.groupBy("LANGUAGE").agg(count("*").alias("COUNT")).orderBy(desc("COUNT")).show()

+--------+-----+
|LANGUAGE|COUNT|
+--------+-----+
|    ENGL|    3|
+--------+-----+



In [34]:
# 7. Diagnosis Statistics
# Most Common Diagnoses:
df.groupBy("DIAGNOSIS").agg(count("*").alias("COUNT")).orderBy(desc("COUNT")).show(truncate=False)

+------------+-----+
|DIAGNOSIS   |COUNT|
+------------+-----+
|APPENDICITIS|1    |
|PNEUMONIA   |1    |
|STROKE      |1    |
+------------+-----+



In [35]:
# Length of Stay by Diagnosis:
df.groupBy("DIAGNOSIS").agg(
    avg("LENGTH_OF_STAY").alias("AVG_LOS"),
    min("LENGTH_OF_STAY").alias("MIN_LOS"),
    max("LENGTH_OF_STAY").alias("MAX_LOS")
).orderBy(desc("AVG_LOS")).show(truncate=False)

+------------+-------+-------+-------+
|DIAGNOSIS   |AVG_LOS|MIN_LOS|MAX_LOS|
+------------+-------+-------+-------+
|PNEUMONIA   |9.0    |9      |9      |
|APPENDICITIS|5.0    |5      |5      |
|STROKE      |5.0    |5      |5      |
+------------+-------+-------+-------+

