In [1]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType
import os
print(os.environ.get('JAVA_HOME', 'JAVA_HOME not set'))

JAVA_HOME not set


In [2]:
spark = SparkSession.builder \
    .appName("Appointments Cleaning") \
    .getOrCreate()

In [3]:
df = spark.read.csv(r'/bin/aptDataset.csv', header=True, inferSchema=True)

In [4]:
old_row_count = df.count()
old_df_shape = (df.count(), len(df.columns))
old_df_datatypes = df.dtypes
print(old_row_count)
print(old_df_shape)
print(old_df_datatypes)

110527
(110527, 7)
[('AppointmentID', 'int'), ('PatientId', 'bigint'), ('ScheduledDay', 'timestamp'), ('AppointmentDay', 'timestamp'), ('Neighbourhood', 'string'), ('No-show', 'string'), ('Age', 'int')]


In [5]:
df = df.na.drop()

In [6]:
df = df.dropDuplicates()

In [7]:
df = df.withColumn("AppointmentDate", F.to_timestamp("AppointmentDay"))
df = df.withColumn("Year", F.year("AppointmentDate"))
df = df.withColumn("Month", F.month("AppointmentDate"))
df = df.withColumn("DayOfWeek", F.date_format("AppointmentDate", "E"))

In [8]:
string_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, StringType)]
for col in string_cols:
    df = df.withColumn(col, F.lower(F.trim(F.col(col))))

In [9]:
df = df.withColumn("AgeGrouped",
                   F.when(F.col("Age") < 18, "Child")
                   .when((F.col("Age") >= 18) & (F.col("Age") < 60), "Adult")
                   .otherwise("Senior"))

In [10]:
def add_late_scheduling_flag(df, scheduled_col="ScheduledDay", appointment_col="AppointmentDay", flag_col="LateScheduling"):
    df = df.withColumn("ScheduledDate", F.to_timestamp(F.col(scheduled_col)))
    df = df.withColumn("AppointmentDate", F.to_timestamp(F.col(appointment_col)))
    df = df.withColumn(flag_col,
                       F.when(F.col("ScheduledDate") > F.col("AppointmentDate"), F.lit(1))
                        .otherwise(F.lit(0)))
    return df.drop("ScheduledDate", "AppointmentDate")

In [11]:
output_path = "aptDatasetPqt"
df.write.mode("overwrite").partitionBy("Year", "Month").parquet(output_path)

In [12]:
null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas()

## Distribution of no-shows by AgeGroup
agegroup_dist = df.groupBy("AgeGrouped", "No-show").count().toPandas()

## Save QA report
row_count_after = df.count()
qa_report = {
    "RowCountBefore": old_row_count,
    "RowCountAfter": row_count_after
}
qa_df = pd.DataFrame([qa_report])
qa_df.to_csv("QA_Report.csv", index=False)
null_counts.to_csv("QA_NullCounts.csv", index=False)
agegroup_dist.to_csv("QA_NoShowDistribution.csv", index=False)

In [13]:
df.write.mode("overwrite").parquet("aptPrqtDataset")