In [3]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
    builder. \
    config('spark.ui.port','0'). \
    config("spark.sql.warehouse.dir",f"/user/itv010130/warehouse"). \
    enableHiveSupport(). \
    master('yarn'). \
    getOrCreate()

In [4]:
spark

In [5]:
hospital_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/public/trendytech/datasets/hospital.csv")

In [6]:
hospital_df.show(5)

+----------+--------------+--------------+-------------+---------+----------+
|patient_id|admission_date|discharge_date|    diagnosis|doctor_id|total_cost|
+----------+--------------+--------------+-------------+---------+----------+
|         1|    01-01-2022|    2022-01-10|    Pneumonia|      101|    5000.0|
|         2|    02-05-2022|    2022-02-09| Appendicitis|      102|    7000.0|
|         3|    03-12-2022|    2022-03-18|Fractured Arm|      103|    3500.0|
|         4|    04-02-2022|    2022-04-08| Heart Attack|      104|   15000.0|
|         5|    05-05-2022|    2022-05-07|    Influenza|      105|    2500.0|
+----------+--------------+--------------+-------------+---------+----------+
only showing top 5 rows



In [7]:
hospital_df.printSchema()

root
 |-- patient_id: integer (nullable = true)
 |-- admission_date: string (nullable = true)
 |-- discharge_date: string (nullable = true)
 |-- diagnosis: string (nullable = true)
 |-- doctor_id: integer (nullable = true)
 |-- total_cost: double (nullable = true)



### 1. Drop doctor_id column

In [8]:
new_hosp_df1 = hospital_df.drop("doctor_id")

In [9]:
new_hosp_df1.show()

+----------+--------------+--------------+-------------+----------+
|patient_id|admission_date|discharge_date|    diagnosis|total_cost|
+----------+--------------+--------------+-------------+----------+
|         1|    01-01-2022|    2022-01-10|    Pneumonia|    5000.0|
|         2|    02-05-2022|    2022-02-09| Appendicitis|    7000.0|
|         3|    03-12-2022|    2022-03-18|Fractured Arm|    3500.0|
|         4|    04-02-2022|    2022-04-08| Heart Attack|   15000.0|
|         5|    05-05-2022|    2022-05-07|    Influenza|    2500.0|
|         6|    06-10-2022|    2022-06-15| Appendicitis|    8000.0|
|         7|    07-20-2022|    2022-07-25|    Pneumonia|    5500.0|
|         8|    08-25-2022|    2022-09-01| Heart Attack|   20000.0|
|         9|    09-15-2022|    2022-09-22|Fractured Leg|    6000.0|
|        10|    10-05-2022|    2022-10-10| Appendicitis|    7500.0|
|        11|    11-02-2022|    2022-11-05|    Influenza|    2800.0|
|        12|    12-10-2022|    2022-12-18|    Pn

### 2. Change total_cost column to hospital_bill

In [10]:
new_hosp_df2 = new_hosp_df1.withColumnRenamed("total_cost","hospital_bill")

In [11]:
new_hosp_df2.show()

+----------+--------------+--------------+-------------+-------------+
|patient_id|admission_date|discharge_date|    diagnosis|hospital_bill|
+----------+--------------+--------------+-------------+-------------+
|         1|    01-01-2022|    2022-01-10|    Pneumonia|       5000.0|
|         2|    02-05-2022|    2022-02-09| Appendicitis|       7000.0|
|         3|    03-12-2022|    2022-03-18|Fractured Arm|       3500.0|
|         4|    04-02-2022|    2022-04-08| Heart Attack|      15000.0|
|         5|    05-05-2022|    2022-05-07|    Influenza|       2500.0|
|         6|    06-10-2022|    2022-06-15| Appendicitis|       8000.0|
|         7|    07-20-2022|    2022-07-25|    Pneumonia|       5500.0|
|         8|    08-25-2022|    2022-09-01| Heart Attack|      20000.0|
|         9|    09-15-2022|    2022-09-22|Fractured Leg|       6000.0|
|        10|    10-05-2022|    2022-10-10| Appendicitis|       7500.0|
|        11|    11-02-2022|    2022-11-05|    Influenza|       2800.0|
|     

### 3. Add new column "duration_of_stay"

In [12]:
from pyspark.sql.functions import to_date

In [13]:
new_hosp_df3 = new_hosp_df2.withColumn("admission_date",to_date("admission_date","MM-dd-yyyy"))

In [14]:
new_hosp_df3.show()

