In [33]:
import requests as req
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, array, array_contains
import xml.etree.ElementTree as ET
from pyspark.sql.functions import col
from pyspark.sql.functions import length
from pyspark.sql.functions import lit, concat, col
from pyspark.sql import Row
from pyspark.sql.functions import col, to_date
import pandas as pd
from pathlib import Path
import requests
import re

In [34]:
# url_file = "/content/drive/MyDrive/milliman/ccda_pre_signed_urls.csv"
# ccda_dir = "/content/drive/MyDrive/milliman/ccda_files"
# os.makedirs(ccda_dir, exist_ok=True)

# with open(url_file, 'r') as file:
#     # Skipping  the header in the file
#     next(file)
#     for line in file:
#         url = line.strip()
#         if url:
#             filename = url.split("/")[-1].split("?")[0]
#             output_path = os.path.join(ccda_dir, filename)
#             try:
#                 response = requests.get(url)
#                 response.raise_for_status()
#                 with open(output_path, 'wb') as f:
#                     f.write(response.content)
#             except requests.exceptions.HTTPError as e:
#                 print(f"Failed to download {filename}: {e}")


In [35]:
# List all unique CCDA element names across all files
ccda_dir = Path("/content/drive/MyDrive/milliman/ccda_files")
xml_paths = sorted(ccda_dir.glob("*.xml"))
ns_strip = lambda tag: tag.split("}", 1)[-1]  # drop namespace

all_tags = set()
for p in xml_paths:
    root = ET.parse(p).getroot()
    for elem in root.iter():
        all_tags.add(ns_strip(elem.tag))

# Show a sorted sample of tags
for t in sorted(all_tags):
    print(t)


ClinicalDocument
act
addr
administrationUnitCode
administrativeGenderCode
approachSiteCode
assignedAuthor
assignedAuthoringDevice
assignedCustodian
assignedEntity
assignedPerson
associatedEntity
associatedPerson
author
authorization
awarenessCode
birthTime
caption
city
code
col
colgroup
component
componentOf
confidentialityCode
consent
consumable
content
country
county
criterion
custodian
deceasedInd
deceasedTime
desc
dischargeDispositionCode
documentationOf
doseQuantity
effectiveTime
encompassingEncounter
encounter
encounterParticipant
entry
entryRelationship
ethnicGroupCode
family
footnote
functionCode
given
healthCareFacility
high
id
informant
informationRecipient
intendedRecipient
interpretationCode
item
languageCode
languageCommunication
legalAuthenticator
list
location
lotNumberText
low
manufacturedLabeledDrug
manufacturedMaterial
manufacturedProduct
manufacturerModelName
manufacturerOrganization
maritalStatusCode
methodCode
modeCode
name
observation
observationRange
organizer
or

In [36]:
# creating raw data frame from ccda files to discover what tags and attributes exist across all each ccda xml files

# 1) Locate all your CCDA XML files
ccda_dir = Path("/content/drive/MyDrive/milliman/ccda_files")
xml_paths = sorted(ccda_dir.glob("*.xml"))
if not xml_paths:
    raise FileNotFoundError(f"No XML files found in {ccda_dir}")

# 2) Helper to strip namespaces from tags
ns_strip = re.compile(r"\{.*\}")
def strip_ns(tag: str) -> str:
    return ns_strip.sub("", tag)

# 3) Build record list
records = []
for p in xml_paths:
    try:
        tree = ET.parse(p)
    except ET.ParseError:
        # skip malformed files
        continue
    root = tree.getroot()
    for elem in root.iter():
        tag = strip_ns(elem.tag)
        # capture text (if any)
        text = elem.text.strip() if elem.text and elem.text.strip() else ""
        # capture attributes (as a dict)
        attributes = elem.attrib or {}
        records.append({
            "file_name":    p.name,
            "tag":          tag,
            "text":         text,
            "attributes":   attributes
        })

# 4) Create  a DataFrame
df_ccda_raw = pd.DataFrame.from_records(records)

# 5) Inspect top rows
df_ccda_raw.head()

Unnamed: 0,file_name,tag,text,attributes
0,005ao54m-c566-7671-19c0-596o1os969x4_055d57c0a...,ClinicalDocument,,{}
1,005ao54m-c566-7671-19c0-596o1os969x4_055d57c0a...,realmCode,,{'code': 'US'}
2,005ao54m-c566-7671-19c0-596o1os969x4_055d57c0a...,typeId,,"{'root': '2.16.840.1.113883.1.3', 'extension':..."
3,005ao54m-c566-7671-19c0-596o1os969x4_055d57c0a...,templateId,,{'root': '2.16.840.1.113883.10.20.22.1.1'}
4,005ao54m-c566-7671-19c0-596o1os969x4_055d57c0a...,templateId,,"{'root': '2.16.840.1.113883.10.20.22.1.1', 'ex..."


