In [0]:
# Collect Client Name and Client Id

dbutils.widgets.removeAll()

dbutils.widgets.text("client_name", "", "CDAP Client Name")
# dbutils.widgets.text("clientId", "", "Carehub Client ID")
# dbutils.widgets.text("sponsor", "", "Plan Sponsor")
# dbutils.widgets.text("clientCode", "", "Client Code")

client_name = dbutils.widgets.get("client_name")
# clientId = dbutils.widgets.get("clientId")
# sponsor = dbutils.widgets.get("sponsor")
# ClientCode = dbutils.widgets.get("clientCode")

domain = "PlanSponsorClaims"
nRows = 20

In [0]:
import os
import re
import pandas as pd
import numpy as np
from datetime import datetime
from pyspark.sql.functions import col, date_format, to_date
from databricks.sdk.runtime import dbutils
import glob
import random
from faker import Faker

current_dir = "/Workspace/Users/kayley.lutzer@edhc.com/Bug Fixes/data-platform/data"

In [0]:
sql_server_name = dbutils.secrets.get(scope="ETL-KeyVault", key="sql-etl-cdap-server")
sql_server_database = dbutils.secrets.get(scope="ETL-KeyVault", key="sql-etl-cdap-db")
sql_server_user_name = dbutils.secrets.get(scope="ETL-KeyVault", key="sql-etl-cdap-user")
sql_server_password = dbutils.secrets.get(scope="ETL-KeyVault", key="sql-etl-cdap-pwd")

jdbc_url = (f"jdbc:sqlserver://{sql_server_name}.database.windows.net:1433;"
                f"database={sql_server_database};"
                f"user={sql_server_user_name}@{sql_server_name};"
                f"password={sql_server_password};"
                f"encrypt=true;trustServerCertificate=false;"
                f"hostNameInCertificate=*.database.windows.net;loginTimeout=30;")


In [0]:
query = f"""
                        select 
                        ic.IngestionConfigId,d.DomainId,d.DomainName as domain,c.DAPClientName as client 
                        ,c.ClientKey,ps.PlanSponsorName,pp.PreprocessorName 
                        ,ipt.GroupName,ipt.SubGroupName 
                        ,ipt.ParamName,ip.ParamValue
                        from CDAP.IngestionConfig ic 
                        join CDAP.DomainClient dc on (ic.DomainClientId = dc.DomainClientId and dc.isactive=1) 
                        join CDAP.ClientSponsor cs on (ic.ClientSponsorId = cs.ClientSponsorId and cs.isactive=1) 
                        join CDAP.DomainSponsor ds on (ic.DomainSponsorId = ds.DomainSponsorId and ds.isactive=1) 
                        join CDAP.Preprocessor pp on (ic.PreprocessorId = pp.PreprocessorId and pp.isactive=1) 
                        join CDAP.Domain d on (d.DomainId = dc.DomainId and dc.isactive=1 and d.isactive=1) 
                        join CDAP.Client c on (c.ClientKey = dc.ClientKey and dc.isactive=1 and c.isactive=1) 
                        join CDAP.PlanSponsor ps on (ps.PlanSponsorId = cs.PlanSponsorId and cs.isactive=1 and ps.isactive=1) 
                        join CDAP.IngestionConfigParameter icp on (icp.IngestionConfigId = ic.IngestionConfigId and icp.IsActive=1) 
                        join CDAP.IngestionParameter ip on (icp.ParameterId = ip.ParameterId and ip.IsActive=1) 
                        join CDAP.IngestionParameterType ipt on (ip.ParameterTypeId = ipt.ParameterTypeId and ipt.IsActive=1) 
                        where ic.isactive=1 
                        and d.DomainName = (case when '{domain}'='None' then d.DomainName else '{domain}' end)
                        and c.DAPClientName = (case when '{client_name}'='None' then c.DAPClientName else '{client_name}' end)
                        """

result_df = spark.read.format("jdbc") \
                .option("url", jdbc_url) \
                .option("query", query) \
                .option("user", sql_server_user_name) \
                .option("password", sql_server_password) \
                .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
                .load()

