In [1]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark=SparkSession.builder.appName('roche_1p1').getOrCreate()
spark

CMS Part-B Data
---

In [2]:
cms_b=spark.read.csv('Dataset/cms_b.csv',header=True,inferSchema=True)
cms_b_brand_names=spark.read.csv('Dataset/cms_b_unique_hcpcs.csv',header=True,inferSchema=True)
cms_b.show(10)

+----------+--------------------------+-----------------------+---------------+--------------------+-----------------+-------------------+--------------------+----------------+-----------------+-------------------------+-----------------------+-----------------+-----------------+----------------------+------------------+-----------------+-----------------------------+--------+--------------------+--------------+-------------+---------+---------+------------------+--------------+------------------+-----------------+------------------+
|Rndrng_NPI|Rndrng_Prvdr_Last_Org_Name|Rndrng_Prvdr_First_Name|Rndrng_Prvdr_MI|Rndrng_Prvdr_Crdntls|Rndrng_Prvdr_Gndr|Rndrng_Prvdr_Ent_Cd|    Rndrng_Prvdr_St1|Rndrng_Prvdr_St2|Rndrng_Prvdr_City|Rndrng_Prvdr_State_Abrvtn|Rndrng_Prvdr_State_FIPS|Rndrng_Prvdr_Zip5|Rndrng_Prvdr_RUCA|Rndrng_Prvdr_RUCA_Desc|Rndrng_Prvdr_Cntry|Rndrng_Prvdr_Type|Rndrng_Prvdr_Mdcr_Prtcptg_Ind|HCPCS_Cd|          HCPCS_Desc|HCPCS_Drug_Ind|Place_Of_Srvc|Tot_Benes|Tot_Srvcs|Tot_Bene_Da

In [3]:
print("Shape of DataFrame:", (cms_b.count(), len(cms_b.columns)))

Shape of DataFrame: (9886177, 29)


In [4]:
cms_b_brand_names.show()

+--------+--------------------+---------+
|HCPCS_Cd|          HCPCS_Desc|    BRAND|
+--------+--------------------+---------+
|   J3380|injection, vedoli...|  ENTYVIO|
|   Q5121|injection, inflix...|   AVSOLA|
|   J1745|injection, inflix...| REMICADE|
|   J1602|injection, golimu...|  SIMPONI|
|   J3357|ustekinumab, for ...|  STELARA|
|   J0717|injection, certol...|   CIMZIA|
|   Q5103|injection, inflix...|INFLECTRA|
|   Q5104|injection, inflix...|RENFLEXIS|
|   J3358|ustekinumab, for ...|  STELARA|
+--------+--------------------+---------+



Joining CMS-B dataset with CMS_B_Brand_name Dataset
---

In [5]:
partialcms_b=cms_b.join(cms_b_brand_names,'HCPCS_Cd','inner')
partialcms_b.show()

+--------+----------+--------------------------+-----------------------+---------------+--------------------+-----------------+-------------------+--------------------+--------------------+-----------------+-------------------------+-----------------------+-----------------+-----------------+----------------------+------------------+--------------------+-----------------------------+--------------------+--------------+-------------+---------+---------+------------------+--------------+------------------+-----------------+------------------+--------------------+---------+
|HCPCS_Cd|Rndrng_NPI|Rndrng_Prvdr_Last_Org_Name|Rndrng_Prvdr_First_Name|Rndrng_Prvdr_MI|Rndrng_Prvdr_Crdntls|Rndrng_Prvdr_Gndr|Rndrng_Prvdr_Ent_Cd|    Rndrng_Prvdr_St1|    Rndrng_Prvdr_St2|Rndrng_Prvdr_City|Rndrng_Prvdr_State_Abrvtn|Rndrng_Prvdr_State_FIPS|Rndrng_Prvdr_Zip5|Rndrng_Prvdr_RUCA|Rndrng_Prvdr_RUCA_Desc|Rndrng_Prvdr_Cntry|   Rndrng_Prvdr_Type|Rndrng_Prvdr_Mdcr_Prtcptg_Ind|          HCPCS_Desc|HCPCS_Drug_Ind|

In [6]:
print("Shape of Data:",(partialcms_b.count(),len(partialcms_b.columns)))

Shape of Data: (3598, 31)


Joining CMS-B Dataset with Speciality Mapping Data
---

In [7]:
speciality_filter=spark.read.csv('Dataset/speciality_mappings.csv',header=True,inferSchema=True)
fullcms_b=partialcms_b.join(speciality_filter,partialcms_b.Rndrng_NPI==speciality_filter.NPI,'inner')
fullcms_b.show()

+--------+----------+--------------------------+-----------------------+---------------+--------------------+-----------------+-------------------+--------------------+----------------+-----------------+-------------------------+-----------------------+-----------------+-----------------+----------------------+------------------+-----------------+-----------------------------+--------------------+--------------+-------------+---------+---------+------------------+--------------+------------------+-----------------+------------------+--------------------+--------+----------+----------------+
|HCPCS_Cd|Rndrng_NPI|Rndrng_Prvdr_Last_Org_Name|Rndrng_Prvdr_First_Name|Rndrng_Prvdr_MI|Rndrng_Prvdr_Crdntls|Rndrng_Prvdr_Gndr|Rndrng_Prvdr_Ent_Cd|    Rndrng_Prvdr_St1|Rndrng_Prvdr_St2|Rndrng_Prvdr_City|Rndrng_Prvdr_State_Abrvtn|Rndrng_Prvdr_State_FIPS|Rndrng_Prvdr_Zip5|Rndrng_Prvdr_RUCA|Rndrng_Prvdr_RUCA_Desc|Rndrng_Prvdr_Cntry|Rndrng_Prvdr_Type|Rndrng_Prvdr_Mdcr_Prtcptg_Ind|          HCPCS_Desc|HC

