
## THIS NOTEBOOK CONTAINS

**Core Transformations**  
Standardizes partner data into a unified schema, including column normalization, date parsing, and phone number formatting to ensure consistent downstream processing.

**Core Validations**  
Applies centralized data quality checks such as required identifiers, valid date formats, and phone number integrity, routing invalid records to a rejects dataset without failing the pipeline.


In [0]:
STANDARD_COLS = [
    "external_id", "first_name", "last_name",
    "dob", "email", "phone", "partner_code"
]


def apply_column_mapping(df: DataFrame, mapping: Dict[str, str], partner_code: str) -> DataFrame:
    for src, tgt in mapping.items():
        if src in df.columns:
            df = df.withColumnRenamed(src, tgt)
        else:
            df = df.withColumn(tgt, F.lit(None).cast("string"))

    for col in STANDARD_COLS:
        if col not in df.columns:
            df = df.withColumn(col, F.lit(None).cast("string"))

    df = df.withColumn("partner_code", F.lit(partner_code))
    return df.select(*STANDARD_COLS)

def standardize_fields(df: DataFrame) -> DataFrame:
    df = (
        df.withColumn("first_name", F.initcap(F.lower(F.col("first_name"))))
          .withColumn("last_name", F.initcap(F.lower(F.col("last_name"))))
          .withColumn("email", F.lower(F.col("email")))
    )

    df = df.withColumn(
        "dob_parsed",
        F.coalesce(
            F.expr("try_to_date(dob, 'MM/dd/yyyy')"),
            F.expr("try_to_date(dob, 'yyyy-MM-dd')")
        )
    )

    df = df.withColumn(
        "phone_digits",
        F.regexp_replace("phone", r"[^0-9]", "")
    )

    return df

In [0]:
def split_clean_and_rejects(df: DataFrame, cfg: dict):
    required_digits = int(cfg["validation"].get("phone_digits_required", 10))

    ext_ok = F.col("external_id").isNotNull() & (F.length(F.col("external_id")) > 0)
    dob_ok = F.col("dob").isNull() | F.col("dob_parsed").isNotNull()
    phone_ok = F.col("phone_digits").isNull() | (F.length(F.col("phone_digits")) == required_digits)

    reject_reason = (
        F.when(~ext_ok, "missing_external_id")
         .when(~dob_ok, "invalid_dob")
         .when(~phone_ok, "invalid_phone")
    )

    df = df.withColumn("reject_reason", reject_reason)

    phone_fmt = F.when(
        F.length("phone_digits") == required_digits,
        F.concat_ws(
            "-",
            F.substring("phone_digits", 1, 3),
            F.substring("phone_digits", 4, 3),
            F.substring("phone_digits", 7, 4),
        ),
    )

    clean = (
        df.filter(F.col("reject_reason").isNull())
          .select(
              "external_id", "first_name", "last_name",
              F.date_format("dob_parsed", "yyyy-MM-dd").alias("dob"),
              "email", phone_fmt.alias("phone"), "partner_code"
          )
    )

    rejects = df.filter(F.col("reject_reason").isNotNull())
    return clean, rejects