# Convert the DataFrame to a list of dictionaries
result = result_df.collect()
config_data = {"domains":[]}
for row in result:
    row_dict = row.asDict()
    if row_dict.get("domain") not in config_data["domains"]:
        config_data["domains"].append(row_dict.get("domain"))
    if row_dict.get("GroupName") not in config_data.keys():
        config_data[row_dict.get("GroupName")] = {}
    if row_dict.get("domain") not in config_data[row_dict.get("GroupName")].keys():
        config_data[row_dict.get("GroupName")][row_dict.get("domain")] = {}
    if row_dict.get("client") not in config_data[row_dict.get("GroupName")][row_dict.get("domain")].keys():
        config_data[row_dict.get("GroupName")][row_dict.get("domain")][row_dict.get("client")] = {}
        config_data[row_dict.get("GroupName")][row_dict.get("domain")][row_dict.get("client")]["ingestion_config_id"] = row_dict.get("IngestionConfigId")
        config_data[row_dict.get("GroupName")][row_dict.get("domain")][row_dict.get("client")]["domain_id"] = row_dict.get("DomainId")
        config_data[row_dict.get("GroupName")][row_dict.get("domain")][row_dict.get("client")]["domain"] = row_dict.get("domain")
        config_data[row_dict.get("GroupName")][row_dict.get("domain")][row_dict.get("client")]["client_id"] = row_dict.get("ClientKey")
        config_data[row_dict.get("GroupName")][row_dict.get("domain")][row_dict.get("client")]["client"] = row_dict.get("client")
        config_data[row_dict.get("GroupName")][row_dict.get("domain")][row_dict.get("client")]["plan_sponsor_name"] = row_dict.get("PlanSponsorName")
    if row_dict.get("SubGroupName") is not None and len(row_dict.get("SubGroupName")) > 0:
        if row_dict.get("SubGroupName") not in config_data[row_dict.get("GroupName")][row_dict.get("domain")][row_dict.get("client")].keys():
            config_data[row_dict.get("GroupName")][row_dict.get("domain")][row_dict.get("client")][row_dict.get("SubGroupName")] = {}
        config_data[row_dict.get("GroupName")][row_dict.get("domain")][row_dict.get("client")][row_dict.get("SubGroupName")][row_dict.get("ParamName")] = row_dict.get("ParamValue")
    else:
        config_data[row_dict.get("GroupName")][row_dict.get("domain")][row_dict.get("client")][row_dict.get("ParamName")] = row_dict.get("ParamValue")

if config_data['domains'] == []:
    dbutils.notebook.exit("client doesn't exist")

In [0]:
client_carehub_name = config_data['domain_configurations'][domain][client_name].get('client_name')
clientId = spark.sql(f"select ClientId from etl_dev.dev_structured.lntrninternal_carehub_client_unified where Name = '{client_carehub_name}'").collect()[0].ClientId
print(clientId)

