Partner config (config-driven, required)

# Databricks / PySpark ‚Äî Full end-to-end solution (Acme + BetterCare)
# - Reads raw files from S3 (your external location already allows spark.read from s3://)
# - Configuration-driven ingestion (delimiter + column mappings)
# - Standardizes to unified schema + required transformations
# - Writes Silver as Delta to S3
# - Publishes Gold unified Delta table
# - Writes Rejects (missing external_id) as Delta
#
# Matches assessment requirements. :contentReference[oaicite:0]{index=0}

In [0]:

from pyspark.sql import functions as F

# =========================
# 1) CONFIG (edit paths if needed)
# =========================
PARTNER_CONFIGS = {
    "acme": {
        "partner_code": "ACME",
        "path": "s3://databricks-age-bold/partner/acme/acme.txt",
        "delimiter": "|",
        "dob_format": "MM/dd/yyyy",
        "column_mapping": {
            "MBI": "external_id",
            "FNAME": "first_name",
            "LNAME": "last_name",
            "DOB": "dob",
            "EMAIL": "email",
            "PHONE": "phone"
        }
    },
    "bettercare": {
        "partner_code": "BETTERCARE",
        "path": "s3://databricks-age-bold/partner/bettercare/bettercare.csv",
        "delimiter": ",",
        "dob_format": "yyyy-MM-dd",
        "column_mapping": {
            "subscriber_id": "external_id",
            "first_name": "first_name",
            "last_name": "last_name",
            "date_of_birth": "dob",
            "email": "email",
            "phone": "phone"
        }
    }
}


In [0]:
SILVER_PATH = "s3://databricks-age-bold/silver/eligibility_delta"
REJECTS_PATH = "s3://databricks-age-bold/rejects/eligibility_delta"
GOLD_TABLE = "eligibility_gold_unified"  # Delta table in metastore

STEP 2 ‚Äî Generic transformation function (core logic)

In [0]:
STD_COLS = ["external_id", "first_name", "last_name", "dob", "email", "phone", "partner_code"]


# =========================
# 2) Helpers
# =========================
def format_phone(col):
    """
    Normalize phone into XXX-XXX-XXXX if exactly 10 digits after stripping non-digits.
    Otherwise set NULL.
    """
    digits = F.regexp_replace(col.cast("string"), r"[^0-9]", "")
    return F.when(
        F.length(digits) == 10,
        F.concat_ws(
            "-",
            F.substring(digits, 1, 3),
            F.substring(digits, 4, 3),
            F.substring(digits, 7, 4),
        )
    ).otherwise(F.lit(None))


def process_partner(cfg: dict):
    """
    Read raw partner file, map to standard schema, apply required transformations,
    and return (good_rows_df, bad_rows_df).
    """
    # Read raw
    df = (
        spark.read
             .option("header", True)
             .option("sep", cfg["delimiter"])
             .option("mode", "PERMISSIVE")
             .csv(cfg["path"])
    )

    # Rename -> standard schema using config mapping
    for src, tgt in cfg["column_mapping"].items():
        if src in df.columns:
            df = df.withColumnRenamed(src, tgt)

    # Select only mapped standard columns (ignore any extras)
    df = df.select(list(cfg["column_mapping"].values()))

    # Transformations required by assessment
    df = (
        df
        .withColumn("first_name", F.initcap(F.col("first_name")))
        .withColumn("last_name", F.initcap(F.col("last_name")))
        .withColumn("email", F.lower(F.col("email")))
        .withColumn("dob", F.date_format(F.to_date(F.col("dob").cast("string"), cfg["dob_format"]), "yyyy-MM-dd"))
        .withColumn("phone", format_phone(F.col("phone")))
        .withColumn("partner_code", F.lit(cfg["partner_code"]))
    ).select(STD_COLS)

    # Bonus validation: external_id must be present
    good = df.filter(F.col("external_id").isNotNull() & (F.length(F.col("external_id")) > 0))
    bad = df.filter(F.col("external_id").isNull() | (F.length(F.col("external_id")) == 0))

    return good, bad

STEP 3 ‚Äî Run Silver ingestion (standardized la

In [0]:

# =========================
# 3) Run pipeline (Acme + BetterCare) -> Silver + Rejects
# =========================
silver_dfs = []
total_good = 0
total_bad = 0

for partner_key, cfg in PARTNER_CONFIGS.items():
    print(f"Processing partner: {partner_key} ({cfg['partner_code']}) from {cfg['path']}")

    good_df, bad_df = process_partner(cfg)

    # Keep in memory list for union into unified dataset
    silver_dfs.append(good_df)

    # Write rejects (if any)
    if bad_df.limit(1).count() > 0:
        bad_count = bad_df.count()
        total_bad += bad_count
        (bad_df.write.format("delta").mode("append").save(REJECTS_PATH))
        print(f"  - Wrote {bad_count} reject rows to {REJECTS_PATH}")

    good_count = good_df.count()
    total_good += good_count
    print(f"  - Good rows: {good_count}")

# Unified silver dataset (single output)
if not silver_dfs:
    raise ValueError("No partner dataframes produced. Check input paths and configs.")

silver_df = silver_dfs[0]
for df in silver_dfs[1:]:
    silver_df = silver_df.unionByName(df)

# Write standardized Silver (overwrite for the take-home; production could append/partition)
(silver_df.write.format("delta").mode("overwrite").save(SILVER_PATH))

print(f"\n‚úÖ Silver written to: {SILVER_PATH}")
print(f"‚úÖ Total good rows: {total_good}")
print(f"‚úÖ Total bad rows:  {total_bad}")

display(spark.read.format("delta").load(SILVER_PATH))

In [0]:
# =========================
# 4) Publish Gold unified table (consumption)
# =========================
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {GOLD_TABLE} (
  external_id STRING,
  first_name  STRING,
  last_name   STRING,
  dob         STRING,
  email       STRING,
  phone       STRING,
  partner_code STRING
)
USING DELTA
""")

(
    spark.read.format("delta").load(SILVER_PATH)
         .select(STD_COLS)
         .write.mode("overwrite")
         .saveAsTable(GOLD_TABLE)
)

print(f"\nüèÅ Gold table published: {GOLD_TABLE}")
spark.sql(f"SELECT partner_code, COUNT(*) AS cnt FROM {GOLD_TABLE} GROUP BY partner_code").show()
spark.sql(f"SELECT * FROM {GOLD_TABLE} ORDER BY partner_code, external_id").show(truncate=False)

