In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName('Medicare').getOrCreate()
spark

In [0]:
df1 = spark.read.parquet('/FileStore/tables/us_cities_dimension__1_-1.parquet')
# display(df1.limit(3))
df1.count()

Out[17]: 28338

In [0]:
# Drop extra col
df_city = df1.select(
                    upper(df1.city).alias('city'),
                    df1.state_id,
                    upper(df1.state_name).alias('state_name'),
                    upper(df1.county_name).alias('county_name'),
                    df1.population,
                    df1.timezone,
                    df1.zips
                    )
# display(df_city.limit(3))

df_city.printSchema()

root
 |-- city: string (nullable = true)
 |-- state_id: string (nullable = true)
 |-- state_name: string (nullable = true)
 |-- county_name: string (nullable = true)
 |-- population: integer (nullable = true)
 |-- timezone: string (nullable = true)
 |-- zips: string (nullable = true)



In [0]:
df2 = spark.read.csv('/FileStore/tables/USA_Presc_Medicare_Data_12021.csv' , inferSchema=True , header=True)
display(df2.limit(3))
df2.printSchema()

df2.count()

npi,nppes_provider_last_org_name,nppes_provider_first_name,nppes_provider_city,nppes_provider_state,specialty_description,description_flag,drug_name,generic_name,bene_count,total_claim_count,total_30_day_fill_count,total_day_supply,total_drug_cost,bene_count_ge65,bene_count_ge65_suppress_flag,total_claim_count_ge65,ge65_suppress_flag,total_30_day_fill_count_ge65,total_day_supply_ge65,total_drug_cost_ge65,years_of_exp
2006000252,ENKESHAFI,ARDALAN,CUMBERLAND,MD,Internal Medicine,S,ATORVASTATIN CALCIUM,ATORVASTATIN CALCIUM,,13,15.0,450,139.32,,*,13.0,,15.0,450.0,139.32,= 45.0
2006000252,ENKESHAFI,ARDALAN,CUMBERLAND,MD,Internal Medicine,S,CIPROFLOXACIN HCL,CIPROFLOXACIN HCL,,11,11.0,96,80.99,,*,,*,,,,= 43.0
2006000252,ENKESHAFI,ARDALAN,CUMBERLAND,MD,Internal Medicine,S,DOXYCYCLINE HYCLATE,DOXYCYCLINE HYCLATE,20.0,20,20.0,199,586.12,,#,,#,,,,= 33.0


root
 |-- npi: integer (nullable = true)
 |-- nppes_provider_last_org_name: string (nullable = true)
 |-- nppes_provider_first_name: string (nullable = true)
 |-- nppes_provider_city: string (nullable = true)
 |-- nppes_provider_state: string (nullable = true)
 |-- specialty_description: string (nullable = true)
 |-- description_flag: string (nullable = true)
 |-- drug_name: string (nullable = true)
 |-- generic_name: string (nullable = true)
 |-- bene_count: integer (nullable = true)
 |-- total_claim_count: integer (nullable = true)
 |-- total_30_day_fill_count: double (nullable = true)
 |-- total_day_supply: integer (nullable = true)
 |-- total_drug_cost: double (nullable = true)
 |-- bene_count_ge65: integer (nullable = true)
 |-- bene_count_ge65_suppress_flag: string (nullable = true)
 |-- total_claim_count_ge65: integer (nullable = true)
 |-- ge65_suppress_flag: string (nullable = true)
 |-- total_30_day_fill_count_ge65: double (nullable = true)
 |-- total_day_supply_ge65: integer

In [0]:
# drop extra col 
df_presc = df2.select(
            df2.npi.alias('presc_id'),
            df2.nppes_provider_last_org_name.alias('presc_lname'),
            df2.nppes_provider_first_name.alias('presc_fname'),
            df2.nppes_provider_city.alias('presc_city'),
            df2.nppes_provider_state.alias('presc_state'),
            df2.specialty_description.alias('presc_speclty'),
             df2.total_claim_count.alias('tx_cnt'),
            df2.drug_name,
            df2.total_drug_cost,
            df2.total_day_supply,
            df2.years_of_exp           
            )
