In [1]:
!pip install pyspark
!pip install findspark

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840629 sha256=1027db70700186c111fd869b073340d1e3973db2c85ff6be1669b98ced538c0a
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [2]:
import pyspark
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, count, col

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("KaggleSpark").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")  # Suppress warnings, only show errors


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/23 12:43:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark = SparkSession.builder.appName("FraudDetection").getOrCreate()

df = spark.read.csv("/kaggle/input/paysim1/PS_20174392719_1491204439457_log.csv", header=True, inferSchema=True)

                                                                                

In [5]:
print(df.columns)

['step', 'type', 'amount', 'nameOrig', 'oldbalanceOrg', 'newbalanceOrig', 'nameDest', 'oldbalanceDest', 'newbalanceDest', 'isFraud', 'isFlaggedFraud']


In [6]:
df.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

# Data Preprocessing

In [7]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

# Removing irrelevant columns
df_preprocessed = df.drop('nameOrig', 'nameDest')

# Convert categorical 'type' column
indexer = StringIndexer(inputCol="type", outputCol="typeIndex")
df_preprocessed = indexer.fit(df_preprocessed).transform(df_preprocessed)

# Prepare features for the model
assembler = VectorAssembler(inputCols=["step", "typeIndex", "amount", "oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest"], outputCol="features")
df_preprocessed = assembler.transform(df_preprocessed)

df_preprocessed.select("features", "isFraud").show()

                                                                                

+--------------------+-------+
|            features|isFraud|
+--------------------+-------+
|[1.0,1.0,9839.64,...|      0|
|[1.0,1.0,1864.28,...|      0|
|[1.0,3.0,181.0,18...|      1|
|[1.0,0.0,181.0,18...|      1|
|[1.0,1.0,11668.14...|      0|
|[1.0,1.0,7817.71,...|      0|
|[1.0,1.0,7107.77,...|      0|
|[1.0,1.0,7861.64,...|      0|
|[1.0,1.0,4024.36,...|      0|
|[1.0,4.0,5337.77,...|      0|
|[1.0,4.0,9644.94,...|      0|
|[1.0,1.0,3099.97,...|      0|
|[1.0,1.0,2560.74,...|      0|
|[1.0,1.0,11633.76...|      0|
|[1.0,1.0,4098.78,...|      0|
|[1.0,0.0,229133.9...|      0|
|[1.0,1.0,1563.82,...|      0|
|[1.0,1.0,1157.86,...|      0|
|[1.0,1.0,671.64,1...|      0|
|[1.0,3.0,215310.3...|      0|
+--------------------+-------+
only showing top 20 rows



In [8]:
# Split data into training and testing sets (80% training, 20% testing)
train_df, test_df = df_preprocessed.randomSplit([0.8, 0.2], seed=42)

# Show the size of the datasets
print(f"Training Set Count: {train_df.count()}")
print(f"Test Set Count: {test_df.count()}")

                                                                                

Training Set Count: 5089858




Test Set Count: 1272762


                                                                                

In [9]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Initialize Logistic Regression model
gbt = GBTClassifier(featuresCol="features", labelCol="isFraud")

# Train the model on the training data
gbt_model = gbt.fit(train_df)

# Make predictions on the test data
predictions = gbt_model.transform(test_df)

# Show some predictions
predictions.select("features", "isFraud", "prediction").show(5)

[Stage 217:>                                                        (0 + 1) / 1]

+--------------------+-------+----------+
|            features|isFraud|prediction|
+--------------------+-------+----------+
|[1.0,2.0,783.31,8...|      0|       0.0|
|[1.0,2.0,1271.77,...|      0|       0.0|
|[1.0,2.0,2643.45,...|      0|       0.0|
|[1.0,2.0,6284.18,...|      0|       0.0|
|[1.0,2.0,8679.13,...|      0|       0.0|
+--------------------+-------+----------+
only showing top 5 rows



                                                                                

In [10]:
# Initialize evaluator
evaluator = BinaryClassificationEvaluator(labelCol="isFraud", metricName="areaUnderROC")

# Calculate the AUC
accuracy = evaluator.evaluate(predictions)
print(f"Gradient Boosted Test AUC: {accuracy}")

                                                                                

Gradient Boosted Test AUC: 0.9953657075236532