In [37]:
# exploring ccda xml files and Combine all CCDA XML files into one pandas DataFrame to explore source data
# 1. Point to your CCDA directory
ccda_dir = Path("/content/drive/MyDrive/milliman/ccda_files")
xml_paths = sorted(ccda_dir.glob("*.xml"))
if not xml_paths:
    raise FileNotFoundError(f"No XML files found in {ccda_dir}")

# 2) XML namespaces
ns = {
    "hl7": "urn:hl7-org:v3",
    "xsi": "http://www.w3.org/2001/XMLSchema-instance"
}

# 3) Function to extract  fields, with None-safe name handling
def summarize_ccda(path: Path) -> dict:
    tree = ET.parse(path)
    root = tree.getroot()

    # patientRole
    pr = root.find(".//hl7:recordTarget/hl7:patientRole", ns)

    # ID
    pid_e = pr.find("hl7:id[@extension]", ns) if pr is not None else None
    patient_id = pid_e.attrib.get("extension", "") if pid_e is not None else ""

    # Name (given + family), filtering out any None
    patient_name = ""
    if pr is not None:
        name_e = pr.find("hl7:patient/hl7:name", ns)
        if name_e is not None:
            given_parts = [
                g.text.strip()
                for g in name_e.findall("hl7:given", ns)
                if g.text and isinstance(g.text, str)
            ]
            family = name_e.findtext("hl7:family", namespaces=ns) or ""
            # filter out any empty strings
            name_parts = [p for p in given_parts + [family] if p]
            patient_name = " ".join(name_parts)

    # Birth
    birth = ""
    if pr is not None:
        birth_e = pr.find("hl7:patient/hl7:birthTime", ns)
        birth = birth_e.attrib.get("value", "") if birth_e is not None else ""

    # Counts
    meds_count  = len(root.findall(".//hl7:substanceAdministration", ns))
    probs_count = len(root.findall(".//hl7:observation", ns))

    return {
        "file_name":    path.name,
        "patient_id":   patient_id,
        "patient_name": patient_name,
        "birth":        birth,
        "meds_count":   meds_count,
        "probs_count":  probs_count
    }

# 4) Aggregate all into a DataFrame
records = [summarize_ccda(p) for p in xml_paths]
df_ccda_summary = pd.DataFrame.from_records(records)

# 5) Inspect the first few rows
df_ccda_summary.head()

Unnamed: 0,file_name,patient_id,patient_name,birth,meds_count,probs_count
0,005ao54m-c566-7671-19c0-596o1os969x4_055d57c0a...,574626,Jessica Sanchez,19510730,47,47
1,005ao54m-c566-7671-19c0-596o1os969x4_11772d5b7...,1559429,Jessica Jennifer Sanchez,19510730,16,7
2,005ao54m-c566-7671-19c0-596o1os969x4_13a46b10c...,457978,Jessica Jennifer Sanchez,19510730,71,78
3,005ao54m-c566-7671-19c0-596o1os969x4_1557496d7...,1883,Jessica Sanchez,19510730,35,18
4,005ao54m-c566-7671-19c0-596o1os969x4_16e6aea35...,923,Jessica Sanchez,19510730,34,23


In [38]:
# examing HL7 codeSystem OIDs.
CODE_SYSTEM_NDC  = "2.16.840.1.113883.6.69"
CODE_SYSTEM_ICD10 = "2.16.840.1.113883.6.3"

# …later on…
for tr in code_node.findall("hl7:translation", ns):
    if tr.get("codeSystem") == CODE_SYSTEM_NDC:
        ndc = tr.get("code", "")
        break
# …
for tr in val_node.findall("hl7:translation", ns):
    if tr.get("codeSystem") == CODE_SYSTEM_ICD10:
        icd10 = tr.get("code", "")
        break

print(f"Extracted for member {member_id}: RXNorm={rxnorm}, NDC={ndc}")
print(f"Extracted for member {member_id}: SNOMED={snomed}, ICD10={icd10}, Name={problem_name}")

Extracted for member 5916394628: RXNorm=, NDC=
Extracted for member 5916394628: SNOMED=55561003, ICD10=, Name=Active


In [39]:
# Question2:parses CCDA XML documents to extract and normalize medication (RXNorm/NDC) and problem (SNOMED/ICD-10) records for downstream claims enrichment.
class CodeSystem(Enum):
    RXNORM = "2.16.840.1.113883.6.88"
    NDC    = "2.16.840.1.113883.6.69"
    ICD10  = "2.16.840.1.113883.6.3"


