In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import functions as func



In [2]:
sc = SparkContext("local", "HealthCareCapstone")
sqlContext = SQLContext(sc)

                       

In [3]:
# load the dataset
#  1. Part D drug 2016 dataset 
#     https://data.cms.gov/Medicare-Part-D/Medicare-Provider-Utilization-and-Payment-Data-201/yvpj-pmj2
#  2. CMS Payment Data Set  
#     https://www.cms.gov/OpenPayments/Explore-the-Data/Dataset-Downloads.html#
#  3. EXCLUEDED Datavset
#     https://oig.hhs.gov/exclusions/exclusions_list.asp 
#  4. Opioids drugs and opioids prescriber information

partD_drug_rawdata = sc.textFile("/Users/my_macbook/Projects/Project_Capstone/DataSet/PartD_Drug_16/PartD_Drug_16.txt")
#partD_pres_rawdata = sc.textFile("/Users/my_macbook/Projects/Project_Capstone/DataSet/PartD_Prescriber_16/PartD_Prescriber_16.txt")
payment_rawdata = sqlContext.read.csv("/Users/my_macbook/Projects/Project_Capstone/DataSet/2017Payment/OP_DTL_GNRL_PGYR2017_P06292018.csv", \
                                       header = True, inferSchema = True)
IELE_rawdata = sqlContext.read.csv("/Users/my_macbook/Projects/Project_Capstone/DataSet/2018UpdatedLEIE.csv", \
                                       header = True, inferSchema = True)
Opioids_drugs = sqlContext.read.csv("/Users/my_macbook/Projects/Project_Capstone/DataSet/opioids.csv", \
                                       header = True, inferSchema = True)
Opioids_Prescriber = sqlContext.read.csv("/Users/my_macbook/Projects/Project_Capstone/DataSet/opioids_prescriber_info.csv", \
                                       header = True, inferSchema = True)


In [4]:
# PAYMENT DATA PREPARATION
payment_features_df = payment_rawdata.select(["Physician_First_Name",\
                                             "Physician_Last_Name", \
                                             "Recipient_City", \
                                             "Recipient_State", \
                                             "Total_Amount_of_Payment_USDollars"])
#payment_features_df.show()



In [5]:
payment_features_df = payment_features_df.withColumn("Total_Amount_of_Payment_USDollars",payment_features_df["Total_Amount_of_Payment_USDollars"].cast(DoubleType()))

payment_features_gourpbyTotal_df = payment_features_df.groupBy("Physician_First_Name","Physician_Last_Name","Recipient_City","Recipient_State").sum("Total_Amount_of_Payment_USDollars")



In [6]:
payment_features_gourpbyTotal_df.show()

+--------------------+-------------------+--------------+---------------+--------------------------------------+
|Physician_First_Name|Physician_Last_Name|Recipient_City|Recipient_State|sum(Total_Amount_of_Payment_USDollars)|
+--------------------+-------------------+--------------+---------------+--------------------------------------+
|               SARAH|               AZAD| MOUNTAIN VIEW|             CA|                                293.92|
|               Laxmi|              Berwa|       Clinton|             MD|                                190.44|
|             Prabodh|              Mehta| Elizabethtown|             KY|                                132.92|
|             William|              Laird|        Dallas|             TX|                                 24.31|
|              Preeti|              Desai|        Conroe|             TX|                                  65.6|
|                ERIC|            SCHROER|      HILLIARD|             OH|                     95

In [6]:
payment_features_gourpbyTotal_df = payment_features_gourpbyTotal_df.withColumnRenamed("Physician_First_Name" , "first_name")
payment_features_gourpbyTotal_df = payment_features_gourpbyTotal_df.withColumnRenamed("Physician_Last_Name" , "last_name")
payment_features_gourpbyTotal_df = payment_features_gourpbyTotal_df.withColumnRenamed("Recipient_City" , "city")
payment_features_gourpbyTotal_df = payment_features_gourpbyTotal_df.withColumnRenamed("Recipient_State" , "state")
payment_features_gourpbyTotal_df = payment_features_gourpbyTotal_df.withColumnRenamed("sum(Total_Amount_of_Payment_USDollars)" , "Total_Amount_of_Payment")

