Install & Start Spark

In [1]:
!pip install pyspark




Imports & Spark Session

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, lower, initcap, to_date, date_format,
    regexp_replace, lit, length, when
)
import logging

spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("EligibilityPipeline-Colab")
    .getOrCreate()
)

spark


The logging set up

In [3]:
logger = logging.getLogger("eligibility_pipeline")
logger.setLevel(logging.INFO)

if not logger.handlers:
    handler = logging.StreamHandler()
    formatter = logging.Formatter(
        "%(asctime)s - %(levelname)s - %(message)s"
    )
    handler.setFormatter(formatter)
    logger.addHandler(handler)

logger.info("Logging initialized")


2026-01-14 18:02:46,041 - INFO - Logging initialized
INFO:eligibility_pipeline:Logging initialized


Creating Sample Input Files based on the given info

In [4]:
acme_data = """MBI|FNAME|LNAME|DOB|EMAIL|PHONE
1234567890A|John|Doe|03/15/1955|JOHN.DOE@EMAIL.COM|5551234567
"""

bettercare_data = """subscriber_id,first_name,last_name,date_of_birth,email,phone
BC-001,Alice,Johnson,1965-08-10,alice.j@test.com,555-222-3333
"""

nova_data = """member_id~fname~lname~birthdate~email_address~phone_number
N-777~michael~scott~1964/03/15~MICHAEL@DUNDER.COM~(555)8889999
"""

with open("/content/acme.txt", "w") as f:
    f.write(acme_data)

with open("/content/bettercare.csv", "w") as f:
    f.write(bettercare_data)

with open("/content/novahealth.txt", "w") as f:
    f.write(nova_data)


Partner Configuration (3 PARTNERS) as required

In [5]:
PARTNER_CONFIG = {
    "acme_health": {
        "partner_code": "ACME",
        "path": "/content/acme.txt",
        "delimiter": "|",
        "column_mapping": {
            "MBI": "external_id",
            "FNAME": "first_name",
            "LNAME": "last_name",
            "DOB": "dob",
            "EMAIL": "email",
            "PHONE": "phone"
        },
        "date_format": "MM/dd/yyyy"
    },
    "better_care": {
        "partner_code": "BETTER",
        "path": "/content/bettercare.csv",
        "delimiter": ",",
        "column_mapping": {
            "subscriber_id": "external_id",
            "first_name": "first_name",
            "last_name": "last_name",
            "date_of_birth": "dob",
            "email": "email",
            "phone": "phone"
        },
        "date_format": "yyyy-MM-dd"
    },
    "nova_health": {
        "partner_code": "NOVA",
        "path": "/content/novahealth.txt",
        "delimiter": "~",
        "column_mapping": {
            "member_id": "external_id",
            "fname": "first_name",
            "lname": "last_name",
            "birthdate": "dob",
            "email_address": "email",
            "phone_number": "phone"
        },
        "date_format": "yyyy/MM/dd"
    }
}


Checking the data quality

In [6]:
def data_quality_checks(df):
    checks = {
        "missing_external_id": df.filter(col("external_id").isNull()).count(),
        "invalid_dob": df.filter(col("dob").isNull()).count(),
        "invalid_phone": df.filter(col("phone").isNull()).count()
    }

    for k, v in checks.items():
        logger.info(f"DQ_CHECK | {k} = {v}")

    return checks


the transformation logic

In [7]:
def transform_dataframe(df, config):
    df = (
        df
        .filter(col("external_id").isNotNull())
        .withColumn("first_name", initcap(col("first_name")))
        .withColumn("last_name", initcap(col("last_name")))
        .withColumn("email", lower(col("email")))
        .withColumn(
            "dob",
            date_format(
                to_date(col("dob"), config["date_format"]),
                "yyyy-MM-dd"
            )
        )
        .withColumn(
            "phone_digits",
            regexp_replace(col("phone"), "[^0-9]", "")
        )
        .withColumn(
            "phone",
            when(
                length(col("phone_digits")) == 10,
                regexp_replace(
                    col("phone_digits"),
                    r"(\d{3})(\d{3})(\d{4})",
                    "$1-$2-$3"
                )
            )
        )
        .withColumn("partner_code", lit(config["partner_code"]))
        .drop("phone_digits")
    )

    return df.select(
        "external_id", "first_name", "last_name",
        "dob", "email", "phone", "partner_code"
    )