class CCDAParser:
    # HL7 namespace
    ns = {"hl7": "urn:hl7-org:v3"}

    def __init__(self, ccda_dir: str):
        self.ccda_dir = ccda_dir

    def parse(self):
        # find all .xml files
        xml_paths = [os.path.join(self.ccda_dir, f)
                     for f in os.listdir(self.ccda_dir)
                     if f.lower().endswith(".xml")]
        if not xml_paths:
            raise FileNotFoundError(f"No XML files in {self.ccda_dir}")

        med_records = []
        prob_records = []

        for path in xml_paths:
            tree = ET.parse(path)
            root = tree.getroot()

            # Extract MemberID
            member_id = self._extract_member_id(root)
            filename = os.path.basename(path)

            # parse medications
            med_records.extend(
                self._parse_medications(root, member_id, filename)
            )

            # parse problems
            prob_records.extend(
                self._parse_problems(root, member_id, filename)
            )

        return med_records, prob_records

    def _extract_member_id(self, root) -> str:
        pr = root.find(".//hl7:recordTarget/hl7:patientRole", self.ns)
        if pr is not None:
            id_node = pr.find("hl7:id", self.ns)
            if id_node is not None:
                return id_node.get("extension", "")
        return ""

    def _parse_medications(self, root, member_id: str, filename: str) -> list:
        records = []
        for med in root.findall(".//hl7:substanceAdministration", self.ns):
            code_node = med.find(".//hl7:code", self.ns)
            rxnorm = code_node.get("code", "") if code_node is not None else ""
            med_name = code_node.get("displayName", "") if code_node is not None else ""

            # collect translations into a dict
            translations = {}
            if code_node is not None:
                for tr in code_node.findall("hl7:translation", self.ns):
                    translations[tr.get("codeSystem")] = tr.get("code")

            # lookup NDC
            ndc = translations.get(CodeSystem.NDC.value, "")
            if not ndc:
                prod = med.find(
                    ".//hl7:manufacturedProduct//hl7:manufacturedMaterial//hl7:code", self.ns
                )
                if prod is not None and prod.get("codeSystem") == CodeSystem.NDC.value:
                    ndc = prod.get("code", "")

            # effective dates
            low = med.find("hl7:effectiveTime/hl7:low", self.ns)
            start = low.get("value", "") if low is not None else ""
            high = med.find("hl7:effectiveTime/hl7:high", self.ns)
            end = high.get("value", "") if high is not None else ""

            # dose
            dose_node = med.find("hl7:doseQuantity", self.ns)
            dose = dose_node.get("value", "") if dose_node is not None else ""

            # record
            records.append({
                "file_name": filename,
                "MemberID": member_id,
                "RXNorm": rxnorm,
                "NDC": ndc,
                "MedicationName": med_name,
                "startDate": start,
                "endDate": end,
                "dose": dose
            })

            print(f"Member {member_id} – RXNorm: {rxnorm!r}, NDC: {ndc!r}")
        return records

    def _parse_problems(self, root, member_id: str, filename: str) -> list:
        records = []
        # locate 'Problems' section
        problem_section = None
        for sec in root.findall(".//hl7:section", self.ns):
            title = sec.findtext("hl7:title", default="", namespaces=self.ns)
            if title.strip().lower() == "problems":
                problem_section = sec
                break
        if problem_section is None:
            return records

        # build content ID -> text lookup
        lookup = {
            c.get("ID"): (c.text or "").strip()
            for c in problem_section.findall(".//hl7:content", self.ns)
            if c.get("ID")
        }

        for obs in problem_section.findall(
            ".//hl7:entryRelationship/hl7:observation", self.ns
        ):
            val_node = obs.find("hl7:value", self.ns)
            snomed = val_node.get("code", "") if val_node is not None else ""

            # collect translations
            translations = {}
            if val_node is not None:
                for tr in val_node.findall("hl7:translation", self.ns):
                    translations[tr.get("codeSystem")] = tr.get("code")
            icd10 = translations.get(CodeSystem.ICD10.value, "")

            # human-readable name
            problem_name = val_node.get("displayName", "") if val_node is not None else ""
            if not problem_name:
                ref = obs.find("hl7:text/hl7:reference", self.ns)
                if ref is not None:
                    rid = ref.get("value", "").lstrip("#")
                    problem_name = lookup.get(rid, "")
            if not problem_name:
                orig = obs.find(".//hl7:originalText/hl7:text", self.ns)
                if orig is not None and orig.text:
                    problem_name = orig.text.strip()

            # onset date
            onset_node = obs.find("hl7:effectiveTime/hl7:low", self.ns)
            onset = onset_node.get("value", "") if onset_node is not None else ""

            records.append({
                "file_name": filename,
                "MemberID": member_id,
                "ProblemCode": snomed,
                "SNOMED": snomed,
                "ICD10": icd10,
                "ProblemName": problem_name,
                "onsetDate": onset
            })

            print(f"Member {member_id} – SNOMED: {snomed!r}, ICD10: {icd10!r}, Name: {problem_name!r}")
        return records