Dropping Duplicated Columns
---

In [8]:
fullcms_b=fullcms_b.drop('NPI','Speciality')
fullcms_b.show()

+--------+----------+--------------------------+-----------------------+---------------+--------------------+-----------------+-------------------+--------------------+----------------+-----------------+-------------------------+-----------------------+-----------------+-----------------+----------------------+------------------+-----------------+-----------------------------+--------------------+--------------+-------------+---------+---------+------------------+--------------+------------------+-----------------+------------------+--------------------+--------+
|HCPCS_Cd|Rndrng_NPI|Rndrng_Prvdr_Last_Org_Name|Rndrng_Prvdr_First_Name|Rndrng_Prvdr_MI|Rndrng_Prvdr_Crdntls|Rndrng_Prvdr_Gndr|Rndrng_Prvdr_Ent_Cd|    Rndrng_Prvdr_St1|Rndrng_Prvdr_St2|Rndrng_Prvdr_City|Rndrng_Prvdr_State_Abrvtn|Rndrng_Prvdr_State_FIPS|Rndrng_Prvdr_Zip5|Rndrng_Prvdr_RUCA|Rndrng_Prvdr_RUCA_Desc|Rndrng_Prvdr_Cntry|Rndrng_Prvdr_Type|Rndrng_Prvdr_Mdcr_Prtcptg_Ind|          HCPCS_Desc|HCPCS_Drug_Ind|Place_Of_Srvc|T

Filtering based on Speciality (Gastro)
----

In [9]:
# fullcms_b=fullcms_b.filter(col('Rndrng_Prvdr_Type').rlike('Gastro'))
# fullcms_b.show()

In [10]:
print("Shape of Data:",(fullcms_b.count(),len(fullcms_b.columns)))

Shape of Data: (500, 31)


Unique Brand Names and HCPCS Code
---

In [11]:
fullcms_b.select(['HCPCS_Cd','BRAND']).distinct().show()

+--------+---------+
|HCPCS_Cd|    BRAND|
+--------+---------+
|   J1745| REMICADE|
|   J3380|  ENTYVIO|
|   J3357|  STELARA|
|   Q5104|RENFLEXIS|
|   Q5121|   AVSOLA|
|   Q5103|INFLECTRA|
+--------+---------+



Dropping Unwanted Columns
---

In [12]:
fullcms_b=fullcms_b.drop('HCPCS_Cd','Rndrng_Prvdr_Last_Org_Name','Rndrng_Prvdr_First_Name','Rndrng_Prvdr_MI', 'Rndrng_Prvdr_Crdntls','Rndrng_Prvdr_Gndr','Rndrng_Prvdr_Ent_Cd','Rndrng_Prvdr_St1','Rndrng_Prvdr_St2', 'Rndrng_Prvdr_City','Rndrng_Prvdr_State_Abrvtn','Rndrng_Prvdr_State_FIPS','Rndrng_Prvdr_Zip5','Rndrng_Prvdr_RUCA', 'Rndrng_Prvdr_RUCA_Desc','Rndrng_Prvdr_Cntry','Rndrng_Prvdr_Type','Rndrng_Prvdr_Mdcr_Prtcptg_Ind','HCPCS_Desc', 'HCPCS_Drug_Ind','Place_Of_Srvc','Tot_Bene_Day_Srvcs','Avg_Sbmtd_Chrg','Avg_Mdcr_Alowd_Amt','Avg_Mdcr_Pymt_Amt','Avg_Mdcr_Stdzd_Amt','HCPCS_Desc')
fullcms_b.show()

+----------+---------+---------+--------+
|Rndrng_NPI|Tot_Benes|Tot_Srvcs|   BRAND|
+----------+---------+---------+--------+
|1003078973|       13|   1490.0|REMICADE|
|1003078973|       12|  11101.0| ENTYVIO|
|1003255365|       12|   1210.0|REMICADE|
|1003806043|       11|   3900.0| ENTYVIO|
|1003848102|       14|   1510.0|REMICADE|
|1003848102|       17|   7800.0| ENTYVIO|
|1003961236|       20|   3840.0|REMICADE|
|1013022680|       19|   1230.0|REMICADE|
|1013147206|       16|  40500.0| ENTYVIO|
|1013194414|       31|   3000.0|REMICADE|
|1013194414|       31|  12000.0| ENTYVIO|
|1013980440|       13|   5100.0| ENTYVIO|
|1013990076|       15|   1110.0|REMICADE|
|1023055340|       14|   1180.0|REMICADE|
|1023060480|       12|   6000.0| ENTYVIO|
|1033372727|       11|   9600.0| ENTYVIO|
|1043264336|       23|   1980.0|REMICADE|
|1043264336|       14|   6300.0| ENTYVIO|
|1053403923|       14|   2023.5|REMICADE|
|1053571620|       16|   1160.0|REMICADE|
+----------+---------+---------+--

Renaming Columns
---

In [13]:
fullcms_b=fullcms_b.withColumnRenamed('BRAND','Drug_Name').withColumnRenamed('Tot_Srvcs','Tot_Clms').withColumnRenamed('Rndrng_NPI','NPI')
fullcms_b.show()

