### Setting up the configuration  

In [1]:
%%configure -f 
{"executorMemory":"150G"}

### Setting up the iceberg configuration  

In [2]:
%%configure -f
{
  "conf": {
    "spark.sql.files.maxPartitionBytes": "1073741824"
  }
}

### Here we are starting the spark application

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import types as T
import pyspark.sql.functions as F
from pyspark.sql.functions import lit
from pyspark.sql.types import IntegerType,BooleanType,DateType,LongType,DoubleType, StringType
from pyspark.sql.functions import row_number,lit,col,when
from pyspark.sql.window import Window
from pyspark.sql.functions import *
import time
spark = SparkSession.builder \
    .master("yarn") \
    .appName("Excellus-UHC-1") \
    .getOrCreate()


VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
4,application_1683567174230_0005,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### The below function is used to structure(flatten) the unstructured raw data schema 

In [4]:
def flatten(df):
    complex_fields = dict([
        (field.name, field.dataType) 
        for field in df.schema.fields 
        if isinstance(field.dataType, T.ArrayType) or isinstance(field.dataType, T.StructType)
    ])
    
    qualify = list(complex_fields.keys())[0] + "_"

    while len(complex_fields) != 0:
        col_name = list(complex_fields.keys())[0]
        
        if isinstance(complex_fields[col_name], T.StructType):
            expanded = [F.col(col_name + '.' + k).alias(col_name + '_' + k) 
                        for k in [ n.name for n in  complex_fields[col_name]]
                       ]
            
            df = df.select("*", *expanded).drop(col_name)
    
        elif isinstance(complex_fields[col_name], T.ArrayType): 
            df = df.withColumn(col_name, F.explode_outer(col_name))
    
      
        complex_fields = dict([
            (field.name, field.dataType)
            for field in df.schema.fields
            if isinstance(field.dataType, T.ArrayType) or isinstance(field.dataType, T.StructType)
        ])
        
    for df_col_name in df.columns:
        df = df.withColumnRenamed(df_col_name, df_col_name.replace(qualify, ""))
    return df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Here bellow we have to provide the S3 URI where the "uhc_reporting_details.csv" file is saved and we are reading that csv file

