In [1]:
import pyspark

In [2]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [3]:
import findspark

In [4]:
findspark.init('/home/vboxuser/spark-3.5.0-bin-hadoop3')

In [5]:
# Importing Session
from pyspark.sql import SparkSession

In [6]:
# Creating Session
spark_session = SparkSession.builder.appName('Fraud Detection').getOrCreate()

23/12/14 20:49:58 WARN Utils: Your hostname, Ubuntu resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
23/12/14 20:49:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/14 20:49:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
PartDRawData ="/home/vboxuser/utkarsh datasets/PartD_Prescriber_PUF_NPI_DRUG_15/PartD_Prescriber_PUF_NPI_Drug_15.txt"
partD_pd = spark_session.read.csv(PartDRawData, header=True, inferSchema=True, sep="\t")

                                                                                

In [8]:
# Show the number of rows
num_rows = partD_pd.count()
print(f"Number of Rows: {num_rows}")

# Show the number of columns
num_columns = len(partD_pd.columns)
print(f"Number of Columns: {num_columns}")



Number of Rows: 24524894
Number of Columns: 21


                                                                                

In [9]:
# Select specific columns
partD_Drug_pd1 = partD_pd.select('npi', 'nppes_provider_city', 'nppes_provider_state',
                          'nppes_provider_last_org_name', 'nppes_provider_first_name',
                          'specialty_description', 'drug_name', 'generic_name',
                          'total_drug_cost', 'total_claim_count', 'total_day_supply')

In [10]:
partD_pd1 = partD_Drug_pd1.alias("partD_pd1")

In [11]:
from pyspark.sql.functions import col
partD_Drug_pd = partD_pd.select('npi', 'drug_name', 'total_drug_cost', 'total_claim_count', 'total_day_supply', 'specialty_description')

# Convert 'npi' column to object type
partD_Drug_pd = partD_Drug_pd.withColumn('npi', col('npi').cast('string'))


In [12]:
partD_Spec_pd1 = partD_pd.select(
    col('npi'),
    col('specialty_description')
)

In [13]:
# Get the number of rows
num_rows = partD_Spec_pd1.count()

# Get the number of columns
num_columns = len(partD_Spec_pd1.columns)

# Print the shape of the DataFrame
print("Shape of the DataFrame: ({}, {})".format(num_rows, num_columns))




Shape of the DataFrame: (24524894, 2)


                                                                                

In [14]:
partD_pd0 = partD_pd.select(
    col('npi'),
    col('nppes_provider_city'),
    col('nppes_provider_state'),
    col('nppes_provider_last_org_name'),
    col('nppes_provider_first_name'),
    col('specialty_description')
)

In [15]:
partD_catfpd = partD_pd0.drop_duplicates()

In [16]:
rename_dict = {'nppes_provider_first_name': 'first_name', 'nppes_provider_last_org_name': 'last_name',
               'nppes_provider_city': 'city', 'nppes_provider_state': 'state', 'specialty_description': 'Speciality'}

for old_name, new_name in rename_dict.items():
    partD_catfpd = partD_catfpd.withColumnRenamed(old_name, new_name)

# Show the first few rows of the new DataFrame
partD_catfpd.show(5, truncate=False)



+----------+-------------+-----+----------+----------+--------------------------------------------------------------+
|npi       |city         |state|last_name |first_name|Speciality                                                    |
+----------+-------------+-----+----------+----------+--------------------------------------------------------------+
|1003013384|LITTLE ROCK  |AR   |OSLEBER   |MICHAEL   |Dermatology                                                   |
|1003020538|BRIGHTON     |MA   |BROWN     |COLLEEN   |Physician Assistant                                           |
|1003040031|DALLAS       |TX   |GUTTIKONDA|SANDEEP   |Emergency Medicine                                            |
|1003063918|SAN FRANCISCO|CA   |AKLILU    |EPHRAIM   |Student in an Organized Health Care Education/Training Program|
|1003088550|MORGANTOWN   |WV   |SOFKA     |SARAH     |Internal Medicine                                             |
+----------+-------------+-----+----------+----------+--

                                                                                