payment_features_gourpbyTotal_df.show()

+-----------+----------+--------------+-----+-----------------------+
| first_name| last_name|          city|state|Total_Amount_of_Payment|
+-----------+----------+--------------+-----+-----------------------+
|      SARAH|      AZAD| MOUNTAIN VIEW|   CA|                 293.92|
|      Laxmi|     Berwa|       Clinton|   MD|                 190.44|
|    Prabodh|     Mehta| Elizabethtown|   KY|                 132.92|
|    William|     Laird|        Dallas|   TX|                  24.31|
|     Preeti|     Desai|        Conroe|   TX|                   65.6|
|       ERIC|   SCHROER|      HILLIARD|   OH|      956.8499999999997|
|  ELIZABETH|     TANZI|     BALTIMORE|   MD|                 4155.8|
|CHRISTOPHER|   STEVENS|       SUWANEE|   GA|                  55.55|
|      AHMED|    YOUSRY|   JERSEY CITY|   NJ|     226.83999999999997|
|     CARRIE|FUNKHAUSER|MOUNT PLEASANT|   SC|                  15.43|
|       JOHN|   O NEILL|      BETHESDA|   MD|                 151.12|
|       MARY|   WITT

In [7]:
payment_features_gourpbyTotal_df = payment_features_gourpbyTotal_df.sort(payment_features_gourpbyTotal_df.Total_Amount_of_Payment.desc())


In [8]:
payment_features_gourpbyTotal_df.show()

+----------+----------+-------------+-----+-----------------------+
|first_name| last_name|         city|state|Total_Amount_of_Payment|
+----------+----------+-------------+-----+-----------------------+
|      null|      null|       DUARTE|   CA|         4.0078907511E8|
|      null|      null|       BOSTON|   MA|    5.974409932000001E7|
|      null|      null|        Phila|   PA|   3.3104393040000003E7|
|     JAMES|    GAMMIE|    Baltimore|   MD|           2.68435298E7|
|   STEPHEN|  BURKHART|  SAN ANTONIO|   TX|          2.296924526E7|
|   CHARLES|    GOODIS|  ALBUQUERQUE|   NM|   2.2880446670000006E7|
|      null|      null|  LOS ANGELES|   CA|          1.722602166E7|
|     KEVIN|     FOLEY|      Memphis|   TN|          1.703851275E7|
|      null|      null|   Hackensack|   NJ|          1.509109387E7|
|      null|      null|       Boston|   MA|   1.2392886729999997E7|
|      null|      null|  Los Angeles|   CA|          1.113067121E7|
|      IVAN|    OSORIO|  KANSAS CITY|   KS|     

In [9]:
# Part D drug data Preparation 

header = partD_drug_rawdata.first()
partD_drug_rawdata =partD_drug_rawdata.filter (lambda line: line != header)

temp_var = partD_drug_rawdata.map(lambda k: k.split("\t"))
partD_Drug_df = temp_var.toDF(header.split("\t"))
partD_Drug_df.show()


partD_Drug_df = partD_Drug_df.withColumn("bene_count",partD_Drug_df["bene_count"].cast(DoubleType()))
partD_Drug_df = partD_Drug_df.withColumn("total_claim_count",partD_Drug_df["total_claim_count"].cast(DoubleType()))
partD_Drug_df = partD_Drug_df.withColumn("total_30_day_fill_count",partD_Drug_df["total_30_day_fill_count"].cast(DoubleType()))
partD_Drug_df = partD_Drug_df.withColumn("total_day_supply",partD_Drug_df["total_day_supply"].cast(DoubleType()))
partD_Drug_df = partD_Drug_df.withColumn("total_drug_cost",partD_Drug_df["total_drug_cost"].cast(DoubleType()))
partD_Drug_df = partD_Drug_df.withColumn("bene_count_ge65",partD_Drug_df["bene_count_ge65"].cast(DoubleType()))
partD_Drug_df = partD_Drug_df.withColumn("total_claim_count_ge65",partD_Drug_df["total_claim_count_ge65"].cast(DoubleType()))
partD_Drug_df = partD_Drug_df.withColumn("total_30_day_fill_count_ge65",partD_Drug_df["total_30_day_fill_count_ge65"].cast(DoubleType()))
partD_Drug_df = partD_Drug_df.withColumn("total_day_supply_ge65",partD_Drug_df["total_day_supply_ge65"].cast(DoubleType()))
partD_Drug_df = partD_Drug_df.withColumn("total_drug_cost_ge65",partD_Drug_df["total_drug_cost_ge65"].cast(DoubleType()))








