In [0]:
# This cell is only for validation
# It helps us confirm that the raw tables exist
# and lets us visually inspect the raw partner data
# before doing any transformations

spark.table("workspace.default.bettercare_raw").show(truncate=False)
spark.table("workspace.default.acme_raw").show(truncate=False)

+-------------+----------+---------+-------------+------------------+------------+
|subscriber_id|first_name|last_name|date_of_birth|email             |phone       |
+-------------+----------+---------+-------------+------------------+------------+
|BC-001       |Alice     |Johnson  |1965-08-10   |alice.j@test.com  |555-222-3333|
|BC-002       |Charlie   |Brown    |1972-03-25   |charlie.b@test.com|5554445555  |
+-------------+----------+---------+-------------+------------------+------------+

+-----------------------------------------------------------------+
|value                                                            |
+-----------------------------------------------------------------+
|MBI|FNAME|LNAME|DOB|EMAIL|PHONE                                  |
|1234567890A|John|Doe|03/15/1955|JOHN.DOE@EMAIL.COM|5551234567    |
|9876543210B|Jane|Smith|07/22/1948|jane.smith@email.com|5559876543|
||Bad|Row|03/15/1955|bad@email.com|5551234567                     |
|ABC123|||invaliddate|bad

In [0]:
# This configuration dictionary is the heart of the pipeline
# Every partner specific detail lives here
# The core pipeline logic never changes when new partners are added

PARTNER_CONFIG= {
    "ACME": {
        "source_table": "workspace.default.acme_raw",       # Raw source table name
        "partner_code": "ACME",                             # Hardcoded partner identifier
        "delimiter": "|",                                   # File delimiter used by this partner
        "raw_value_col":"value",                            # acme data arrives as a single text column called value
        "raw_columns_in_order": ["MBI", "FNAME", "LNAME", "DOB", "EMAIL", "PHONE"], # Column order after splitting the text line
        "column_mapping": {                             # Column mapping from partner schema to standardized schemas
            "MBI": "external_id",
            "FNAME": "first_name",
            "LNAME": "last_name",
            "DOB": "dob",
            "EMAIL": "email",
            "PHONE": "phone"
        }
    },
    "BETTERCARE": {
        "source_table": "workspace.default.bettercare_raw",
        "partner_code": "BETTERCARE",
        "delimiter": ",",
        "column_mapping": {
            "subscriber_id": "external_id",
            "first_name": "first_name",
            "last_name": "last_name",
            "date_of_birth": "dob",
            "email": "email",
            "phone": "phone"
        }
    }
}

# Final standardized column order
STANDARD_COLS = ["external_id", "first_name", "last_name", "dob", "email", "phone", "partner_code"]

In [0]:
# Import Spark SQL functions used for transformations
from pyspark.sql import functions as F

In [0]:
# This function normalizes date of birth
# It supports both MM/dd/yyyy and yyyy-MM-dd formats
# Whichever format works first will be used

def normalize_dob(colname):
    c = F.col(colname)
    return F.coalesce(
        F.expr(f"try_to_date({colname}, 'MM/dd/yyyy')"),
        F.expr(f"try_to_date({colname}, 'yyyy-MM-dd')")
    )

In [0]:
# This function normalizes phone numbers
# It removes all non numeric characters
# Then formats valid 10 digit numbers consistently

def normalize_phone(colname):
    digits = F.regexp_replace(F.col(colname), r"[^0-9]", "")
    return F.when(
        F.length(digits) == 10,
        F.concat_ws("-", F.substring(digits, 1, 3), F.substring(digits, 4, 3), F.substring(digits, 7, 4))
    ).otherwise(F.lit(None))

In [0]:
# This function converts raw partner data that arrives as a single text column
# into a structured DataFrame with individual columns.
#
## ** WHY THIS IS NEEDED: **
# Some partners (like ACME) send their data as pipe-delimited text files.
# When uploaded into Databricks, each row appears as one long string
# instead of separate columns.
#
## ** HOW IT WORKS:**
# 1. Reads the raw text column specified in the configuration.
# 2. Removes empty rows and the header row.
# 3. Splits each line using the delimiter defined in the configuration.
# 4. Assigns column names based on the configured column order.
#
## ** DESIGN PRINCIPLE: **
# This logic is fully configuration-driven.
# The delimiter and column order come from the config object,
# so no partner-specific logic is hardcoded in the function.
# Adding a new delimited-text partner requires only a config update.

def parse_delimited_value_column(df, cfg):
    value_col = cfg["raw_value_col"]
    delim = cfg["delimiter"]
    expected_cols = cfg["raw_columns_in_order"]

    header_line = delim.join(expected_cols)

    cleaned = (
        df
        .withColumn("line", F.trim(F.col(value_col)))
        .filter(F.col("line").isNotNull() & (F.col("line") != ""))
        .filter(F.col("line") != header_line)
    )

    parts = F.split(F.col("line"), f"\\{delim}")

    parsed = cleaned.select(*[parts[i].alias(expected_cols[i]) for i in range(len(expected_cols))])

    return parsed


In [0]:
# This function performs ingestion for any partner
# It does not know partner names
# It only follows instructions from configuration

def ingest_and_map(cfg):
    df = spark.table(cfg["source_table"])       # Read raw partner table

    if "raw_value_col" in cfg:                          # If raw_value_col exists in config
        df = parse_delimited_value_column(df, cfg)      # it means the partner data needs parsing

    mapped = [F.col(src).alias(dst) for src, dst in cfg["column_mapping"].items()]   # Rename partner columns to standardized column names
    df = df.select(*mapped)

    df = df.withColumn("partner_code", F.lit(cfg["partner_code"]))  # Add hardcoded partner identifier column

    return df


