In [1]:
import pandas as pd
import numpy as np
import scipy
import os 

import matplotlib.pyplot as plt


from sklearn.naive_bayes import GaussianNB 
from sklearn.linear_model import LogisticRegression 
from sklearn import ensemble 
from sklearn.ensemble import RandomForestClassifier 
from sklearn.metrics import brier_score_loss, precision_score, recall_score,f1_score, roc_auc_score, accuracy_score 
from sklearn.metrics import confusion_matrix, roc_curve

from sklearn.preprocessing import StandardScaler 
from sklearn.feature_extraction import DictVectorizer
from sklearn.cluster import KMeans

import random

from scipy.stats import ttest_ind

In [2]:
part_D_prescriber_dir = os.path.join(".", "datasets", "Medicare_Part_D_Prescribers_by_Provider_2021.csv")

**Loading the part D dataset about precription drug provider from the CSV file.**

In [551]:
partD_pd = pd.read_csv(part_D_prescriber_dir)

In [552]:
partD_pd

Unnamed: 0,PRSCRBR_NPI,Prscrbr_Last_Org_Name,Prscrbr_First_Name,Prscrbr_MI,Prscrbr_Crdntls,Prscrbr_Gndr,Prscrbr_Ent_Cd,Prscrbr_St1,Prscrbr_St2,Prscrbr_City,...,Bene_Male_Cnt,Bene_Race_Wht_Cnt,Bene_Race_Black_Cnt,Bene_Race_Api_Cnt,Bene_Race_Hspnc_Cnt,Bene_Race_Natind_Cnt,Bene_Race_Othr_Cnt,Bene_Dual_Cnt,Bene_Ndual_Cnt,Bene_Avg_Risk_Scre
0,1003000126,Enkeshafi,Ardalan,,M.D.,M,I,6410 Rockledge Dr Ste 304,,Bethesda,...,153.0,231.0,32.0,47.0,20.0,0.0,28.0,157.0,201.0,1.462630
1,1003000142,Khalil,Rashid,,M.D.,M,I,4126 N Holland Sylvania Rd,Suite 220,Toledo,...,105.0,144.0,143.0,,,0.0,,166.0,133.0,1.691054
2,1003000167,Escobar,Julio,E,DDS,M,I,5 Pine Cone Rd,,Dayton,...,14.0,32.0,0.0,0.0,,0.0,,0.0,35.0,0.727543
3,1003000175,Reyes-Vasquez,Belinda,,D.D.S.,F,I,322 N Azusa Ave Ste 202,,La Puente,...,,,0.0,,,0.0,0.0,,,0.946591
4,1003000423,Velotta,Jennifer,A,M.D.,F,I,11100 Euclid Ave,,Cleveland,...,,57.0,,0.0,,0.0,,14.0,52.0,0.801164
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1287449,1992999650,Yong,Wayne,B,DMD,M,I,28 Atlantic Avenue,121,Boston,...,14.0,,,22.0,0.0,0.0,,,,1.136733
1287450,1992999692,Haas,Meghan,R,R.PH.,F,I,900 Wooster Rd N,,Barberton,...,,13.0,0.0,0.0,0.0,0.0,0.0,0.0,13.0,0.664538
1287451,1992999817,Takenishi,Greg,S,M.D.,M,I,4601 Dale Rd,,Modesto,...,,,,,,0.0,0.0,,,0.964235
1287452,1992999825,Deschenes,Geoffrey,R,M.D.,M,I,1100 9th Ave,Ms:m4-Pfs,Seattle,...,53.0,81.0,,,,0.0,,12.0,90.0,1.125913


In [553]:
partD_pd.shape

(1287454, 85)

In [554]:
partD_pd.columns