the ingestion and the pipeline

In [8]:
def ingest_partner(config):
    df = (
        spark.read
        .option("header", True)
        .option("delimiter", config["delimiter"])
        .csv(config["path"])
    )

    for src, tgt in config["column_mapping"].items():
        df = df.withColumnRenamed(src, tgt)

    df = transform_dataframe(df, config)
    data_quality_checks(df)

    logger.info(f"METRIC | {config['partner_code']} | rows={df.count()}")
    return df


def run_pipeline():
    dfs = [ingest_partner(cfg) for cfg in PARTNER_CONFIG.values()]
    final_df = dfs[0]
    for df in dfs[1:]:
        final_df = final_df.unionByName(df)
    return final_df


pipeline execution

In [9]:
final_df = run_pipeline()
final_df.show(truncate=False)


2026-01-14 18:16:10,060 - INFO - DQ_CHECK | missing_external_id = 0
INFO:eligibility_pipeline:DQ_CHECK | missing_external_id = 0
2026-01-14 18:16:10,070 - INFO - DQ_CHECK | invalid_dob = 0
INFO:eligibility_pipeline:DQ_CHECK | invalid_dob = 0
2026-01-14 18:16:10,077 - INFO - DQ_CHECK | invalid_phone = 0
INFO:eligibility_pipeline:DQ_CHECK | invalid_phone = 0
2026-01-14 18:16:10,445 - INFO - METRIC | ACME | rows=1
INFO:eligibility_pipeline:METRIC | ACME | rows=1
2026-01-14 18:16:11,946 - INFO - DQ_CHECK | missing_external_id = 0
INFO:eligibility_pipeline:DQ_CHECK | missing_external_id = 0
2026-01-14 18:16:11,950 - INFO - DQ_CHECK | invalid_dob = 0
INFO:eligibility_pipeline:DQ_CHECK | invalid_dob = 0
2026-01-14 18:16:11,953 - INFO - DQ_CHECK | invalid_phone = 0
INFO:eligibility_pipeline:DQ_CHECK | invalid_phone = 0
2026-01-14 18:16:12,241 - INFO - METRIC | BETTER | rows=1
INFO:eligibility_pipeline:METRIC | BETTER | rows=1
2026-01-14 18:16:13,605 - INFO - DQ_CHECK | missing_external_id = 0


+-----------+----------+---------+----------+------------------+------------+------------+
|external_id|first_name|last_name|dob       |email             |phone       |partner_code|
+-----------+----------+---------+----------+------------------+------------+------------+
|1234567890A|John      |Doe      |1955-03-15|john.doe@email.com|555-123-4567|ACME        |
|BC-001     |Alice     |Johnson  |1965-08-10|alice.j@test.com  |555-222-3333|BETTER      |
|N-777      |Michael   |Scott    |1964-03-15|michael@dunder.com|555-888-9999|NOVA        |
+-----------+----------+---------+----------+------------------+------------+------------+



Checking Unit Tests (Py-Test)

In [10]:
def test_name_formatting():
    row = final_df.filter(col("external_id") == "N-777").collect()[0]
    assert row.first_name == "Michael"
    assert row.last_name == "Scott"

def test_email_lowercase():
    row = final_df.filter(col("external_id") == "N-777").collect()[0]
    assert row.email == "michael@dunder.com"

def test_phone_format():
    row = final_df.filter(col("external_id") == "N-777").collect()[0]
    assert row.phone == "555-888-9999"

test_name_formatting()
test_email_lowercase()
test_phone_format()

print("✅ All unit tests passed")


✅ All unit tests passed
