In [0]:
from delta.tables import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType,array,ArrayType,DateType,TimestampType, FloatType
from pyspark.sql import functions as f
from pyspark.sql.functions import udf, to_date, col
import hashlib
import datetime
import urllib.request
import json
from datetime import timedelta, date

In [0]:
STORAGE_ACCOUNT=dbutils.widgets.get("STORAGE_ACCOUNT")
ADLS_KEY=dbutils.widgets.get("ADLS_KEY")
BRONZE_LAYER_NAMESPACE=dbutils.widgets.get("BRONZE_LAYER_NAMESPACE")
SILVER_LAYER_NAMESPACE=dbutils.widgets.get("SILVER_LAYER_NAMESPACE")
HUB_FOLDER=dbutils.widgets.get("HUB_FOLDER")
ADLS_FOLDER=dbutils.widgets.get("ADLS_FOLDER")
TABLE_LIST=dbutils.widgets.get("TABLE_LIST")
TABLE_LIST_EDI=dbutils.widgets.get("TABLE_LIST_EDI")
#STORAGE_ACCOUNT="835837bronze"
#ADLS_KEY="BB+GLAtU5rR6h7LyaFbzEAuim3Wa5Rzb8UaXDtVp9d58yWOK7NYP/ugjHOmWdjIqMajPzHWZpf9p+AStnqLzCQ=="
#BRONZE_LAYER_NAMESPACE="bronze"
#SILVER_LAYER_NAMESPACE="silver"
#HUB_FOLDER="835837"
#TABLE_LIST=("Customers", "Payers")
#TABLE_LIST_EDI=("In_837X222_Claims", "In_837X222_ServiceLine", "In_835X221_ServiceInfo", "In_835X221_ClaimPayInfo")
#ADLS_FOLDER="2023/06/06"

UPDATED=datetime.datetime.today().replace(second=0, microsecond=0)

In [0]:
HUB_CUSTOMERS_SCHEMA =[
    ('Customer_ID', IntegerType()),
    ('FirstName', StringType()),
    ('LastName', StringType()),
    ('DOB', DateType()),
    ('updated', TimestampType()),
    ('SK', StringType())
]

HUB_PAYERS_SCHEMA =[
    ('Payer_ID', IntegerType()),
    ('PayerType_ID', IntegerType()),
    ('PayerName', StringType()),
    ('ElectronicPayerID', IntegerType()),
    ('updated', TimestampType())
]

EDI_837CLAIMS_SCHEMA =[
    ('Claims_ID', IntegerType()),
    ('Subscriber_ID', IntegerType()),
    ('Dependent_ID', IntegerType()),
    ('PatientControl_ID', IntegerType()),
    ('ClaimChargeAmt', FloatType()),
    ('DateOfServiceStr', StringType()),
    ('Diagnosis1', StringType()),
    ('Diagnosis2', StringType()),
    ('Diagnosis3', StringType()),
    ('Diagnosis4', StringType()),
    ('ProviderLastName', StringType()),
    ('ProviderFirstName', StringType()),
    ('ProviderNPI', IntegerType()),
    ('updated', TimestampType()),
    ('DateOfService', DateType())
]

EDI_837SERVICELINE_SCHEMA =[
    ('ServiceLine_ID', IntegerType()),
    ('Claims_ID', IntegerType()),
    ('LineItem_ID', IntegerType()),
    ('CPTCode', StringType()),
    ('CPTModifier', StringType()),
    ('LineChargeAmt', FloatType()),
    ('DateOfServiceStr', StringType()),
    ('LineItemControl_ID', IntegerType()),
    ('updated', TimestampType()),
    ('DateOfService', DateType())
]

EDI_835SERVICEINFO_SCHEMA =[
    ('ServiceInfo_ID', IntegerType()),
    ('ClaimPay_ID', IntegerType()),
    ('CPTCode', StringType()),
    ('CPTModifier1', StringType()),
    ('CPTModifier2', StringType()),
    ('LineChargeAmt', FloatType()),
    ('ProviderPayAmt', FloatType()),
    ('DateOfServiceStr', StringType()),
    ('LineItemControl_ID', IntegerType()),
    ('ServiceSupp_ID', StringType()),
    ('ServiceSuppAmt', FloatType()),
    ('updated', TimestampType()),
    ('DateOfService', DateType())
]

EDI_835CLAIMINFO_SCHEMA =[
    ('ClaimPayInfo_ID', IntegerType()),
    ('Detail_ID', IntegerType()),
    ('PatientControl_ID', IntegerType()),
    ('ClaimStatusCode', IntegerType()),
    ('ClaimAmt', FloatType()),
    ('ClaimPayAmt', FloatType()),
    ('PatientRespAmt', FloatType()),
    ('ClaimFilingInd', StringType()),
    ('PayerControl_ID', StringType()),
    ('PatientLastName', StringType()),
    ('PatientFirstName', StringType()),
    ('PatientIdent', StringType()),
    ('ProviderNPI', IntegerType()),
    ('COBName', StringType()),
    ('COB_ID', IntegerType()),
    ('ClaimRemarkCode', StringType()),
    ('CoverageExpDatestr', StringType()),
    ('SubmissionDatestr', StringType()),
    ('ClaimSuppAmt', FloatType()),
    ('updated', TimestampType()),
    ('CoverageExpDate', DateType()),
    ('SubmissionDate', DateType())
]

In [0]:
def gen_blank_df(spark, schema_struct):
    fields = [StructField(*field) for field in schema_struct]
    schema = StructType(fields)
    df = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)
    return df

def mask_value(column):
  mask_value = hashlib.sha256(column.encode()).hexdigest()
  return mask_value

def add_date_column(df, new_column, drop_column):
    df = df.withColumn(new_column, to_date(col(drop_column), 'yyyyMMdd')).drop(drop_column)
    return df
    
def curate_name(name):
    curated_value = name.lower()
    return curated_value

curate_name_udf = udf(curate_name, StringType())
mask_udf = udf(mask_value, StringType())