+----------+---------+--------+---------+
|       NPI|Tot_Benes|Tot_Clms|Drug_Name|
+----------+---------+--------+---------+
|1003078973|       13|  1490.0| REMICADE|
|1003078973|       12| 11101.0|  ENTYVIO|
|1003255365|       12|  1210.0| REMICADE|
|1003806043|       11|  3900.0|  ENTYVIO|
|1003848102|       14|  1510.0| REMICADE|
|1003848102|       17|  7800.0|  ENTYVIO|
|1003961236|       20|  3840.0| REMICADE|
|1013022680|       19|  1230.0| REMICADE|
|1013147206|       16| 40500.0|  ENTYVIO|
|1013194414|       31|  3000.0| REMICADE|
|1013194414|       31| 12000.0|  ENTYVIO|
|1013980440|       13|  5100.0|  ENTYVIO|
|1013990076|       15|  1110.0| REMICADE|
|1023055340|       14|  1180.0| REMICADE|
|1023060480|       12|  6000.0|  ENTYVIO|
|1033372727|       11|  9600.0|  ENTYVIO|
|1043264336|       23|  1980.0| REMICADE|
|1043264336|       14|  6300.0|  ENTYVIO|
|1053403923|       14|  2023.5| REMICADE|
|1053571620|       16|  1160.0| REMICADE|
+----------+---------+--------+---

Reordering the Columns
---

In [14]:
fullcms_b=fullcms_b.select(['NPI','Drug_Name','Tot_Benes','Tot_Clms'])
fullcms_b.show()

+----------+---------+---------+--------+
|       NPI|Drug_Name|Tot_Benes|Tot_Clms|
+----------+---------+---------+--------+
|1003078973| REMICADE|       13|  1490.0|
|1003078973|  ENTYVIO|       12| 11101.0|
|1003255365| REMICADE|       12|  1210.0|
|1003806043|  ENTYVIO|       11|  3900.0|
|1003848102| REMICADE|       14|  1510.0|
|1003848102|  ENTYVIO|       17|  7800.0|
|1003961236| REMICADE|       20|  3840.0|
|1013022680| REMICADE|       19|  1230.0|
|1013147206|  ENTYVIO|       16| 40500.0|
|1013194414| REMICADE|       31|  3000.0|
|1013194414|  ENTYVIO|       31| 12000.0|
|1013980440|  ENTYVIO|       13|  5100.0|
|1013990076| REMICADE|       15|  1110.0|
|1023055340| REMICADE|       14|  1180.0|
|1023060480|  ENTYVIO|       12|  6000.0|
|1033372727|  ENTYVIO|       11|  9600.0|
|1043264336| REMICADE|       23|  1980.0|
|1043264336|  ENTYVIO|       14|  6300.0|
|1053403923| REMICADE|       14|  2023.5|
|1053571620| REMICADE|       16|  1160.0|
+----------+---------+---------+--

In [15]:
print("Shape of Dataset :",(fullcms_b.count(),len(fullcms_b.columns)))

Shape of Dataset : (500, 4)


Converting Data_Type of 'Tot_Clms' to 'int'
--- 

In [16]:
fullcms_b = fullcms_b.withColumn("Tot_Clms",fullcms_b.Tot_Clms.cast("int"))
fullcms_b.printSchema()

root
 |-- NPI: integer (nullable = true)
 |-- Drug_Name: string (nullable = true)
 |-- Tot_Benes: integer (nullable = true)
 |-- Tot_Clms: integer (nullable = true)



Count of Null Values in each Column
--- 

In [17]:
fullcms_b.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in fullcms_b.columns]).show()

+---+---------+---------+--------+
|NPI|Drug_Name|Tot_Benes|Tot_Clms|
+---+---------+---------+--------+
|  0|        0|        0|       0|
+---+---------+---------+--------+



Distinct Rows in CMS-B
---

In [18]:
fullcms_b.select(['NPI']).distinct().count()

343

Exporting CMS-B Dataset
---

In [19]:
pcms_b=fullcms_b.toPandas()
pcms_b.to_csv('Output/Final_CMS_B.csv',index=False)

Groupby Check by Drug Name
---

In [20]:
group_cms_b=fullcms_b
group_cms_b=group_cms_b.groupby('Drug_Name').agg(sum("Tot_Benes").alias("Tot_Benes_Sum"),sum("Tot_Clms").alias("Tot_Clms_Sum"))
group_cms_b.show()

+---------+-------------+------------+
|Drug_Name|Tot_Benes_Sum|Tot_Clms_Sum|
+---------+-------------+------------+
| REMICADE|         4157|      420754|
|  ENTYVIO|         3986|     2261749|
|  STELARA|          126|       30060|
|RENFLEXIS|           28|        7462|
|   AVSOLA|           13|        5000|
|INFLECTRA|           13|        6590|
+---------+-------------+------------+



In [21]:
print("Shape of Dataset :",(group_cms_b.count(),len(group_cms_b.columns)))

Shape of Dataset : (6, 3)


CMS-D Dataset
---

In [22]:
cms_d=spark.read.csv('Dataset/cms_d.csv',header=True,inferSchema=True)
cms_d.show()

+-----------+---------------------+------------------+------------+--------------------+------------------+-----------------+----------------+--------------------+--------------------+--------+---------------+-------------+------------+---------+---------------+-------------+--------------------+-----------------+------------------+--------------------+--------------+
|Prscrbr_NPI|Prscrbr_Last_Org_Name|Prscrbr_First_Name|Prscrbr_City|Prscrbr_State_Abrvtn|Prscrbr_State_FIPS|     Prscrbr_Type|Prscrbr_Type_Src|           Brnd_Name|           Gnrc_Name|Tot_Clms|Tot_30day_Fills|Tot_Day_Suply|Tot_Drug_Cst|Tot_Benes|GE65_Sprsn_Flag|GE65_Tot_Clms|GE65_Tot_30day_Fills|GE65_Tot_Drug_Cst|GE65_Tot_Day_Suply|GE65_Bene_Sprsn_Flag|GE65_Tot_Benes|
+-----------+---------------------+------------------+------------+--------------------+------------------+-----------------+----------------+--------------------+--------------------+--------+---------------+-------------+------------+---------+------------