if __name__ == "__main__":
    # Start Spark
    spark = SparkSession.builder \
        .appName("CCDA_Claims_Enrichment") \
        .getOrCreate()

    # Directory containing CCDA XMLs
    ccda_dir = "/content/drive/MyDrive/milliman/ccda_files"

    parser = CCDAParser(ccda_dir)
    med_records, prob_records = parser.parse()

    # Build DataFrames
    df_meds = (
        spark.createDataFrame(med_records)
        if med_records else spark.createDataFrame([], schema=None)
    )
    df_probs = (
        spark.createDataFrame(prob_records)
        if prob_records else spark.createDataFrame([], schema=None)
    )

    # Show samples
    print("Medications sample:")
    df_meds.show(5, truncate=False)

    print("Problems sample with names and codes:")
    df_probs.select(
        "file_name", "MemberID", "ProblemCode", "SNOMED", "ICD10", "ProblemName", "onsetDate"
    ).show(5, truncate=False)


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Member 4951694 – RXNorm: '308135', NDC: ''
Member 4951694 – RXNorm: '617310', NDC: ''
Member 4951694 – RXNorm: '', NDC: ''
Member 4951694 – RXNorm: '905225', NDC: ''
Member 4951694 – RXNorm: '1232082', NDC: ''
Member 4951694 – RXNorm: '', NDC: ''
Member 4951694 – RXNorm: '', NDC: ''
Member 4951694 – RXNorm: '', NDC: ''
Member 4951694 – RXNorm: '', NDC: ''
Member 4951694 – RXNorm: '', NDC: ''
Member 4951694 – RXNorm: '', NDC: ''
Member 4951694 – RXNorm: '309114', NDC: ''
Member 4951694 – RXNorm: '', NDC: ''
Member 4951694 – RXNorm: '113', NDC: ' '
Member 4951694 – RXNorm: '115', NDC: ''
Member 31461 – RXNorm: '', NDC: ''
Member u643609417 – RXNorm: '', NDC: ''
Member u643609417 – RXNorm: '', NDC: ''
Member u643609417 – RXNorm: '76662-6', NDC: ''
Member u643609417 – RXNorm: '519', NDC: ''
Member u643609417 – RXNorm: '76662-6', NDC: ''
Member u643609417 – RXNorm: '6058', NDC: ''
Member u643609417 – RXNorm: '76662-6', NDC: ''

In [40]:
# # Combined CCDA parsing for medication and problems

# # Starting a Spark session
# spark = SparkSession.builder.appName("CCDA_Claims_Enrichment").getOrCreate()

# # Part 1: Parse CCDA files
# ccda_dir = "/content/drive/MyDrive/milliman/ccda_files"
# xml_paths = [os.path.join(ccda_dir, f)
#              for f in os.listdir(ccda_dir)
#              if f.lower().endswith(".xml")]
# if not xml_paths:
#     raise FileNotFoundError("No XML files in " + ccda_dir)

# ns = {"hl7": "urn:hl7-org:v3"}
# med_records = []
# prob_records = []

# for path in xml_paths:
#     tree = ET.parse(path)
#     root = tree.getroot()

#     # Extract MemberID
#     member_id = ""
#     pr = root.find(".//hl7:recordTarget/hl7:patientRole", ns)
#     if pr is not None:
#         id_node = pr.find("hl7:id", ns)
#         if id_node is not None:
#             member_id = id_node.get("extension", "")

#     # Medications domain
#     for med in root.findall(".//hl7:substanceAdministration", ns):
#         code_node = med.find(".//hl7:code", ns)
#         rxnorm = code_node.get("code", "") if code_node is not None else ""
#         med_name = code_node.get("displayName", "") if code_node is not None else ""
#         # NDC translation
#         ndc = ""
#         if code_node is not None:
#             for tr in code_node.findall("hl7:translation", ns):
#                 if tr.get("codeSystem") == "2.16.840.1.113883.6.69":
#                     ndc = tr.get("code", "")
#                     break
#         if not ndc:
#             prod = med.find(".//hl7:manufacturedProduct//hl7:manufacturedMaterial//hl7:code", ns)
#             if prod is not None and prod.get("codeSystem") == "2.16.840.1.113883.6.69":
#                 ndc = prod.get("code", "")
#         # Effective dates
#         start = ""
#         low_node = med.find("hl7:effectiveTime/hl7:low", ns)
#         if low_node is not None:
#             start = low_node.get("value", "")
#         end = ""
#         high_node = med.find("hl7:effectiveTime/hl7:high", ns)
#         if high_node is not None:
#             end = high_node.get("value", "")
#         # Dose quantity
#         dose = ""
#         dose_node = med.find("hl7:doseQuantity", ns)
#         if dose_node is not None:
#             dose = dose_node.get("value", "")
#         med_records.append({
#             "file_name": os.path.basename(path),
#             "MemberID": member_id,
#             "RXNorm": rxnorm,
#             "NDC": ndc,
#             "MedicationName": med_name,
#             "startDate": start,
#             "endDate": end,
#             "dose": dose
#         })
#         # inside your medications loop, after you set rxnorm and ndc:
#         print(f"Member {member_id} – RXNorm: {rxnorm!r},   NDC: {ndc!r}")


#     # Problems domain
#     problem_section = None
#     for sec in root.findall(".//hl7:section", ns):
#         title_text = sec.findtext("hl7:title", default="", namespaces=ns)
#         if title_text.strip().lower() == "problems":
#             problem_section = sec
#             break
#     if problem_section is None:
#         continue

#     # Build lookup of content ID to human name
#     lookup = {c.get("ID"): (c.text or "").strip()
#               for c in problem_section.findall(".//hl7:content", ns)
#               if c.get("ID")}

