In [2]:
import findspark
findspark.init()
findspark.find()

'C:\\spark'

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import pyspark.sql.functions as F

In [4]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("Credit Card Fraud Detection") \
    .getOrCreate()


# Load the CSV file into a DataFrame
df = spark.read.csv("hdfs://localhost:9000/hadoop_input/Fraud.csv", header=True, inferSchema=True)

# Show the first few rows of the DataFrame
df.show()


+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT|  9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT|  1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|    181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|    181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT| 11668.14|C2048537720|      41554.0|      29885.86|M123070170

In [5]:
# Print the schema of the DataFrame
df.printSchema()

# Show summary statistics
df.describe().show()

# Check for null values
# df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

# data.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in data.columns)).show()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)

+-------+------------------+--------+------------------+-----------+------------------+-----------------+-----------+------------------+------------------+--------------------+--------------------+
|summary|              step|    type|            amount|   nameOrig|     oldbalanceOrg|   newbalanceOrig|   nameDest|    oldbalanceDest|    newbalanceDest|             isFraud|      isFlaggedFraud|
+-------+------------------+--------+------------------+-----------+------------------+-----------------+-----------+------------------+-

In [6]:
from pyspark.sql.functions import col

# Example: Filter for fraudulent transactions
fraud_df = df.filter(col("isFraud") == 1)

# Show the filtered DataFrame
fraud_df.show()


+----+--------+----------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|    amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+----------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1|TRANSFER|     181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|     181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1|TRANSFER|    2806.0|C1420196421|       2806.0|           0.0| C972765878|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|    2806.0|C2101527076|       2806.0|           0.0|C1007251739|       26202.0|           0.0|      1|             0|
|   1|TRANSFER|   20128.0| C137533655|      20128.0|           0.0|C1

In [7]:
# Sample 1,000 fraudulent cases from fraud_df
fraud_sample = fraud_df.sample(withReplacement=False, fraction=1000 / fraud_df.count())

# Filter non-fraud cases from df and sample 4,000 cases
non_fraud_df = df.filter(df["isFraud"] == 0)  # Assuming 'isFraud' is the column indicating fraud
non_fraud_sample = non_fraud_df.sample(withReplacement=False, fraction=4000 / non_fraud_df.count())

# Combine both samples
balanced_df = fraud_sample.union(non_fraud_sample)

# View the counts to confirm
print("Fraud cases:", fraud_sample.count())
print("Non-fraud cases:", non_fraud_sample.count())
print("Total cases in balanced dataset:", balanced_df.count())


Fraud cases: 1009
Non-fraud cases: 4001
Total cases in balanced dataset: 5010


In [8]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler

# Prepare the data for ML
assembler = VectorAssembler(inputCols=["amount", "oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest"], outputCol="features")
data = assembler.transform(balanced_df)

In [9]:
data.select("amount", "oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest", "features").show(truncate=False, n=100)  # Adjust 'n' based on the number of rows


+----------+-------------+--------------+--------------+--------------+---------------------------------------------------+
|amount    |oldbalanceOrg|newbalanceOrig|oldbalanceDest|newbalanceDest|features                                           |
+----------+-------------+--------------+--------------+--------------+---------------------------------------------------+
|1277212.77|1277212.77   |0.0           |0.0           |2444985.19    |[1277212.77,1277212.77,0.0,0.0,2444985.19]         |
|35063.63  |35063.63     |0.0           |0.0           |0.0           |(5,[0,1],[35063.63,35063.63])                      |
|25071.46  |25071.46     |0.0           |0.0           |0.0           |(5,[0,1],[25071.46,25071.46])                      |
|1096187.24|1096187.24   |0.0           |0.0           |1096187.24    |[1096187.24,1096187.24,0.0,0.0,1096187.24]         |
|18627.02  |18627.02     |0.0           |147251.58     |165878.6      |[18627.02,18627.02,0.0,147251.58,165878.6]         |
|10539.3

In [10]:
# Split the data into training and testing sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

In [11]:
train_data.show(n=5)
train_data.printSchema()