Dropping Unwanted Columns
---

In [23]:
cms_d=cms_d.drop('Prscrbr_Last_Org_Name', 'Prscrbr_First_Name', 'Prscrbr_City', 'Prscrbr_State_Abrvtn', 'Prscrbr_State_FIPS', 'Prscrbr_Type', 'Prscrbr_Type_Src','Tot_30day_Fills', 'Tot_Day_Suply', 'Tot_Drug_Cst', 'GE65_Sprsn_Flag', 'GE65_Tot_Clms', 'GE65_Tot_30day_Fills', 'GE65_Tot_Drug_Cst', 'GE65_Tot_Day_Suply', 'GE65_Bene_Sprsn_Flag', 'GE65_Tot_Benes')
cms_d.show()

+-----------+--------------------+--------------------+--------+---------+
|Prscrbr_NPI|           Brnd_Name|           Gnrc_Name|Tot_Clms|Tot_Benes|
+-----------+--------------------+--------------------+--------+---------+
| 1003000126|  Alendronate Sodium|  Alendronate Sodium|      11|     NULL|
| 1003000126| Amlodipine Besylate| Amlodipine Besylate|      64|       48|
| 1003000126|            Atenolol|            Atenolol|      12|     NULL|
| 1003000126|Atorvastatin Calcium|Atorvastatin Calcium|      46|       38|
| 1003000126|            Cefdinir|            Cefdinir|      11|       11|
| 1003000126|         Clopidogrel|Clopidogrel Bisul...|      14|     NULL|
| 1003000126|             Eliquis|            Apixaban|      15|     NULL|
| 1003000126|Escitalopram Oxalate|Escitalopram Oxalate|      30|       16|
| 1003000126|         Finasteride|         Finasteride|      19|       13|
| 1003000126|          Furosemide|          Furosemide|      20|       17|
| 1003000126|          Ga

In [24]:
print("Shape of Dataset :",(cms_d.count(),len(cms_d.columns)))

Shape of Dataset : (25231862, 5)


Converting values to Lowercase in Brnd_Name
---

In [25]:
cms_d=cms_d.withColumn('Brnd_Name',lower(cms_d['Brnd_Name']))
cms_d=cms_d.withColumn('Gnrc_Name',lower(cms_d['Gnrc_Name']))
cms_d.show()

+-----------+--------------------+--------------------+--------+---------+
|Prscrbr_NPI|           Brnd_Name|           Gnrc_Name|Tot_Clms|Tot_Benes|
+-----------+--------------------+--------------------+--------+---------+
| 1003000126|  alendronate sodium|  alendronate sodium|      11|     NULL|
| 1003000126| amlodipine besylate| amlodipine besylate|      64|       48|
| 1003000126|            atenolol|            atenolol|      12|     NULL|
| 1003000126|atorvastatin calcium|atorvastatin calcium|      46|       38|
| 1003000126|            cefdinir|            cefdinir|      11|       11|
| 1003000126|         clopidogrel|clopidogrel bisul...|      14|     NULL|
| 1003000126|             eliquis|            apixaban|      15|     NULL|
| 1003000126|escitalopram oxalate|escitalopram oxalate|      30|       16|
| 1003000126|         finasteride|         finasteride|      19|       13|
| 1003000126|          furosemide|          furosemide|      20|       17|
| 1003000126|          ga

Combining CMS-D Data with CMS-D Brand Data
---

In [26]:
cms_d_brand_names=spark.read.csv('Dataset/cms_d_gnrc_name.csv',header=True,inferSchema=True)
partialcms_d=cms_d.join(cms_d_brand_names,'Brnd_Name','inner')
partialcms_d.show()

+--------------+-----------+---------------+--------+---------+---------------+---------+
|     Brnd_Name|Prscrbr_NPI|      Gnrc_Name|Tot_Clms|Tot_Benes|      Gnrc_Name|    BRAND|
+--------------+-----------+---------------+--------+---------+---------------+---------+
|humira(cf) pen| 1003002312|     adalimumab|      41|     NULL|     adalimumab|   HUMIRA|
|      remicade| 1003002312|     infliximab|      12|     NULL|     infliximab| REMICADE|
|humira(cf) pen| 1003003153|     adalimumab|      20|     NULL|     adalimumab|   HUMIRA|
|       entyvio| 1003011891|    vedolizumab|      12|     NULL|    vedolizumab|  ENTYVIO|
|humira(cf) pen| 1003014762|     adalimumab|      13|     NULL|     adalimumab|   HUMIRA|
|    humira pen| 1003015173|     adalimumab|      32|     NULL|     adalimumab|   HUMIRA|
|humira(cf) pen| 1003015173|     adalimumab|      12|     NULL|     adalimumab|   HUMIRA|
|    humira(cf)| 1003040726|     adalimumab|      14|     NULL|     adalimumab|   HUMIRA|
|    humir

In [27]:
print("Shape of Dataset :",(partialcms_d.count(),len(partialcms_d.columns)))

Shape of Dataset : (22225, 7)


Joining CMS-D with Speciality Mappings Data
---

In [28]:
speciality_filter=spark.read.csv('Dataset/speciality_mappings.csv',header=True,inferSchema=True)
fullcms_d=partialcms_d.join(speciality_filter,partialcms_d.Prscrbr_NPI==speciality_filter.NPI,'inner')
fullcms_d.show()