In [0]:
for table in TABLE_LIST.split(","):
#for table in TABLE_LIST:
#  display(table)
  try:
    table_path="wasbs://"+SILVER_LAYER_NAMESPACE+"@"+STORAGE_ACCOUNT+".blob.core.windows.net/"+HUB_FOLDER+"/"+table
    bronze_table_path="wasbs://"+BRONZE_LAYER_NAMESPACE+"@"+STORAGE_ACCOUNT+".blob.core.windows.net/"+HUB_FOLDER+"/\[dbo\].\["+table+"\]/"+ADLS_FOLDER
    spark.conf.set("fs.azure.account.key."+STORAGE_ACCOUNT+".blob.core.windows.net", ADLS_KEY)
 #   print(bronze_table_path)
 #   print(table_path)
  
    if table=="Customers":
      TABLE_SCHEMA=HUB_CUSTOMERS_SCHEMA
    elif table=="Payers":
      TABLE_SCHEMA=HUB_PAYERS_SCHEMA
    elif table=="somethingelse2":
      TABLE_SCHEMA=OTHER2_SCHEMA
    
    
    fields = [StructField(*field) for field in TABLE_SCHEMA]
    schema_hub = StructType(fields)
    try:
      deltaTable = DeltaTable.forPath(spark, table_path)
    except:
      spark.sql("DROP TABLE IF EXISTS "+ table)
      df = gen_blank_df(spark,TABLE_SCHEMA)
      df.write.format("delta").option("path", table_path).saveAsTable(table)
      deltaTable = DeltaTable.forPath(spark, table_path)

    if table=="Customers":
      df_table_incremental = spark.read.csv(bronze_table_path).select("_c0", "_c1", "_c2", "_c9", "_c42")
     # display(df_table_incremental)
      df_table_curated = df_table_incremental.select([col(existing_column).cast(schema_hub[column_name].dataType).alias(column_name) for existing_column, column_name in zip(df_table_incremental.columns, schema_hub.fieldNames())
       ])
      df_table_curated = df_table_curated.withColumn('FirstNamecurated', curate_name_udf('FirstName')).drop('FirstName').withColumnRenamed('FirstNamecurated', 'FirstName')
      df_table_curated = df_table_curated.withColumn('LastNamecurated', curate_name_udf('LastName')).drop('LastName').withColumnRenamed('LastNamecurated', 'LastName')
      df_table_curated = df_table_curated.withColumn('SK', sha2(concat_ws('', 'FirstName', 'LastName', 'DOB'), 256))
      df_table_curated=df_table_curated.withColumn('updated', f.lit(UPDATED))
     # display(df_table_curated)
      deltaTable.alias("Customers").merge(
        df_table_curated.alias("Customers_new"),
                      "Customers.SK = Customers_new.SK") \
                      .whenMatchedUpdate(set = {
                        "Customer_ID": "Customers_new.Customer_ID", 	  \
                        "FirstName":   "Customers_new.FirstName",  \
                        "LastName":    "Customers_new.LastName",        \
                        "DOB":         "Customers_new.DOB",           \
                        "SK":          "Customers_new.SK",         \
                        "updated":     "Customers_new.updated" } )  \
                      .whenNotMatchedInsert(values =                                                      
                         {                                                    
                        "Customer_ID": "Customers_new.Customer_ID", 	  \
                        "FirstName":   "Customers_new.FirstName",  \
                        "LastName":    "Customers_new.LastName",        \
                        "DOB":         "Customers_new.DOB",           \
                        "SK":          "Customers_new.SK",        \
                        "updated":  "Customers_new.updated" }).execute()

    if table=="Payers":
      df_table_incremental = spark.read.csv(bronze_table_path).select("_c0", "_c1", "_c2", "_c23", "_c32")
     # display(df_table_incremental)
      df_table_curated = df_table_incremental.select([col(existing_column).cast(schema_hub[column_name].dataType).alias(column_name) for existing_column, column_name in zip(df_table_incremental.columns, schema_hub.fieldNames())
       ])
    #  df_table_curated = df_table_curated.withColumn('PayerNamecurated', curate_name_udf('PayerName')).drop('PayerName').withColumnRenamed('PayerNamecurated', 'PayerName')
      df_table_curated=df_table_curated.withColumn('updated', f.lit(UPDATED))
    #  display(df_table_curated)
      deltaTable.alias("Payers").merge(
        df_table_curated.alias("Payers_new"),
                      "Payers.Payer_ID = Payers_new.Payer_ID") \
                      .whenMatchedUpdate(set = {
                        "Payer_ID":          "Payers_new.Payer_ID", 	 \
                        "PayerType_ID":       "Payers_new.PayerType_ID", \
                        "PayerName":          "Payers_new.PayerName",        \
                        "ElectronicPayerID":  "Payers_new.ElectronicPayerID",           \
                        "updated":     "Payers_new.updated" } ).whenNotMatchedInsert(values =                                                  
                         {                                                    
                        "Payer_ID":          "Payers_new.Payer_ID", 	 \
                        "PayerType_ID":       "Payers_new.PayerType_ID", \
                        "PayerName":          "Payers_new.PayerName",        \
                        "ElectronicPayerID":  "Payers_new.ElectronicPayerID",           \
                        "updated":     "Payers_new.updated" }).execute()
  except Exception as e:
    print(e)

'Customers'wasbs://bronze@835837bronze.blob.core.windows.net/835837/\[dbo\].\[Customers\]/2023/06/06
wasbs://silver@835837bronze.blob.core.windows.net/835837/Customers


_c0,_c1,_c2,_c9,_c42
40,TEST,Flynn2,1993-05-02 00:00:00.0000000,2023-06-06 17:31:06.3930000
41,TESTI|NG,Testing,1993-01-01 16:03:29.9400000,2023-06-06 17:23:56.8000000
42,Melissa,Testing2,1993-01-01 00:00:00.0000000,2023-06-06 17:23:56.8000000
43,Melissa,Testing3,1993-01-01 00:00:00.0000000,2023-06-06 17:23:56.8000000
44,Andrew,Laurella,1956-04-04 00:00:00.0000000,2023-06-06 17:23:56.8000000
45,Andy,Laurella,1956-04-04 00:00:00.0000000,2023-06-06 17:23:56.8000000
46,Dillon,Brumet,2005-09-29 14:16:35.0000000,2023-06-06 17:23:56.8000000
47,Vicki,Windman,1958-09-15 14:37:15.0000000,2023-06-06 17:23:56.8000000
48,Vicki,Windman,1958-09-15 00:00:00.0000000,2023-06-06 17:23:56.8000000
49,Jacqueline,Thompson-Kenny,1972-03-15 19:30:00.0000000,2023-06-06 17:23:56.8000000