[0;31m---------------------------------------------------------------------------[0m
[0;31mIndexError[0m                                Traceback (most recent call last)
File [0;32m<command-5403996148952337>, line 2[0m
[1;32m      1[0m client_carehub_name [38;5;241m=[39m config_data[[38;5;124m'[39m[38;5;124mdomain_configurations[39m[38;5;124m'[39m][domain][client_name][38;5;241m.[39mget([38;5;124m'[39m[38;5;124mclient_name[39m[38;5;124m'[39m)
[0;32m----> 2[0m clientId [38;5;241m=[39m spark[38;5;241m.[39msql([38;5;124mf[39m[38;5;124m"[39m[38;5;124mselect ClientId from etl_dev.dev_structured.lntrninternal_carehub_client_unified where Name = [39m[38;5;124m'[39m[38;5;132;01m{[39;00mclient_carehub_name[38;5;132;01m}[39;00m[38;5;124m'[39m[38;5;124m"[39m)[38;5;241m.[39mcollect()[[38;5;241m0[39m][38;5;241m.[39mClientId
[1;32m      3[0m [38;5;28mprint[39m(clientId)

[0;31mIndexError[0m: list index out of range

In [0]:
output_format = config_data['reading_configurations'][domain][client_name].get('format')
sep = config_data['reading_configurations'][domain][client_name]['read_kwargs'].get('sep')
date_formats = config_data['mapping_configurations'][domain][client_name].get('dateformat').split(',')
file_name_date_format = config_data['domain_configurations'][domain][client_name].get('file_name_date_format')
file_name_date_regex_pattern = config_data['domain_configurations'][domain][client_name].get('file_name_date_regex_pattern')
include_header = config_data['reading_configurations'][domain][client_name]['read_kwargs'].get('header')
if include_header == 'Y':
    include_header = True
elif include_header == 'N':
    include_header = False

[0;31m---------------------------------------------------------------------------[0m
[0;31mIndexError[0m                                Traceback (most recent call last)
File [0;32m<command-5403996148952337>, line 2[0m
[1;32m      1[0m client_carehub_name [38;5;241m=[39m config_data[[38;5;124m'[39m[38;5;124mdomain_configurations[39m[38;5;124m'[39m][domain][client_name][38;5;241m.[39mget([38;5;124m'[39m[38;5;124mclient_name[39m[38;5;124m'[39m)
[0;32m----> 2[0m clientId [38;5;241m=[39m spark[38;5;241m.[39msql([38;5;124mf[39m[38;5;124m"[39m[38;5;124mselect ClientId from etl_dev.dev_structured.lntrninternal_carehub_client_unified where Name = [39m[38;5;124m'[39m[38;5;132;01m{[39;00mclient_carehub_name[38;5;132;01m}[39;00m[38;5;124m'[39m[38;5;124m"[39m)[38;5;241m.[39mcollect()[[38;5;241m0[39m][38;5;241m.[39mClientId
[1;32m      3[0m [38;5;28mprint[39m(clientId)

[0;31mIndexError[0m: list index out of range

In [0]:
# Read LAYOUT and collect headers 

query = f"""
                select TOP 1000 cd.columnname as source_column, tc.columnorder,
                (case when cd.datatype='string' then 'varchar' else cd.datatype end) as data_type, 
                (case when tc.isnullable=1 then 0 else 1 end) as required,
                tc.StartPosition as start_position,
                tc.EndPosition as end_position
                from 
                cdap.ingestionconfig ic
                join cdap.DomainClient dc on (ic.DomainClientId = dc.DomainClientId)
                join cdap.DomainSponsor ds on (ic.DomainSponsorId = ds.DomainSponsorId)
                join cdap.ClientSponsor cs on (ic.ClientSponsorId = cs.ClientSponsorId)
                join cdap.Domain d on (d.domainid = ds.DomainId)
                join cdap.Client c on (c.clientkey = dc.ClientKey)
                --
                join cdap.columnmapping cm on (cm.ingestionconfigid = ic.ingestionconfigid)
                join cdap.tablecolumn tc on (tc.tablecolumnid = cm.sourcetablecolumnid)
                join cdap.columndetail cd on (tc.columndetailid = cd.columndetailid)
                join cdap.zonetable zt on (zt.tableid = tc.tableid)
                join cdap.ProcessingZone z on (z.zoneid = zt.zoneid)
                -- where z.name = 'prep' and ic.isactive=1 
                where z.name = 'raw' and ic.isactive=1 
                and d.DomainName = '{domain}'
                and c.DAPClientName = '{client_name}'
                order by tc.columnorder
            """
# Execute the query using sql_server_operation
#source_layout = self.spark.sql(query)
layout = spark.read.format("jdbc") \
                .option("url", jdbc_url) \
                .option("query", query) \
                .option("user", sql_server_user_name) \
                .option("password", sql_server_password) \
                .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
                .load()

# Convert the DataFrame to a list of dictionaries
layout_df = [row.asDict() for row in layout.collect()]
layout_has_underscore = layout.filter(layout['source_column'].contains('_')).count() > 0

[0;31m---------------------------------------------------------------------------[0m
[0;31mIndexError[0m                                Traceback (most recent call last)
File [0;32m<command-5403996148952337>, line 2[0m
[1;32m      1[0m client_carehub_name [38;5;241m=[39m config_data[[38;5;124m'[39m[38;5;124mdomain_configurations[39m[38;5;124m'[39m][domain][client_name][38;5;241m.[39mget([38;5;124m'[39m[38;5;124mclient_name[39m[38;5;124m'[39m)
[0;32m----> 2[0m clientId [38;5;241m=[39m spark[38;5;241m.[39msql([38;5;124mf[39m[38;5;124m"[39m[38;5;124mselect ClientId from etl_dev.dev_structured.lntrninternal_carehub_client_unified where Name = [39m[38;5;124m'[39m[38;5;132;01m{[39;00mclient_carehub_name[38;5;132;01m}[39;00m[38;5;124m'[39m[38;5;124m"[39m)[38;5;241m.[39mcollect()[[38;5;241m0[39m][38;5;241m.[39mClientId
[1;32m      3[0m [38;5;28mprint[39m(clientId)

[0;31mIndexError[0m: list index out of range

In [0]:
display(layout_df)

[0;31m---------------------------------------------------------------------------[0m
[0;31mIndexError[0m                                Traceback (most recent call last)
File [0;32m<command-5403996148952337>, line 2[0m
[1;32m      1[0m client_carehub_name [38;5;241m=[39m config_data[[38;5;124m'[39m[38;5;124mdomain_configurations[39m[38;5;124m'[39m][domain][client_name][38;5;241m.[39mget([38;5;124m'[39m[38;5;124mclient_name[39m[38;5;124m'[39m)
[0;32m----> 2[0m clientId [38;5;241m=[39m spark[38;5;241m.[39msql([38;5;124mf[39m[38;5;124m"[39m[38;5;124mselect ClientId from etl_dev.dev_structured.lntrninternal_carehub_client_unified where Name = [39m[38;5;124m'[39m[38;5;132;01m{[39;00mclient_carehub_name[38;5;132;01m}[39;00m[38;5;124m'[39m[38;5;124m"[39m)[38;5;241m.[39mcollect()[[38;5;241m0[39m][38;5;241m.[39mClientId
[1;32m      3[0m [38;5;28mprint[39m(clientId)

[0;31mIndexError[0m: list index out of range

In [0]:
query = f"""
                    select fcd.columnname as source_column_name, tcd.datatype
                    , cm.svmrule as value_mapping, tcd.columnname as target_column_name
                    from 
                    cdap.columnmapping cm
                    join cdap.tablecolumn ftc on (cm.sourcetablecolumnid = ftc.tablecolumnid and ftc.IsActive=1)
                    join cdap.tablecolumn ttc on (cm.targettablecolumnid = ttc.tablecolumnid and ttc.IsActive=1)
                    join cdap.columndetail fcd on (ftc.columndetailid = fcd.columndetailid and fcd.IsActive=1)
                    join cdap.columndetail tcd on (ttc.columndetailid = tcd.columndetailid and tcd.IsActive=1)
                    join cdap.zonetable fzt on (ftc.tableid = fzt.tableid)
                    join cdap.zonetable tzt on (ttc.tableid = tzt.tableid)
                    join cdap.processingzone fz on (fzt.zoneid = fz.zoneid)
                    join cdap.processingzone tz on (tzt.zoneid = tz.zoneid)
                    --
                    join cdap.ingestionconfig ic on (ic.IngestionConfigId = cm.IngestionConfigId)
                    join cdap.DomainClient dc on (ic.DomainClientId = dc.DomainClientId)
                    join cdap.DomainSponsor ds on (ic.DomainSponsorId = ds.DomainSponsorId)
                    join cdap.ClientSponsor cs on (ic.ClientSponsorId = cs.ClientSponsorId)
                    join cdap.Domain d on (d.domainid = ds.DomainId)
                    join cdap.Client c on (c.clientkey = dc.ClientKey)
                    where 
                    fz.name='prep' and tz.name='structured'
                    and d.DomainName = '{domain}'
                    and c.DAPClientName = '{client_name}'
            """

mapping = spark.read.format("jdbc") \
                .option("url", jdbc_url) \
                .option("query", query) \
                .option("user", sql_server_user_name) \
                .option("password", sql_server_password) \
                .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
                .load()

mapping_df = mapping.collect()

[0;31m---------------------------------------------------------------------------[0m
[0;31mIndexError[0m                                Traceback (most recent call last)
File [0;32m<command-5403996148952337>, line 2[0m
[1;32m      1[0m client_carehub_name [38;5;241m=[39m config_data[[38;5;124m'[39m[38;5;124mdomain_configurations[39m[38;5;124m'[39m][domain][client_name][38;5;241m.[39mget([38;5;124m'[39m[38;5;124mclient_name[39m[38;5;124m'[39m)
[0;32m----> 2[0m clientId [38;5;241m=[39m spark[38;5;241m.[39msql([38;5;124mf[39m[38;5;124m"[39m[38;5;124mselect ClientId from etl_dev.dev_structured.lntrninternal_carehub_client_unified where Name = [39m[38;5;124m'[39m[38;5;132;01m{[39;00mclient_carehub_name[38;5;132;01m}[39;00m[38;5;124m'[39m[38;5;124m"[39m)[38;5;241m.[39mcollect()[[38;5;241m0[39m][38;5;241m.[39mClientId
[1;32m      3[0m [38;5;28mprint[39m(clientId)

[0;31mIndexError[0m: list index out of range

In [0]:
PYSPARK_TO_PANDAS_DATE_FORMAT = {
    "yyyyMMdd": "%Y%m%d",
    "yyyy-MM-dd": "%Y-%m-%d",
    "yyyy/MM/dd": "%Y/%m/%d",
    "MMddyyyy": "%m%d%Y",
    "MM/dd/yyyy": "%m/%d/%Y",
    "MMddyy": "%m%d%y",
    "MM/dd/yy": "%m/%d/%y",
    "dd-MM-yyyy": "%d-%m-%Y",
    "dd/MM/yyyy": "%d/%m/%Y",
    "ddMMyyyy": "%d%m%Y",
    "ddMMyy": "%d%m%y",
    "dd/MM/yy": "%d/%m/%y",
    "yyyyMM": "%Y%m",
    "MM-yyyy": "%m-%Y",
    "MM/yyyy": "%m/%Y",
    "ddMMMyyyy": "%d%b%Y",
    "MMM dd yyyy": "%b %d %Y",
    "MMddyyyy": "%m%d%Y",
    "dd MMM yyyy": "%d %b %Y",
    "MMM dd yyyy": "%b %d %Y",
    "MMMM dd yyyy": "%B %d %Y",
    "MM/dd/yyyy hh:mm:ss a": "%m/%d/%Y %I:%M:%S %p"
}

missing_formats = [fmt for fmt in date_formats if fmt not in PYSPARK_TO_PANDAS_DATE_FORMAT]
if missing_formats:
    dbutils.notebook.exit(f"ERROR: Missing date format(s) in mapping: {missing_formats}")

pandas_date_formats = [PYSPARK_TO_PANDAS_DATE_FORMAT.get(fmt, None) for fmt in date_formats]

[0;31m---------------------------------------------------------------------------[0m
[0;31mIndexError[0m                                Traceback (most recent call last)
File [0;32m<command-5403996148952337>, line 2[0m
[1;32m      1[0m client_carehub_name [38;5;241m=[39m config_data[[38;5;124m'[39m[38;5;124mdomain_configurations[39m[38;5;124m'[39m][domain][client_name][38;5;241m.[39mget([38;5;124m'[39m[38;5;124mclient_name[39m[38;5;124m'[39m)
[0;32m----> 2[0m clientId [38;5;241m=[39m spark[38;5;241m.[39msql([38;5;124mf[39m[38;5;124m"[39m[38;5;124mselect ClientId from etl_dev.dev_structured.lntrninternal_carehub_client_unified where Name = [39m[38;5;124m'[39m[38;5;132;01m{[39;00mclient_carehub_name[38;5;132;01m}[39;00m[38;5;124m'[39m[38;5;124m"[39m)[38;5;241m.[39mcollect()[[38;5;241m0[39m][38;5;241m.[39mClientId
[1;32m      3[0m [38;5;28mprint[39m(clientId)

[0;31mIndexError[0m: list index out of range

In [0]:
date_columns = [
    row['source_column']
    for row in layout
        .filter(layout['data_type'] == 'date')
        .select('source_column')
        .collect()
]

col_dateformats_dict = {}
for column in date_columns:
    if column == "file_date":
        continue
    dbutils.widgets.text(column, f"{date_formats[0]}", f"{column} date format")
    col_dateformats_dict[column] = dbutils.widgets.get(column)

[0;31m---------------------------------------------------------------------------[0m
[0;31mIndexError[0m                                Traceback (most recent call last)
File [0;32m<command-5403996148952337>, line 2[0m
[1;32m      1[0m client_carehub_name [38;5;241m=[39m config_data[[38;5;124m'[39m[38;5;124mdomain_configurations[39m[38;5;124m'[39m][domain][client_name][38;5;241m.[39mget([38;5;124m'[39m[38;5;124mclient_name[39m[38;5;124m'[39m)
[0;32m----> 2[0m clientId [38;5;241m=[39m spark[38;5;241m.[39msql([38;5;124mf[39m[38;5;124m"[39m[38;5;124mselect ClientId from etl_dev.dev_structured.lntrninternal_carehub_client_unified where Name = [39m[38;5;124m'[39m[38;5;132;01m{[39;00mclient_carehub_name[38;5;132;01m}[39;00m[38;5;124m'[39m[38;5;124m"[39m)[38;5;241m.[39mcollect()[[38;5;241m0[39m][38;5;241m.[39mClientId
[1;32m      3[0m [38;5;28mprint[39m(clientId)

[0;31mIndexError[0m: list index out of range

In [0]:
# STOP HERE TO CHANGE DATE FORMATS - RUN ALL BELOW
for column in date_columns:
    col_dateformats_dict[column] = dbutils.widgets.get(column)

In [0]:
# Generate basic sample data

fake = Faker()

# Default faker providers by type
DEFAULT_TYPE_PROVIDERS = {
    "varchar": lambda: fake.bothify(text="??###"),
    "date": lambda: fake.date(pattern=random.choice(pandas_date_formats)),
    # Add other types if needed
}

# Optional: Per-column overrides for realism
# COLUMN_OVERRIDES = {
#     "subscriber_ssn": lambda: fake.ssn(),
#     "members_sex": lambda: random.choice(["M", "F"]),
#     "member_first_name": lambda: fake.first_name(),
#     "member_last_name": lambda: fake.last_name(),
#     "claim_no": lambda: fake.uuid4(),
# }

def get_provider(column, dtype):
    # Use column override if present, else fallback to default type provider
    return DEFAULT_TYPE_PROVIDERS.get(dtype, lambda: None)

def generate_row(schema_rows):
    row = {}
    for _, row_meta in schema_rows.toPandas().iterrows():
        col = row_meta["source_column"]
        dtype = row_meta["data_type"]
        provider = get_provider(col, dtype)
        if dtype == "date":
            if col in col_dateformats_dict:
                fmt = PYSPARK_TO_PANDAS_DATE_FORMAT.get(col_dateformats_dict[col])
            else:
                fmt = random.choice(pandas_date_formats)
            row[col] = fake.date(pattern=fmt)
        else:
            row[col] = provider()
    return row

def generate_fake_data(schema_df, n_rows=nRows):
    return pd.DataFrame([generate_row(schema_df) for _ in range(n_rows)])

df_pandas = generate_fake_data(layout, nRows)
# df = spark.createDataFrame(df_pandas)

In [0]:
query = f"select FirstName,LastName,DOB,Gender from etl_dev.dev_structured.lntrninternal_carehub_member_unified where ClientId = '{clientId}' LIMIT {nRows}"
df_carehub = spark.sql(query)
# Convert birth date column to specified format
source_dob_col = (
    mapping
    .filter(mapping['target_column_name'] == 'patient_dob')
    .select('source_column_name')
    .first()['source_column_name']
)

if layout_has_underscore is False:
    source_dob_col = source_dob_col.replace("_", " ")

dob_col_format = dbutils.widgets.get(f"{source_dob_col}")

# Convert Carehub DOB back to source format
df_carehub = df_carehub.withColumn(
    "DOB",
    date_format(
        to_date("DOB", 'yyyy-MM-dd'),
        dob_col_format
    )
)

df_carehub = df_carehub.toPandas()

In [0]:
CAREHUB_MAPPING = {
    'patient_first_name': 'FirstName', 
    'patient_last_name': 'LastName', 
    'patient_sex': 'Gender', 
    'patient_dob': 'DOB', 
}

In [0]:
demo_match_cols = [
    (row['source_column_name'], row['target_column_name'])
    for row in mapping
        .filter((mapping['target_column_name'] == 'patient_dob') | (mapping['target_column_name'] == 'patient_sex') | (mapping['target_column_name'] == 'patient_first_name') | (mapping['target_column_name'] == 'patient_last_name'))
        .select('source_column_name','target_column_name').collect()
]

In [0]:
for target, source in demo_match_cols:
    if layout_has_underscore is False:
        target = target.replace("_"," ")
    df_pandas[target] = df_carehub[CAREHUB_MAPPING.get(source)]

In [0]:
reverse_mapping_dict = {}
for row in mapping.collect():
    col_name = row['source_column_name']
    value_mapping_text = row['value_mapping']
    if value_mapping_text != "":
        mapping_dict = {}
        for line in value_mapping_text.split("\n"):
            line = line.strip()
            if not line or "," not in line:
                continue
            src, repl = line.split(",")
            if src != "source_value":
                mapping_dict[repl] = src  # reversed: repl -> src
            reverse_mapping_dict[col_name] = mapping_dict

In [0]:
reverse_mapping_dict = {}
for row in mapping.collect():
    col_name = row['source_column_name']
    value_mapping_text = row['value_mapping']
    if value_mapping_text != "":
        mapping_dict = {}
        for line in value_mapping_text.split("\n"):
            line = line.replace('\r', '')  # Remove carriage return
            if ',' in line:
                src, repl = line.split(",", 1)
                if src != "source_value":
                    mapping_dict[repl] = src  # reversed: repl -> src
                    reverse_mapping_dict[col_name] = mapping_dict

In [0]:
spark_df_1 = spark.createDataFrame(df_pandas)

In [0]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

for col_name, map_dict in reverse_mapping_dict.items():
    # Create UDF for reverse mapping
    def reverse_map(val,mapping=map_dict):
        return mapping.get(val, val)
    reverse_udf = udf(reverse_map, StringType())
    if layout_has_underscore is False:
        col_name = col_name.replace("_"," ")
    spark_df_1 = spark_df_1.withColumn(col_name, reverse_udf(col(col_name)))

In [0]:
import uuid
guid = str(uuid.uuid4())[:5]  # generates a 5-digit GUUID

formatted_file_date = PYSPARK_TO_PANDAS_DATE_FORMAT.get(file_name_date_format)
date_str = datetime.now().strftime(formatted_file_date)
file_name = file_name_date_regex_pattern.replace("(\d+)", date_str).replace("$", "")

output_dir = f"abfss://data@edhcdevdl.dfs.core.windows.net/test_data/AUTOGEN{guid}__{file_name}.csv"
final_file = f"abfss://data@edhcdevdl.dfs.core.windows.net/test_data/AUTOGEN{guid}_{file_name}.csv"

if output_format.lower() in ["csv", "txt"]:
    spark_df_1.coalesce(1).write.format(output_format) \
        .option("header", f"{include_header}") \
        .option("sep", sep) \
        .save(output_dir)

    files = dbutils.fs.ls(output_dir)
    part_file_path = [f.path for f in files if f.name.startswith("part-")][0]

    dbutils.fs.cp(part_file_path, final_file)
    dbutils.fs.rm(part_file_path)
    unwanted_patterns = ["_SUCCESS", "_committed", "_started"]

    for f in files:
        if any(pattern in f.name for pattern in unwanted_patterns):
            dbutils.fs.rm(f.path)
    dbutils.fs.rm(output_dir)
else:
    print(f"{output_format} is an unsupported file type for the test data generator.")