+--------------+-----------+---------------+--------+---------+---------------+---------+----------+----------------+
|     Brnd_Name|Prscrbr_NPI|      Gnrc_Name|Tot_Clms|Tot_Benes|      Gnrc_Name|    BRAND|       NPI|      Speciality|
+--------------+-----------+---------------+--------+---------+---------------+---------+----------+----------------+
|       entyvio| 1003011891|    vedolizumab|      12|     NULL|    vedolizumab|  ENTYVIO|1003011891|Gastroenterology|
|    humira pen| 1003042672|     adalimumab|      11|     NULL|     adalimumab|   HUMIRA|1003042672|Gastroenterology|
|humira(cf) pen| 1003043647|     adalimumab|      13|     NULL|     adalimumab|   HUMIRA|1003043647|Gastroenterology|
|humira(cf) pen| 1003045634|     adalimumab|      13|     NULL|     adalimumab|   HUMIRA|1003045634|Gastroenterology|
|     inflectra| 1003045634|infliximab-dyyb|      11|     NULL|infliximab-dyyb|INFLECTRA|1003045634|Gastroenterology|
|       stelara| 1003045634|    ustekinumab|      96|   

Dropping Unwanted Columns
---

In [29]:
fullcms_d=fullcms_d.drop('Brnd_Name','Gnrc_Name','NPI','Speciality')
fullcms_d.show()

+-----------+--------+---------+---------+
|Prscrbr_NPI|Tot_Clms|Tot_Benes|    BRAND|
+-----------+--------+---------+---------+
| 1003011891|      12|     NULL|  ENTYVIO|
| 1003042672|      11|     NULL|   HUMIRA|
| 1003043647|      13|     NULL|   HUMIRA|
| 1003045634|      13|     NULL|   HUMIRA|
| 1003045634|      11|     NULL|INFLECTRA|
| 1003045634|      96|       15|  STELARA|
| 1003069733|      13|     NULL|   HUMIRA|
| 1003078429|      14|     NULL|   HUMIRA|
| 1003078429|      13|     NULL|   HUMIRA|
| 1003078429|      13|     NULL|  STELARA|
| 1003078973|      46|     NULL|   HUMIRA|
| 1003090085|      13|     NULL|   HUMIRA|
| 1003102989|      16|     NULL|   HUMIRA|
| 1003102989|      22|     NULL|  STELARA|
| 1003172180|      24|     NULL|  STELARA|
| 1003196353|      11|     NULL|   HUMIRA|
| 1003196353|      26|     NULL|   HUMIRA|
| 1003196353|      11|     NULL| REMICADE|
| 1003255365|      13|     NULL|   HUMIRA|
| 1003255365|      18|     NULL|  STELARA|
+----------

Reordering the Columns
---

In [30]:
fullcms_d=fullcms_d.select('Prscrbr_NPI','BRAND','Tot_Benes','Tot_Clms')
fullcms_d.show()


+-----------+---------+---------+--------+
|Prscrbr_NPI|    BRAND|Tot_Benes|Tot_Clms|
+-----------+---------+---------+--------+
| 1003011891|  ENTYVIO|     NULL|      12|
| 1003042672|   HUMIRA|     NULL|      11|
| 1003043647|   HUMIRA|     NULL|      13|
| 1003045634|   HUMIRA|     NULL|      13|
| 1003045634|INFLECTRA|     NULL|      11|
| 1003045634|  STELARA|       15|      96|
| 1003069733|   HUMIRA|     NULL|      13|
| 1003078429|   HUMIRA|     NULL|      14|
| 1003078429|   HUMIRA|     NULL|      13|
| 1003078429|  STELARA|     NULL|      13|
| 1003078973|   HUMIRA|     NULL|      46|
| 1003090085|   HUMIRA|     NULL|      13|
| 1003102989|   HUMIRA|     NULL|      16|
| 1003102989|  STELARA|     NULL|      22|
| 1003172180|  STELARA|     NULL|      24|
| 1003196353|   HUMIRA|     NULL|      11|
| 1003196353|   HUMIRA|     NULL|      26|
| 1003196353| REMICADE|     NULL|      11|
| 1003255365|   HUMIRA|     NULL|      13|
| 1003255365|  STELARA|     NULL|      18|
+----------

Renaming Column Names
---

In [31]:
fullcms_d=fullcms_d.withColumnRenamed('Prscrbr_NPI','NPI').withColumnRenamed('BRAND','Drug_Name')
fullcms_d.show()

+----------+---------+---------+--------+
|       NPI|Drug_Name|Tot_Benes|Tot_Clms|
+----------+---------+---------+--------+
|1003011891|  ENTYVIO|     NULL|      12|
|1003042672|   HUMIRA|     NULL|      11|
|1003043647|   HUMIRA|     NULL|      13|
|1003045634|   HUMIRA|     NULL|      13|
|1003045634|INFLECTRA|     NULL|      11|
|1003045634|  STELARA|       15|      96|
|1003069733|   HUMIRA|     NULL|      13|
|1003078429|   HUMIRA|     NULL|      14|
|1003078429|   HUMIRA|     NULL|      13|
|1003078429|  STELARA|     NULL|      13|
|1003078973|   HUMIRA|     NULL|      46|
|1003090085|   HUMIRA|     NULL|      13|
|1003102989|   HUMIRA|     NULL|      16|
|1003102989|  STELARA|     NULL|      22|
|1003172180|  STELARA|     NULL|      24|
|1003196353|   HUMIRA|     NULL|      11|
|1003196353|   HUMIRA|     NULL|      26|
|1003196353| REMICADE|     NULL|      11|
|1003255365|   HUMIRA|     NULL|      13|
|1003255365|  STELARA|     NULL|      18|
+----------+---------+---------+--