Customer_ID,DOB,updated,FirstName,LastName,SK
40,1993-05-02,2023-07-02T01:01:00.000+0000,test,flynn2,d84d138580ce876a55b95edf3e0096b0132ae2350b47a42d790212788beac296
41,1993-01-01,2023-07-02T01:01:00.000+0000,testi|ng,testing,413d2f19d566e4b44652bf4621fa68c034eaa249e925417d7db9a3021410df8a
42,1993-01-01,2023-07-02T01:01:00.000+0000,melissa,testing2,c2286ee6670412f86f46dede7ae7ed219d3ee2f2ce361d4425bcfe714c86ad76
43,1993-01-01,2023-07-02T01:01:00.000+0000,melissa,testing3,29296e147b5fe8d5ca8f201d51508b5044a784b24be5b2a8427d5a442556b1f4
44,1956-04-04,2023-07-02T01:01:00.000+0000,andrew,laurella,f96e62ab57cf6501411e33d20652a84286cd351f7bf0b4e4af724d0aafbe70fd
45,1956-04-04,2023-07-02T01:01:00.000+0000,andy,laurella,702b05b3435339aa060f916cb8d6f458382092768ececb2f3a80d6e71c02be3c
46,2005-09-29,2023-07-02T01:01:00.000+0000,dillon,brumet,a3cd52137cb8e43a21d6c88a4e82ed2005d8974deb5f39646979f1a171db166b
47,1958-09-15,2023-07-02T01:01:00.000+0000,vicki,windman,1edd5ad2e60bcf2161bf0bb43c0ac9f5c624b04fb3e0269504903a36cf151a96
48,1958-09-15,2023-07-02T01:01:00.000+0000,vicki,windman,1edd5ad2e60bcf2161bf0bb43c0ac9f5c624b04fb3e0269504903a36cf151a96
49,1972-03-15,2023-07-02T01:01:00.000+0000,jacqueline,thompson-kenny,0e465216f60b9baa33bdb6004ba9654b654d89fcaaccd1b55aba9384fd6ba1d0


'Payers'wasbs://bronze@835837bronze.blob.core.windows.net/835837/\[dbo\].\[Payers\]/2023/06/06
wasbs://silver@835837bronze.blob.core.windows.net/835837/Payers


_c0,_c1,_c2,_c23,_c32
1,6,Blue Cross Utopia Planetia,EPID00001,2023-06-06 18:03:10.1770000
2,6,New Insurance Co,,2023-06-06 18:04:25.9800000
3,6,New Insurance ONETWO,,2023-06-06 18:03:10.1770000
7,6,1st Medical Network,29076,2023-06-06 18:03:10.1770000
8,6,360 Alliance - Gilsbar,7205,2023-06-06 18:03:10.1770000
9,6,AARP Medicare Complete by SecureHorizons,87726,2023-06-06 18:03:10.1770000
10,6,AARP Medicare Supplement/Fixed Indemnity by UHC,36273,2023-06-06 18:03:10.1770000
11,6,AARP MedicareComplete though UnitedHealthcare,87726,2023-06-06 18:03:10.1770000
12,6,Absolute Total Care,68055,2023-06-06 18:03:10.1770000
13,6,Access Behavioral- Care,,2023-06-06 18:03:10.1770000


Payer_ID,PayerType_ID,PayerName,ElectronicPayerID,updated
1.0,6.0,Blue Cross Utopia Planetia,,2023-07-02T01:01:00.000+0000
2.0,6.0,New Insurance Co,,2023-07-02T01:01:00.000+0000
3.0,6.0,New Insurance ONETWO,,2023-07-02T01:01:00.000+0000
7.0,6.0,1st Medical Network,29076.0,2023-07-02T01:01:00.000+0000
8.0,6.0,360 Alliance - Gilsbar,7205.0,2023-07-02T01:01:00.000+0000
9.0,6.0,AARP Medicare Complete by SecureHorizons,87726.0,2023-07-02T01:01:00.000+0000
10.0,6.0,AARP Medicare Supplement/Fixed Indemnity by UHC,36273.0,2023-07-02T01:01:00.000+0000
11.0,6.0,AARP MedicareComplete though UnitedHealthcare,87726.0,2023-07-02T01:01:00.000+0000
12.0,6.0,Absolute Total Care,68055.0,2023-07-02T01:01:00.000+0000
13.0,6.0,Access Behavioral- Care,,2023-07-02T01:01:00.000+0000


In [0]:
for table in TABLE_LIST_EDI.split(","):
#for table in TABLE_LIST_EDI:
#  display(table)
  try:
    table_path="wasbs://"+SILVER_LAYER_NAMESPACE+"@"+STORAGE_ACCOUNT+".blob.core.windows.net/"+HUB_FOLDER+"/"+table
    bronze_table_path="wasbs://"+BRONZE_LAYER_NAMESPACE+"@"+STORAGE_ACCOUNT+".blob.core.windows.net/"+HUB_FOLDER+"/\[edi\].\["+table+"\]/"+ADLS_FOLDER
    spark.conf.set("fs.azure.account.key."+STORAGE_ACCOUNT+".blob.core.windows.net", ADLS_KEY)