+----------+----------------------------+-------------------------+-------------------+--------------------+---------------------+----------------+--------------------+--------------------+----------+-----------------+-----------------------+----------------+---------------+---------------+-----------------------------+----------------------+------------------+----------------------------+---------------------+--------------------+
|       npi|nppes_provider_last_org_name|nppes_provider_first_name|nppes_provider_city|nppes_provider_state|specialty_description|description_flag|           drug_name|        generic_name|bene_count|total_claim_count|total_30_day_fill_count|total_day_supply|total_drug_cost|bene_count_ge65|bene_count_ge65_suppress_flag|total_claim_count_ge65|ge65_suppress_flag|total_30_day_fill_count_ge65|total_day_supply_ge65|total_drug_cost_ge65|
+----------+----------------------------+-------------------------+-------------------+--------------------+--------------------

In [10]:
partD_Drug_df.head()

Row(npi='1003000126', nppes_provider_last_org_name='ENKESHAFI', nppes_provider_first_name='ARDALAN', nppes_provider_city='CUMBERLAND', nppes_provider_state='MD', specialty_description='Internal Medicine', description_flag='S', drug_name='ATORVASTATIN CALCIUM', generic_name='ATORVASTATIN CALCIUM', bene_count=None, total_claim_count=13.0, total_30_day_fill_count=15.0, total_day_supply=450.0, total_drug_cost=139.32, bene_count_ge65=None, bene_count_ge65_suppress_flag='*', total_claim_count_ge65=13.0, ge65_suppress_flag='', total_30_day_fill_count_ge65=15.0, total_day_supply_ge65=450.0, total_drug_cost_ge65=139.32)

In [11]:
partD_Drug_df.printSchema()