In [17]:
partD_pd1.columns

['npi',
 'nppes_provider_city',
 'nppes_provider_state',
 'nppes_provider_last_org_name',
 'nppes_provider_first_name',
 'specialty_description',
 'drug_name',
 'generic_name',
 'total_drug_cost',
 'total_claim_count',
 'total_day_supply']

In [18]:
from pyspark.sql.functions import expr
# Define group columns
group_cols = ['npi']


# Use expr to define aggregations
partD_pd2 = partD_pd1.groupBy(group_cols).agg(
    expr('SUM(CAST(total_drug_cost AS DOUBLE)) AS total_drug_cost_sum'),
    expr('AVG(CAST(total_drug_cost AS DOUBLE)) AS total_drug_cost_mean'),
    expr('MAX(CAST(total_drug_cost AS DOUBLE)) AS total_drug_cost_max'),
    expr('SUM(CAST(total_claim_count AS INT)) AS total_claim_count_sum'),
    expr('AVG(CAST(total_claim_count AS DOUBLE)) AS total_claim_count_mean'),
    expr('MAX(CAST(total_claim_count AS INT)) AS total_claim_count_max'),
    expr('SUM(CAST(total_day_supply AS INT)) AS total_day_supply_sum'),
    expr('AVG(CAST(total_day_supply AS DOUBLE)) AS total_day_supply_mean'),
    expr('MAX(CAST(total_day_supply AS INT)) AS total_day_supply_max')
)


In [19]:
partD_pd2.columns

['npi',
 'total_drug_cost_sum',
 'total_drug_cost_mean',
 'total_drug_cost_max',
 'total_claim_count_sum',
 'total_claim_count_mean',
 'total_claim_count_max',
 'total_day_supply_sum',
 'total_day_supply_mean',
 'total_day_supply_max']

In [20]:
# Get the number of rows
num_rows = partD_pd2.count()

# Get the number of columns
num_cols = len(partD_pd2.columns)

print("Number of rows:", num_rows)
print("Number of columns:", num_cols)



Number of rows: 866552
Number of columns: 10


                                                                                

In [21]:
partD_fpd = (
    partD_pd2
    .withColumnRenamed('sum_total_drug_cost', 'total_drug_cost_sum')
    .withColumnRenamed('mean_total_drug_cost', 'total_drug_cost_mean')
    .withColumnRenamed('max_total_drug_cost', 'total_drug_cost_max')
    .withColumnRenamed('sum_total_claim_count', 'total_claim_count_sum')
    .withColumnRenamed('mean_total_claim_count', 'total_claim_count_mean')
    .withColumnRenamed('max_total_claim_count', 'total_claim_count_max')
    .withColumnRenamed('sum_total_day_supply', 'total_day_supply_sum')
    .withColumnRenamed('mean_total_day_supply', 'total_day_supply_mean')
    .withColumnRenamed('max_total_day_supply', 'total_day_supply_max')
)

In [22]:
partD_fpd.columns

['npi',
 'total_drug_cost_sum',
 'total_drug_cost_mean',
 'total_drug_cost_max',
 'total_claim_count_sum',
 'total_claim_count_mean',
 'total_claim_count_max',
 'total_day_supply_sum',
 'total_day_supply_mean',
 'total_day_supply_max']

In [23]:
partD_fpd.columns

['npi',
 'total_drug_cost_sum',
 'total_drug_cost_mean',
 'total_drug_cost_max',
 'total_claim_count_sum',
 'total_claim_count_mean',
 'total_claim_count_max',
 'total_day_supply_sum',
 'total_day_supply_mean',
 'total_day_supply_max']

In [24]:
# Assuming partD_fpd is your PySpark DataFrame
row_count = partD_fpd.count()
print(f"Number of rows in partD_fpd: {row_count}")