#     # Extract each problem observation
#     for obs in problem_section.findall(".//hl7:entryRelationship/hl7:observation", ns):
#         val_node = obs.find("hl7:value", ns)
#         # SNOMED code
#         snomed = val_node.get("code", "") if val_node is not None else ""
#         # ICD-10 translation
#         icd10 = ""
#         if val_node is not None:
#             for tr in val_node.findall("hl7:translation", ns):
#                 if tr.get("codeSystem") == "2.16.840.1.113883.6.3":
#                     icd10 = tr.get("code", "")
#                     break
#         # Prepare problem name: displayName, table lookup, or originalText
#         problem_name = val_node.get("displayName", "") if val_node is not None else ""
#         if not problem_name:
#             ref = obs.find("hl7:text/hl7:reference", ns)
#             if ref is not None:
#                 rid = ref.get("value", "").lstrip("#")
#                 problem_name = lookup.get(rid, "")
#         if not problem_name:
#             orig = obs.find(".//hl7:originalText/hl7:text", ns)
#             if orig is not None and orig.text:
#                 problem_name = orig.text.strip()
#         # Onset date
#         onset = ""
#         onset_node = obs.find("hl7:effectiveTime/hl7:low", ns)
#         if onset_node is not None:
#             onset = onset_node.get("value", "")
#         # Append record including both ProblemCode (SNOMED) and human name
#         prob_records.append({
#             "file_name": os.path.basename(path),
#             "MemberID": member_id,
#             "ProblemCode": snomed,
#             "SNOMED": snomed,
#             "ICD10": icd10,
#             "ProblemName": problem_name,
#             "onsetDate": onset
#         })
#         # inside your problems loop, after you’ve computed snomed, icd10 and problem_name:
#         print(f"Member {member_id} – SNOMED: {snomed!r}, ICD10: {icd10!r},  Name: {problem_name!r}")

# # Create Spark DataFrames
# if med_records:
#     df_meds = spark.createDataFrame(med_records)
# else:
#     df_meds = spark.createDataFrame([], schema=None)
# if prob_records:
#     df_probs = spark.createDataFrame(prob_records)
# else:
#     df_probs = spark.createDataFrame([], schema=None)

# # Display samples for verification
# print("Medications sample:")
# df_meds.show(5, truncate=False)
# print("Problems sample with names and codes:")
# df_probs.select(
#     "file_name", "MemberID", "ProblemCode", "SNOMED", "ICD10", "ProblemName", "onsetDate"
# ).show(5, truncate=False)


In [41]:
# QUESTION 3: Load claims files
claims_df = spark.read.csv(
    "/content/drive/MyDrive/milliman/data_engineer_exam_claims_final.csv",
    header=True, inferSchema=True
)
rx_claims_df = spark.read.csv(
    "/content/drive/MyDrive/milliman/data_engineer_exam_rx_final.csv",
    header=True, inferSchema=True
)

# Cast join keys to string
# Medications: use NDC extracted from CCDA
df_meds = df_meds.withColumn("MemberID", col("MemberID").cast("string")) \
                 .withColumn("NDC", col("NDC").cast("string"))
# Rx claims also on NDC
rx_claims_df = rx_claims_df.withColumn("MemberID", col("MemberID").cast("string")) \
                             .withColumn("NDC", col("NDC").cast("string"))

# Problems: use ICD10 code extracted from CCDA
df_probs = df_probs.withColumn("MemberID", col("MemberID").cast("string")) \
                   .withColumn("ICD10", col("ICD10").cast("string"))
# Diagnosis claims use ICDD iag1 column
claims_df = claims_df.withColumn("MemberID", col("MemberID").cast("string")) \
                     .withColumn("ICDDiag1", col("ICDDiag1").cast("string"))

# Join Medications on NDC
merged_meds = df_meds.join(
    rx_claims_df,
    (df_meds.MemberID == rx_claims_df.MemberID) &
    (df_meds.NDC == rx_claims_df.NDC),
    how="left"
).drop(rx_claims_df.MemberID)

# Join Problems on ICD10 matching ICDD iag1
merged_probs = df_probs.join(
    claims_df,
    (df_probs.MemberID == claims_df.MemberID) &
    (df_probs.ICD10 == claims_df.ICDDiag1),
    how="left"
).drop(claims_df.MemberID) \
 .drop(claims_df.ICDDiag1)

# Inspect joined results
print("=== Medications + Rx Claims Sample ===")
merged_meds.show(5, truncate=False)

print("=== Problems + Dx Claims Sample ===")
merged_probs.show(5, truncate=False)

# Match-rate metrics
# total_meds = df_meds.count()
# matched_meds = merged_meds.filter(col("ClaimID").isNotNull()).count()
# print("Matched {} / {} medication rows.".format(matched_meds, total_meds))

# total_probs = df_probs.count()
# matched_probs = merged_probs.filter(col("ClaimID").isNotNull()).count()
# print("Matched {} / {} problem rows.".format(matched_probs, total_probs))