root
 |-- npi: string (nullable = true)
 |-- nppes_provider_last_org_name: string (nullable = true)
 |-- nppes_provider_first_name: string (nullable = true)
 |-- nppes_provider_city: string (nullable = true)
 |-- nppes_provider_state: string (nullable = true)
 |-- specialty_description: string (nullable = true)
 |-- description_flag: string (nullable = true)
 |-- drug_name: string (nullable = true)
 |-- generic_name: string (nullable = true)
 |-- bene_count: double (nullable = true)
 |-- total_claim_count: double (nullable = true)
 |-- total_30_day_fill_count: double (nullable = true)
 |-- total_day_supply: double (nullable = true)
 |-- total_drug_cost: double (nullable = true)
 |-- bene_count_ge65: double (nullable = true)
 |-- bene_count_ge65_suppress_flag: string (nullable = true)
 |-- total_claim_count_ge65: double (nullable = true)
 |-- ge65_suppress_flag: string (nullable = true)
 |-- total_30_day_fill_count_ge65: double (nullable = true)
 |-- total_day_supply_ge65: double (nulla

In [11]:
partD_Drug_features_df = partD_Drug_df.select(["npi",\
                                               "nppes_provider_city",\
                                               "nppes_provider_state", \
                                               "nppes_provider_last_org_name", \
                                               "nppes_provider_first_name", \
                                               "specialty_description",\
                                               "drug_name", \
                                               "generic_name",\
                                               "total_drug_cost",\
                                               "total_claim_count",\
                                               "total_day_supply"])

partD_Drug_featuresGroupbyNPI_df = partD_Drug_features_df.groupBy("npi",\
                                                                 "nppes_provider_city",\
                                                                 "nppes_provider_state", \
                                                                 "nppes_provider_last_org_name", \
                                                                 "nppes_provider_first_name", \
                                                                 "specialty_description",\
                                                                 "drug_name", \
                                                                 "generic_name")\
                                                                         .agg(func.sum("total_drug_cost"),\
                                                                              func.sum("total_claim_count"),\
                                                                              func.sum("total_day_supply"), \
                                                                              func.variance("total_drug_cost"),\
                                                                              func.variance("total_claim_count"),\
                                                                              func.variance("total_day_supply"),\
                                                                              func.max("total_drug_cost"),\
                                                                              func.max("total_claim_count"),\
                                                                              func.max("total_day_supply"),\
                                                                             )


partD_Drug_featuresGroupbyNPI_df = partD_Drug_featuresGroupbyNPI_df.withColumnRenamed("nppes_provider_first_name" , "first_name")
partD_Drug_featuresGroupbyNPI_df = partD_Drug_featuresGroupbyNPI_df.withColumnRenamed("nppes_provider_last_org_name" , "last_name")
partD_Drug_featuresGroupbyNPI_df = partD_Drug_featuresGroupbyNPI_df.withColumnRenamed("nppes_provider_city" , "city")
partD_Drug_featuresGroupbyNPI_df = partD_Drug_featuresGroupbyNPI_df.withColumnRenamed("nppes_provider_state" , "state")

In [13]:
partD_Drug_featuresGroupbyNPI_df.show()

+----------+---------------+-----+---------------+----------+---------------------+--------------------+--------------------+--------------------+----------------------+---------------------+-------------------------+---------------------------+--------------------------+--------------------+----------------------+---------------------+
|       npi|           city|state|      last_name|first_name|specialty_description|           drug_name|        generic_name|sum(total_drug_cost)|sum(total_claim_count)|sum(total_day_supply)|var_samp(total_drug_cost)|var_samp(total_claim_count)|var_samp(total_day_supply)|max(total_drug_cost)|max(total_claim_count)|max(total_day_supply)|
+----------+---------------+-----+---------------+----------+---------------------+--------------------+--------------------+--------------------+----------------------+---------------------+-------------------------+---------------------------+--------------------------+--------------------+----------------------+------

In [None]:
# EXCLUDED Data Preparation

In [13]:
IELE_rawdata.show()

+----+----+---+------------------------+------------------+---------------+----+----------+----+--------------------+---------+---+-----+------+--------+---+---+----+
| _c0| _c1|   |14 LAWRENCE AVE PHARMACY|          PHARMACY|            _c5| _c6|        07| _c8|  14 LAWRENCE AVENUE|SMITHTOWN| NY|11787|1128a1|19880830|015|016|_c17|
+----+----+---+------------------------+------------------+---------------+----+----------+----+--------------------+---------+---+-----+------+--------+---+---+----+
|null|null|   |    143 MEDICAL EQUIP...|       DME COMPANY|   DME - OXYGEN|null|         0|null|    701 NW 36 AVENUE|    MIAMI| FL|33125|1128b7|19970620|  0|  0|null|
|null|null|   |    184TH STREET PHAR...|    OTHER BUSINESS|       PHARMACY|null|1922348218|null|       69 E 184TH ST|    BRONX| NY|10468|1128a1|20180419|  0|  0|null|
|null|null|   |    1951 FLATBUSH AVE...|          PHARMACY|           null|null|         0|null|   1951 FLATBUSH AVE| BROOKLYN| NY|11234|1128b5|20090319|  0|  0|null

In [14]:

npi_fraud_df = IELE_rawdata.select (["npi", "EXCLTYPE"])

In [15]:
npi_fraud_df = npi_fraud_df.filter(npi_fraud_df.npi != 0)

In [16]:
npi_fraud_df.printSchema

<bound method DataFrame.printSchema of DataFrame[npi: int, EXCLTYPE: string]>

In [17]:
npi_fraud_df = npi_fraud_df.withColumn("is_fraud", npi_fraud_df["npi"]/npi_fraud_df["npi"] )

In [18]:
npi_fraud_df = npi_fraud_df.withColumn("is_fraud",npi_fraud_df["is_fraud"].cast(IntegerType()))

TypeError: 'Column' object is not callable

In [None]:
# Opioid drugs

In [19]:
Opioids_Prescriber.printSchema()

root
 |-- npi: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Credentials: string (nullable = true)
 |-- Specialty: string (nullable = true)
 |-- ABILIFY: integer (nullable = true)
 |-- ACETAMINOPHEN.CODEINE: integer (nullable = true)
 |-- ACYCLOVIR: integer (nullable = true)
 |-- ADVAIR.DISKUS: integer (nullable = true)
 |-- AGGRENOX: integer (nullable = true)
 |-- ALENDRONATE.SODIUM: integer (nullable = true)
 |-- ALLOPURINOL: integer (nullable = true)
 |-- ALPRAZOLAM: integer (nullable = true)
 |-- AMIODARONE.HCL: integer (nullable = true)
 |-- AMITRIPTYLINE.HCL: integer (nullable = true)
 |-- AMLODIPINE.BESYLATE: integer (nullable = true)
 |-- AMLODIPINE.BESYLATE.BENAZEPRIL: integer (nullable = true)
 |-- AMOXICILLIN: integer (nullable = true)
 |-- AMOX.TR.POTASSIUM.CLAVULANATE: integer (nullable = true)
 |-- AMPHETAMINE.SALT.COMBO: integer (nullable = true)
 |-- ATENOLOL: integer (nullable = true)
 |-- ATORVASTATIN.CALCIUM

In [20]:
Opioids_Prescriber_npi_df = Opioids_Prescriber.select (["npi","Specialty","is_Opioid_Prescriber"])

In [21]:
Opioids_Prescriber_npi_df.show()

+----------+-------------------+--------------------+
|       npi|          Specialty|is_Opioid_Prescriber|
+----------+-------------------+--------------------+
|1710982582|            Dentist|                   1|
|1245278100|    General Surgery|                   1|
|1427182161|   General Practice|                   0|
|1669567541|  Internal Medicine|                   1|
|1679650949|Hematology/Oncology|                   1|
|1548580897|    General Surgery|                   1|
|1437192002|    Family Practice|                   1|
|1407113988| Nurse Practitioner|                   0|
|1023260569|          Optometry|                   0|
|1821106832|  Internal Medicine|                   1|
|1609931914|   General Practice|                   1|
|1659334472|    General Surgery|                   1|
|1144205303|    Family Practice|                   1|
|1548275050|         Cardiology|                   0|
|1952598419|Hematology/Oncology|                   1|
|1780661793|    General Surg

In [28]:
df =  npi_fraud_df.join(Opioids_Prescriber_npi_df, (Opioids_Prescriber_npi_df.npi == npi_fraud_df.npi))

In [29]:
df.printSchema()

root
 |-- npi: integer (nullable = true)
 |-- EXCLTYPE: string (nullable = true)
 |-- is_fraud: integer (nullable = true)
 |-- npi: integer (nullable = true)
 |-- Specialty: string (nullable = true)
 |-- is_Opioid_Prescriber: integer (nullable = true)



In [30]:

sqlContext.registerDataFrameAsTable(payment_features_gourpbyTotal_df,"payment_table")
payment_df = sqlContext.sql("Select * from payment_table")


In [31]:
sqlContext.registerDataFrameAsTable(partD_Drug_featuresGroupbyNPI_df,"partD_table")
partD_df = sqlContext.sql("Select * from partD_table")

In [32]:
sqlContext.registerDataFrameAsTable(npi_fraud_df,"npi_fraud_table")
npifraud_df = sqlContext.sql("Select * from npi_fraud_table")

In [33]:
sqlContext.registerDataFrameAsTable(Opioids_Prescriber_npi_df,"npi_opioids_table")
npiopioids_df = sqlContext.sql("Select * from npi_opioids_table")

In [None]:
features_df = sqlContext.sql ("select npi ")


In [None]:
SELECT npi
FROM (
    SELECT distinct npi , "first_name" , "last_name", "city" ,"state"
    FROM payment_table
    ) npis 
    
LEFT JOIN  payments pay
ON npis."NPPES_PROVIDER_FIRST_NAME" = pay."Physician_First_Name"
AND npis."NPPES_PROVIDER_LAST_ORG_NAME" = pay."Physician_Last_Name"
AND npis."NPPES_PROVIDER_CITY" = pay."Recipient_City"
AND npis."NPPES_PROVIDER_STATE" = pay."Recipient_State"

GROUP BY npi
;