Number of rows in partD_fpd: 866552


                                                                                

In [25]:
partD_allpd = partD_fpd.join(partD_catfpd, on='npi', how='left')

In [26]:
partD_allpd.columns

['npi',
 'total_drug_cost_sum',
 'total_drug_cost_mean',
 'total_drug_cost_max',
 'total_claim_count_sum',
 'total_claim_count_mean',
 'total_claim_count_max',
 'total_day_supply_sum',
 'total_day_supply_mean',
 'total_day_supply_max',
 'city',
 'state',
 'last_name',
 'first_name',
 'Speciality']

In [27]:
PaymentRawData  = "/home/vboxuser/utkarsh datasets/PGYR15_P012023/OP_DTL_GNRL_PGYR2015_P01202023.csv"

In [28]:
payment_pd = spark_session.read.csv(PaymentRawData, header=True, inferSchema=True)

                                                                                

In [29]:
payment_fpd = payment_pd.select(
    col('Physician_First_Name'),
    col('Physician_Last_Name'),
    col('Recipient_City'),
    col('Recipient_State'),
    col('Total_Amount_of_Payment_USDollars')
)
payment_fpd.head()

Row(Physician_First_Name='DAVID', Physician_Last_Name='GORDLEY', Recipient_City='SLIPPERY ROCK', Recipient_State='PA', Total_Amount_of_Payment_USDollars='60.00')

In [30]:
from pyspark.sql.functions import sum
from pyspark.sql.types import FloatType 
payment_fpd1 = payment_pd.groupBy(
    'Physician_First_Name',
    'Physician_Last_Name',
    'Recipient_City',
    'Recipient_State'
).agg(
    sum('Total_Amount_of_Payment_USDollars').alias('Total_Amount_of_Payment_USDollars_sum')
)

# Cast the sum column to float
payment_fpd1 = payment_fpd1.withColumn(
    'Total_Amount_of_Payment_USDollars_sum',
    payment_fpd1['Total_Amount_of_Payment_USDollars_sum'].cast(FloatType())
)

In [31]:
from pyspark.sql import functions as F
payment_fpd1 = payment_fpd1.select(
    F.col('Physician_First_Name').alias('first_name'),
    F.col('Physician_Last_Name').alias('last_name'),
    F.col('Recipient_City').alias('city'),
    F.col('Recipient_State').alias('state'),
    F.col('Total_Amount_of_Payment_USDollars_sum').alias('Total_Payment_Sum')
)

In [32]:
payment_fpd2 = payment_fpd1.withColumn('index', F.monotonically_increasing_id())


In [33]:
payment_fpd2 = payment_fpd2.withColumnRenamed('Physician_First_Name', 'first_name') \
                           .withColumnRenamed('Physician_Last_Name', 'last_name') \
                           .withColumnRenamed('Recipient_City', 'city') \
                           .withColumnRenamed('Recipient_State', 'state') \
                           .withColumnRenamed('Total_Amount_of_Payment_USDollars_sum', 'Total_Payment_Sum')

In [34]:
# Sort the DataFrame by 'Total_Payment_Sum' column in descending order
payment_fpd2 = payment_fpd2.orderBy('Total_Payment_Sum', ascending=False)


In [35]:
payment_fpd2.head()

                                                                                

Row(first_name=None, last_name=None, city='DUARTE', state='CA', Total_Payment_Sum=306541824.0, index=60129572416)

In [36]:
# Iterate through columns and apply uppercase transformation to string columns
for col_name in payment_fpd2.columns:
    if payment_fpd2.schema[col_name].dataType == 'string':
        payment_fpd2 = payment_fpd2.withColumn(col_name, upper(col(col_name)))


In [37]:
partD_alias = partD_allpd.alias('partD')
payment_alias = payment_fpd2.alias('payment')

# Join using aliases and specify the join conditions
pay_partD_fpd = partD_alias.join(
    payment_alias,
    (col('partD.last_name') == col('payment.last_name')) &
    (col('partD.first_name') == col('payment.first_name')) &
    (col('partD.city') == col('payment.city')) &
    (col('partD.state') == col('payment.state')),
    'left'
)