#    print(bronze_table_path)
#    print(table_path)
  
    if table=="In_837X222_Claims":
      TABLE_SCHEMA=EDI_837CLAIMS_SCHEMA
    elif table=="In_837X222_ServiceLine":
      TABLE_SCHEMA=EDI_837SERVICELINE_SCHEMA
    elif table=="In_835X221_ServiceInfo":
      TABLE_SCHEMA=EDI_835SERVICEINFO_SCHEMA
    elif table =="In_835X221_ClaimPayInfo":
      TABLE_SCHEMA=EDI_835CLAIMINFO_SCHEMA
    
    
    fields = [StructField(*field) for field in TABLE_SCHEMA]
    schema_hub = StructType(fields)
    try:
      deltaTable = DeltaTable.forPath(spark, table_path)
    except:
      spark.sql("DROP TABLE IF EXISTS "+ table)
      df = gen_blank_df(spark,TABLE_SCHEMA)
      df.write.format("delta").option("path", table_path).saveAsTable(table)
      deltaTable = DeltaTable.forPath(spark, table_path)

    if table=="In_837X222_Claims":
      df_table_incremental = spark.read.csv(bronze_table_path).select("_c0", "_c1", "_c2", "_c3", "_c4", "_c19", "_c92", "_c94", "_c96", "_c98", "_c152", "_c153", "_c156", "_c195")
    #  display(df_table_incremental)
      df_table_curated = df_table_incremental.select([col(existing_column).cast(schema_hub[column_name].dataType).alias(column_name) for existing_column, column_name in zip(df_table_incremental.columns, schema_hub.fieldNames())
       ])      
      df_table_curated = add_date_column(df_table_curated, 'DateOfService', 'DateOfServiceStr')
      df_table_curated=df_table_curated.withColumn('updated', f.lit(UPDATED))
    #  display(df_table_curated)
      deltaTable.alias("In_837X222_Claims").merge(
        df_table_curated.alias("In_837X222_Claims_new"),
                      "In_837X222_Claims.Claims_ID = In_837X222_Claims_new.Claims_ID") \
                      .whenMatchedUpdate(set = {
                        "Claims_ID": "In_837X222_Claims_new.Claims_ID", \
                        "Subscriber_ID":"In_837X222_Claims_new.Subscriber_ID",  \
                        "Dependent_ID": "In_837X222_Claims_new.Dependent_ID",        \
                        "PatientControl_ID":"In_837X222_Claims_new.PatientControl_ID",           \
                        "ClaimChargeAmt":"In_837X222_Claims_new.ClaimChargeAmt",         \
                        "DateOfService": "In_837X222_Claims_new.DateOfService",      \
                        "Diagnosis1": "In_837X222_Claims_new.Diagnosis1",\
                        "Diagnosis2": "In_837X222_Claims_new.Diagnosis2",\
                        "Diagnosis3": "In_837X222_Claims_new.Diagnosis3",\
                        "Diagnosis4": "In_837X222_Claims_new.Diagnosis4",\
                        "ProviderLastName": "In_837X222_Claims_new.ProviderLastName",  \
                        "ProviderFirstName": "In_837X222_Claims_new.ProviderFirstName",  \
                        "ProviderNPI": "In_837X222_Claims_new.ProviderNPI",           
                        "updated":     "In_837X222_Claims_new.updated" } )  \
                      .whenNotMatchedInsert(values =                                                      
                         {                                               
                        "Claims_ID": "In_837X222_Claims_new.Claims_ID", \
                        "Subscriber_ID":"In_837X222_Claims_new.Subscriber_ID",  \
                        "Dependent_ID": "In_837X222_Claims_new.Dependent_ID",        \
                        "PatientControl_ID":"In_837X222_Claims_new.PatientControl_ID",           \
                        "ClaimChargeAmt":"In_837X222_Claims_new.ClaimChargeAmt",         \
                        "DateOfService": "In_837X222_Claims_new.DateOfService",      \
                        "Diagnosis1": "In_837X222_Claims_new.Diagnosis1",\
                        "Diagnosis2": "In_837X222_Claims_new.Diagnosis2",\
                        "Diagnosis3": "In_837X222_Claims_new.Diagnosis3",\
                        "Diagnosis4": "In_837X222_Claims_new.Diagnosis4",                         
                        "ProviderLastName": "In_837X222_Claims_new.ProviderLastName",  \
                        "ProviderFirstName": "In_837X222_Claims_new.ProviderFirstName",  \
                        "ProviderNPI": "In_837X222_Claims_new.ProviderNPI",        
                        "updated":     "In_837X222_Claims_new.updated"}).execute()
                      
    if table=="In_837X222_ServiceLine":
      df_table_incremental = spark.read.csv(bronze_table_path).select("_c0", "_c1", "_c2", "_c4", "_c5", "_c10", "_c49", "_c75", "_c190")
    #  display(df_table_incremental)
      df_table_curated = df_table_incremental.select([col(existing_column).cast(schema_hub[column_name].dataType).alias(column_name) for existing_column, column_name in zip(df_table_incremental.columns, schema_hub.fieldNames())
       ])      
      df_table_curated = add_date_column(df_table_curated, 'DateOfService', 'DateOfServiceStr')
      df_table_curated=df_table_curated.withColumn('updated', f.lit(UPDATED))
    #  display(df_table_curated)

      deltaTable.alias("In_837X222_ServiceLine").merge(
        df_table_curated.alias("In_837X222_ServiceLine_new"),
                      "In_837X222_ServiceLine.ServiceLine_ID = In_837X222_ServiceLine_new.ServiceLine_ID") \
                      .whenMatchedUpdate(set = {
                        "ServiceLine_ID": "In_837X222_ServiceLine_new.ServiceLine_ID", \
                        "Claims_ID":"In_837X222_ServiceLine_new.Claims_ID",  \
                        "LineItem_ID": "In_837X222_ServiceLine_new.LineItem_ID",        \
                        "CPTCode":"In_837X222_ServiceLine_new.CPTCode",           \
                        "CPTModifier":"In_837X222_ServiceLine_new.CPTModifier",         \
                        "DateOfService": "In_837X222_ServiceLine_new.DateOfService",      \
                        "LineChargeAmt": "In_837X222_ServiceLine_new.LineChargeAmt",\
                        "LineItemControl_ID": "In_837X222_ServiceLine_new.LineItemControl_ID",         
                        "updated":     "In_837X222_ServiceLine_new.updated" })  \
                      .whenNotMatchedInsert(values =                                                      
                         {                                               
                        "ServiceLine_ID": "In_837X222_ServiceLine_new.ServiceLine_ID", \
                        "Claims_ID":"In_837X222_ServiceLine_new.Claims_ID",  \
                        "LineItem_ID": "In_837X222_ServiceLine_new.LineItem_ID",        \
                        "CPTCode":"In_837X222_ServiceLine_new.CPTCode",           \
                        "CPTModifier":"In_837X222_ServiceLine_new.CPTModifier",         \
                        "DateOfService": "In_837X222_ServiceLine_new.DateOfService",      \
                        "LineChargeAmt": "In_837X222_ServiceLine_new.LineChargeAmt",\
                        "LineItemControl_ID": "In_837X222_ServiceLine_new.LineItemControl_ID",         
                        "updated":     "In_837X222_ServiceLine_new.updated" }).execute()
                      
    if table=="In_835X221_ServiceInfo":
      df_table_incremental = spark.read.csv(bronze_table_path).select("_c0", "_c1", "_c3", "_c4", "_c5", "_c8", "_c9", "_c22", "_c53", "_c57", "_c58", "_c61")
    #  display(df_table_incremental)
      df_table_curated = df_table_incremental.select([col(existing_column).cast(schema_hub[column_name].dataType).alias(column_name) for existing_column, column_name in zip(df_table_incremental.columns, schema_hub.fieldNames())
       ])      
      df_table_curated = add_date_column(df_table_curated, 'DateOfService', 'DateOfServiceStr')
      df_table_curated=df_table_curated.withColumn('updated', f.lit(UPDATED))
    #  display(df_table_curated)

      deltaTable.alias("In_835X221_ServiceInfo").merge(
        df_table_curated.alias("In_835X221_ServiceInfo_new"),
                      "In_835X221_ServiceInfo.ServiceInfo_ID = In_835X221_ServiceInfo_new.ServiceInfo_ID") \
                      .whenMatchedUpdate(set = {
                        "ServiceInfo_ID": "In_835X221_ServiceInfo_new.ServiceInfo_ID", \
                        "ClaimPay_ID":"In_835X221_ServiceInfo_new.ClaimPay_ID",  \
                        "CPTModifier2": "In_835X221_ServiceInfo_new.CPTModifier2",        \
                        "CPTCode":"In_835X221_ServiceInfo_new.CPTCode",           \
                        "CPTModifier1":"In_835X221_ServiceInfo_new.CPTModifier1",         \
                        "DateOfService": "In_835X221_ServiceInfo_new.DateOfService",      \
                        "LineChargeAmt": "In_835X221_ServiceInfo_new.LineChargeAmt",\
                        "LineItemControl_ID": "In_835X221_ServiceInfo_new.LineItemControl_ID", \
                        "ProviderPayAmt": "In_835X221_ServiceInfo_new.ProviderPayAmt",   \
                        "ServiceSupp_ID": "In_835X221_ServiceInfo_new.ServiceSupp_ID",  \
                        "ServiceSuppAmt": "In_835X221_ServiceInfo_new.ServiceSuppAmt",          
                        "updated":     "In_835X221_ServiceInfo_new.updated" })  \
                      .whenNotMatchedInsert(values =                                                      
                         {                                               
                        "ServiceInfo_ID": "In_835X221_ServiceInfo_new.ServiceInfo_ID", \
                        "ClaimPay_ID":"In_835X221_ServiceInfo_new.ClaimPay_ID",  \
                        "CPTModifier2": "In_835X221_ServiceInfo_new.CPTModifier2",        \
                        "CPTCode":"In_835X221_ServiceInfo_new.CPTCode",           \
                        "CPTModifier1":"In_835X221_ServiceInfo_new.CPTModifier1",         \
                        "DateOfService": "In_835X221_ServiceInfo_new.DateOfService",      \
                        "LineChargeAmt": "In_835X221_ServiceInfo_new.LineChargeAmt",\
                        "LineItemControl_ID": "In_835X221_ServiceInfo_new.LineItemControl_ID", \
                        "ProviderPayAmt": "In_835X221_ServiceInfo_new.ProviderPayAmt",   \
                        "ServiceSupp_ID": "In_835X221_ServiceInfo_new.ServiceSupp_ID",  \
                        "ServiceSuppAmt": "In_835X221_ServiceInfo_new.ServiceSuppAmt",          
                        "updated":     "In_835X221_ServiceInfo_new.updated" }).execute()

    if table=="In_835X221_ClaimPayInfo":
      df_table_incremental = spark.read.csv(bronze_table_path).select("_c0", "_c1", "_c2", "_c3", "_c4", "_c5", "_c6", "_c7", "_c8", "_c42", "_c43", "_c47", "_c67", "_c68", "_c70", "_c107", "_c120", "_c121", "_c130", "_c131")
    #  display(df_table_incremental)
      df_table_curated = df_table_incremental.select([col(existing_column).cast(schema_hub[column_name].dataType).alias(column_name) for existing_column, column_name in zip(df_table_incremental.columns, schema_hub.fieldNames())
       ])      
      df_table_curated = add_date_column(df_table_curated, 'CoverageExpDate', 'CoverageExpDatestr')
      df_table_curated = add_date_column(df_table_curated, 'SubmissionDate', 'SubmissionDatestr')
      df_table_curated=df_table_curated.withColumn('updated', f.lit(UPDATED))
    #  display(df_table_curated)

      deltaTable.alias("In_835X221_ClaimPayInfo").merge(
        df_table_curated.alias("In_835X221_ClaimPayInfo_new"),
                      "In_835X221_ClaimPayInfo.ClaimPayInfo_ID = In_835X221_ClaimPayInfo_new.ClaimPayInfo_ID") \
                      .whenMatchedUpdate(set = {
                        "ClaimPayInfo_ID": "In_835X221_ClaimPayInfo_new.ClaimPayInfo_ID", \
                        "Detail_ID":"In_835X221_ClaimPayInfo_new.Detail_ID",  \
                        "PatientControl_ID": "In_835X221_ClaimPayInfo_new.PatientControl_ID",        \
                        "ClaimStatusCode":"In_835X221_ClaimPayInfo_new.ClaimStatusCode",           \
                        "ClaimAmt":"In_835X221_ClaimPayInfo_new.ClaimAmt",         \
                        "ClaimPayAmt": "In_835X221_ClaimPayInfo_new.ClaimPayAmt",      \
                        "PatientRespAmt": "In_835X221_ClaimPayInfo_new.PatientRespAmt",\
                        "ClaimFilingInd": "In_835X221_ClaimPayInfo_new.ClaimFilingInd", \
                        "PayerControl_ID": "In_835X221_ClaimPayInfo_new.PayerControl_ID",   \
                        "PatientLastName": "In_835X221_ClaimPayInfo_new.PatientLastName",  \
                        "PatientFirstName": "In_835X221_ClaimPayInfo_new.PatientFirstName", \
                        "PatientIdent": "In_835X221_ClaimPayInfo_new.PatientIdent",  \
                        "ProviderNPI":  "In_835X221_ClaimPayInfo_new.ProviderNPI",  \
                        "COBName": "In_835X221_ClaimPayInfo_new.COBName",  \
                        "COB_ID": "In_835X221_ClaimPayInfo_new.COB_ID",  \
                        "ClaimRemarkCode": "In_835X221_ClaimPayInfo_new.ClaimRemarkCode",  \
                        "CoverageExpDate": "In_835X221_ClaimPayInfo_new.CoverageExpDate",  \
                        "SubmissionDate": "In_835X221_ClaimPayInfo_new.SubmissionDate",  \
                        "ClaimSuppAmt": "In_835X221_ClaimPayInfo_new.ClaimSuppAmt",
                        "updated":     "In_835X221_ClaimPayInfo_new.updated" })  \
                      .whenNotMatchedInsert(values =                                                      
                         {                                               
                        "ClaimPayInfo_ID": "In_835X221_ClaimPayInfo_new.ClaimPayInfo_ID", \
                        "Detail_ID":"In_835X221_ClaimPayInfo_new.Detail_ID",  \
                        "PatientControl_ID": "In_835X221_ClaimPayInfo_new.PatientControl_ID",        \
                        "ClaimStatusCode":"In_835X221_ClaimPayInfo_new.ClaimStatusCode",           \
                        "ClaimAmt":"In_835X221_ClaimPayInfo_new.ClaimAmt",         \
                        "ClaimPayAmt": "In_835X221_ClaimPayInfo_new.ClaimPayAmt",      \
                        "PatientRespAmt": "In_835X221_ClaimPayInfo_new.PatientRespAmt",\
                        "ClaimFilingInd": "In_835X221_ClaimPayInfo_new.ClaimFilingInd", \
                        "PayerControl_ID": "In_835X221_ClaimPayInfo_new.PayerControl_ID",   \
                        "PatientLastName": "In_835X221_ClaimPayInfo_new.PatientLastName",  \
                        "PatientFirstName": "In_835X221_ClaimPayInfo_new.PatientFirstName", \
                        "PatientIdent": "In_835X221_ClaimPayInfo_new.PatientIdent",  \
                        "ProviderNPI":  "In_835X221_ClaimPayInfo_new.ProviderNPI",  \
                        "COBName": "In_835X221_ClaimPayInfo_new.COBName",  \
                        "COB_ID": "In_835X221_ClaimPayInfo_new.COB_ID",  \
                        "ClaimRemarkCode": "In_835X221_ClaimPayInfo_new.ClaimRemarkCode",  \
                        "CoverageExpDate": "In_835X221_ClaimPayInfo_new.CoverageExpDate",  \
                        "SubmissionDate": "In_835X221_ClaimPayInfo_new.SubmissionDate",  \
                        "ClaimSuppAmt": "In_835X221_ClaimPayInfo_new.ClaimSuppAmt",
                        "updated":     "In_835X221_ClaimPayInfo_new.updated" }).execute()
  except Exception as e:
    print(e)

