## Load CMS Medicare Part D data to Lakehouse

1. Download the csv files to your local storage
2. Upload the files into the LakeHouse using azure storage explorer
3. Verify the path is correct in the file_dict for the files to be picked up

This spark notebook will read the files and append them into one single delta parquet table called 'cms_provider_drug_costs'



In [10]:
spark.sql("DROP TABLE IF EXISTS cms_provider_drug_costs")

StatementMeta(, 660acfce-f548-4cf1-a814-e7d55bb642fd, 12, Finished, Available)

DataFrame[]

In [11]:
file_dict = {
                2019 : "Files/Medicare_Part_D_Prescribers_by_Provider_and_Drug_2019-Atorvastatin Calcium.csv",                 
                2020 : "Files/Medicare_Part_D_Prescribers_by_Provider_and_Drug_2020-Levothyroxine Sodium.csv",
                2021 :  "Files/Medicare_Part_D_Prescribers_by_Provider_and_Drug_2021-Clopidogrel.csv"
            }

StatementMeta(, 660acfce-f548-4cf1-a814-e7d55bb642fd, 13, Finished, Available)

In [12]:
from pyspark.sql.types import LongType, DecimalType
from pyspark.sql.functions import lit, col, concat

first_file = True

for key, v in file_dict.items():
    print(f"Key: {key}, Value: {v}")

    df = spark.read.format("csv").option("header","true").option("inferschema","true").load(v)

    if first_file == True :
        mode = "overwrite"
        first_file = False #changing the flag to false for next run
        df.printSchema()
    else:
        mode = "append"
#changing names and few datatypes of columns
    df = df.withColumn("Year", lit(key)) \
        .withColumn("Tot_Drug_Cst", df.Tot_Drug_Cst.cast(DecimalType(10,2))) \
        .withColumn("Tot_30day_Fills", df.Tot_30day_Fills.cast(DecimalType(10,2))) \
        .withColumn("GE65_Tot_30day_Fills", df.GE65_Tot_30day_Fills.cast(DecimalType(10,2))) \
        .withColumn("GE65_Tot_Drug_Cst", df.GE65_Tot_Drug_Cst.cast(DecimalType(10,2))) \
        .withColumn("Prscrbr_City_State", concat(df.Prscrbr_City, lit(", "), df.Prscrbr_State_Abrvtn)) \
        .withColumn("Prscrbr_Full_Name", concat(df.Prscrbr_Last_Org_Name, lit(", "), df.Prscrbr_First_Name)) \
        .withColumn("Tot_Clms", df.Tot_Clms.cast(LongType())) \
        .withColumn("Tot_Day_Suply", df.Tot_Day_Suply.cast(LongType())) \
        .withColumn("Tot_Benes", df.Tot_Benes.cast(LongType())) \
        .withColumn("GE65_Tot_Clms", df.GE65_Tot_Clms.cast(LongType())) \
        .withColumn("GE65_Tot_Benes", df.GE65_Tot_Benes.cast(LongType())) \
        .withColumn("GE65_Tot_Day_Suply", df.GE65_Tot_Day_Suply.cast(LongType()))

    display(df)

    print(f'Writing {key} data to table - {df.count()} records')  
    df.write.mode(mode).format('delta').save(f"Tables/cms_provider_drug_costs")


StatementMeta(, 660acfce-f548-4cf1-a814-e7d55bb642fd, 14, Finished, Available)