+----+--------+----------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+--------------------+
|step|    type|    amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|            features|
+----+--------+----------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+--------------------+
|   1|CASH_OUT|1277212.77| C467632528|   1277212.77|           0.0| C716083600|           0.0|    2444985.19|      1|             0|[1277212.77,12772...|
|   1|TRANSFER|  25071.46| C669700766|     25071.46|           0.0|C1384210339|           0.0|           0.0|      1|             0|(5,[0,1],[25071.4...|
|   2|CASH_OUT|  18627.02| C175961135|     18627.02|           0.0|C1711105800|     147251.58|      165878.6|      1|             0|[18627.02,18627.0...|
|   2|CASH_OUT|1096187.24|  C77163673|   1096187.24|           0.0| C6443458

In [12]:
test_data.show(n=5)
test_data.printSchema()

+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+--------------------+
|step|    type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|            features|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+--------------------+
|   1|TRANSFER| 35063.63|C1364127192|     35063.63|           0.0|C1136419747|           0.0|           0.0|      1|             0|(5,[0,1],[35063.6...|
|   4|CASH_OUT|169941.73|C2026325575|    169941.73|           0.0|C1394526584|           0.0|     169941.73|      1|             0|[169941.73,169941...|
|   5|CASH_OUT| 60726.57| C840095827|     60726.57|           0.0| C803116137| 1.301050278E7| 1.307122935E7|      1|             0|[60726.57,60726.5...|
|   9|CASH_OUT|244068.01| C507466068|    244068.01|           0.0| C706331499|    

In [13]:
# Create and train a Logistic Regression model
lr = LogisticRegression(labelCol="isFraud", featuresCol="features")
model = lr.fit(train_data)

In [14]:
predictions = model.transform(test_data)
predictions.select("isFraud", "prediction").show()


+-------+----------+
|isFraud|prediction|
+-------+----------+
|      1|       0.0|
|      1|       1.0|
|      1|       0.0|
|      1|       1.0|
|      1|       0.0|
|      1|       0.0|
|      1|       1.0|
|      1|       1.0|
|      1|       1.0|
|      1|       1.0|
|      1|       1.0|
|      1|       1.0|
|      1|       1.0|
|      1|       1.0|
|      1|       1.0|
|      1|       0.0|
|      1|       1.0|
|      1|       1.0|
|      1|       0.0|
|      1|       1.0|
+-------+----------+
only showing top 20 rows



In [16]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator


In [17]:
# Accuracy
accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="isFraud", predictionCol="prediction", metricName="accuracy")
accuracy = accuracy_evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy:.4f}")

# Precision
precision_evaluator = MulticlassClassificationEvaluator(labelCol="isFraud", predictionCol="prediction", metricName="weightedPrecision")
precision = precision_evaluator.evaluate(predictions)
print(f"Precision: {precision:.4f}")

# Recall
recall_evaluator = MulticlassClassificationEvaluator(labelCol="isFraud", predictionCol="prediction", metricName="weightedRecall")
recall = recall_evaluator.evaluate(predictions)
print(f"Recall: {recall:.4f}")

# F1 Score
f1_evaluator = MulticlassClassificationEvaluator(labelCol="isFraud", predictionCol="prediction", metricName="f1")
f1_score = f1_evaluator.evaluate(predictions)
print(f"F1 Score: {f1_score:.4f}")


Accuracy: 0.9401
Precision: 0.9402
Recall: 0.9401
F1 Score: 0.9377


In [18]:
# Save the model to HDFS
model_path = "hdfs://localhost:9000/hadoop_output/fraud_detection_model"
model.write().overwrite().save(model_path)


In [19]:
# Save the train_data DataFrame to HDFS in Parquet format (recommended for Spark)
train_data_path = "hdfs://localhost:9000/hadoop_output/train_data"
train_data.write.mode("overwrite").parquet(train_data_path)

# Save the test_data DataFrame to HDFS in Parquet format
test_data_path = "hdfs://localhost:9000/hadoop_output/test_data"
test_data.write.mode("overwrite").parquet(test_data_path)