'In_837X222_Claims'wasbs://bronze@835837bronze.blob.core.windows.net/835837/\[edi\].\[In_837X222_Claims\]/2023/06/06
wasbs://silver@835837bronze.blob.core.windows.net/835837/In_837X222_Claims


_c0,_c1,_c2,_c3,_c4,_c19,_c92,_c94,_c96,_c98,_c152,_c153,_c156,_c195
1,1,1,22963,28.0,20221219,M9903,M9904,M545,M9902,Evenson,Mitchell,1992295638,2023-05-31 17:26:22.5500000
2,2,2,22964,36.35,20220701,M9904,M9901,M9902,M9903,Mochinski,Zachary,1144765991,2023-05-31 17:26:23.8500000
3,3,0,22965,51.97,20220801,M546,R293,M2560,M9902,Evenson,Mitchell,1992295638,2023-05-31 17:26:24.9600000
4,4,0,22966,25.0,20221208,G44209,M9901,M2560,R293,Evenson,Mitchell,1992295638,2023-05-31 17:26:26.3730000
5,5,3,22967,28.0,20221219,M9903,M9904,M545,M9902,Evenson,Mitchell,1992295638,2023-05-31 17:26:33.6030000
6,6,4,22968,36.35,20220701,M9904,M9901,M9902,M9903,Mochinski,Zachary,1144765991,2023-05-31 17:26:34.8330000
7,7,0,22969,51.97,20220801,M546,R293,M2560,M9902,Evenson,Mitchell,1992295638,2023-05-31 17:26:35.9630000
8,8,0,22970,25.0,20221208,G44209,M9901,M2560,R293,Evenson,Mitchell,1992295638,2023-05-31 17:26:37.8270000
9,9,5,22971,28.0,20221219,M9903,M9904,M545,M9902,Evenson,Mitchell,1992295638,2023-05-31 17:26:44.1600000
10,10,6,22972,36.35,20220701,M9904,M9901,M9902,M9903,Mochinski,Zachary,1144765991,2023-05-31 17:26:45.9770000


