### The below block of code is to be removed prior to staging

In [0]:
%run "/Workspace/Users/anirudhp@megnity.com/Healthcare/src/Ingest Source Data"

### General Insights into the Tables

In [0]:
first_camp.printSchema()
second_camp.printSchema()
third_camp.printSchema()
patient_profiles.printSchema()
health_camp_details.printSchema()


In [0]:
print("The number of rows in the first camp table:", first_camp.count())
print("The number of rows in the second camp table:", second_camp.count())
print("The number of rows in the third camp table:", third_camp.count())
print("The number of rows in the patient profiles table:", patient_profiles.count())
print("The number of rows in the health camp table:", health_camp_details.count())

### Steps for cleaning the data

1. Go through the data description on Kaggle
2. Define what a clean, usable, and accurate table should look like
    1. Drop/Fill if there is no value .dropna()
    3. There should be no negatives (if applicable), only positives (if applicable)
    4. Drop duplicates (if applicable)
    5. Standardizing column names
    6. Trimming and cleaning string data
    7. Parsing dates and timestamps
    8. Casting columns to correct types
    9. Validating categorical values
3. Clean the table first_camp
4. Clean the table second_camp
5. Clean the table third_camp
6. Clean the table Health_Camp_Detail
7. Clean the table Patient_Profile

### Let's the clean the table first camp 

In [0]:
# first_camp
display(first_camp.limit(25))

In [0]:
# # drop all rows with null values

# before_count = first_camp.count()
# first_camp = first_camp.dropna()
# after_count = first_camp.count()
# print("Rows dropped:", before_count - after_count)

In [0]:
# Check which column has null values
from pyspark.sql.functions import col, sum

missing_values = first_camp.select([sum(col(c).isNull().cast("int")).alias(c) for c in first_camp.columns])
display(missing_values)

In [0]:
# There are missing values only in the _c4 column which is irrelevant. So it's better to remove the column

first_camp = first_camp.drop("_c4")
display(first_camp.limit(25))

In [0]:
# Check for negatives

negative_counts = first_camp.select([sum((col(c) < 0).cast("int")).alias(c) for c in first_camp.columns])
display(negative_counts)

In [0]:
# Drop duplicates (if applicable)

before_count = first_camp.count()
first_camp = first_camp.dropDuplicates(["Patient_ID", "Health_Camp_ID"])
after_count = first_camp.count()
rows_dropped = before_count - after_count
print("Rows dropped:", rows_dropped)

In [0]:
# Standardizing column names

first_camp = first_camp.withColumnRenamed("Patient_ID", "patient_id") \
                       .withColumnRenamed("Health_Camp_ID", "health_camp_id") \
                       .withColumnRenamed("Donation", "donation") \
                       .withColumnRenamed("Health_Score", "health_score")

In [0]:
# first camp table is cleaned

first_camp_cleaned = first_camp

display(first_camp_cleaned.limit(25))

### Let's clean the table second camp 

In [0]:
# second_camp
display(second_camp.limit(25))

In [0]:
# 1. Drop/Fill if there is no value .dropna()

before_count = second_camp.count()
second_camp = second_camp.dropna()
after_count = second_camp.count()
print("Rows dropped:", before_count - after_count)

In [0]:
# 2. There should be no no negatives (if applicable), only positives

non_positive_counts = second_camp.select([sum((col(c) <= 0).cast("int")).alias(c) for c in second_camp.columns])
display(non_positive_counts)

In [0]:
# 3. Drop duplicates (if applicable)

before_count = second_camp.count()
second_camp = second_camp.dropDuplicates(["Patient_ID", "Health_Camp_ID"])
after_count = second_camp.count()
rows_dropped = before_count - after_count
print("Rows dropped:", rows_dropped)

In [0]:
# 4. Standardizing column names

second_camp = second_camp.withColumnRenamed("Patient_ID", "patient_id") \
                       .withColumnRenamed("Health_Camp_ID", "health_camp_id") \
                       .withColumnRenamed("Health Score", "health_score")

display(second_camp.limit(25))

In [0]:
# second camp table is cleaned

second_camp_cleaned = second_camp

display(second_camp_cleaned.limit(25))

### Let's clean the table third camp 

In [0]:
# third_camp
display(third_camp.limit(25))

In [0]:
# 1. Drop/Fill if there is no value .dropna()

before_count = third_camp.count()
third_camp = third_camp.dropna()
after_count = third_camp.count()
print("Rows dropped:", before_count - after_count)

In [0]:
# 2. There should be no negatives (if applicable), only positives and zeroes

non_positive_counts = third_camp.select([sum((col(c) <= 0).cast("int")).alias(c) for c in third_camp.columns])
display(non_positive_counts)

In [0]:
# Display rows where Number_of_stall_visited is <= 0

invalid_stalls_visited = third_camp.filter(col("Number_of_stall_visited") <= 0)
display(invalid_stalls_visited)

# We don't have to drop any of these rows

In [0]:
# 3. Drop duplicates (if applicable)

before_count = third_camp.count()
third_camp = third_camp.dropDuplicates(["Patient_ID", "Health_Camp_ID"])
after_count = third_camp.count()
rows_dropped = before_count - after_count
print("Rows dropped:", rows_dropped)

In [0]:
# 4. Standardizing column names

third_camp = third_camp.withColumnRenamed("Patient_ID", "patient_id") \
                       .withColumnRenamed("Health_Camp_ID", "health_camp_id") \
                       .withColumnRenamed("Number_of_stall_visited", "number_of_stall_visited") \
                       .withColumnRenamed("Last_Stall_Visited_Number", "last_stall_visited_number")