In [99]:
IELErawdata = "/home/vboxuser/utkarsh datasets/UPDATED.csv"

In [100]:
IELE_pd = spark_session.read.csv(IELErawdata, header=True, inferSchema=True)

In [101]:
npifraud_pd0 = IELE_pd.select("NPI", "EXCLTYPE")

In [102]:
# Filter rows where 'NPI' is not equal to 0
npifraud_pd1 = npifraud_pd0.filter(col("NPI") != 0)

In [103]:
npi_fraud_pd = npifraud_pd1.withColumnRenamed("NPI", "npi").withColumnRenamed("EXCLTYPE", "is_fraud")


In [105]:
from pyspark.sql.functions import lit
npi_fraud_pd = npi_fraud_pd.withColumn('is_fraud', lit(1))



In [107]:
from pyspark.sql.functions import col, regexp_replace
npi_fraud_pd.withColumn('NPI', regexp_replace(col('NPI'), '[^0-9]', ''))

# Convert to LongType
npi_fraud_pd.withColumn('NPI', col('NPI').cast('long'))


DataFrame[NPI: bigint, is_fraud: int]

In [109]:
Features_pd1 = pay_partD_fpd.join(npi_fraud_pd, on='npi', how='left')

In [111]:
Features_pd1.where(Features_pd1.is_fraud.isNotNull()).count()

                                                                                

1573

In [125]:
final_pd = Features_pd1.na.fill(value=0,subset=['is_fraud'])

In [126]:
final_pd.head()

                                                                                

Row(npi=1003043209, total_drug_cost_sum=47219.94000000001, total_drug_cost_mean=1311.6650000000002, total_drug_cost_max=11136.75, total_claim_count_sum=1209, total_claim_count_mean=33.583333333333336, total_claim_count_max=96, total_day_supply_sum=43661, total_day_supply_mean=1212.8055555555557, total_day_supply_max=3998, city='LYNN', state='MA', last_name='AFFEL', first_name='MARJORIE', Speciality='Family Practice', first_name=None, last_name=None, city=None, state=None, Total_Payment_Sum=None, index=None, is_fraud=0)

In [127]:
final_pd.filter(col('is_fraud') == 1).count()

                                                                                

1573

In [128]:
feature_columns = ['total_drug_cost_sum', 'total_drug_cost_mean', 'total_drug_cost_max', 'total_claim_count_sum', 'total_claim_count_mean', 'total_claim_count_max', 
 'total_day_supply_sum', 'total_day_supply_mean', 'total_day_supply_max']

In [129]:
# Assuming 'label' is your target column
label_column = 'is_fraud'

In [130]:
# Assemble features into a vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

In [131]:
final_pd1 = final_pd.select('total_drug_cost_sum', 'total_drug_cost_mean', 'total_drug_cost_max', 'total_claim_count_sum', 'total_claim_count_mean', 'total_claim_count_max', 
 'total_day_supply_sum', 'total_day_supply_mean', 'total_day_supply_max', 'is_fraud')

In [132]:
assembled_data = assembler.transform(final_pd1)

In [133]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Load data
training = assembled_data.select("is_fraud", "features")

# Create a GBT (Gradient-Boosted Trees) classifier
gbt = GBTClassifier(labelCol="is_fraud", featuresCol="features", maxIter=10)

In [134]:
training.head()

                                                                                

Row(is_fraud=0, features=DenseVector([47219.94, 1311.665, 11136.75, 1209.0, 33.5833, 96.0, 43661.0, 1212.8056, 3998.0]))

In [135]:
# Fit the model
gbt_model = gbt.fit(training)

                                                                                

In [136]:
# Make predictions
predictions = gbt_model.transform(training)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="is_fraud", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

23/12/14 22:36:19 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS

Accuracy: 0.9981871222209657


                                                                                