Key: 2019, Value: Files/Medicare_Part_D_Prescribers_by_Provider_and_Drug_2019-Atorvastatin Calcium.csv
root
 |-- Prscrbr_NPI: integer (nullable = true)
 |-- Prscrbr_Last_Org_Name: string (nullable = true)
 |-- Prscrbr_First_Name: string (nullable = true)
 |-- Prscrbr_City: string (nullable = true)
 |-- Prscrbr_State_Abrvtn: string (nullable = true)
 |-- Prscrbr_State_FIPS: string (nullable = true)
 |-- Prscrbr_Type: string (nullable = true)
 |-- Prscrbr_Type_Src: string (nullable = true)
 |-- Brnd_Name: string (nullable = true)
 |-- Gnrc_Name: string (nullable = true)
 |-- Tot_Clms: integer (nullable = true)
 |-- Tot_30day_Fills: double (nullable = true)
 |-- Tot_Day_Suply: integer (nullable = true)
 |-- Tot_Drug_Cst: double (nullable = true)
 |-- Tot_Benes: integer (nullable = true)
 |-- GE65_Sprsn_Flag: string (nullable = true)
 |-- GE65_Tot_Clms: integer (nullable = true)
 |-- GE65_Tot_30day_Fills: double (nullable = true)
 |-- GE65_Tot_Drug_Cst: double (nullable = true)
 |-- GE65_T

SynapseWidget(Synapse.DataFrame, eb22b4dd-49f1-417e-9287-fce0149db232)

Writing 2019 data to table - 324089 records
Key: 2020, Value: Files/Medicare_Part_D_Prescribers_by_Provider_and_Drug_2020-Levothyroxine Sodium.csv


SynapseWidget(Synapse.DataFrame, 51d42def-718f-4f10-bd95-9aab3dc11021)

Writing 2020 data to table - 400337 records
Key: 2021, Value: Files/Medicare_Part_D_Prescribers_by_Provider_and_Drug_2021-Clopidogrel.csv


SynapseWidget(Synapse.DataFrame, da2dd6ba-4923-4825-b984-7ba4993763fe)

Writing 2021 data to table - 199214 records


In [13]:
df = spark.read.table("cms_provider_drug_costs")

display(df)

StatementMeta(, 660acfce-f548-4cf1-a814-e7d55bb642fd, 15, Finished, Available)

SynapseWidget(Synapse.DataFrame, 3806d3dc-a26a-4bc6-910b-47e215a72362)

In [14]:
print(df.count())

StatementMeta(, 660acfce-f548-4cf1-a814-e7d55bb642fd, 16, Finished, Available)

923640


In [15]:
df.printSchema()

StatementMeta(, 660acfce-f548-4cf1-a814-e7d55bb642fd, 17, Finished, Available)

root
 |-- Prscrbr_NPI: integer (nullable = true)
 |-- Prscrbr_Last_Org_Name: string (nullable = true)
 |-- Prscrbr_First_Name: string (nullable = true)
 |-- Prscrbr_City: string (nullable = true)
 |-- Prscrbr_State_Abrvtn: string (nullable = true)
 |-- Prscrbr_State_FIPS: string (nullable = true)
 |-- Prscrbr_Type: string (nullable = true)
 |-- Prscrbr_Type_Src: string (nullable = true)
 |-- Brnd_Name: string (nullable = true)
 |-- Gnrc_Name: string (nullable = true)
 |-- Tot_Clms: long (nullable = true)
 |-- Tot_30day_Fills: decimal(10,2) (nullable = true)
 |-- Tot_Day_Suply: long (nullable = true)
 |-- Tot_Drug_Cst: decimal(10,2) (nullable = true)
 |-- Tot_Benes: long (nullable = true)
 |-- GE65_Sprsn_Flag: string (nullable = true)
 |-- GE65_Tot_Clms: long (nullable = true)
 |-- GE65_Tot_30day_Fills: decimal(10,2) (nullable = true)
 |-- GE65_Tot_Drug_Cst: decimal(10,2) (nullable = true)
 |-- GE65_Tot_Day_Suply: long (nullable = true)
 |-- GE65_Bene_Sprsn_Flag: string (nullable = true

In [16]:
%%sql

select sum(Tot_Day_Suply) from cms_provider_drug_costs

StatementMeta(, 660acfce-f548-4cf1-a814-e7d55bb642fd, 18, Finished, Available)

<Spark SQL result set with 1 rows and 1 fields>

In [17]:
%%sql

DESCRIBE DETAIL cms_provider_drug_costs

StatementMeta(, 660acfce-f548-4cf1-a814-e7d55bb642fd, 19, Finished, Available)

<Spark SQL result set with 1 rows and 14 fields>