+----------+--------------+--------------+-------------+-------------+
|patient_id|admission_date|discharge_date|    diagnosis|hospital_bill|
+----------+--------------+--------------+-------------+-------------+
|         1|    2022-01-01|    2022-01-10|    Pneumonia|       5000.0|
|         2|    2022-02-05|    2022-02-09| Appendicitis|       7000.0|
|         3|    2022-03-12|    2022-03-18|Fractured Arm|       3500.0|
|         4|    2022-04-02|    2022-04-08| Heart Attack|      15000.0|
|         5|    2022-05-05|    2022-05-07|    Influenza|       2500.0|
|         6|    2022-06-10|    2022-06-15| Appendicitis|       8000.0|
|         7|    2022-07-20|    2022-07-25|    Pneumonia|       5500.0|
|         8|    2022-08-25|    2022-09-01| Heart Attack|      20000.0|
|         9|    2022-09-15|    2022-09-22|Fractured Leg|       6000.0|
|        10|    2022-10-05|    2022-10-10| Appendicitis|       7500.0|
|        11|    2022-11-02|    2022-11-05|    Influenza|       2800.0|
|     

In [15]:
from pyspark.sql.functions import expr

In [16]:
new_hosp_df4 = new_hosp_df3.selectExpr("*",("discharge_date - admission_date as duration_of_stay"))

In [17]:
new_hosp_df4.show()

+----------+--------------+--------------+-------------+-------------+----------------+
|patient_id|admission_date|discharge_date|    diagnosis|hospital_bill|duration_of_stay|
+----------+--------------+--------------+-------------+-------------+----------------+
|         1|    2022-01-01|    2022-01-10|    Pneumonia|       5000.0|          9 days|
|         2|    2022-02-05|    2022-02-09| Appendicitis|       7000.0|          4 days|
|         3|    2022-03-12|    2022-03-18|Fractured Arm|       3500.0|          6 days|
|         4|    2022-04-02|    2022-04-08| Heart Attack|      15000.0|          6 days|
|         5|    2022-05-05|    2022-05-07|    Influenza|       2500.0|          2 days|
|         6|    2022-06-10|    2022-06-15| Appendicitis|       8000.0|          5 days|
|         7|    2022-07-20|    2022-07-25|    Pneumonia|       5500.0|          5 days|
|         8|    2022-08-25|    2022-09-01| Heart Attack|      20000.0|          7 days|
|         9|    2022-09-15|    2

### 4. Create new column 'adjusted_total_cost' and increase hospital_bill rates for heart attack by 50% and appendicitis by 20%

In [21]:
new_hosp_df5 = new_hosp_df4.withColumn("adjusted_total_cost",expr("CASE WHEN diagnosis LIKE '%Heart Attack%' THEN hospital_bill * 1.5 WHEN diagnosis LIKE '%Appendicitis%' THEN hospital_bill * 1.2 ELSE hospital_bill END"))

In [22]:
new_hosp_df5.show()

+----------+--------------+--------------+-------------+-------------+----------------+-------------------+
|patient_id|admission_date|discharge_date|    diagnosis|hospital_bill|duration_of_stay|adjusted_total_cost|
+----------+--------------+--------------+-------------+-------------+----------------+-------------------+
|         1|    2022-01-01|    2022-01-10|    Pneumonia|       5000.0|          9 days|             5000.0|
|         2|    2022-02-05|    2022-02-09| Appendicitis|       7000.0|          4 days|             8400.0|
|         3|    2022-03-12|    2022-03-18|Fractured Arm|       3500.0|          6 days|             3500.0|
|         4|    2022-04-02|    2022-04-08| Heart Attack|      15000.0|          6 days|            22500.0|
|         5|    2022-05-05|    2022-05-07|    Influenza|       2500.0|          2 days|             2500.0|
|         6|    2022-06-10|    2022-06-15| Appendicitis|       8000.0|          5 days|             9600.0|
|         7|    2022-07-20| 

## JUST

In [23]:
new_hosp_df6 = new_hosp_df5.select("patient_id","diagnosis","hospital_bill","adjusted_total_cost")

In [24]:
new_hosp_df6.show()

+----------+-------------+-------------+-------------------+
|patient_id|    diagnosis|hospital_bill|adjusted_total_cost|
+----------+-------------+-------------+-------------------+
|         1|    Pneumonia|       5000.0|             5000.0|
|         2| Appendicitis|       7000.0|             8400.0|
|         3|Fractured Arm|       3500.0|             3500.0|
|         4| Heart Attack|      15000.0|            22500.0|
|         5|    Influenza|       2500.0|             2500.0|
|         6| Appendicitis|       8000.0|             9600.0|
|         7|    Pneumonia|       5500.0|             5500.0|
|         8| Heart Attack|      20000.0|            30000.0|
|         9|Fractured Leg|       6000.0|             6000.0|
|        10| Appendicitis|       7500.0|             9000.0|
|        11|    Influenza|       2800.0|             2800.0|
|        12|    Pneumonia|       6000.0|             6000.0|
|        13| Heart Attack|      18000.0|            27000.0|
|        14| Appendiciti