In [32]:
print("Shape of Dataset :",(fullcms_d.count(),len(fullcms_d.columns)))

Shape of Dataset : (4482, 4)


Filter Checks
---

Distinct NPI and Drug_Name rows


In [33]:
fullcms_d.select(['NPI','Drug_Name']).distinct().count()

4077

Distinct Drug_Names

In [34]:
fullcms_d.select('Drug_Name').distinct().show()

+---------+
|Drug_Name|
+---------+
|INFLECTRA|
|   CIMZIA|
|  SIMPONI|
| REMICADE|
|   HUMIRA|
|  STELARA|
|  ENTYVIO|
|RENFLEXIS|
+---------+



Count of Null Values in Each Column
---

In [35]:
fullcms_d.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in fullcms_d.columns]).show()

+---+---------+---------+--------+
|NPI|Drug_Name|Tot_Benes|Tot_Clms|
+---+---------+---------+--------+
|  0|        0|     4410|       0|
+---+---------+---------+--------+



Filling Null Values in 'Total_Benes' with 0
--- 

In [36]:
fullcms_d=fullcms_d.fillna(value=0,subset='Tot_Benes')
fullcms_d.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in fullcms_d.columns]).show()

+---+---------+---------+--------+
|NPI|Drug_Name|Tot_Benes|Tot_Clms|
+---+---------+---------+--------+
|  0|        0|        0|       0|
+---+---------+---------+--------+



In [37]:
fullcms_d.show()

+----------+---------+---------+--------+
|       NPI|Drug_Name|Tot_Benes|Tot_Clms|
+----------+---------+---------+--------+
|1003011891|  ENTYVIO|        0|      12|
|1003042672|   HUMIRA|        0|      11|
|1003043647|   HUMIRA|        0|      13|
|1003045634|   HUMIRA|        0|      13|
|1003045634|INFLECTRA|        0|      11|
|1003045634|  STELARA|       15|      96|
|1003069733|   HUMIRA|        0|      13|
|1003078429|   HUMIRA|        0|      14|
|1003078429|   HUMIRA|        0|      13|
|1003078429|  STELARA|        0|      13|
|1003078973|   HUMIRA|        0|      46|
|1003090085|   HUMIRA|        0|      13|
|1003102989|   HUMIRA|        0|      16|
|1003102989|  STELARA|        0|      22|
|1003172180|  STELARA|        0|      24|
|1003196353|   HUMIRA|        0|      11|
|1003196353|   HUMIRA|        0|      26|
|1003196353| REMICADE|        0|      11|
|1003255365|   HUMIRA|        0|      13|
|1003255365|  STELARA|        0|      18|
+----------+---------+---------+--

Aggregating NPI and Drug_Name
---

In [38]:
fullcms_d = fullcms_d.groupBy("NPI","Drug_Name").agg(sum("Tot_Benes").alias("Tot_Benes"),sum("Tot_Clms").alias("Tot_Clms"))
fullcms_d.show()

+----------+---------+---------+--------+
|       NPI|Drug_Name|Tot_Benes|Tot_Clms|
+----------+---------+---------+--------+
|1003822875|   HUMIRA|        0|      25|
|1003861816|   CIMZIA|        0|      13|
|1013088046|   HUMIRA|        0|      12|
|1023264900|   HUMIRA|        0|      74|
|1033206941|   CIMZIA|        0|      12|
|1023003563|   HUMIRA|        0|      14|
|1023081486|   HUMIRA|        0|      12|
|1023441375|   HUMIRA|        0|      13|
|1023467198|   HUMIRA|        0|      22|
|1003011891|  ENTYVIO|        0|      12|
|1013981737|   HUMIRA|        0|      82|
|1033147848|  STELARA|        0|      15|
|1023032414|   HUMIRA|        0|      14|
|1003078429|  STELARA|        0|      13|
|1003869215|  STELARA|        0|      16|
|1013908920|   HUMIRA|        0|      14|
|1013945930|   HUMIRA|        0|      27|
|1013118231|   HUMIRA|        0|      13|
|1003898602|   HUMIRA|        0|      17|
|1013147206|   HUMIRA|        0|      17|
+----------+---------+---------+--

In [39]:
print("Shape of Dataset :",(fullcms_d.count(),len(fullcms_d.columns)))

Shape of Dataset : (4077, 4)


Distinct NPI Count
---

In [40]:
fullcms_d.select('NPI').distinct().count()

3334

Count of Null Values in each Column
---

In [41]:
fullcms_d.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in fullcms_d.columns]).show()

+---+---------+---------+--------+
|NPI|Drug_Name|Tot_Benes|Tot_Clms|
+---+---------+---------+--------+
|  0|        0|        0|       0|
+---+---------+---------+--------+



Exporting CMS-D Data
---

In [42]:
pcms_d=fullcms_d.toPandas()
pcms_d.to_csv('Output/Final_CMS_D.csv',index=False)

Groupby Check by Drug Name
---

In [43]:
group_cms_d=fullcms_d
group_cms_d=group_cms_d.groupby('Drug_Name').agg(sum("Tot_Benes").alias("Tot_Benes_Sum"),sum("Tot_Clms").alias("Tot_Clms_Sum"))
group_cms_d.show()