=== Medications + Rx Claims Sample ===
+------------------------------------------+----------+---+------+-----+--------------+----------------------------------------------------------------------------------------+--------------+--------------+-------+----------+--------+--------+----+-------------+--------------+----------+---------+--------------+-------------+------+-------+----+----+-----+-----------+----------+----------+-----+----------+-----------------+----+------+----+----------+-------+-------+-----------+------+------+----+-------------+----------------+---------------+------------+--------+-----------+-------+-----------+------------+--------+-------+-----------+-------------+--------+----------------+------------------------+-------------------+---------+----------+-------+--------+--------------+-------------+-------------+----------+------------+--------------+--------------+---------------+----------------+-------------+----------------+---------------------+----------

In [42]:
# populating mergedmeds ==

df_meds = df_meds.withColumnRenamed("NDC", "MedicationCode")

# now join MedicationCode ←→ NDC in the claims DF
merged_meds = df_meds.join(
    rx_claims_df,
    (df_meds.MemberID       == rx_claims_df.MemberID) &
    (df_meds.MedicationCode == rx_claims_df.NDC),
    how="left"
).drop(rx_claims_df.MemberID)

merged_meds.show(5, truncate=False)

+------------------------------------------+----------+--------------+------+-----+--------------+----------------------------------------------------------------------------------------+--------------+--------------+-------+----------+--------+--------+----+-------------+--------------+----------+---------+--------------+-------------+------+-------+----+----+-----+-----------+----------+----------+-----+----------+-----------------+----+------+----+----------+-------+-------+-----------+------+------+----+-------------+----------------+---------------+------------+--------+-----------+-------+-----------+------------+--------+-------+-----------+-------------+--------+----------------+------------------------+-------------------+---------+----------+-------+--------+--------------+-------------+-------------+----------+------------+--------------+--------------+---------------+----------------+-------------+----------------+---------------------+---------------------+----------------

In [43]:
### populating MergeProblems

# — make sure spark session is available —
spark = SparkSession.builder.getOrCreate()

# 1) Cast to string to avoid type mismatches
df_probs = df_probs \
    .withColumn("MemberID",    col("MemberID").cast("string")) \
    .withColumn("ProblemCode", col("ProblemCode").cast("string"))

claims_df = claims_df \
    .withColumn("MemberID", col("MemberID").cast("string"))

# 2) Build an array of all your ICD diagnosis columns
diag_cols = [f"ICDDiag{i}" for i in range(1, 31)]
claims_df = claims_df.withColumn(
    "diag_codes",
    array(*[col(c).cast("string") for c in diag_cols])
)

# 3) Join on both MemberID and ProblemCode ∈ diag_codes
merged_probs = df_probs.join(
    claims_df,
    (df_probs.MemberID == claims_df.MemberID) &
    (array_contains(claims_df.diag_codes, df_probs.ProblemCode)),
    how="left"
).drop(claims_df.MemberID, "diag_codes")

# 4) Inspect
print("=== Problems joined to Dx Claims (code‐level) ===")
merged_probs.show(5, truncate=False)

=== Problems joined to Dx Claims (code‐level) ===
+-----+--------+-----------+---------------------------------------------+---------+----------------------------------------------------------------------------------------+---------+--------------+-------+-------+----------+--------+------+---------+---------+--------+-------+-----+--------+---------+------+----+------------+---------+-------------+----------+--------+-----------+---------+------+-------+----+----+-----+-----------+----------+----------+----+-----+----------+---------------+---------+--------+--------+--------+--------+--------+--------+--------+--------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+--------+--------+--------+---------+---------+---------+---------+---------+---------+---------+-

In [44]:
# Step 4: Save to Parquet for Databricks Ingestion
# Save outputs
# Define output directory
output_dir = "/content/drive/MyDrive/milliman/processed_data"

# Save as Parquet (preferred for Databricks)
merged_meds.write.mode("overwrite").parquet(f"{output_dir}/merged_medications.parquet")
merged_probs.write.mode("overwrite").parquet(f"{output_dir}/merged_problems.parquet")

# Optionally save as CSV (less efficient but human-readable for demo purpose_
merged_meds.write.mode("overwrite").option("header", True).csv(f"{output_dir}/merged_medications_csv")
merged_probs.write.mode("overwrite").option("header", True).csv(f"{output_dir}/merged_problems_csv")

#view first rows of both df
merged_meds.show(5)
merged_probs.show(5)

+--------------------+----------+--------------+------+-----+--------------+--------------------+--------------+--------------+-------+----------+--------+--------+----+-------------+--------------+----------+---------+--------------+-------------+------+-------+----+----+-----+-----------+----------+----------+-----+----------+-----------------+----+------+----+----------+-------+-------+-----------+------+------+----+-------------+----------------+---------------+------------+--------+-----------+-------+-----------+------------+--------+-------+-----------+-------------+--------+----------------+------------------------+-------------------+---------+----------+-------+--------+--------------+-------------+-------------+----------+------------+--------------+--------------+---------------+----------------+-------------+----------------+---------------------+---------------------+---------------------------+----+----+
|      MedicationName|  MemberID|MedicationCode|RXNorm| dose|      