Claims_ID,Subscriber_ID,Dependent_ID,PatientControl_ID,ClaimChargeAmt,Diagnosis1,Diagnosis2,Diagnosis3,Diagnosis4,ProviderLastName,ProviderFirstName,ProviderNPI,updated,DateOfService
1,1,1,22963,28.0,M9903,M9904,M545,M9902,Evenson,Mitchell,1992295638,2023-07-02T01:01:00.000+0000,2022-12-19
2,2,2,22964,36.35,M9904,M9901,M9902,M9903,Mochinski,Zachary,1144765991,2023-07-02T01:01:00.000+0000,2022-07-01
3,3,0,22965,51.97,M546,R293,M2560,M9902,Evenson,Mitchell,1992295638,2023-07-02T01:01:00.000+0000,2022-08-01
4,4,0,22966,25.0,G44209,M9901,M2560,R293,Evenson,Mitchell,1992295638,2023-07-02T01:01:00.000+0000,2022-12-08
5,5,3,22967,28.0,M9903,M9904,M545,M9902,Evenson,Mitchell,1992295638,2023-07-02T01:01:00.000+0000,2022-12-19
6,6,4,22968,36.35,M9904,M9901,M9902,M9903,Mochinski,Zachary,1144765991,2023-07-02T01:01:00.000+0000,2022-07-01
7,7,0,22969,51.97,M546,R293,M2560,M9902,Evenson,Mitchell,1992295638,2023-07-02T01:01:00.000+0000,2022-08-01
8,8,0,22970,25.0,G44209,M9901,M2560,R293,Evenson,Mitchell,1992295638,2023-07-02T01:01:00.000+0000,2022-12-08
9,9,5,22971,28.0,M9903,M9904,M545,M9902,Evenson,Mitchell,1992295638,2023-07-02T01:01:00.000+0000,2022-12-19
10,10,6,22972,36.35,M9904,M9901,M9902,M9903,Mochinski,Zachary,1144765991,2023-07-02T01:01:00.000+0000,2022-07-01