Index(['PRSCRBR_NPI', 'Prscrbr_Last_Org_Name', 'Prscrbr_First_Name',
       'Prscrbr_MI', 'Prscrbr_Crdntls', 'Prscrbr_Gndr', 'Prscrbr_Ent_Cd',
       'Prscrbr_St1', 'Prscrbr_St2', 'Prscrbr_City', 'Prscrbr_State_Abrvtn',
       'Prscrbr_State_FIPS', 'Prscrbr_zip5', 'Prscrbr_RUCA',
       'Prscrbr_RUCA_Desc', 'Prscrbr_Cntry', 'Prscrbr_Type',
       'Prscrbr_Type_src', 'Tot_Clms', 'Tot_30day_Fills', 'Tot_Drug_Cst',
       'Tot_Day_Suply', 'Tot_Benes', 'GE65_Sprsn_Flag', 'GE65_Tot_Clms',
       'GE65_Tot_30day_Fills', 'GE65_Tot_Drug_Cst', 'GE65_Tot_Day_Suply',
       'GE65_Bene_Sprsn_Flag', 'GE65_Tot_Benes', 'Brnd_Sprsn_Flag',
       'Brnd_Tot_Clms', 'Brnd_Tot_Drug_Cst', 'Gnrc_Sprsn_Flag',
       'Gnrc_Tot_Clms', 'Gnrc_Tot_Drug_Cst', 'Othr_Sprsn_Flag',
       'Othr_Tot_Clms', 'Othr_Tot_Drug_Cst', 'MAPD_Sprsn_Flag',
       'MAPD_Tot_Clms', 'MAPD_Tot_Drug_Cst', 'PDP_Sprsn_Flag', 'PDP_Tot_Clms',
       'PDP_Tot_Drug_Cst', 'LIS_Sprsn_Flag', 'LIS_Tot_Clms', 'LIS_Drug_Cst',
       'NonLIS_Sprsn_

**Take all the columns that woud be used in Machine Learning problem.**

In [555]:
part_D_full_data = partD_pd.loc[:,['PRSCRBR_NPI',
                                               'Prscrbr_Type',
                                               'Tot_Drug_Cst',\
                                               'Tot_Clms',\
                                               'Tot_Day_Suply']]
part_D_full_data = part_D_full_data.rename(columns = {"PRSCRBR_NPI": "npi", 
                                            'Prscrbr_Type': "specialty_description",
                                            "Tot_Drug_Cst": "total_drug_cost",
                                            'Tot_Clms': "total_claim",
                                            "Tot_Day_Suply": "total_day_supply"})
part_D_full_data.head(5)

Unnamed: 0,npi,specialty_description,total_drug_cost,total_claim,total_day_supply
0,1003000126,Internal Medicine,81553.44,1123,74080
1,1003000142,Anesthesiology,37841.04,1493,43944
2,1003000167,Dentist,221.66,47,455
3,1003000175,Dentist,125.82,20,161
4,1003000423,Obstetrics & Gynecology,20757.65,206,10231


**The next dataset: Data about the total payments of physicians in each NPIs in the United States**

In [556]:
PHYSICIAN_PAYMENTS_DIR = os.path.join(".", "datasets", "PGYR21_P063023", "OP_DTL_GNRL_PGYR2021_P06302023.csv")
# Number of rows to retrieve randomly
num_rows_to_read = 1000000  # Adjust this number as needed
physician_payment_data = pd.read_csv(PHYSICIAN_PAYMENTS_DIR, 
                                     nrows = num_rows_to_read)
#Read the first rows of the dataframe
physician_payment_data.head(5)

  physician_payment_data = pd.read_csv(PHYSICIAN_PAYMENTS_DIR,


Unnamed: 0,Change_Type,Covered_Recipient_Type,Teaching_Hospital_CCN,Teaching_Hospital_ID,Teaching_Hospital_Name,Covered_Recipient_Profile_ID,Covered_Recipient_NPI,Covered_Recipient_First_Name,Covered_Recipient_Middle_Name,Covered_Recipient_Last_Name,...,Associated_Drug_or_Biological_NDC_4,Associated_Device_or_Medical_Supply_PDI_4,Covered_or_Noncovered_Indicator_5,Indicate_Drug_or_Biological_or_Device_or_Medical_Supply_5,Product_Category_or_Therapeutic_Area_5,Name_of_Drug_or_Biological_or_Device_or_Medical_Supply_5,Associated_Drug_or_Biological_NDC_5,Associated_Device_or_Medical_Supply_PDI_5,Program_Year,Payment_Publication_Date
0,UNCHANGED,Covered Recipient Physician,,,,92058.0,1043218000.0,Ahad,,Mahootchi,...,,,,,,,,,2021,06/30/2023
1,UNCHANGED,Covered Recipient Physician,,,,1231338.0,1487818000.0,Arsham,,Sheybani,...,,,,,,,,,2021,06/30/2023
2,UNCHANGED,Covered Recipient Physician,,,,992939.0,1104822000.0,Donato,J,Borrillo,...,,,,,,,,,2021,06/30/2023
3,UNCHANGED,Covered Recipient Physician,,,,774776.0,1457355000.0,Earl,,Craven,...,,,,,,,,,2021,06/30/2023
4,UNCHANGED,Covered Recipient Physician,,,,106575.0,1346417000.0,Michael,J,Bauer,...,,,,,,,,,2021,06/30/2023


**Taking only relevant columns here: "npi" of the physician and the total payment for them.**

In [557]:
physician_data = physician_payment_data.loc[:, ["Covered_Recipient_NPI", 'Total_Amount_of_Payment_USDollars']]
physician_data = physician_data.rename(columns = {"Covered_Recipient_NPI": "npi", 
                                                  "Total_Amount_of_Payment_USDollars": "total_payment"})
physician_data.head(5)

Unnamed: 0,npi,total_payment
0,1043218000.0,2500.0
1,1487818000.0,3000.0
2,1104822000.0,8580.0
3,1457355000.0,500.0
4,1346417000.0,37000.0


In [558]:
physician_data = physician_data.dropna(subset = ["npi"])

**Groupby to calculate the aggregate payment sum for each individual NPI**

In [559]:
physician_data["npi"] = physician_data["npi"].astype(int)
total_payment_sum = physician_data.groupby(["npi"])["total_payment"].sum()
physician_data["total_payment_sum"] = physician_data["npi"].map(total_payment_sum.to_dict())
physician_data = physician_data.loc[:, ["npi", "total_payment_sum"]]
physician_data = physician_data.drop_duplicates()
physician_data

Unnamed: 0,npi,total_payment_sum
0,1043218118,2983.89
1,1487817995,36790.20
2,1104822170,8580.00
3,1457354821,500.00
4,1346417086,38925.00
...,...,...
999987,1639141732,22.34
999989,1689052748,11.36
999991,1841230646,26.51
999995,1255339909,12.71


**Merging the data in two dataset together, according to the NPI column.**

In [560]:
full_dataset = pd.merge(part_D_full_data, physician_data, on = "npi", how = "left")
full_dataset

Unnamed: 0,npi,specialty_description,total_drug_cost,total_claim,total_day_supply,total_payment_sum
0,1003000126,Internal Medicine,81553.44,1123,74080,
1,1003000142,Anesthesiology,37841.04,1493,43944,
2,1003000167,Dentist,221.66,47,455,
3,1003000175,Dentist,125.82,20,161,
4,1003000423,Obstetrics & Gynecology,20757.65,206,10231,35.88
...,...,...,...,...,...,...
1287449,1992999650,Dentist,498.08,74,1173,
1287450,1992999692,Pharmacist,2167.29,15,282,
1287451,1992999817,Orthopaedic Surgery,248.81,25,219,
1287452,1992999825,Otolaryngology,7712.09,206,4636,


**Next dataset: The list of excluded NPI in the United States, as of 2023. That is the list of NPIs that have been
caught with medical frauds and labeled as such by the United States medical authority.**

In [561]:
EXCLUSION_CSV_DIR = os.path.join(".", "datasets", "LEIE.csv")
exclusion_data = pd.read_csv(EXCLUSION_CSV_DIR)
#Read the first rows of the dataframe
exclusion_data.head(5)

  exclusion_data = pd.read_csv(EXCLUSION_CSV_DIR)


Unnamed: 0,LASTNAME,FIRSTNAME,MIDNAME,BUSNAME,GENERAL,SPECIALTY,UPIN,NPI,DOB,ADDRESS,CITY,STATE,ZIP,EXCLTYPE,EXCLDATE,REINDATE,WAIVERDATE,WVRSTATE
0,,,,"#1 MARKETING SERVICE, INC",OTHER BUSINESS,SOBER HOME,,0,,239 BRIGHTON BEACH AVENUE,BROOKLYN,NY,11235,1128a1,20200319,0,0,
1,,,,"1 BEST CARE, INC",OTHER BUSINESS,HOME HEALTH AGENCY,,0,,"2161 UNIVERSITY AVENUE W, STE",SAINT PAUL,MN,55114,1128b5,20230518,0,0,
2,,,,101 FIRST CARE PHARMACY INC,OTHER BUSINESS,PHARMACY,,1972902351,,"C/O 609 W 191ST STREET, APT D",NEW YORK,NY,10040,1128b8,20220320,0,0,
3,,,,14 LAWRENCE AVE PHARMACY,PHARMACY,,,0,,14 LAWRENCE AVENUE,SMITHTOWN,NY,11787,1128a1,19880830,0,0,
4,,,,143 MEDICAL EQUIPMENT CO,DME COMPANY,DME - OXYGEN,,0,,701 NW 36 AVENUE,MIAMI,FL,33125,1128b7,19970620,0,0,


**Retriving all the fraudulent NPIs, label them as 1 in the target feature "is_fraud".**

In [562]:
#Get all the rows whose "NPI" column is not equal to 0, that would be the list of excluded NPIs.
npi_fraud_data = exclusion_data.loc[:, ["NPI", "EXCLTYPE"]].query('NPI !=0')
#Rename the column
npi_fraud_data = npi_fraud_data.rename(columns = {"NPI": "npi", "EXCLTYPE": "is_fraud"})
#Set the "is_fraud" column to 1.
npi_fraud_data['is_fraud'] = 1
#View the first rows of the resulting data
npi_fraud_data.head(5)

Unnamed: 0,npi,is_fraud
2,1972902351,1
6,1922348218,1
26,1942476080,1
30,1275600959,1
33,1891731758,1


In [563]:
len(npi_fraud_data)

7060

**Continue merging**

In [564]:
full_dataset = pd.merge(full_dataset, npi_fraud_data, on = "npi", how = "left")
full_dataset

Unnamed: 0,npi,specialty_description,total_drug_cost,total_claim,total_day_supply,total_payment_sum,is_fraud
0,1003000126,Internal Medicine,81553.44,1123,74080,,
1,1003000142,Anesthesiology,37841.04,1493,43944,,
2,1003000167,Dentist,221.66,47,455,,
3,1003000175,Dentist,125.82,20,161,,
4,1003000423,Obstetrics & Gynecology,20757.65,206,10231,35.88,
...,...,...,...,...,...,...,...
1287450,1992999650,Dentist,498.08,74,1173,,
1287451,1992999692,Pharmacist,2167.29,15,282,,
1287452,1992999817,Orthopaedic Surgery,248.81,25,219,,
1287453,1992999825,Otolaryngology,7712.09,206,4636,,


**Filling NaN values with 0, in here the rows whose column "is_fraud" is NaN would be filled with the value 0.**

In [565]:
full_dataset.fillna({"is_fraud": 0}, inplace = True)

In [566]:
full_dataset.head()

Unnamed: 0,npi,specialty_description,total_drug_cost,total_claim,total_day_supply,total_payment_sum,is_fraud
0,1003000126,Internal Medicine,81553.44,1123,74080,,0.0
1,1003000142,Anesthesiology,37841.04,1493,43944,,0.0
2,1003000167,Dentist,221.66,47,455,,0.0
3,1003000175,Dentist,125.82,20,161,,0.0
4,1003000423,Obstetrics & Gynecology,20757.65,206,10231,35.88,0.0


**The column "total_payment_sum" NaN values will be filled by the median value of column**

In [567]:
full_dataset['total_payment_sum'] = full_dataset['total_payment_sum'].fillna(
    full_dataset["total_payment_sum"].median())

In [568]:
full_dataset.head()

Unnamed: 0,npi,specialty_description,total_drug_cost,total_claim,total_day_supply,total_payment_sum,is_fraud
0,1003000126,Internal Medicine,81553.44,1123,74080,55.57,0.0
1,1003000142,Anesthesiology,37841.04,1493,43944,55.57,0.0
2,1003000167,Dentist,221.66,47,455,55.57,0.0
3,1003000175,Dentist,125.82,20,161,55.57,0.0
4,1003000423,Obstetrics & Gynecology,20757.65,206,10231,35.88,0.0


**Only 17 observations in the whole dataset are positive(there are fraud). The rest were negative(no fraud).**

In [570]:
full_dataset[full_dataset['is_fraud'] == 1].count()

npi                      304
specialty_description    304
total_drug_cost          304
total_claim              304
total_day_supply         304
total_payment_sum        304
is_fraud                 304
dtype: int64

In [571]:
partD_pd.head(5)

Unnamed: 0,PRSCRBR_NPI,Prscrbr_Last_Org_Name,Prscrbr_First_Name,Prscrbr_MI,Prscrbr_Crdntls,Prscrbr_Gndr,Prscrbr_Ent_Cd,Prscrbr_St1,Prscrbr_St2,Prscrbr_City,...,Bene_Male_Cnt,Bene_Race_Wht_Cnt,Bene_Race_Black_Cnt,Bene_Race_Api_Cnt,Bene_Race_Hspnc_Cnt,Bene_Race_Natind_Cnt,Bene_Race_Othr_Cnt,Bene_Dual_Cnt,Bene_Ndual_Cnt,Bene_Avg_Risk_Scre
0,1003000126,Enkeshafi,Ardalan,,M.D.,M,I,6410 Rockledge Dr Ste 304,,Bethesda,...,153.0,231.0,32.0,47.0,20.0,0.0,28.0,157.0,201.0,1.46263
1,1003000142,Khalil,Rashid,,M.D.,M,I,4126 N Holland Sylvania Rd,Suite 220,Toledo,...,105.0,144.0,143.0,,,0.0,,166.0,133.0,1.691054
2,1003000167,Escobar,Julio,E,DDS,M,I,5 Pine Cone Rd,,Dayton,...,14.0,32.0,0.0,0.0,,0.0,,0.0,35.0,0.727543
3,1003000175,Reyes-Vasquez,Belinda,,D.D.S.,F,I,322 N Azusa Ave Ste 202,,La Puente,...,,,0.0,,,0.0,0.0,,,0.946591
4,1003000423,Velotta,Jennifer,A,M.D.,F,I,11100 Euclid Ave,,Cleveland,...,,57.0,,0.0,,0.0,,14.0,52.0,0.801164


In [572]:
#All the numerical features from PartDPrescriberDataset that will be taken as feeatures used for Machine Learning
partD_pd_cat_features = ["Prscrbr_Crdntls", "Prscrbr_Gndr", "Prscrbr_State_Abrvtn", "Prscrbr_RUCA_Desc"]
#All the categorical features from PartDPrescriberDataset that will be taken as feeatures used for Machine Learning
partD_pd_num_features = ["GE65_Tot_Clms", "GE65_Tot_Drug_Cst", "GE65_Tot_Day_Suply",
                        "Brnd_Tot_Clms", "Brnd_Tot_Drug_Cst", "Gnrc_Tot_Clms", "Gnrc_Tot_Drug_Cst", 
                        "MAPD_Tot_Clms", "MAPD_Tot_Drug_Cst", "PDP_Tot_Clms", "PDP_Tot_Drug_Cst",
                        "LIS_Tot_Clms", "LIS_Drug_Cst", "NonLIS_Tot_Clms", "NonLIS_Drug_Cst", 
                        "Opioid_Tot_Clms", "Opioid_Tot_Drug_Cst", "Opioid_Prscrbr_Rate", 
                        "Antbtc_Tot_Clms", "Antbtc_Tot_Drug_Cst"]

partD_pd_all_features = partD_pd_cat_features + partD_pd_num_features

#The subset of the original PartDPrescriberDataset that will be used for Machine Learning.
partD_data = partD_pd.loc[:, ["PRSCRBR_NPI"] + partD_pd_all_features]

In [573]:
full_dataset = pd.merge(full_dataset, partD_data, left_on = ["npi"], right_on = ["PRSCRBR_NPI"], 
                       how = "left")

In [574]:
len(full_dataset)

1287455

**Now we have the full dataset used for training**

In [575]:
full_dataset.drop(labels = ["PRSCRBR_NPI"], axis = 1, inplace = True)
full_dataset.head(5)

Unnamed: 0,npi,specialty_description,total_drug_cost,total_claim,total_day_supply,total_payment_sum,is_fraud,Prscrbr_Crdntls,Prscrbr_Gndr,Prscrbr_State_Abrvtn,...,PDP_Tot_Drug_Cst,LIS_Tot_Clms,LIS_Drug_Cst,NonLIS_Tot_Clms,NonLIS_Drug_Cst,Opioid_Tot_Clms,Opioid_Tot_Drug_Cst,Opioid_Prscrbr_Rate,Antbtc_Tot_Clms,Antbtc_Tot_Drug_Cst
0,1003000126,Internal Medicine,81553.44,1123,74080,55.57,0.0,M.D.,M,MD,...,66129.25,650.0,54225.12,473.0,27328.32,17.0,100.47,1.513802,59.0,729.44
1,1003000142,Anesthesiology,37841.04,1493,43944,55.57,0.0,M.D.,M,OH,...,16002.79,997.0,30945.54,496.0,6895.5,658.0,16694.14,44.072338,,
2,1003000167,Dentist,221.66,47,455,55.57,0.0,DDS,M,NV,...,136.33,,,,,,,,35.0,116.76
3,1003000175,Dentist,125.82,20,161,55.57,0.0,D.D.S.,F,CA,...,,,,,,0.0,0.0,0.0,15.0,93.09
4,1003000423,Obstetrics & Gynecology,20757.65,206,10231,35.88,0.0,M.D.,F,OH,...,14326.9,74.0,9103.03,132.0,11654.62,0.0,0.0,0.0,,


**Scaling numerical features to smaller number**

In [576]:
#all categorical features in the full dataset
all_cat_features = ["specialty_description"] + partD_pd_cat_features
#all numerical features in the full dataset
all_num_features = ["total_drug_cost", "total_claim", "total_day_supply", "total_payment_sum"] + partD_pd_num_features
#scaling of all numerical features
for feature in all_num_features:
    
    full_dataset[feature] = full_dataset[feature].map(lambda x: np.log10(x + 1.0))

full_dataset.head(5)

Unnamed: 0,npi,specialty_description,total_drug_cost,total_claim,total_day_supply,total_payment_sum,is_fraud,Prscrbr_Crdntls,Prscrbr_Gndr,Prscrbr_State_Abrvtn,...,PDP_Tot_Drug_Cst,LIS_Tot_Clms,LIS_Drug_Cst,NonLIS_Tot_Clms,NonLIS_Drug_Cst,Opioid_Tot_Clms,Opioid_Tot_Drug_Cst,Opioid_Prscrbr_Rate,Antbtc_Tot_Clms,Antbtc_Tot_Drug_Cst
0,1003000126,Internal Medicine,4.911448,3.050766,4.869707,1.752586,0.0,M.D.,M,MD,...,4.8204,2.813581,4.734209,2.675778,4.436629,1.255273,2.006338,0.400331,1.778151,2.863585
1,1003000142,Anesthesiology,4.577975,3.174351,4.642909,1.752586,0.0,M.D.,M,OH,...,4.204223,2.999131,4.490612,2.696356,3.838629,2.818885,4.22259,1.65391,,
2,1003000167,Dentist,2.347642,1.681241,2.658965,1.752586,0.0,DDS,M,NV,...,2.137765,,,,,,,,1.556303,2.070998
3,1003000175,Dentist,2.103188,1.322219,2.209515,1.752586,0.0,D.D.S.,F,CA,...,,,,,,0.0,0.0,0.0,1.20412,1.973543
4,1003000423,Obstetrics & Gynecology,4.317199,2.31597,4.009961,1.566791,0.0,M.D.,F,OH,...,4.156183,1.875061,3.959234,2.123852,4.066535,0.0,0.0,0.0,,


In [578]:
for feature in all_cat_features:
    print(len(X_train[feature].unique()))

120
913
2
60
16


In [579]:
#Split the full dataset into features and targets.
X_full_dataset = full_dataset.drop(["is_fraud"], axis = 1)
Y_full_dataset = full_dataset["is_fraud"].astype(int).copy()

Y_full_dataset

0          0
1          0
2          0
3          0
4          0
          ..
1287450    0
1287451    0
1287452    0
1287453    0
1287454    0
Name: is_fraud, Length: 1287455, dtype: int64

In [580]:
Y_full_dataset.unique()

array([0, 1])

In [581]:
X_full_dataset.head(3)

Unnamed: 0,npi,specialty_description,total_drug_cost,total_claim,total_day_supply,total_payment_sum,Prscrbr_Crdntls,Prscrbr_Gndr,Prscrbr_State_Abrvtn,Prscrbr_RUCA_Desc,...,PDP_Tot_Drug_Cst,LIS_Tot_Clms,LIS_Drug_Cst,NonLIS_Tot_Clms,NonLIS_Drug_Cst,Opioid_Tot_Clms,Opioid_Tot_Drug_Cst,Opioid_Prscrbr_Rate,Antbtc_Tot_Clms,Antbtc_Tot_Drug_Cst
0,1003000126,Internal Medicine,4.911448,3.050766,4.869707,1.752586,M.D.,M,MD,Metropolitan area core: primary flow within an...,...,4.8204,2.813581,4.734209,2.675778,4.436629,1.255273,2.006338,0.400331,1.778151,2.863585
1,1003000142,Anesthesiology,4.577975,3.174351,4.642909,1.752586,M.D.,M,OH,Metropolitan area core: primary flow within an...,...,4.204223,2.999131,4.490612,2.696356,3.838629,2.818885,4.22259,1.65391,,
2,1003000167,Dentist,2.347642,1.681241,2.658965,1.752586,DDS,M,NV,Metropolitan area high commuting: primary flow...,...,2.137765,,,,,,,,1.556303,2.070998


**There are more than 80000 instances in the full dataset, while there are only 17 positive instances in the whole dataset. So the dataset is grossly imbalanced, so here I use RandomUnderSampler in the library imblearn to choose randomly 5000 negative samples from more than 80000 negative samples for the full dataset along with all 17 positive instances.**

In [584]:
from imblearn.under_sampling import RandomUnderSampler
# Specify the fixed number of negative instances you want to keep
num_negative_samples = 30400
# Create a RandomUnderSampler
rus = RandomUnderSampler(sampling_strategy = {0: num_negative_samples, 1: 304}, random_state = 42)

# Fit and transform the training data using the RandomUnderSampler
X_resampled_full_dataset, y_resampled_full_dataset = rus.fit_resample(X_full_dataset, Y_full_dataset)

In [585]:
len(X_resampled_full_dataset)

30704

**Divide the dataset into train set and test set. I will divide the data in a stratified way, to ensure that the positive/negative ratio in train set and test set is identical.**

In [586]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X_resampled_full_dataset, 
                y_resampled_full_dataset, test_size = 0.25, random_state=42, stratify = y_resampled_full_dataset)

In [587]:
y_train[y_train == 1].sum(), y_test[y_test == 1].sum()

(228, 76)

# Preprocessing data in Scikit-Learn Pipeline

**Importing all relevant classes in sklearn.**

In [588]:
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import OneHotEncoder

In [589]:
from sklearn.compose import make_column_selector

**Creatng pipeline for numerical features and categorical features of the dataset.**

In [590]:
num_pipeline = Pipeline([("imputer", SimpleImputer(strategy = "constant", fill_value = 0)),
                        ("std_scaler", StandardScaler())])
cat_pipeline = Pipeline([("one_hot_encoder",
                OneHotEncoder(handle_unknown = "infrequent_if_exist", min_frequency = 10, max_categories = 10))])

**The ColumnTransformer which apply numerical pipeline to numerical columns and categorical pipeline to categorical columns, and then concatenating the results of those 2 transformations along the feature axis.**

In [591]:
col_transform = ColumnTransformer(transformers = [
    ("num_pipeline", num_pipeline, all_num_features),
    ("cat_pipeline", cat_pipeline, all_cat_features)], remainder = "drop", sparse_threshold = 0.0)

In [592]:
full_pipeline = Pipeline([("col_transform", col_transform)])
X_train_prepared = full_pipeline.fit_transform(X_train)

In [593]:
X_train_prepared.shape

(23028, 66)

# Training and testing with Kafka

**Making KafkaProducer to send training instances to Kafka Topic.**

In [594]:
from kafka import KafkaProducer
import json
import time
import random

# Kafka broker address, in this case will be locathost:9092
kafka_broker = 'localhost:9092'
# Kafka topic to publish data, in this case will be "training"
kafka_topic = 'training'

# Create a Kafka producer
producer = KafkaProducer(bootstrap_servers = [kafka_broker], 
                         value_serializer = lambda v: json.dumps(v).encode('utf-8'), api_version = (0, 10, 1))


**For each iteration in the loop, send one individual sample to the topic named "training"**

In [595]:
#Sending training instances to the newly created Kafka topic
for i in range(len(X_train_prepared)):
    
    # Combine random data with actual Iris dataset features
    instance = {
        'features': X_train_prepared[i].tolist(),
        'label': int(y_train.iloc[i])
    }

    # Publish data to Kafka topic
    producer.send(kafka_topic, value = instance)
    # Simulate a delay between data points

# Close the Kafka producer
producer.close()

**Importing Machine Learning training algorithms**

In [671]:
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import ExtraTreesClassifier
from sklearn.ensemble import GradientBoostingClassifier

**Creating MachineLearning class**

In [672]:
log_reg = LogisticRegression(random_state = 42, warm_start = True, class_weight = {0: 1, 1: 200})
extra_tree = ExtraTreesClassifier(n_estimators = 1, random_state = 42, warm_start = True, 
                                       class_weight = {0: 1, 1: 200})
grad_boost = GradientBoostingClassifier(n_estimators = 1, random_state = 42, warm_start = True)

classifiers = [log_reg, extra_tree, grad_boost]

**Initiating KafkaConsumer**

In [673]:
from kafka import KafkaConsumer
# Create a Kafka consumer
consumer = KafkaConsumer(kafka_topic, bootstrap_servers = [kafka_broker], 
                value_deserializer = lambda x: json.loads(x.decode('utf-8')), api_version = (0,10)
                         , auto_offset_reset = "earliest", consumer_timeout_ms = 60000)

In [674]:
# Initialize lists to store data
features = []
labels = []

# Consume and process data
for message in consumer:
    data_point = message.value
    
    # Collect features and labels
    features.append(data_point['features'])
    labels.append(data_point['label'])
    
    # Train the model with accumulated data
    if len(features) >= 32:  # Train the model every 32 data points
        
        if 1 not in labels:
            features.append([0 for i in range(len(data_point["features"]))])
            labels.append(1)
        
        #for each classifier call fit() method
        for classifier in classifiers:
            if classifier.__class__.__name__ in ["ExtraTreesClassifier", "GradientBoostingClassifier"]:
                classifier.n_estimators += 1
            classifier.fit(np.array(features), np.array(labels))
        
        # Reset data lists
        features = []
        labels = []

# Close the Kafka consumer
consumer.close()

In [675]:
grad_boost.n_estimators

720

**Testing**

In [676]:
X_test_prepared = full_pipeline.transform(X_test)

In [677]:
extra_tree.predict_proba(X_test_prepared)[:, 1].max()

0.2763888888888889

In [684]:
recall_score(y_test, log_reg.predict(X_test_prepared))

0.05263157894736842

In [679]:
roc_auc_score(y_test, log_reg.predict(X_test_prepared))

0.5092763157894736

In [None]:
# Kafka broker address, in this case will be locathost:9092
kafka_broker = 'localhost:9092'
# Kafka topic to publish data, in this case will be "training"
kafka_topic = 'testing'

# Create a Kafka producer
testing_producer = KafkaProducer(bootstrap_servers = [kafka_broker], 
                         value_serializer = lambda v: json.dumps(v).encode('utf-8'), api_version = (0, 10, 1))


In [None]:
#Sending training instances to the newly created Kafka topic
for i in range(len(X_train_prepared)):
    
    # Combine random data with actual Iris dataset features
    instance = {
        'features': X_train_prepared[i].tolist(),
        'label': int(y_train.iloc[i])
    }

    # Publish data to Kafka topic
    testing_producer.send(kafka_topic, value = instance)
    time.sleep(1)  # Simulate a delay between data points

# Close the Kafka producer
testing_producer.close()

In [None]:
# Create a Kafka consumer
testing_consumer = KafkaConsumer(kafka_topic, bootstrap_servers = [kafka_broker], 
                         value_deserializer = lambda x: json.loads(x.decode('utf-8')))

In [298]:
from sklearn.metrics import roc_auc_score
from sklearn.metrics import f1_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score

In [None]:
# Initialize lists to store data
features = []
labels = []

# Consume and process data
for message in consumer:
    data_point = message.value
    
    # Collect features and labels
    features.append(data_point['features'])
    labels.append(data_point['label'])

# Close the Kafka consumer
consumer.close()

In [None]:
for classifier in classifiers:
    
    print(classifier.__class__.__name__)
    prediction = classifier.predict(np.array(features))
    print("The precision score on the test set is:", precision_score(np.array(labels), prediction))
    print("The recall score on the test set is:", recall_score(np.array(labels), prediction))
    print("The f1 score on the test set is:", f1_score(np.array(labels), prediction))
    print("The roc auc score on the test set is:", roc_auc_score(np.array(labels), prediction))