+---------+-------------+------------+
|Drug_Name|Tot_Benes_Sum|Tot_Clms_Sum|
+---------+-------------+------------+
|INFLECTRA|            0|          48|
|   CIMZIA|            0|        2808|
|  SIMPONI|            0|        1005|
| REMICADE|            0|        1035|
|   HUMIRA|          121|       59023|
|  STELARA|         1015|       21512|
|RENFLEXIS|            0|          13|
|  ENTYVIO|            0|        2189|
+---------+-------------+------------+



In [44]:
print("Shape of Dataset :",(group_cms_d.count(),len(group_cms_d.columns)))

Shape of Dataset : (8, 3)


Combining Grouped_CMS_B Data with Grouped_CMS_D Data
---

In [45]:
merged_grouped_cms=group_cms_b.union(group_cms_d)
merged_grouped_cms.show()

+---------+-------------+------------+
|Drug_Name|Tot_Benes_Sum|Tot_Clms_Sum|
+---------+-------------+------------+
| REMICADE|         4157|      420754|
|  ENTYVIO|         3986|     2261749|
|  STELARA|          126|       30060|
|RENFLEXIS|           28|        7462|
|   AVSOLA|           13|        5000|
|INFLECTRA|           13|        6590|
|INFLECTRA|            0|          48|
|   CIMZIA|            0|        2808|
|  SIMPONI|            0|        1005|
| REMICADE|            0|        1035|
|   HUMIRA|          121|       59023|
|  STELARA|         1015|       21512|
|RENFLEXIS|            0|          13|
|  ENTYVIO|            0|        2189|
+---------+-------------+------------+



In [46]:
print("Shape of Dataset :",(merged_grouped_cms.count(),len(merged_grouped_cms.columns)))

Shape of Dataset : (14, 3)


Aggregated Final Grouped Data (Data Validation)
---

In [47]:
merged_grouped_cms=merged_grouped_cms.groupby('Drug_Name').agg(sum("Tot_Benes_Sum").alias("Tot_Benes_Sum"),sum("Tot_Clms_Sum").alias("Tot_Clms_Sum"))
merged_grouped_cms.show()

+---------+-------------+------------+
|Drug_Name|Tot_Benes_Sum|Tot_Clms_Sum|
+---------+-------------+------------+
|INFLECTRA|           13|        6638|
| REMICADE|         4157|      421789|
|   AVSOLA|           13|        5000|
|  STELARA|         1141|       51572|
|RENFLEXIS|           28|        7475|
|  ENTYVIO|         3986|     2263938|
|   CIMZIA|            0|        2808|
|  SIMPONI|            0|        1005|
|   HUMIRA|          121|       59023|
+---------+-------------+------------+



In [48]:
print("Shape of Dataset :",(merged_grouped_cms.count(),len(merged_grouped_cms.columns)))

Shape of Dataset : (9, 3)


In [49]:
pmg=merged_grouped_cms.toPandas()
pmg.to_csv('output/raw_grouped_result.csv',index=False)

Appending CMS-B and CMS-D Data
---

In [50]:
mergedcms=fullcms_b.union(fullcms_d)
mergedcms.show()

+----------+---------+---------+--------+
|       NPI|Drug_Name|Tot_Benes|Tot_Clms|
+----------+---------+---------+--------+
|1003078973| REMICADE|       13|    1490|
|1003078973|  ENTYVIO|       12|   11101|
|1003255365| REMICADE|       12|    1210|
|1003806043|  ENTYVIO|       11|    3900|
|1003848102| REMICADE|       14|    1510|
|1003848102|  ENTYVIO|       17|    7800|
|1003961236| REMICADE|       20|    3840|
|1013022680| REMICADE|       19|    1230|
|1013147206|  ENTYVIO|       16|   40500|
|1013194414| REMICADE|       31|    3000|
|1013194414|  ENTYVIO|       31|   12000|
|1013980440|  ENTYVIO|       13|    5100|
|1013990076| REMICADE|       15|    1110|
|1023055340| REMICADE|       14|    1180|
|1023060480|  ENTYVIO|       12|    6000|
|1033372727|  ENTYVIO|       11|    9600|
|1043264336| REMICADE|       23|    1980|
|1043264336|  ENTYVIO|       14|    6300|
|1053403923| REMICADE|       14|    2023|
|1053571620| REMICADE|       16|    1160|
+----------+---------+---------+--

In [51]:
print("Shape of Dataset :",(mergedcms.count(),len(mergedcms.columns)))

Shape of Dataset : (4577, 4)


Final Grouping based on Drug_Names (Data Validation)
---

In [52]:
finalgroupcms=mergedcms.groupby('Drug_Name').agg(sum("Tot_Benes").alias("Tot_Benes_Sum"),sum("Tot_Clms").alias("Tot_Clms_Sum"))
finalgroupcms.show()

+---------+-------------+------------+
|Drug_Name|Tot_Benes_Sum|Tot_Clms_Sum|
+---------+-------------+------------+
| REMICADE|         4157|      421789|
|  ENTYVIO|         3986|     2263938|
|  STELARA|         1141|       51572|
|RENFLEXIS|           28|        7475|
|   AVSOLA|           13|        5000|
|INFLECTRA|           13|        6638|
|   CIMZIA|            0|        2808|
|  SIMPONI|            0|        1005|
|   HUMIRA|          121|       59023|
+---------+-------------+------------+



In [53]:
# pmg=finalgroupcms.toPandas()
# pmg.to_csv('output/grouped_result_after_appending.csv',index=False)

Distinct Drug Names
---

In [54]:
mergedcms.select('Drug_Name').distinct().show()

+---------+
|Drug_Name|
+---------+
| REMICADE|
|  ENTYVIO|
|  STELARA|
|RENFLEXIS|
|   AVSOLA|
|INFLECTRA|
|   CIMZIA|
|  SIMPONI|
|   HUMIRA|
+---------+