'In_837X222_ServiceLine'wasbs://bronze@835837bronze.blob.core.windows.net/835837/\[edi\].\[In_837X222_ServiceLine\]/2023/06/06
wasbs://silver@835837bronze.blob.core.windows.net/835837/In_837X222_ServiceLine


_c0,_c1,_c2,_c4,_c5,_c10,_c49,_c75,_c190
1,1,1,98941,AT,28.0,20221219,47657,2023-05-31 17:26:22.8230000
2,2,1,98941,AT,36.35,20221223,47658,2023-05-31 17:26:24.1370000
3,3,1,98941,,32.39,20221223,47659,2023-05-31 17:26:25.2230000
4,3,2,98943,,19.58,20221223,47660,2023-05-31 17:26:25.4670000
5,4,1,98940,AT,25.0,20221223,47661,2023-05-31 17:26:26.6670000
6,5,1,98941,AT,28.0,20221219,47662,2023-05-31 17:26:33.8900000
7,6,1,98941,AT,36.35,20221223,47663,2023-05-31 17:26:35.1430000
8,7,1,98941,,32.39,20221223,47664,2023-05-31 17:26:36.2030000
9,7,2,98943,,19.58,20221223,47665,2023-05-31 17:26:36.4530000
10,8,1,98940,AT,25.0,20221223,47666,2023-05-31 17:26:38.1230000


ServiceLine_ID,Claims_ID,LineItem_ID,CPTCode,CPTModifier,LineChargeAmt,LineItemControl_ID,updated,DateOfService
1,1,1,98941,AT,28.0,47657,2023-07-02T01:01:00.000+0000,2022-12-19
2,2,1,98941,AT,36.35,47658,2023-07-02T01:01:00.000+0000,2022-12-23
3,3,1,98941,,32.39,47659,2023-07-02T01:01:00.000+0000,2022-12-23
4,3,2,98943,,19.58,47660,2023-07-02T01:01:00.000+0000,2022-12-23
5,4,1,98940,AT,25.0,47661,2023-07-02T01:01:00.000+0000,2022-12-23
6,5,1,98941,AT,28.0,47662,2023-07-02T01:01:00.000+0000,2022-12-19
7,6,1,98941,AT,36.35,47663,2023-07-02T01:01:00.000+0000,2022-12-23
8,7,1,98941,,32.39,47664,2023-07-02T01:01:00.000+0000,2022-12-23
9,7,2,98943,,19.58,47665,2023-07-02T01:01:00.000+0000,2022-12-23
10,8,1,98940,AT,25.0,47666,2023-07-02T01:01:00.000+0000,2022-12-23


'In_835X221_ServiceInfo'wasbs://bronze@835837bronze.blob.core.windows.net/835837/\[edi\].\[In_835X221_ServiceInfo\]/2023/06/06
wasbs://silver@835837bronze.blob.core.windows.net/835837/In_835X221_ServiceInfo


_c0,_c1,_c3,_c4,_c5,_c8,_c9,_c22,_c53,_c57,_c58,_c61
1,1,97150,GP,GY,35,12.01,20230313,108278,B6,15.01,2023-05-31 17:25:44.5370000
2,1,97012,,,15,4.8,20230313,108279,B6,6.0,2023-05-31 17:25:44.8830000
3,1,98940,,,55,2.98,20230313,108280,B6,23.73,2023-05-31 17:25:45.2130000
4,1,97150,GP,GY,35,12.01,20230314,108281,B6,15.01,2023-05-31 17:25:45.5230000
5,1,97012,,,15,4.8,20230314,108282,B6,6.0,2023-05-31 17:25:45.8430000
6,1,98940,,,55,2.98,20230314,108283,B6,23.73,2023-05-31 17:25:46.1630000
7,1,97150,GP,GY,35,12.01,20230316,108284,B6,15.01,2023-05-31 17:25:46.5600000
8,1,97012,,,15,4.8,20230316,108285,B6,6.0,2023-05-31 17:25:46.8870000
9,1,98940,,,55,2.98,20230316,108286,B6,23.73,2023-05-31 17:25:47.1970000
10,1,97150,GP,GY,35,12.01,20230320,108287,B6,15.01,2023-05-31 17:25:47.6170000