In [None]:
# Wrote partitioned Parquet files for Databricks Auto Loader assuming that output files are stored on s3 bucket
"I wrote partitioned Parquet files by year/month/day so Databricks Auto Loader can pick them up automatically."
def write_partitioned_output(df, output_path):
    from pyspark.sql.functions import year, month, dayofmonth

    (df.write
       .mode("overwrite")
       .partitionBy(year(col("startDate")).alias("year"),
                    month(col("startDate")).alias("month"),
                    dayofmonth(col("startDate")).alias("day"))
       .parquet(output_path))

output_path = "s3://<bucket>/ccda_output/"
write_partitioned_output(df_meds, output_path)

In [45]:
#### optional Validation####3

# Validationcriteria: Filter and inspect medication codes based on length
# Filter valid medication codes (length > 2)
# Filter valid medication codes (length > 2)
valid_meds = merged_meds.filter(length("MedicationCode") > 2)

# Show valid medication codes
print("Valid Medication Codes (length > 2):")
valid_meds.show(10, truncate=False)

# Filter invalid medication codes (length <= 2)
invalid_meds = merged_meds.filter(length("MedicationCode") <= 2)

# Show invalid medication codes
print("Invalid Medication Codes (length <= 2):")
invalid_meds.show(10, truncate=False)

Valid Medication Codes (length > 2):
+------------------------------------------------------------------------------------------------------------+--------+--------------+-------+----+-------+----------------------------------------------------------------------------------------+---------+--------------+-------+----------+--------+--------+----+-------------+--------------+----------+---------+--------------+-------------+------+-------+----+----+-----+-----------+----------+----------+-----+----------+-----------------+----+------+----+----------+-------+-------+-----------+------+------+----+-------------+----------------+---------------+------------+--------+-----------+-------+-----------+------------+--------+-------+-----------+-------------+--------+----------------+------------------------+-------------------+---------+----------+-------+--------+--------------+-------------+-------------+----------+------------+--------------+--------------+---------------+----------------+--

In [46]:
# FHIR Mapping########
### MedicationStatement Mapping
meds_fhir = valid_meds \
    .withColumnRenamed("MemberID", "patient_id") \
    .withColumnRenamed("MedicationCode", "medication_code") \
    .withColumnRenamed("MedicationName", "medication_display") \
    .withColumn("subject_reference", concat(lit("Patient/"), col("patient_id"))) \
    .withColumn("resource_type", lit("MedicationStatement")) \
    .select("resource_type", "subject_reference", "medication_code", "medication_display")

# Condition Mapping
probs_fhir = merged_probs \
    .withColumnRenamed("MemberID", "patient_id") \
    .withColumnRenamed("ProblemCode", "condition_code") \
    .withColumnRenamed("ProblemName", "condition_display") \
    .withColumn("subject_reference", concat(lit("Patient/"), col("patient_id"))) \
    .withColumn("resource_type", lit("Condition")) \
    .select("resource_type", "subject_reference", "condition_code", "condition_display")

# Write to disk
meds_fhir.write.mode("overwrite").parquet("/content/drive/MyDrive/milliman/output/fhir_medicationstatement")
probs_fhir.write.mode("overwrite").parquet("/content/drive/MyDrive/milliman//output/fhir_condition")

# Show first 5 rows of medication data
print("First 5 rows of medication statements:")
meds_fhir.show(5, truncate=False)

# Show first 5 rows of condition data
print("First 5 rows of conditions:")
probs_fhir.show(5, truncate=False)

First 5 rows of medication statements:
+-------------------+-----------------+---------------+------------------------------------------------------------------------------------------------------------+
|resource_type      |subject_reference|medication_code|medication_display                                                                                          |
+-------------------+-----------------+---------------+------------------------------------------------------------------------------------------------------------+
|MedicationStatement|Patient/819950   |68180072103    |amlodipine 10 MG Oral Tablet                                                                                |
|MedicationStatement|Patient/819950   |23155000301    |hydralazine hydrochloride 50 MG Oral Tablet                                                                 |
|MedicationStatement|Patient/819950   |47781030301    |nitrofurantoin, macrocrystals 25 MG / nitrofurantoin, monohydrate 75 MG Oral Caps

In [47]:
# Unit Tests for Clinical Data Pipeline (Step 8: Optional)
overview_path = "/content/drive/MyDrive/milliman/data_overview.csv"

overview_df = spark.read.csv(overview_path, header=True, inferSchema=True)


# Raw Claims Data Quality Tests

# Checks that all ClaimID values in the claims file are non-null
def test_claimid_not_null(spark, df):
    print("First 5 rows of claims data:")
    df.show(5, truncate=False)

    null_count = df.filter(col("ClaimID").isNull()).count()
    assert null_count == 0, f"Found {null_count} null ClaimID(s)"