display(third_camp.limit(25))

In [0]:
# third camp table is cleaned

third_camp_cleaned = third_camp

display(third_camp_cleaned.limit(25))

### Let's clean the table patient profile

In [0]:
# patient_profile

display(patient_profiles.limit(25))

In [0]:
# 1. Drop/Fill if there is no value .dropna()
# Check if there are any missing values in the column Patient_ID and only drop those. If there are missing values in the follower count, income, education score, age, first interaction, city type, employer category, then ignore those. 

before_count = patient_profiles.count()
patient_profiles = patient_profiles.dropna(subset=["Patient_ID"])
after_count = patient_profiles.count()
print("Rows dropped:", before_count - after_count)

In [0]:
# 7. Casting columns to correct types
# Cast Income and Age to Integer and Education_Score to double
from pyspark.sql.functions import when, lit
from pyspark.sql.types import DoubleType, IntegerType

cols_to_cast = {
    "Income": IntegerType(),
    "Education_Score": DoubleType(),
    "Age": IntegerType()
}

for c, t in cols_to_cast.items():
    patient_profiles = patient_profiles.withColumn(
        c,
        when(col(c).isNull(), lit(None)).otherwise(col(c).try_cast(t))
    )

display(patient_profiles.limit(25))
patient_profiles.printSchema()

In [0]:
# 6. Parsing dates and timestamps
# Convert First_Interaction to date

from pyspark.sql.functions import to_date

patient_profiles = patient_profiles.withColumn("First_Interaction", to_date(col("First_Interaction"), "dd-MMM-yy"))
display(patient_profiles.limit(25))
patient_profiles.printSchema()

In [0]:
# 2. Check if all Patient_ID values are greater than 0

non_positive_count = patient_profiles.filter(col("Patient_ID") <= 0).count()
display(spark.createDataFrame([(non_positive_count,)], ["Non_Positive_Patient_ID_Count"]))

In [0]:
# Check if all other columns until age have values that are greater than or equal to 0

columns_to_check = ["Online_Follower", "LinkedIn_Shared", "Twitter_Shared", "Facebook_Shared", "Income", "Education_Score", "Age"]
non_positive_counts = patient_profiles.select([sum((col(c) < 0).cast("int")).alias(c) for c in columns_to_check])
display(non_positive_counts)

In [0]:
# 3. Drop duplicates (if applicable)

before_count = patient_profiles.count()
patient_profiles = patient_profiles.dropDuplicates(["Patient_ID"])
after_count = patient_profiles.count()
rows_dropped = before_count - after_count
print("Rows dropped:", rows_dropped)

In [0]:
# 4. Standardizing column names

patient_profiles = patient_profiles.withColumnRenamed("Patient_ID", "patient_id") \
                                  .withColumnRenamed("Online_Follower", "online_follower") \
                                  .withColumnRenamed("LinkedIn_Shared", "linkedin_shared") \
                                  .withColumnRenamed("Twitter_Shared", "twitter_shared") \
                                  .withColumnRenamed("Facebook_Shared", "facebook_shared") \
                                  .withColumnRenamed("Income", "income") \
                                  .withColumnRenamed("Education_Score", "education_score") \
                                  .withColumnRenamed("Age", "age") \
                                  .withColumnRenamed("First_Interaction", "first_interaction") \
                                  .withColumnRenamed("City_Type", "city_type") \
                                  .withColumnRenamed("Employer_Category", "employer_category")

In [0]:
# patient_profiles table is clean

patient_profiles_cleaned = patient_profiles
display(patient_profiles_cleaned.limit(25))

### Let's clean the table health camp details

In [0]:
# health_camp_details

display(health_camp_details.limit(25))

In [0]:
# 1. Drop/Fill if there is no value .dropna()
# Check if there are any missing values in Health_Camp_ID and drop those rows

before_count = health_camp_details.count()
health_camp_details = health_camp_details.dropna(subset=["Health_Camp_ID"])
after_count = health_camp_details.count()
print("Rows dropped:", before_count - after_count)

In [0]:
# 2. There should be only positives in the column Health_Camp_ID

non_positive_count = health_camp_details.filter(col("Health_Camp_ID") <= 0).count()
display(spark.createDataFrame([(non_positive_count,)], ["Non_Positive_Patient_ID_Count"]))

In [0]:
# 3. Drop duplicates 

before_count = health_camp_details.count()
health_camp_details = health_camp_details.dropDuplicates(["Health_Camp_ID"])
after_count = health_camp_details.count()
rows_dropped = before_count - after_count
print("Rows dropped:", rows_dropped)

In [0]:
# 4. Standardizing column names

health_camp_details = health_camp_details.withColumnRenamed("Health_Camp_ID", "health_camp_id") \
                                        .withColumnRenamed("Camp_Start_Date", "camp_start_date") \
                                        .withColumnRenamed("Camp_End_Date", "camp_end_date") \
                                        .withColumnRenamed("Category1", "category1") \
                                        .withColumnRenamed("Category2", "category2") \
                                        .withColumnRenamed("Category3", "category3")  

In [0]:
# 6. Parsing dates and timestamps
# Convert camp_start_date and camp_end_date to type date

health_camp_details = health_camp_details.withColumn("camp_start_date", to_date(col("camp_start_date"), "dd-MMM-yy")) \
                                         .withColumn("camp_end_date", to_date(col("camp_end_date"), "dd-MMM-yy"))


In [0]:
# health_camp_details table is clean

health_camp_details_cleaned = health_camp_details
display(health_camp_details_cleaned.limit(25))