display(df_presc.limit(3))


presc_id,presc_lname,presc_fname,presc_city,presc_state,presc_speclty,tx_cnt,drug_name,total_drug_cost,total_day_supply,years_of_exp
2006000252,ENKESHAFI,ARDALAN,CUMBERLAND,MD,Internal Medicine,13,ATORVASTATIN CALCIUM,139.32,450,= 45.0
2006000252,ENKESHAFI,ARDALAN,CUMBERLAND,MD,Internal Medicine,11,CIPROFLOXACIN HCL,80.99,96,= 43.0
2006000252,ENKESHAFI,ARDALAN,CUMBERLAND,MD,Internal Medicine,20,DOXYCYCLINE HYCLATE,586.12,199,= 33.0


In [0]:
# Concat fname , lname to name & Add new col 'country_name' 
from pyspark.sql.types import *

df_presc  = df_presc.withColumn('name' , concat_ws(' ','presc_fname','presc_lname'))
# 
df_presc = df_presc.withColumn('country_name' , lit('USA'))

df_presc = df_presc.drop('presc_fname' , 'presc_lname')

display(df_presc.limit(3))

presc_id,presc_city,presc_state,presc_speclty,tx_cnt,drug_name,total_drug_cost,total_day_supply,years_of_exp,name,country_name
2006000252,CUMBERLAND,MD,Internal Medicine,13,ATORVASTATIN CALCIUM,139.32,450,= 45.0,ARDALAN ENKESHAFI,USA
2006000252,CUMBERLAND,MD,Internal Medicine,11,CIPROFLOXACIN HCL,80.99,96,= 43.0,ARDALAN ENKESHAFI,USA
2006000252,CUMBERLAND,MD,Internal Medicine,20,DOXYCYCLINE HYCLATE,586.12,199,= 33.0,ARDALAN ENKESHAFI,USA


In [0]:

#  Handling Missing values
def check_null(df):
    nulls = []
    for i in df.columns:
        null_count = count(when(isnan(i) | col(i).isNull(),i)).alias(i)
        nulls.append(null_count)
    df_null = df.select(nulls)
    display(df_null)

check_null(df_presc)

presc_id,presc_city,presc_state,presc_speclty,tx_cnt,drug_name,total_drug_cost,total_day_supply,years_of_exp,name,country_name
22,1,1,1,3,15,1,1,1,0,0


In [0]:
# Remove NULL Values
df_presc = df_presc.dropna(subset=['presc_id', 'drug_name'])
display(df_presc.limit(3))

presc_id,presc_city,presc_state,presc_speclty,tx_cnt,drug_name,total_drug_cost,total_day_supply,years_of_exp,name,country_name
2006000252,CUMBERLAND,MD,Internal Medicine,13,ATORVASTATIN CALCIUM,139.32,450,= 45.0,ARDALAN ENKESHAFI,USA
2006000252,CUMBERLAND,MD,Internal Medicine,11,CIPROFLOXACIN HCL,80.99,96,= 43.0,ARDALAN ENKESHAFI,USA
2006000252,CUMBERLAND,MD,Internal Medicine,20,DOXYCYCLINE HYCLATE,586.12,199,= 33.0,ARDALAN ENKESHAFI,USA


In [0]:
# Checking if NULL values exist 
check_null(df_presc)

presc_id,presc_city,presc_state,presc_speclty,tx_cnt,drug_name,total_drug_cost,total_day_supply,years_of_exp,name,country_name
0,0,0,0,2,0,0,0,0,0,0


In [0]:
# Now, converting year_of_exp col from STR to INT

df_presc = df_presc.withColumn('years_of_exp', regexp_replace(col('years_of_exp'), r"^=" , "" ))

df_presc = df_presc.withColumn('years_of_exp' , col('years_of_exp').cast(DoubleType()))

display(df_presc.limit(3))


