In [0]:
ls

In [0]:
!ls Data/acme
!ls Data/bettercare



In [0]:
pip install openpyxl

#### This script builds a unified healthcare eligibility dataset by ingesting and standardizing data from multiple partner sources with different file formats.

In [0]:
# =========================
# Healthcare Eligibility Pipeline (FINAL)
# =========================

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, initcap, lower,
    regexp_replace, concat_ws,
    lit, coalesce, try_to_date
)

import pandas as pd

spark = SparkSession.builder.appName("EligibilityPipeline").getOrCreate()

# -------------------------
# Partner Configuration
# -------------------------

PARTNER_CONFIG = {
    "acme": {
        "file_path": "/Workspace/Users/vijaylanka18us@gmail.com/Data Engineer Assessment/Data/acme/acme.txt",
        "file_type": "csv",
        "delimiter": "|",
        "partner_code": "ACME",
        "column_mapping": {
            "MBI": "external_id",
            "FNAME": "first_name",
            "LNAME": "last_name",
            "DOB": "dob",
            "EMAIL": "email",
            "PHONE": "phone"
        }
    },
    "bettercare": {
        "file_path": "/Workspace/Users/vijaylanka18us@gmail.com/Data Engineer Assessment/Data/bettercare/bettercare.xlsx",
        "file_type": "excel",
        "partner_code": "BETTERCARE",
        "column_mapping": {
            "subscriber_id": "external_id",
            "first_name": "first_name",
            "last_name": "last_name",
            "date_of_birth": "dob",
            "email": "email",
            "phone": "phone"
        }
    }
}

#### This section safely reads input files while handling format-specific quirks and avoiding Arrow conversion issues commonly seen with mixed or messy data.

In [0]:

# -------------------------
# Safe Reader (Arrow-safe)
# -------------------------
def read_partner_file(file_path, file_type, delimiter=None):
    if file_type == "csv":
        pdf = pd.read_csv(file_path, delimiter=delimiter)
    elif file_type == "excel":
        pdf = pd.read_excel(file_path)
    else:
        raise ValueError("Unsupported file type")

    ### Critical: normalize types for Arrow
    pdf = pdf.astype(str).replace("nan", None)

    return spark.createDataFrame(pdf)


#### Applies consistent data cleaning and normalization logic so all partner records conform to a single, well-defined eligibility schema.

In [0]:
# -------------------------
# Standard Transform
# -------------------------
def transform_to_standard(df, config):

    for src, tgt in config["column_mapping"].items():
        df = df.withColumnRenamed(src, tgt)

    df = (
        df
        .withColumn("first_name", initcap(col("first_name")))
        .withColumn("last_name", initcap(col("last_name")))
        .withColumn("email", lower(col("email")))
        .withColumn("email", regexp_replace(col("email"), ",$", ""))

        #  date parsing
        .withColumn(
            "dob",
            coalesce(
                try_to_date(col("dob"), "MM/dd/yyyy"),
                try_to_date(col("dob"), "yyyy-MM-dd")
            )
        )

        # Clean phone
        .withColumn("phone", regexp_replace(col("phone"), "[^0-9]", ""))
        .withColumn(
            "phone",
            concat_ws(
                "-",
                col("phone").substr(1, 3),
                col("phone").substr(4, 3),
                col("phone").substr(7, 4)
            )
        )
        .withColumn("partner_code", lit(config["partner_code"]))
    )

    return (
        df
        .select(
            "external_id",
            "first_name",
            "last_name",
            "dob",
            "email",
            "phone",
            "partner_code"
        )
        .filter(col("external_id").isNotNull())
    )

### Orchestrates the end-to-end flow by iterating through partners, invoking the appropriate reader, applying transformations, and combining results.

In [0]:

# -------------------------
# Pipeline Runner
# -------------------------

def run_pipeline():
    dfs = []

    for partner, config in PARTNER_CONFIG.items():
        print(f"Processing partner: {partner}")
        print(f"Reading file: {config['file_path']}")

        raw_df = read_partner_file(
            config["file_path"],
            config["file_type"],
            config.get("delimiter")
        )

        std_df = transform_to_standard(raw_df, config)
        dfs.append(std_df)

    final_df = dfs[0]
    for df in dfs[1:]:
        final_df = final_df.unionByName(df)

    return final_df

#### Executes the full pipeline and produces the final, validated eligibility dataset ready for downstream use.

In [0]:

# -------------------------
# Execute
# -------------------------

final_df = run_pipeline()
final_df.show(truncate=False)
final_df.printSchema()