In [29]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("HealthcareETL").getOrCreate()

file_path = "healthcare_dataset.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

In [30]:
df.show()

+-------------------+---+------+----------+-----------------+-----------------+----------------+--------------------+------------------+------------------+-----------+--------------+--------------+-----------+------------+
|               Name|Age|Gender|Blood Type|Medical Condition|Date of Admission|          Doctor|            Hospital|Insurance Provider|    Billing Amount|Room Number|Admission Type|Discharge Date| Medication|Test Results|
+-------------------+---+------+----------+-----------------+-----------------+----------------+--------------------+------------------+------------------+-----------+--------------+--------------+-----------+------------+
|      Bobby JacksOn| 30|  Male|        B-|           Cancer|       2024-01-31|   Matthew Smith|     Sons and Miller|        Blue Cross|18856.281305978155|        328|        Urgent|    2024-02-02|Paracetamol|      Normal|
|       LesLie TErRy| 62|  Male|        A+|          Obesity|       2019-08-20| Samantha Davies|            

In [31]:
total_columns = len(df.columns)
print(f"Total number of columns: {total_columns}")

total_rows = df.count()
print(f"Total number of rows: {total_rows}")

duplicate_count = total_rows - df.dropDuplicates().count()
print(f"Total number of duplicate rows: {duplicate_count}")

Total number of columns: 15
Total number of rows: 55500
Total number of duplicate rows: 534


In [None]:
from pyspark.sql.functions import col, trim, initcap, lower, to_date, when, lit

# Normalize Name
cleaned_df = df.withColumn("Name", initcap(col("Name")))

# Standardize Gender
cleaned_df = cleaned_df.withColumn("Gender", lower(trim(col("Gender"))))

# Convert Dates
cleaned_df = cleaned_df.withColumn("Date of Admission", to_date(col("Date of Admission"), "yyyy-MM-dd"))
cleaned_df = cleaned_df.withColumn("Discharge Date", to_date(col("Discharge Date"), "yyyy-MM-dd"))

# Fill Missing Values
from pyspark.sql.functions import mean

# Fill missing Billing Amount with the median
median_value = cleaned_df.approxQuantile("Billing Amount", [0.5], 0.0)[0]
cleaned_df = cleaned_df.fillna({"Billing Amount": median_value})

# Fill missing categorical values
cleaned_df = cleaned_df.fillna({
    "Medical Condition": "Unknown",
    "Doctor": "Unknown",
    "Hospital": "Unknown",
    "Insurance Provider": "Unknown",
    "Admission Type": "Unknown",
    "Medication": "Unknown",
    "Test Results": "Unknown"
})

In [33]:
# Remove invalid ages
cleaned_df = cleaned_df.filter((col("Age") > 0) & (col("Age") <= 120))

# Remove records with missing Room Number
cleaned_df = cleaned_df.na.drop(subset=["Room Number"])

# Aggregate Billing Amount by Hospital
from pyspark.sql.functions import sum

billing_summary = cleaned_df.groupBy("Hospital").agg(sum("Billing Amount").alias("Total Billing"))
billing_summary.show(truncate=False)

+-------------------------+------------------+
|Hospital                 |Total Billing     |
+-------------------------+------------------+
|Ramirez-Robinson         |96739.00128614204 |
|Foster Lamb, Graham and  |10659.608812944593|
|LLC Massey               |71446.9940966842  |
|Coleman-Aguilar          |34961.90671326528 |
|Group Stein              |25094.95572621151 |
|Smith PLC                |1029424.449116314 |
|Alvarado-Martin          |1077.1696809399066|
|Hall Group               |247201.21558231176|
|and Mayo Chen, Murray    |49400.214028011644|
|Lopez-Wilson             |56481.55593873338 |
|Harris-Farrell           |32166.967383439547|
|Dawson-Williams          |9616.626025733382 |
|Holmes Reed and Johnson, |9492.225487246009 |
|and Lee Rodriguez Morris,|18047.5510046168  |
|Watkins, and Young Perry |41838.22082511615 |
|Nunez-Hamilton           |40446.09173896364 |
|Freeman-Hunter           |29414.68600622586 |
|and Aguilar Sons         |92686.82176385348 |
|Reeves-Edwar

In [34]:
from pyspark.sql.functions import count, col

# Find and show duplicate rows
duplicates_df = df.groupBy(df.columns).agg(count("*").alias("count")).filter(col("count") > 1).drop("count")
duplicates_df.show(truncate=False)

+--------------------+---+------+----------+-----------------+-----------------+---------------------+------------------------------+------------------+------------------+-----------+--------------+--------------+-----------+------------+
|Name                |Age|Gender|Blood Type|Medical Condition|Date of Admission|Doctor               |Hospital                      |Insurance Provider|Billing Amount    |Room Number|Admission Type|Discharge Date|Medication |Test Results|
+--------------------+---+------+----------+-----------------+-----------------+---------------------+------------------------------+------------------+------------------+-----------+--------------+--------------+-----------+------------+
|maTthEw SEllERS     |71 |Female|O-        |Cancer           |2021-02-06       |Erin Page            |Brown-Cortez                  |UnitedHealthcare  |47566.507842139574|400        |Emergency     |2021-02-10    |Ibuprofen  |Abnormal    |
|samUel brYaNt       |53 |Male  |O+        |

In [35]:

cleaned_df = cleaned_df.dropDuplicates()

cleaned_df.show(truncate=False)

+------------------+---+------+----------+-----------------+-----------------+-------------------+------------------------------+------------------+------------------+-----------+--------------+--------------+-----------+------------+
|Name              |Age|Gender|Blood Type|Medical Condition|Date of Admission|Doctor             |Hospital                      |Insurance Provider|Billing Amount    |Room Number|Admission Type|Discharge Date|Medication |Test Results|
+------------------+---+------+----------+-----------------+-----------------+-------------------+------------------------------+------------------+------------------+-----------+--------------+--------------+-----------+------------+
|James Ross        |83 |female|A+        |Diabetes         |2024-01-13       |Michael Baker      |Cox-Hester                    |Blue Cross        |10352.20848674086 |394        |Urgent        |2024-01-22    |Aspirin    |Abnormal    |
|Andrea Allen      |47 |female|AB+       |Obesity          |

In [36]:

total_columns = len(cleaned_df.columns)
print(f"Total number of columns: {total_columns}")

total_rows = cleaned_df.count()
print(f"Total number of rows: {total_rows}")

duplicate_count = total_rows - df.dropDuplicates().count()
print(f"Total number of duplicate rows: {duplicate_count}")

Total number of columns: 15
Total number of rows: 54966
Total number of duplicate rows: 0


In [39]:
output_path = "cleaned_data.csv"
cleaned_df.coalesce(1).write.mode("overwrite").csv(output_path, header=True)

Py4JJavaError: An error occurred while calling o435.csv.
: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:552)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:392)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:420)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:392)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:860)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:842)