ServiceInfo_ID,ClaimPay_ID,CPTCode,CPTModifier1,CPTModifier2,LineChargeAmt,ProviderPayAmt,LineItemControl_ID,ServiceSupp_ID,ServiceSuppAmt,updated,DateOfService
1,1,97150,GP,GY,35.0,12.01,108278,B6,15.01,2023-07-02T01:01:00.000+0000,2023-03-13
2,1,97012,,,15.0,4.8,108279,B6,6.0,2023-07-02T01:01:00.000+0000,2023-03-13
3,1,98940,,,55.0,2.98,108280,B6,23.73,2023-07-02T01:01:00.000+0000,2023-03-13
4,1,97150,GP,GY,35.0,12.01,108281,B6,15.01,2023-07-02T01:01:00.000+0000,2023-03-14
5,1,97012,,,15.0,4.8,108282,B6,6.0,2023-07-02T01:01:00.000+0000,2023-03-14
6,1,98940,,,55.0,2.98,108283,B6,23.73,2023-07-02T01:01:00.000+0000,2023-03-14
7,1,97150,GP,GY,35.0,12.01,108284,B6,15.01,2023-07-02T01:01:00.000+0000,2023-03-16
8,1,97012,,,15.0,4.8,108285,B6,6.0,2023-07-02T01:01:00.000+0000,2023-03-16
9,1,98940,,,55.0,2.98,108286,B6,23.73,2023-07-02T01:01:00.000+0000,2023-03-16
10,1,97150,GP,GY,35.0,12.01,108287,B6,15.01,2023-07-02T01:01:00.000+0000,2023-03-20


'In_835X221_ClaimPayInfo'wasbs://bronze@835837bronze.blob.core.windows.net/835837/\[edi\].\[In_835X221_ClaimPayInfo\]/2023/06/06
wasbs://silver@835837bronze.blob.core.windows.net/835837/In_835X221_ClaimPayInfo


_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c42,_c43,_c47,_c67,_c68,_c70,_c107,_c120,_c121,_c130,_c131
1,1,14682,1,525,98.95,124.75,12,231506105100,FULLER FOUNTAS,TAMRA,MRJ822024858,1811913213,,,,,20230324,,2023-05-31 17:25:44.2570000
2,1,14666,1,115,27.43,26.86,12,231507119900,FULLER FOUNTAS,TAMRA,MRJ822024858,1811913213,,,,,20230324,,2023-05-31 17:25:49.7930000
3,2,14670,1,285,0.0,270.0,13,DU97103457 0514457073,MARTINEZ,TONY,,1316131873,,,,,20230324,,2023-05-31 17:25:54.4230000
4,2,14692,1,230,0.0,200.0,13,DU97104840 0514369035,MARTINEZ,TONY,,1316131873,,,MA15,,20230324,,2023-05-31 17:25:56.1670000
5,2,14692,1,115,0.0,100.0,13,DU97104840 0514369037,MARTINEZ,TONY,,1316131873,,,MA15,,20230324,,2023-05-31 17:25:57.9330000
6,3,14663,19,65,0.0,,HM,33-9282483-03-045,VALDOVINOS,ALEJANDRO,928248303,1811913213,,,,,20230324,,2023-05-31 17:25:59.6900000
7,3,14674,19,390,0.0,,HM,33-9282483-03-044,VALDOVINOS,ALEJANDRO,928248303,1811913213,,,,,20230324,,2023-05-31 17:26:00.4000000
8,4,14701,1,350,0.0,75.0,13,DU97104841 0272044358,VIERA,RICH,,1811913213,,,MA15,,20230324,,2023-05-31 17:26:05.8200000
9,4,14701,1,165,0.0,70.0,13,DU97104841 0272044364,VIERA,RICH,,1811913213,,,MA15,,20230324,,2023-05-31 17:26:08.3700000
10,5,14700,1,285,7.26,148.4,13,ECAC41GP20000,RAMOS,KRISEL,W273136404,1811913213,,,,,20230324,,2023-05-31 17:26:13.2330000


ClaimPayInfo_ID,Detail_ID,PatientControl_ID,ClaimStatusCode,ClaimAmt,ClaimPayAmt,PatientRespAmt,ClaimFilingInd,PayerControl_ID,PatientLastName,PatientFirstName,PatientIdent,ProviderNPI,COBName,COB_ID,ClaimRemarkCode,ClaimSuppAmt,updated,CoverageExpDate,SubmissionDate
1,1,14682,1,525.0,98.95,124.75,12,231506105100,FULLER FOUNTAS,TAMRA,MRJ822024858,1811913213,,,,,2023-07-02T01:01:00.000+0000,,2023-03-24
2,1,14666,1,115.0,27.43,26.86,12,231507119900,FULLER FOUNTAS,TAMRA,MRJ822024858,1811913213,,,,,2023-07-02T01:01:00.000+0000,,2023-03-24
3,2,14670,1,285.0,0.0,270.0,13,DU97103457 0514457073,MARTINEZ,TONY,,1316131873,,,,,2023-07-02T01:01:00.000+0000,,2023-03-24
4,2,14692,1,230.0,0.0,200.0,13,DU97104840 0514369035,MARTINEZ,TONY,,1316131873,,,MA15,,2023-07-02T01:01:00.000+0000,,2023-03-24
5,2,14692,1,115.0,0.0,100.0,13,DU97104840 0514369037,MARTINEZ,TONY,,1316131873,,,MA15,,2023-07-02T01:01:00.000+0000,,2023-03-24
6,3,14663,19,65.0,0.0,,HM,33-9282483-03-045,VALDOVINOS,ALEJANDRO,928248303,1811913213,,,,,2023-07-02T01:01:00.000+0000,,2023-03-24
7,3,14674,19,390.0,0.0,,HM,33-9282483-03-044,VALDOVINOS,ALEJANDRO,928248303,1811913213,,,,,2023-07-02T01:01:00.000+0000,,2023-03-24
8,4,14701,1,350.0,0.0,75.0,13,DU97104841 0272044358,VIERA,RICH,,1811913213,,,MA15,,2023-07-02T01:01:00.000+0000,,2023-03-24
9,4,14701,1,165.0,0.0,70.0,13,DU97104841 0272044364,VIERA,RICH,,1811913213,,,MA15,,2023-07-02T01:01:00.000+0000,,2023-03-24
10,5,14700,1,285.0,7.26,148.4,13,ECAC41GP20000,RAMOS,KRISEL,W273136404,1811913213,,,,,2023-07-02T01:01:00.000+0000,,2023-03-24