presc_id,presc_city,presc_state,presc_speclty,tx_cnt,drug_name,total_drug_cost,total_day_supply,years_of_exp,name,country_name
2006000252,CUMBERLAND,MD,Internal Medicine,13,ATORVASTATIN CALCIUM,139.32,450,45.0,ARDALAN ENKESHAFI,USA
2006000252,CUMBERLAND,MD,Internal Medicine,11,CIPROFLOXACIN HCL,80.99,96,43.0,ARDALAN ENKESHAFI,USA
2006000252,CUMBERLAND,MD,Internal Medicine,20,DOXYCYCLINE HYCLATE,586.12,199,33.0,ARDALAN ENKESHAFI,USA


In [0]:
# Calculate mean for missing value of col 'tx_cnt'

mean_tx = df_presc.select(mean(col('tx_cnt'))).collect()[0][0]
print(mean_tx)

df_presc = df_presc.fillna(mean_tx , 'tx_cnt')

51.57326467512733


In [0]:
check_null(df_presc)


presc_id,presc_city,presc_state,presc_speclty,tx_cnt,drug_name,total_drug_cost,total_day_supply,years_of_exp,name,country_name
0,0,0,0,0,0,0,0,0,0,0


In [0]:
#  Create udf(User Define Function) 

from pyspark.sql.functions import *
from pyspark.sql.types import *

@udf(returnType = IntegerType())

def record_len(column):
    splitcol = column.split(' ')
    return len(splitcol)

In [0]:
df_city_zip = df_city.withColumn('zip_count' , record_len('zips'))
display(df_city_zip.select('zip_count').limit(6))

zip_count
310
196
84
30
107
86


In [0]:
# Joining two dataset 
df_join = df_presc.join(
    df_city_zip , 
    (df_presc.presc_city == df_city_zip.city)&(df_presc.presc_state == df_city_zip.state_id) ,
    how='inner'
    )
display(df_join.limit(6))       

presc_id,presc_city,presc_state,presc_speclty,tx_cnt,drug_name,total_drug_cost,total_day_supply,years_of_exp,name,country_name,city,state_id,state_name,county_name,population,timezone,zips,zip_count
2006000252,CUMBERLAND,MD,Internal Medicine,13,ATORVASTATIN CALCIUM,139.32,450,45.0,ARDALAN ENKESHAFI,USA,CUMBERLAND,MD,MARYLAND,ALLEGANY,47980,America/New_York,21502 21501 21503 21504 21505,5
2006000252,CUMBERLAND,MD,Internal Medicine,11,CIPROFLOXACIN HCL,80.99,96,43.0,ARDALAN ENKESHAFI,USA,CUMBERLAND,MD,MARYLAND,ALLEGANY,47980,America/New_York,21502 21501 21503 21504 21505,5
2006000252,CUMBERLAND,MD,Internal Medicine,20,DOXYCYCLINE HYCLATE,586.12,199,33.0,ARDALAN ENKESHAFI,USA,CUMBERLAND,MD,MARYLAND,ALLEGANY,47980,America/New_York,21502 21501 21503 21504 21505,5
2006000252,CUMBERLAND,MD,Internal Medicine,17,ELIQUIS,6065.02,510,44.0,ARDALAN ENKESHAFI,USA,CUMBERLAND,MD,MARYLAND,ALLEGANY,47980,America/New_York,21502 21501 21503 21504 21505,5
2006000252,CUMBERLAND,MD,Internal Medicine,17,FUROSEMIDE,45.76,405,55.0,ARDALAN ENKESHAFI,USA,CUMBERLAND,MD,MARYLAND,ALLEGANY,47980,America/New_York,21502 21501 21503 21504 21505,5
2006000252,CUMBERLAND,MD,Internal Medicine,16,HYDRALAZINE HCL,169.48,420,36.0,ARDALAN ENKESHAFI,USA,CUMBERLAND,MD,MARYLAND,ALLEGANY,47980,America/New_York,21502 21501 21503 21504 21505,5