# Checks that all MemberID values in a given DataFrame are non-null
def test_memberid_not_null(spark, df, name="claims"):
    print(f"First 5 rows of {name} data:")
    df.show(5, truncate=False)

    null_count = df.filter(col("MemberID").isNull()).count()
    assert null_count == 0, f"Found {null_count} null MemberID(s) in {name} data"


# Validates that all NDC codes in Rx claims follow a numeric pattern (at least 5 digits)
def test_ndc_format(rx_df):
    print("First 5 rows of Rx data:")
    rx_df.show(5, truncate=False)

    bad_ndc = rx_df.filter(~col("NDC").rlike("^[0-9]{5,}$"))
    count = bad_ndc.count()
    assert count == 0, f"{count} Rx claims have invalid NDC format"


# Ensures the overview file has the expected metadata columns
def test_overview_file_loaded(df):
    print("First 5 rows of overview file:")
    df.show(5, truncate=False)

    required_columns = ["File_Name", "Type", "Patient_Identifier_Information"]
    for col_name in required_columns:
        assert col_name in df.columns, f"Missing column: {col_name}"


# Referential Integrity Test

# Ensures all files listed in the overview file are present in the actual inputs
def test_referential_integrity_overview(overview_df, actual_files):
    # Filter out rows with invalid or descriptive text instead of actual file names
    overview_files = (
        overview_df
        .select("File_Name")
        .dropna()
        .rdd
        .map(lambda row: row.File_Name.strip())
        .filter(lambda name: name.endswith(".csv") or name.endswith(".xml"))
        .collect()
    )

    missing = [f for f in overview_files if f not in actual_files]

    print("Overview files listed:", overview_files)
    print("Actual input files:", actual_files)

    assert len(missing) == 0, f"Files listed in overview but not found: {missing}"



# Date Field Logic Test

# Validates that FromDate is not after ToDate in the claims file
def test_date_order(claims_df):
    print("Validating FromDate < ToDate in claims data...")

    df = claims_df \
        .withColumn("FromDateParsed", to_date(col("FromDate"), "M/d/yyyy")) \
        .withColumn("ToDateParsed", to_date(col("ToDate"), "M/d/yyyy"))

    bad_dates = df.filter(col("FromDateParsed") > col("ToDateParsed"))
    bad_dates.show(5, truncate=False)

    count = bad_dates.count()
    assert count == 0, f"Found {count} records where FromDate is after ToDate"


# Run All Tests

print("Running extended tests on raw input files...\n")

# Run each test with comments printed inline
test_claimid_not_null(spark, claims_df)
test_memberid_not_null(spark, claims_df, name="claims")
test_memberid_not_null(spark, rx_claims_df, name="rx_claims")
test_ndc_format(rx_claims_df)
test_overview_file_loaded(overview_df)

# Provide actual filenames used in your notebook for referential check
actual_file_names = [
    "ccda_pre_signed_urls.csv",
    "data_engineer_exam_claims_final.csv",
    "data_engineer_exam_rx_final.csv"
]
test_referential_integrity_overview(overview_df, actual_file_names)

# Validate dates
test_date_order(claims_df)

print("\nAll extended data quality tests passed.")


Running extended tests on raw input files...

First 5 rows of claims data:
+--------------+---------------+-------+----------+------------------------------------+---------+---------+---------+---------+----------+-------+-----+--------+---------+------+---+------------+---------+-------------+-------------+--------+-----------+---------+------+-------+------+---+-----+-----------+----------+----------+----+-----+----------+---------------+---------+--------+--------+--------+--------+--------+--------+--------+--------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+--------+--------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----

In [None]:
# To set up CI/CD & Testing via github actions

# .github/workflows/ci.yml
name: CI
on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]
jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.9'
      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install pytest
      - name: Run tests
        run: pytest --maxfail=1 --disable-warnings -q

# .github/workflows/ci.yml
name: CI
on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]
jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.9'
      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install pytest
      - name: Run tests
        run: pytest --maxfail=1 --disable-warnings -q

In [None]:
## Step 7 – Orchestration (Assuming we are using Airflow DAG  )

# airflow_dags/ccda_pipeline.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime

with DAG(
    'ccda_pipeline',
    start_date=datetime(2025, 1, 1),
    schedule_interval='@daily',
    catchup=False
) as dag:

    t1 = BashOperator(
        task_id='download_ccda',
        bash_command='python scripts/download_ccda.py --output_dir /data/ccda'
    )

    t2 = SparkSubmitOperator(
        task_id='parse_medications',
        application='notebooks/2_parse_medications.py'
    )

    t3 = SparkSubmitOperator(
        task_id='parse_problems',
        application='notebooks/3_parse_problems.py'
    )

    t4 = SparkSubmitOperator(
        task_id='transform_and_join',
        application='scripts/transform_join.py'
    )

    t5 = BashOperator(
        task_id='upload_to_s3',
        bash_command='bash scripts/upload_to_s3.sh /data/output s3://<your-bucket>/ccda_output'
    )

    t1 >> [t2, t3] >> t4 >> t5