#### 1. Read usa_presc_medicare dataframe

In [0]:
usa_presc_df = spark.read \
    .csv('/mnt/synapselearningadls/raw/pharmacy_usa/USA_Presc_Medicare_Data_12021.csv', inferSchema=True, header=True)

display(usa_presc_df.limit(5))

npi,nppes_provider_last_org_name,nppes_provider_first_name,nppes_provider_city,nppes_provider_state,specialty_description,description_flag,drug_name,generic_name,bene_count,total_claim_count,total_30_day_fill_count,total_day_supply,total_drug_cost,bene_count_ge65,bene_count_ge65_suppress_flag,total_claim_count_ge65,ge65_suppress_flag,total_30_day_fill_count_ge65,total_day_supply_ge65,total_drug_cost_ge65,years_of_exp
2006000252,ENKESHAFI,ARDALAN,CUMBERLAND,MD,Internal Medicine,S,ATORVASTATIN CALCIUM,ATORVASTATIN CALCIUM,,13,15.0,450,139.32,,*,13.0,,15.0,450.0,139.32,= 45.0
2006000252,ENKESHAFI,ARDALAN,CUMBERLAND,MD,Internal Medicine,S,CIPROFLOXACIN HCL,CIPROFLOXACIN HCL,,11,11.0,96,80.99,,*,,*,,,,= 43.0
2006000252,ENKESHAFI,ARDALAN,CUMBERLAND,MD,Internal Medicine,S,DOXYCYCLINE HYCLATE,DOXYCYCLINE HYCLATE,20.0,20,20.0,199,586.12,,#,,#,,,,= 33.0
2006000252,ENKESHAFI,ARDALAN,CUMBERLAND,MD,Internal Medicine,S,ELIQUIS,APIXABAN,,17,17.0,510,6065.02,,*,17.0,,17.0,510.0,6065.02,= 44.0
2006000252,ENKESHAFI,ARDALAN,CUMBERLAND,MD,Internal Medicine,S,FUROSEMIDE,FUROSEMIDE,12.0,17,17.0,405,45.76,,#,,#,,,,= 55.0


#### 2. Select required columns and clean it

In [0]:
from pyspark.sql.functions import col, lit, regexp_replace, cast, concat, coalesce

usa_presc_trans_df = usa_presc_df.withColumn('country', lit('USA')) \
                                    .select(col('npi').alias('presc_id'), 
                                            concat(coalesce(col('nppes_provider_first_name'), lit('')),
                                                    lit(' '), coalesce(col('nppes_provider_last_org_name'), 
                                                                       lit(''))).alias('presc_name'),
                                            col('nppes_provider_city').alias('presc_city'),
                                            col('nppes_provider_state').alias('presc_state'), 'country',
                                            col('specialty_description').alias('presc_spclt'), 'drug_name',
                                            'total_claim_count', 'total_day_supply','total_drug_cost', 
                                            regexp_replace(col('years_of_exp'), '=', '').cast('int').alias('years_of_exp'))
                                    
# Here i am coalesce names column because these columns contains null values. So, if use concat that contains null values, then the result will also be null. So, coalesce will replace null values with mention string.

##### Because of below fields, I use `coalesce` in above command

In [0]:
display(usa_presc_df.filter(col('nppes_provider_first_name').isNull()))

npi,nppes_provider_last_org_name,nppes_provider_first_name,nppes_provider_city,nppes_provider_state,specialty_description,description_flag,drug_name,generic_name,bene_count,total_claim_count,total_30_day_fill_count,total_day_supply,total_drug_cost,bene_count_ge65,bene_count_ge65_suppress_flag,total_claim_count_ge65,ge65_suppress_flag,total_30_day_fill_count_ge65,total_day_supply_ge65,total_drug_cost_ge65,years_of_exp
-1682890840,MINUTECLINIC DIAGNOSTIC OF TEXAS LLC,,AUSTIN,TX,Nurse Practitioner,S,ADACEL TDAP,"DIPH,PERTUSS(ACELL),TET VAC/PF",11.0,11.0,11.0,330.0,724.0,,*,,*,,,,= 31.0
2076780856,MINUTECLINIC DIAGNOSTIC OF FLORIDA LLC,,ORLANDO,FL,Nurse Practitioner,S,ADACEL TDAP,"DIPH,PERTUSS(ACELL),TET VAC/PF",69.0,69.0,69.0,2031.0,4470.14,,#,,#,,,,= 55.0
-17537445,,,,,,,,,,,,,,,,,,,,,


##### After using `coalesce`

In [0]:
display(usa_presc_trans_df.filter(col('presc_name').isNull()))

presc_id,presc_name,presc_city,presc_state,country,presc_spclt,drug_name,total_claim_count,total_day_supply,total_drug_cost,years_of_exp


#### Check null values in all columns

In [0]:
from pyspark.sql.functions import count, when, isnan

def check_null(df):
    display(df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) 
                                  for c in usa_presc_trans_df.columns]))
    
check_null(usa_presc_trans_df)

#  'isnan' is used to check is NaN in columns.

presc_id,presc_name,presc_city,presc_state,country,presc_spclt,drug_name,total_claim_count,total_day_supply,total_drug_cost,years_of_exp
22,0,1,1,0,1,15,3,1,1,1


#### 3. Drop null values in respective columns

In [0]:
from pyspark.sql.functions import mean, cast

usa_presc_trans_df = usa_presc_trans_df.na.drop(subset='presc_id')
usa_presc_trans_df = usa_presc_trans_df.na.drop(subset='drug_name')

# Here I am filling avg value, in place of null values.
mean_value = usa_presc_trans_df.select(mean('total_claim_count')).collect()[0][0]
usa_presc_trans_df = usa_presc_trans_df.na.fill(mean_value, 'total_claim_count')

##### Again checking null values

In [0]:
check_null(usa_presc_trans_df)

presc_id,presc_name,presc_city,presc_state,country,presc_spclt,drug_name,total_claim_count,total_day_supply,total_drug_cost,years_of_exp
0,0,0,0,0,0,0,0,0,0,0


In [0]:
display(usa_presc_trans_df.limit(5))

presc_id,presc_name,presc_city,presc_state,country,presc_spclt,drug_name,total_claim_count,total_day_supply,total_drug_cost,years_of_exp
2006000252,ARDALAN ENKESHAFI,CUMBERLAND,MD,USA,Internal Medicine,ATORVASTATIN CALCIUM,13,450,139.32,45
2006000252,ARDALAN ENKESHAFI,CUMBERLAND,MD,USA,Internal Medicine,CIPROFLOXACIN HCL,11,96,80.99,43
2006000252,ARDALAN ENKESHAFI,CUMBERLAND,MD,USA,Internal Medicine,DOXYCYCLINE HYCLATE,20,199,586.12,33
2006000252,ARDALAN ENKESHAFI,CUMBERLAND,MD,USA,Internal Medicine,ELIQUIS,17,510,6065.02,44
2006000252,ARDALAN ENKESHAFI,CUMBERLAND,MD,USA,Internal Medicine,FUROSEMIDE,17,405,45.76,55


#### 4. Write processed data to processed container

In [0]:
usa_presc_trans_df.write.mode('overwrite').format('delta') \
    .save('/mnt/synapselearningadls/processed/pharmacy_usa/usa_presc')