Distinct Drug_Name Count
---

In [55]:
mergedcms.select('Drug_Name').distinct().count()

9

Aggregating based on NPI and Drug_Name 
---

In [56]:
mergedcms = mergedcms.groupBy("NPI","Drug_Name").agg(sum("Tot_Benes").alias("Tot_Benes"),sum("Tot_Clms").alias("Tot_Clms"))

In [57]:
print("Shape of Dataset :",(mergedcms.count(),len(mergedcms.columns)))

Shape of Dataset : (4568, 4)


Count of Distinct Rows
---

In [58]:
mergedcms.select(['NPI','Drug_Name']).distinct().count()

4568

In [59]:
final_mergedcms = mergedcms.withColumn("Tot_Benes",mergedcms.Tot_Benes.cast("int"))
final_mergedcms = final_mergedcms.withColumn("Tot_Clms",final_mergedcms.Tot_Clms.cast("int"))

final_mergedcms=final_mergedcms.groupBy("NPI").pivot("Drug_Name").sum('Tot_Benes','Tot_Clms')
final_mergedcms.show(truncate=False)

+----------+---------------------+--------------------+---------------------+--------------------+----------------------+---------------------+---------------------+--------------------+------------------------+-----------------------+-----------------------+----------------------+------------------------+-----------------------+----------------------+---------------------+----------------------+---------------------+
|NPI       |AVSOLA_sum(Tot_Benes)|AVSOLA_sum(Tot_Clms)|CIMZIA_sum(Tot_Benes)|CIMZIA_sum(Tot_Clms)|ENTYVIO_sum(Tot_Benes)|ENTYVIO_sum(Tot_Clms)|HUMIRA_sum(Tot_Benes)|HUMIRA_sum(Tot_Clms)|INFLECTRA_sum(Tot_Benes)|INFLECTRA_sum(Tot_Clms)|REMICADE_sum(Tot_Benes)|REMICADE_sum(Tot_Clms)|RENFLEXIS_sum(Tot_Benes)|RENFLEXIS_sum(Tot_Clms)|SIMPONI_sum(Tot_Benes)|SIMPONI_sum(Tot_Clms)|STELARA_sum(Tot_Benes)|STELARA_sum(Tot_Clms)|
+----------+---------------------+--------------------+---------------------+--------------------+----------------------+---------------------+-------------

In [60]:
print("Shape of Dataset :",(final_mergedcms.count(),len(final_mergedcms.columns)))

Shape of Dataset : (3553, 19)


Count of Distinct NPI's
---

In [61]:
final_mergedcms.select('NPI').distinct().count()

3553

Filling the Null values with 0
---

In [62]:
final_mergedcms=final_mergedcms.fillna(value=0)
final_mergedcms.show()

+----------+---------------------+--------------------+---------------------+--------------------+----------------------+---------------------+---------------------+--------------------+------------------------+-----------------------+-----------------------+----------------------+------------------------+-----------------------+----------------------+---------------------+----------------------+---------------------+
|       NPI|AVSOLA_sum(Tot_Benes)|AVSOLA_sum(Tot_Clms)|CIMZIA_sum(Tot_Benes)|CIMZIA_sum(Tot_Clms)|ENTYVIO_sum(Tot_Benes)|ENTYVIO_sum(Tot_Clms)|HUMIRA_sum(Tot_Benes)|HUMIRA_sum(Tot_Clms)|INFLECTRA_sum(Tot_Benes)|INFLECTRA_sum(Tot_Clms)|REMICADE_sum(Tot_Benes)|REMICADE_sum(Tot_Clms)|RENFLEXIS_sum(Tot_Benes)|RENFLEXIS_sum(Tot_Clms)|SIMPONI_sum(Tot_Benes)|SIMPONI_sum(Tot_Clms)|STELARA_sum(Tot_Benes)|STELARA_sum(Tot_Clms)|
+----------+---------------------+--------------------+---------------------+--------------------+----------------------+---------------------+-------------

Exporting Data
---

In [63]:
pcms=final_mergedcms.toPandas()
pcms.to_csv('output/Merged_CMS_B&D.csv',index=False)

In [43]:
final_mergedcms.filter(col('NPI').rlike('1013194414')).show()

+----------+---------------------+--------------------+---------------------+--------------------+----------------------+---------------------+---------------------+--------------------+------------------------+-----------------------+-----------------------+----------------------+------------------------+-----------------------+----------------------+---------------------+----------------------+---------------------+----------------------+---------------------+----------------------+---------------------+
|       NPI|AVSOLA_sum(Tot_Benes)|AVSOLA_sum(Tot_Clms)|CIMZIA_sum(Tot_Benes)|CIMZIA_sum(Tot_Clms)|ENTYVIO_sum(Tot_Benes)|ENTYVIO_sum(Tot_Clms)|HUMIRA_sum(Tot_Benes)|HUMIRA_sum(Tot_Clms)|INFLECTRA_sum(Tot_Benes)|INFLECTRA_sum(Tot_Clms)|REMICADE_sum(Tot_Benes)|REMICADE_sum(Tot_Clms)|RENFLEXIS_sum(Tot_Benes)|RENFLEXIS_sum(Tot_Clms)|SIMPONI_sum(Tot_Benes)|SIMPONI_sum(Tot_Clms)|SKYRIZI_sum(Tot_Benes)|SKYRIZI_sum(Tot_Clms)|STELARA_sum(Tot_Benes)|STELARA_sum(Tot_Clms)|ZEPOSIA_sum(Tot_Benes)