In [0]:
# This function applies all required transformations
# It ensures the output schema is consistent for all partners

def standardize(df):
    df2 = (
        df
        # Clean and format fields
        .withColumn("external_id", F.trim(F.col("external_id")))
        .withColumn("first_name", F.initcap(F.col("first_name")))
        .withColumn("last_name", F.initcap(F.col("last_name")))
        .withColumn("email", F.lower(F.col("email")))

        # Normalize date of birth
        .withColumn("dob_date", normalize_dob("dob"))
        .withColumn("dob", F.date_format(F.col("dob_date"), "yyyy-MM-dd"))
        .drop("dob_date")

        # Normalize phone number
        .withColumn("phone", normalize_phone("phone"))

        # Enforce final column order
        .select(*STANDARD_COLS)
    )
    return df2


In [0]:
# This cell runs the pipeline end to end
# It loops through every partner configuration
# and applies the same ingestion and transformation logic

dfs = []
for cfg in PARTNER_CONFIG.values():
    df_ingested = ingest_and_map(cfg)
    df_std = standardize(df_ingested)
    dfs.append(df_std)

# Combine all partner datasets into one unified dataset
final_df = dfs[0]
for d in dfs[1:]:
    final_df = final_df.unionByName(d, allowMissingColumns=True)

# Display final unified data 
display(final_df)


external_id,first_name,last_name,dob,email,phone,partner_code
1234567890A,John,Doe,1955-03-15,john.doe@email.com,555-123-4567,ACME
9876543210B,Jane,Smith,1948-07-22,jane.smith@email.com,555-987-6543,ACME
,Bad,Row,1955-03-15,bad@email.com,555-123-4567,ACME
ABC123,,,,bademail,,ACME
BC-001,Alice,Johnson,1965-08-10,alice.j@test.com,555-222-3333,BETTERCARE
BC-002,Charlie,Brown,1972-03-25,charlie.b@test.com,555-444-5555,BETTERCARE


In [0]:
# This cell applies validation rules to the unified dataset
# The goal is to separate clean records from invalid records
# Invalid records are captured with a clear error reason
# Clean records are written to the final output table

# Check for missing or empty external_id (mandatory identifier, must not be null or blank)
missing_id = F.col("external_id").isNull() | (F.col("external_id") == "")

# Check for invalid date of birth: value is null, blank, or failed to parse
invalid_dob = F.col("dob").isNull() | (F.trim(F.col("dob")) == "")

# Invalid phone: missing or empty after formatting
invalid_phone = F.col("phone").isNull() | (F.trim(F.col("phone")) == "")

with_errors = final_df.withColumn(
    "error_reason",
    F.concat_ws(
        ",",
        F.when(missing_id, F.lit("missing_external_id")),
        F.when(invalid_dob, F.lit("invalid_dob")),
        F.when(invalid_phone, F.lit("invalid_phone"))
    )
)

error_df = with_errors.filter(F.col("error_reason") != "")

valid_df = with_errors.filter(F.col("error_reason") == "").drop("error_reason")

print("valid rows", valid_df.count())
print("error rows", error_df.count())

display(error_df)


valid rows 4
error rows 2


external_id,first_name,last_name,dob,email,phone,partner_code,error_reason
,Bad,Row,1955-03-15,bad@email.com,555-123-4567,ACME,missing_external_id
ABC123,,,,bademail,,ACME,"invalid_dob,invalid_phone"


In [0]:
# Save clean unified dataset
valid_df.write.mode("overwrite").saveAsTable("workspace.default.unified_members")

# Save invalid records for investigation
error_df.write.mode("overwrite").saveAsTable("workspace.default.unified_members_errors")

In [0]:
# Final confirmation that output table exists and is correct
spark.table("workspace.default.unified_members").show(truncate=False)

+-----------+----------+---------+----------+--------------------+------------+------------+
|external_id|first_name|last_name|dob       |email               |phone       |partner_code|
+-----------+----------+---------+----------+--------------------+------------+------------+
|BC-001     |Alice     |Johnson  |1965-08-10|alice.j@test.com    |555-222-3333|BETTERCARE  |
|BC-002     |Charlie   |Brown    |1972-03-25|charlie.b@test.com  |555-444-5555|BETTERCARE  |
|1234567890A|John      |Doe      |1955-03-15|john.doe@email.com  |555-123-4567|ACME        |
|9876543210B|Jane      |Smith    |1948-07-22|jane.smith@email.com|555-987-6543|ACME        |
+-----------+----------+---------+----------+--------------------+------------+------------+



In [0]:
# Final confirmation that output error table exists and is correct
spark.table("workspace.default.unified_members_errors").show(truncate=False)

+-----------+----------+---------+----------+-------------+------------+------------+-------------------------+
|external_id|first_name|last_name|dob       |email        |phone       |partner_code|error_reason             |
+-----------+----------+---------+----------+-------------+------------+------------+-------------------------+
|           |Bad       |Row      |1955-03-15|bad@email.com|555-123-4567|ACME        |missing_external_id      |
|ABC123     |          |         |NULL      |bademail     |NULL        |ACME        |invalid_dob,invalid_phone|
+-----------+----------+---------+----------+-------------+------------+------------+-------------------------+

