In [1]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Fraud Detection") \
    .getOrCreate()

# Load the dataset
df = spark.read.csv('Synthetic_Financial_datasets_log.csv', header=True, inferSchema=True)

# Show the schema
df.printSchema()


root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [2]:
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

# Index the categorical column 'type'
indexer = StringIndexer(inputCol='type', outputCol='type_index')

# Select features for the model (you can adjust this based on your needs)
feature_cols = ['step', 'amount', 'oldbalanceOrg', 'newbalanceOrig', 'oldbalanceDest', 'newbalanceDest', 'type_index']
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')

# Create a pipeline
pipeline = Pipeline(stages=[indexer, assembler])
df_transformed = pipeline.fit(df).transform(df)

# Show transformed data
df_transformed.select('features', 'isFraud').show()


+--------------------+-------+
|            features|isFraud|
+--------------------+-------+
|[1.0,9839.64,1701...|      0|
|[1.0,1864.28,2124...|      0|
|[1.0,181.0,181.0,...|      1|
|[1.0,181.0,181.0,...|      1|
|[1.0,11668.14,415...|      0|
|[1.0,7817.71,5386...|      0|
|[1.0,7107.77,1831...|      0|
|[1.0,7861.64,1760...|      0|
|[1.0,4024.36,2671...|      0|
|[1.0,5337.77,4172...|      0|
|[1.0,9644.94,4465...|      0|
|[1.0,3099.97,2077...|      0|
|[1.0,2560.74,5070...|      0|
|[1.0,11633.76,101...|      0|
|[1.0,4098.78,5032...|      0|
|[1.0,229133.94,15...|      0|
|[1.0,1563.82,450....|      0|
|[1.0,1157.86,2115...|      0|
|[1.0,671.64,15123...|      0|
|[1.0,215310.3,705...|      0|
+--------------------+-------+
only showing top 20 rows



In [3]:
train_data, test_data = df_transformed.randomSplit([0.8, 0.2], seed=42)


In [4]:
from pyspark.sql import functions as F

# Get counts
count_1 = df_transformed.filter(col("isFraud") == 1).count()
count_0 = df_transformed.filter(col("isFraud") == 0).count()

# Oversample the minority class
fraud_data = df_transformed.filter(col("isFraud") == 1)
non_fraud_data = df_transformed.filter(col("isFraud") == 0)

# Randomly sample the majority class to match the minority class
non_fraud_data_oversampled = non_fraud_data.sample(False, count_1 / count_0, seed=42)

# Combine the two datasets
balanced_data = fraud_data.union(non_fraud_data_oversampled)


In [5]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol='isFraud', featuresCol='features', numTrees=100)
model = rf.fit(balanced_data)

# Save the model
model.save("/data/fraud_detection_model_V2")


In [6]:
predictions = model.transform(test_data)
predictions.select('features', 'isFraud', 'prediction').show()

# Evaluate the model
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol='isFraud', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")


+--------------------+-------+----------+
|            features|isFraud|prediction|
+--------------------+-------+----------+
|[1.0,783.31,81503...|      0|       0.0|
|[1.0,1271.77,6973...|      0|       0.0|
|[1.0,2643.45,6434...|      0|       0.0|
|[1.0,6284.18,7858...|      0|       0.0|
|[1.0,8679.13,7087...|      0|       0.0|
|[1.0,9577.45,5198...|      0|       0.0|
|[1.0,12336.48,731...|      0|       0.0|
|[1.0,16236.25,826...|      0|       0.0|
|[1.0,19872.97,356...|      0|       0.0|
|[1.0,20490.81,596...|      0|       0.0|
|[1.0,21255.8,3113...|      0|       0.0|
|[1.0,21898.97,722...|      0|       0.0|
|[1.0,22765.47,969...|      0|       0.0|
|[1.0,24936.34,482...|      0|       0.0|
|[1.0,27948.65,118...|      0|       0.0|
|[1.0,31646.31,584...|      0|       0.0|
|[1.0,34918.59,433...|      0|       0.0|
|[1.0,44655.46,445...|      0|       0.0|
|[1.0,49031.62,454...|      0|       0.0|
|[1.0,51284.87,413...|      0|       0.0|
+--------------------+-------+----

In [7]:
def predict_fraud(input_data):
    # Create a DataFrame from the input
    input_df = spark.createDataFrame([input_data], schema=['step', 'type', 'amount', 'oldbalanceOrg', 'newbalanceOrig', 'oldbalanceDest', 'newbalanceDest'])

    # Transform the input
    input_transformed = pipeline.fit(input_df).transform(input_df)
    
    # Make predictions
    prediction = model.transform(input_transformed)
    
    return prediction.select('prediction').collect()[0][0]

# Example usage
user_input = [1, 'Debit', 5000, 9000, 4000, 0, 5000]  # Example input
result = predict_fraud(user_input)
print("Prediction:", "Fraud" if result == 1 else "Not Fraud")


Prediction: Not Fraud
