In [0]:
from pyspark.sql.functions import col, lower, initcap, regexp_replace, when, to_date, lit, expr


In [0]:
def load_config_from_json(config_path):
    config_df = spark.read.option("multiline", "true").json(config_path)
    config_row = config_df.first()

    partner_code = config_row["partner_code"]
    delimiter = config_row["delimiter"]
    date_format = config_row["date_format"]
    column_mapping = config_row["column_mapping"].asDict() 

    return partner_code, delimiter, column_mapping, date_format


In [0]:
def standardize_columns(df, dob_format):
    df = df.withColumn("first_name", initcap(col("first_name"))) \
           .withColumn("last_name", initcap(col("last_name"))) \
           .withColumn("email", lower(col("email"))) \
           .withColumn("dob", expr(f"try_to_date(dob, '{dob_format}')")) \
           .withColumn("phone", regexp_replace(col("phone"), "[^0-9]", "")) \
           .withColumn("phone", when(col("phone").rlike(r"^\d{10}$"),
                                     regexp_replace(col("phone"), r"(\d{3})(\d{3})(\d{4})", r"$1-$2-$3"))
                       .otherwise(col("phone")))
    df = df.withColumn("is_valid", col("external_id").isNotNull())
    return df

In [0]:
def load_and_process(data_path, config_path):
    partner_code, delimiter, column_mapping, dob_format  = load_config_from_json(config_path)

    # Read the raw file
    raw_df = spark.read.option("header", True).option("delimiter", delimiter).csv(data_path)

    # TRIM column names (remove extra spaces)
    for col_name in raw_df.columns:
        raw_df = raw_df.withColumnRenamed(col_name, col_name.strip())

    # Rename columns to standardized schema
    df = raw_df.select([col(src).alias(dst) for src, dst in column_mapping.items()])

    # Clean and tag
    df = standardize_columns(df, dob_format)  
    return df.withColumn("partner_code", lit(partner_code))


In [0]:
# Data paths
acme_data_path = "/Volumes/data_catalog/source/assessment_volume/Healthcare_patners/acme.txt"
bettercare_data_path = "/Volumes/data_catalog/source/assessment_volume/Healthcare_patners/bettercare.csv"

# Config paths 
acme_config_path = "/Volumes/data_catalog/source/assessment_volume/Config/acme.json"
bettercare_config_path = "/Volumes/data_catalog/source/assessment_volume/Config/bettercare.json"


In [0]:
# Ingest both datasets
acme_df = load_and_process(acme_data_path, acme_config_path)
bettercare_df = load_and_process(bettercare_data_path, bettercare_config_path)

# Merge into unified output
final_df = acme_df.unionByName(bettercare_df)

merged_df = final_df.filter(col("dob").isNotNull())
merged_df.display() 


In [0]:
# Filter rows with null DOB
invalid_dob_df = final_df.filter(col("dob").isNull())\
                            .drop("is_valid")

output_path = "/Volumes/data_catalog/source/assessment_volume/output/invalid_dobs"

if invalid_dob_df.limit(1).count() == 0:
    # Delete entire output folder if no invalid DOBs
    dbutils.fs.rm(output_path, True)
    print("No invalid DOBs found. Old data cleaned.")

else:

    invalid_dob_df.display()

    # Get all partner codes (as list of Row objects)
    partner_codes_df = invalid_dob_df.select("partner_code").distinct()

    # Use collect() on DataFrame (NOT RDD) — supported in serverless
    partner_codes = [row["partner_code"] for row in partner_codes_df.collect()]

    # Loop and write each partner’s invalid DOB rows separately
    for partner in partner_codes:
        partner_df = invalid_dob_df.filter(col("partner_code") == partner)
        partner_df = partner_df.drop("partner_code")
        partner_df.write.mode("overwrite").option("header", True).csv(f"{output_path}/{partner}")