In [5]:
path = "s3://zigna-nsa-data-staging/reporting-details/Q2-2023/may-wellmark-uhc/may_wellmark_uhc_reporting_details.csv"
# uhc_files = spark.read.parquet(path)
uhc_files = spark.read.options(header = True).csv(path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Here we are checking the schema(structure) of the imported csv file and this file will have 5 columns
<br>
0.Index
<br>
1.file_name
<br>
2.reporting_entity_name
<br>
3.reporting_entity_type
<br>
4.last_updated_on
<br>
5.version

In [6]:
uhc_files.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- file_name: string (nullable = true)
 |-- reporting_entity_name: string (nullable = true)
 |-- reporting_entity_type: string (nullable = true)
 |-- last_updated_on: string (nullable = true)
 |-- version: string (nullable = true)

### Here we are collecting and assigning the 5 variables data separately

In [7]:
files = uhc_files[['file_name']].collect()
ren = uhc_files[['reporting_entity_name']].collect()
ret = uhc_files[['reporting_entity_type']].collect()
lun = uhc_files[['last_updated_on']].collect()
ver = uhc_files[['version']].collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Here we are crearting 5 lists and adding the the above 5 variables data to process by using the index 

In [8]:
file_names = []
rep_name = []
rep_type = []
last_up_on = []
vers = []
for i,j,k,l, m in zip(files,ren,ret,lun,ver):
    file_names.append(i['file_name'])
    rep_name.append(j['reporting_entity_name'])
    rep_type.append(k['reporting_entity_type'])
    last_up_on.append(l['last_updated_on'])
    vers.append(m['version'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
len(file_names)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

37

### Here we are just checking that we are getting the proper output as we required or not

In [9]:
idx = 12
file_names[idx]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'2023-05-01_Surest_Third-Party-Administrator_DOCTORS-ON-DEMAND_UNITEDHEALTHCARE-CHOICE-PLUS_-DD_UCEQ_in-network-rates'

### Here we have to change the number to the index number of the file which we want to process now

In [153]:
ff = file_names[idx]
rn = rep_name[idx]
rt = rep_type[idx]
luo = last_up_on[idx]
versi = vers[idx]
print(f'{rn}')
print(f'{rt}')
print(f'{luo}')
print(f'{versi}')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

United HealthCare Services, Inc.
Third-Party Administrator
2023-05-01
1.0.0

### Here we are assiging the 'fname' to 'in_path', so that we can read the in_network file from that in_network raw data path

In [154]:
in_net = "s3://zigna-nsa-data-staging/raw-json-files/Q2-2023/may-wellmark-uhc/" + ff + ".json"
processed_fname = ff.replace('2023-05-01_','').replace('_in-network-rates','')
print(f'Link: {in_net} \n\nFile name: {ff} \n\n Processed file name : {processed_fname}')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Link: s3://zigna-nsa-data-staging/raw-json-files/Q2-2023/may-wellmark-uhc/2023-05-01_United-HealthCare-Services--Inc-_Third-Party-Administrator_Navigate-EPO_636_in-network-rates.json 

File name: 2023-05-01_United-HealthCare-Services--Inc-_Third-Party-Administrator_Navigate-EPO_636_in-network-rates 

 Processed file name : United-HealthCare-Services--Inc-_Third-Party-Administrator_Navigate-EPO_636

### Bellow we are creating the provider_referenece raw data file path by using in_path and splitting the file name from the link and assiging it to the 'fname'

In [155]:
prov_ref = in_net.replace('raw-json-files','provider-references').replace('.json','') + '/'
prov_ref

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

's3://zigna-nsa-data-staging/provider-references/Q2-2023/may-wellmark-uhc/2023-05-01_United-HealthCare-Services--Inc-_Third-Party-Administrator_Navigate-EPO_636_in-network-rates/'

# in-network level

### Here we are reading the in_netwrork raw data json file

In [156]:
# fname = '2023-04-01_UMR--Inc-_Third-Party-Administrator_ALLIANCE-WITH-NEHA-H-W-3-TIER_UHC-OPTIONS-TRAVEL-WITH-MULTIPLAN-BENCHMARK_WOBC_58TQ_in-network-rates'
# in_net = 's3://zigna-nsa-data-staging/test/test-uhc/uhc-test9/'+fname+'.json'
# prov_ref = 's3://zigna-nsa-data-staging/test/test-uhc/provider-references/'+fname+'/'
df1 = spark.read.options(allowNumericLeadingZero = True).json(in_net)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [157]:
df1.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- _corrupt_record: string (nullable = true)
 |-- billing_code: string (nullable = true)
 |-- billing_code_type: string (nullable = true)
 |-- billing_code_type_version: string (nullable = true)
 |-- covered_services: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- description: string (nullable = true)
 |-- name: string (nullable = true)
 |-- negotiated_rates: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- negotiated_prices: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- additional_information: string (nullable = true)
 |    |    |    |    |-- billing_class: string (nullable = true)
 |    |    |    |    |-- billing_code_modifier: array (nullable = true)
 |    |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |    |-- expiration_date: string (nullable = true)
 |    |    |    |    |-- negotiated_rate: double (nullable = true)


In [158]:
df2 = flatten(df1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [159]:
df2.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- _corrupt_record: string (nullable = true)
 |-- billing_code: string (nullable = true)
 |-- billing_code_type: string (nullable = true)
 |-- billing_code_type_version: string (nullable = true)
 |-- covered_services: string (nullable = true)
 |-- description: string (nullable = true)
 |-- name: string (nullable = true)
 |-- negotiation_arrangement: string (nullable = true)
 |-- negotiated_rates_provider_references: long (nullable = true)
 |-- negotiated_rates_negotiated_prices_additional_information: string (nullable = true)
 |-- negotiated_rates_negotiated_prices_billing_class: string (nullable = true)
 |-- negotiated_rates_negotiated_prices_billing_code_modifier: string (nullable = true)
 |-- negotiated_rates_negotiated_prices_expiration_date: string (nullable = true)
 |-- negotiated_rates_negotiated_prices_negotiated_rate: double (nullable = true)
 |-- negotiated_rates_negotiated_prices_negotiated_type: string (nullable = true)
 |-- negotiated_rates_negotiated_prices_service

In [160]:
# df2[['covered_services']].distinct().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [161]:
df2 = df2.drop("covered_services")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [162]:
# rn = 'UMR, Inc.'
# rt ='Third-Party Administrator'
luo = '05-01-2023'
# versi = '1.0.0'
df2 = df2.withColumn('reporting_entity_name',lit(rn))
df2 = df2.withColumn('reporting_entity_type',lit(rt))
df2 = df2.withColumn('last_updated_on',lit(luo))
df2 = df2.withColumn('version',lit(versi))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [163]:
df2 = df2.drop('_corrupt_record')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [164]:
# ff = in_net.replace('s3://zigna-nsa-data-staging/test/test-uhc/uhc-test7/',"").replace("_in-network-rates.json","")
# processed_fname =in_net.replace('s3://zigna-nsa-data-staging/test/test-uhc/uhc-test7/2023-04-01_',"").replace("_in-network-rates.json","")

print(ff,"\n",processed_fname)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2023-05-01_United-HealthCare-Services--Inc-_Third-Party-Administrator_Navigate-EPO_636_in-network-rates 
 United-HealthCare-Services--Inc-_Third-Party-Administrator_Navigate-EPO_636

In [165]:
df2 = df2.withColumn("service_code", when(df2.negotiated_rates_negotiated_prices_service_code == "11","Office")
                                 .when(df2.negotiated_rates_negotiated_prices_service_code == "20","Urgent Care")                            
                                 .when(df2.negotiated_rates_negotiated_prices_service_code.isin(['21','22','23']),"IP/OP/ED")
                                 .otherwise("All Other"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [166]:
df2 = df2.withColumnRenamed('negotiated_prices_expiration_date','expiration_date')
df2 = df2.drop('negotiated_prices_service_code')
df2 = df2.drop('negotiated_rates_negotiated_prices_service_code')
df2 = df2.withColumnRenamed('negotiated_prices_negotiated_rate','negotiated_rate').withColumnRenamed('negotiated_prices_negotiated_type','negotiated_type').withColumnRenamed('negotiated_prices_additional_information','additional_information')
df2 = df2.withColumnRenamed('negotiated_prices_billing_class','billing_class').withColumnRenamed('negotiated_prices_billing_code_modifier','billing_code_modifier')
df2 = df2.withColumnRenamed('negotiated_rates_negotiated_prices_expiration_date','expiration_date').withColumnRenamed('negotiated_rates_provider_references','provider_references')
df2 = df2.withColumnRenamed('negotiated_rates_negotiated_prices_negotiated_rate','negotiated_rate').withColumnRenamed('negotiated_rates_negotiated_prices_negotiated_type','negotiated_type').withColumnRenamed('negotiated_rates_negotiated_prices_additional_information','additional_information')
df2 = df2.withColumnRenamed('negotiated_rates_negotiated_prices_billing_class','billing_class').withColumnRenamed('negotiated_rates_negotiated_prices_billing_code_modifier','billing_code_modifier')
df2 = df2.withColumn('covered_billing_code',lit('')).withColumn('covered_billing_code_type',lit('')).withColumn('covered_billing_code_type_version',lit('')).withColumn('covered_description',lit(''))
df2 = df2.withColumn('bundled_billing_code',lit('')).withColumn('bundled_billing_code_type',lit('')).withColumn('bundled_billing_code_type_version',lit('')).withColumn('bundled_description',lit(''))
df2 = df2.withColumn('file_name',lit(ff)).withColumn('location',lit('')).withColumn('processed_file_name',lit(processed_fname))
# df2 = df2.withColumn('additional_information',lit(''))
df2 = df2.withColumn("negotiated_rate",df2.negotiated_rate.cast(DoubleType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [167]:
df3 =  df2.dropna(subset=['negotiated_rate'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [168]:
df3 = df3.filter(df3.negotiated_rate > 0.0)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [169]:
df3_reordered =  df3.select("file_name","processed_file_name","reporting_entity_name","reporting_entity_type","last_updated_on","version","billing_code","billing_code_type","billing_code_type_version","name","description","billing_class","billing_code_modifier","negotiation_arrangement","negotiated_type","negotiated_rate","service_code","provider_references","location","expiration_date","additional_information","covered_billing_code","covered_billing_code_type","covered_billing_code_type_version","covered_description","bundled_billing_code","bundled_billing_code_type","bundled_billing_code_type_version","bundled_description")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [170]:
df3_reordered.printSchema()
len(df3_reordered.columns)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- file_name: string (nullable = false)
 |-- processed_file_name: string (nullable = false)
 |-- reporting_entity_name: string (nullable = false)
 |-- reporting_entity_type: string (nullable = false)
 |-- last_updated_on: string (nullable = false)
 |-- version: string (nullable = false)
 |-- billing_code: string (nullable = true)
 |-- billing_code_type: string (nullable = true)
 |-- billing_code_type_version: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- billing_class: string (nullable = true)
 |-- billing_code_modifier: string (nullable = true)
 |-- negotiation_arrangement: string (nullable = true)
 |-- negotiated_type: string (nullable = true)
 |-- negotiated_rate: double (nullable = true)
 |-- service_code: string (nullable = false)
 |-- provider_references: long (nullable = true)
 |-- location: string (nullable = false)
 |-- expiration_date: string (nullable = true)
 |-- additional_information: string (nullable =

In [171]:
# df3_reordered.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [172]:
# df3_reordered_deduped.distinct().count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [173]:
# df3_reordered[['service_code']].distinct().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [174]:
df3_reordered[['billing_code_type']].distinct().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------+
|billing_code_type|
+-----------------+
|           MS-DRG|
|               RC|
|              CPT|
|              ICD|
|            HCPCS|
|         CSTM-ALL|
+-----------------+

In [175]:
df3_reordered.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+---------------------+---------------------+---------------+-------+------------+-----------------+-------------------------+--------------------+--------------------+-------------+---------------------+-----------------------+---------------+---------------+------------+-------------------+--------+---------------+----------------------+--------------------+-------------------------+---------------------------------+-------------------+--------------------+-------------------------+---------------------------------+-------------------+
|           file_name| processed_file_name|reporting_entity_name|reporting_entity_type|last_updated_on|version|billing_code|billing_code_type|billing_code_type_version|                name|         description|billing_class|billing_code_modifier|negotiation_arrangement|negotiated_type|negotiated_rate|service_code|provider_references|location|expiration_date|additional_information|covered_billing_code|covered_bi

In [176]:
df3_reordered_deduped = df3_reordered.distinct()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [177]:
if str(df3_reordered_deduped.schema["file_name"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["negotiated_rate"].dataType) == 'DoubleType' and str(df3_reordered_deduped.schema["reporting_entity_name"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["reporting_entity_type"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["last_updated_on"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["version"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["billing_code"].dataType) == 'StringType'and str(df3_reordered_deduped.schema["billing_code_type"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["billing_code_type_version"].dataType) == 'StringType'and str(df3_reordered_deduped.schema["description"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["name"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["billing_class"].dataType) == 'StringType'and str(df3_reordered_deduped.schema["billing_code_modifier"].dataType) == 'StringType'and str(df3_reordered_deduped.schema["negotiation_arrangement"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["negotiated_type"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["service_code"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["provider_references"].dataType) == 'LongType'and str(df3_reordered_deduped.schema["location"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["expiration_date"].dataType) == 'StringType'and str(df3_reordered_deduped.schema["additional_information"].dataType) == 'StringType'and str(df3_reordered_deduped.schema["bundled_billing_code"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["bundled_billing_code_type"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["bundled_billing_code_type_version"].dataType) == 'StringType'and str(df3_reordered_deduped.schema["bundled_description"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["covered_billing_code"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["covered_billing_code_type"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["covered_billing_code_type_version"].dataType) == 'StringType'and str(df3_reordered_deduped.schema["covered_description"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["processed_file_name"].dataType) == 'StringType' and len(df3_reordered_deduped.columns) == 29:
    df3_reordered_deduped.write.mode("append").parquet(f"s3://zigna-nsa-payer-data-parquet-staging/Q2-2023/wellmark/may-wellmark-uhc/may-wellmark-uhc-in-network/{ff}/")
#     print('true')
else:
    print('error')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### We have check the sample data, to make sure the data is looking fine to process further
<br>
If you find anything wrong in the data please raise a flag

In [174]:
# df3_reordered_deduped.show(n=1000)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Need to cross check the distinct values in 'billing_code_type' columns

In [175]:
# df3_reordered_deduped[['billing_code_type']].distinct().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Need to cross check the distinct values in 'negotiated_type' columns

In [176]:
# df3_reordered_deduped[['negotiation_arrangement']].distinct().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [177]:
# df3_reordered_deduped[['negotiated_type']].distinct().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Need to cross check the distinct values in 'billing_class' columns

In [178]:
# df3_reordered_deduped[['billing_class']].distinct().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [179]:
# bc = df3_reordered_deduped.filter(df3_reordered_deduped.billing_code_type == 'APR-DRG')
# bc[['billing_code_type']].distinct().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [180]:
# bc = df3_reordered_deduped.filter(df3_reordered_deduped.billing_code_type == 'MS-DRG')
# bc[['billing_code']].distinct().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Provider references level


### Here we are reading the provider_references raw data file

# idxx

In [203]:
idx = 36
file_names[idx]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'2023-05-01_Surest_Third-Party-Administrator_UHSS---TERMED-GROUP_UNITEDHEALTHCARE-CHOICE-PLUS_-K_UBNT_in-network-rates'

In [204]:
ff = file_names[idx]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [205]:
in_net = "s3://zigna-nsa-data-staging/raw-json-files/Q2-2023/may-wellmark-uhc/" + ff + ".json"
processed_fname = ff.replace('2023-05-01_','').replace('_in-network-rates','')
print(f'Link: {in_net} \n\nFile name: {ff} \n\n Processed file name : {processed_fname}')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Link: s3://zigna-nsa-data-staging/raw-json-files/Q2-2023/may-wellmark-uhc/2023-05-01_Surest_Third-Party-Administrator_UHSS---TERMED-GROUP_UNITEDHEALTHCARE-CHOICE-PLUS_-K_UBNT_in-network-rates.json 

File name: 2023-05-01_Surest_Third-Party-Administrator_UHSS---TERMED-GROUP_UNITEDHEALTHCARE-CHOICE-PLUS_-K_UBNT_in-network-rates 

 Processed file name : Surest_Third-Party-Administrator_UHSS---TERMED-GROUP_UNITEDHEALTHCARE-CHOICE-PLUS_-K_UBNT

### Bellow we are creating the provider_referenece raw data file path by using in_path and splitting the file name from the link and assiging it to the 'fname'

In [206]:
prov_ref = in_net.replace('raw-json-files','provider-references').replace('.json','') + '/'
prov_ref

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

's3://zigna-nsa-data-staging/provider-references/Q2-2023/may-wellmark-uhc/2023-05-01_Surest_Third-Party-Administrator_UHSS---TERMED-GROUP_UNITEDHEALTHCARE-CHOICE-PLUS_-K_UBNT_in-network-rates/'

In [207]:
df4 = spark.read.options(allowNumericLeadingZero = True).json(prov_ref)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [208]:
df4.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- provider_group_id: long (nullable = true)
 |-- provider_groups: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- npi: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)
 |    |    |-- tin: struct (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |    |-- value: string (nullable = true)

In [209]:
df5 = flatten(df4)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [210]:
df5 = df5.withColumn("npi",df5.npi.cast(StringType()))
df5 = df5.withColumn('file_name',lit(ff)).withColumn('processed_file_name',lit(processed_fname))
print(ff,'\n',processed_fname)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2023-05-01_Surest_Third-Party-Administrator_UHSS---TERMED-GROUP_UNITEDHEALTHCARE-CHOICE-PLUS_-K_UBNT_in-network-rates 
 Surest_Third-Party-Administrator_UHSS---TERMED-GROUP_UNITEDHEALTHCARE-CHOICE-PLUS_-K_UBNT

In [211]:
df5.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- provider_group_id: long (nullable = true)
 |-- npi: string (nullable = true)
 |-- tin_type: string (nullable = true)
 |-- tin_value: string (nullable = true)
 |-- file_name: string (nullable = false)
 |-- processed_file_name: string (nullable = false)

In [212]:
df6 = df5.distinct()
df6.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2813207

In [213]:
# pgi = df6[['provider_group_id']].distinct().collect()
# upgi = []
# for i in pgi:
#     upgi.append(i['provider_group_id'])
# inpgi = df3_reordered[['provider_references']].distinct().collect()
# inupgi = []
# for i in inpgi:
#     inupgi.append(i['provider_references'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [201]:
# print(len(upgi),len(inupgi),len(set(upgi).intersection(set(inupgi))))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [214]:
if str(df6.schema["provider_group_id"].dataType) == 'LongType' and str(df6.schema["npi"].dataType) == 'StringType' and str(df6.schema["tin_type"].dataType) == 'StringType' and str(df6.schema["tin_value"].dataType) == 'StringType' and str(df6.schema["file_name"].dataType) == 'StringType' and str(df6.schema["processed_file_name"].dataType) == 'StringType' and len(df6.columns)==6:
    
    df6.write.mode("append").parquet(f"s3://zigna-nsa-payer-data-parquet-staging/Q2-2023/wellmark/may-wellmark-uhc/may-wellmark-uhc-provider-references/{ff}/")
else:
    print('error')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### checking for size of the dataframe

In [495]:

# df6.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…





## Before running the bellow line of code please make sure the path

### Here we are generating parquet files

In [10]:
import time

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
error = []

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
for idx in [32,33,34,35,36]: #
    ff = file_names[idx]
    rn = rep_name[idx]
    rt = rep_type[idx]
    luo = last_up_on[idx]
    versi = vers[idx]
    in_net = "s3://zigna-nsa-data-staging/raw-json-files/Q2-2023/may-wellmark-uhc/" + ff + ".json"
    processed_fname = ff.replace('2023-05-01_','').replace('_in-network-rates','')
    prov_ref = in_net.replace('raw-json-files','provider-references').replace('.json','') + '/'
    df1 = spark.read.options(allowNumericLeadingZero = True).json(in_net)
    df2 = flatten(df1)
    df2 = df2.drop("covered_services")
    luo = '05-01-2023'
    df2 = df2.withColumn('reporting_entity_name',lit(rn))
    df2 = df2.withColumn('reporting_entity_type',lit(rt))
    df2 = df2.withColumn('last_updated_on',lit(luo))
    df2 = df2.withColumn('version',lit(versi))
    df2 = df2.drop('_corrupt_record')
    if 'negotiated_rates_negotiated_prices_service_code' in list(df2.columns):
        df2 = df2.withColumn("service_code", when(df2.negotiated_rates_negotiated_prices_service_code == "11","Office")
                                         .when(df2.negotiated_rates_negotiated_prices_service_code == "20","Urgent Care")                            
                                         .when(df2.negotiated_rates_negotiated_prices_service_code.isin(['21','22','23']),"IP/OP/ED")
                                         .otherwise("All Other"))
    else:
        df2 = df2.withColumn("service_code", when(df2.negotiated_prices_service_code == "11","Office")
                                         .when(df2.negotiated_prices_service_code == "20","Urgent Care")                            
                                         .when(df2.negotiated_prices_service_code.isin(['21','22','23']),"IP/OP/ED")
                                         .otherwise("All Other"))
    df2 = df2.withColumnRenamed('negotiated_prices_expiration_date','expiration_date')
    df2 = df2.drop('negotiated_prices_service_code')
    df2 = df2.drop('negotiated_rates_negotiated_prices_service_code')
    df2 = df2.drop('negotiated_prices_service_code')
    df2 = df2.withColumnRenamed('negotiated_prices_negotiated_rate','negotiated_rate').withColumnRenamed('negotiated_prices_negotiated_type','negotiated_type').withColumnRenamed('negotiated_prices_additional_information','additional_information')
    df2 = df2.withColumnRenamed('negotiated_prices_billing_class','billing_class').withColumnRenamed('negotiated_prices_billing_code_modifier','billing_code_modifier')
    df2 = df2.withColumnRenamed('negotiated_rates_negotiated_prices_expiration_date','expiration_date').withColumnRenamed('negotiated_rates_provider_references','provider_references')
    df2 = df2.withColumnRenamed('negotiated_rates_negotiated_prices_negotiated_rate','negotiated_rate').withColumnRenamed('negotiated_rates_negotiated_prices_negotiated_type','negotiated_type').withColumnRenamed('negotiated_rates_negotiated_prices_additional_information','additional_information')
    df2 = df2.withColumnRenamed('negotiated_rates_negotiated_prices_billing_class','billing_class').withColumnRenamed('negotiated_rates_negotiated_prices_billing_code_modifier','billing_code_modifier')
    df2 = df2.withColumn('covered_billing_code',lit('')).withColumn('covered_billing_code_type',lit('')).withColumn('covered_billing_code_type_version',lit('')).withColumn('covered_description',lit(''))
    df2 = df2.withColumn('bundled_billing_code',lit('')).withColumn('bundled_billing_code_type',lit('')).withColumn('bundled_billing_code_type_version',lit('')).withColumn('bundled_description',lit(''))
    df2 = df2.withColumn('file_name',lit(ff)).withColumn('location',lit('')).withColumn('processed_file_name',lit(processed_fname))
    # df2 = df2.withColumn('additional_information',lit(''))
    df2 = df2.withColumn("negotiated_rate",df2.negotiated_rate.cast(DoubleType()))
    df3 =  df2.dropna(subset=['negotiated_rate'])
    df3 = df3.filter(df3.negotiated_rate > 0.0)
    df3_reordered =  df3.select("file_name","processed_file_name","reporting_entity_name","reporting_entity_type","last_updated_on","version","billing_code","billing_code_type","billing_code_type_version","name","description","billing_class","billing_code_modifier","negotiation_arrangement","negotiated_type","negotiated_rate","service_code","provider_references","location","expiration_date","additional_information","covered_billing_code","covered_billing_code_type","covered_billing_code_type_version","covered_description","bundled_billing_code","bundled_billing_code_type","bundled_billing_code_type_version","bundled_description")
    df3_reordered_deduped = df3_reordered.distinct()
    if str(df3_reordered_deduped.schema["file_name"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["negotiated_rate"].dataType) == 'DoubleType' and str(df3_reordered_deduped.schema["reporting_entity_name"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["reporting_entity_type"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["last_updated_on"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["version"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["billing_code"].dataType) == 'StringType'and str(df3_reordered_deduped.schema["billing_code_type"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["billing_code_type_version"].dataType) == 'StringType'and str(df3_reordered_deduped.schema["description"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["name"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["billing_class"].dataType) == 'StringType'and str(df3_reordered_deduped.schema["billing_code_modifier"].dataType) == 'StringType'and str(df3_reordered_deduped.schema["negotiation_arrangement"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["negotiated_type"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["service_code"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["provider_references"].dataType) == 'LongType'and str(df3_reordered_deduped.schema["location"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["expiration_date"].dataType) == 'StringType'and str(df3_reordered_deduped.schema["additional_information"].dataType) == 'StringType'and str(df3_reordered_deduped.schema["bundled_billing_code"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["bundled_billing_code_type"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["bundled_billing_code_type_version"].dataType) == 'StringType'and str(df3_reordered_deduped.schema["bundled_description"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["covered_billing_code"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["covered_billing_code_type"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["covered_billing_code_type_version"].dataType) == 'StringType'and str(df3_reordered_deduped.schema["covered_description"].dataType) == 'StringType' and str(df3_reordered_deduped.schema["processed_file_name"].dataType) == 'StringType' and len(df3_reordered_deduped.columns) == 29:
        df3_reordered_deduped.write.mode("append").parquet(f"s3://zigna-nsa-payer-data-parquet-staging/Q2-2023/wellmark/may-wellmark-uhc/may-wellmark-uhc-in-network/{ff}/")
    else:
        print('error')
        error.append(idx)
    time.sleep(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
error

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[]

In [16]:
file_names[9]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'2023-05-01_United-HealthCare-Services--Inc-_Third-Party-Administrator_GIL-75_EE_in-network-rates'

In [13]:
df3_reordered_deduped[[]].show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+---------------------+---------------------+---------------+-------+------------+-----------------+-------------------------+--------------------+--------------------+-------------+---------------------+-----------------------+---------------+---------------+------------+-------------------+--------+---------------+----------------------+--------------------+-------------------------+---------------------------------+-------------------+--------------------+-------------------------+---------------------------------+-------------------+
|           file_name| processed_file_name|reporting_entity_name|reporting_entity_type|last_updated_on|version|billing_code|billing_code_type|billing_code_type_version|                name|         description|billing_class|billing_code_modifier|negotiation_arrangement|negotiated_type|negotiated_rate|service_code|provider_references|location|expiration_date|additional_information|covered_billing_code|covered_bi