<a href="https://colab.research.google.com/github/Ramme121/project/blob/main/FraudAnalysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install pyspark


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.sql.types import IntegerType


In [4]:
spark = SparkSession.builder.appName("FraudDetection").getOrCreate()


In [10]:
import pandas as pd




In [11]:
# Download the Excel data from GitHub URL
excel_file_url = "https://github.com/Ramme121/project/raw/main/FraudData.xlsx"
df_pandas = pd.read_excel(excel_file_url)

# Convert the Pandas DataFrame to a PySpark DataFrame
df = spark.createDataFrame(df_pandas)


In [13]:
# Data Preprocessing
# For demonstration purposes, let's fill missing values in "TotalCredit" with the mean value,
# and convert the "AnnualIncome" column to IntegerType.

# Filling missing values in the "TotalCredit" column with the mean value
mean_total_credit = df.select("TotalCredit").agg({"TotalCredit": "mean"}).collect()[0][0]
df = df.na.fill(mean_total_credit, subset=["TotalCredit"])

# Converting the "AnnualIncome" column to IntegerType
df = df.withColumn("AnnualIncome", df["AnnualIncome"].cast(IntegerType()))

# Feature Selection (Assuming you want to use only selected columns for the model)
selected_columns = ["TotalCredit", "MonthlyPayments", "CreditScore", "CustomerAge", "Gender", "OwnsCar", "OwnsProperty", "NumberOfChildren", "Fraud"]
df = df.select(*selected_columns)

# Check DataFrame Schema
df.printSchema()



root
 |-- TotalCredit: double (nullable = false)
 |-- MonthlyPayments: double (nullable = true)
 |-- CreditScore: long (nullable = true)
 |-- CustomerAge: long (nullable = true)
 |-- Gender: string (nullable = true)
 |-- OwnsCar: long (nullable = true)
 |-- OwnsProperty: long (nullable = true)
 |-- NumberOfChildren: long (nullable = true)
 |-- Fraud: long (nullable = true)



In [14]:
# you already have the DataFrame 'df' loaded and preprocessed
from pyspark.sql.types import IntegerType

# Converting 'CreditScore' and 'CustomerAge' columns to IntegerType
df = df.withColumn("CreditScore", df["CreditScore"].cast(IntegerType()))
df = df.withColumn("CustomerAge", df["CustomerAge"].cast(IntegerType()))

# Verify the updated DataFrame Schema
df.printSchema()


root
 |-- TotalCredit: double (nullable = false)
 |-- MonthlyPayments: double (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- CustomerAge: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- OwnsCar: long (nullable = true)
 |-- OwnsProperty: long (nullable = true)
 |-- NumberOfChildren: long (nullable = true)
 |-- Fraud: long (nullable = true)



In [15]:
#you already have the DataFrame 'df' loaded and preprocessed
from pyspark.sql.types import IntegerType

# Converting 'OwnsCar', 'OwnsProperty', and 'NumberOfChildren' columns to IntegerType
df = df.withColumn("OwnsCar", df["OwnsCar"].cast(IntegerType()))
df = df.withColumn("OwnsProperty", df["OwnsProperty"].cast(IntegerType()))
df = df.withColumn("NumberOfChildren", df["NumberOfChildren"].cast(IntegerType()))

# Verify the updated DataFrame Schema
df.printSchema()


root
 |-- TotalCredit: double (nullable = false)
 |-- MonthlyPayments: double (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- CustomerAge: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- OwnsCar: integer (nullable = true)
 |-- OwnsProperty: integer (nullable = true)
 |-- NumberOfChildren: integer (nullable = true)
 |-- Fraud: long (nullable = true)



In [16]:
# Feature Selection (Assuming you want to use only selected columns for the model)
selected_columns = ["TotalCredit", "MonthlyPayments", "CreditScore", "CustomerAge", "Gender", "OwnsCar", "OwnsProperty", "NumberOfChildren", "Fraud"]
df = df.select(*selected_columns)

# Check DataFrame Schema
df.printSchema()


root
 |-- TotalCredit: double (nullable = false)
 |-- MonthlyPayments: double (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- CustomerAge: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- OwnsCar: integer (nullable = true)
 |-- OwnsProperty: integer (nullable = true)
 |-- NumberOfChildren: integer (nullable = true)
 |-- Fraud: long (nullable = true)



In [17]:
!pip install pyspark
!pip install findspark


Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [18]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder.appName("FraudDetection").getOrCreate()


In [26]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
import pandas as pd

# Create a SparkSession
spark = SparkSession.builder.appName("FraudDetection").getOrCreate()

# Replace "GIT_REPOSITORY_URL" with the actual URL of your Git repository
excel_file_url = "https://github.com/Ramme121/project/raw/main/FraudData.xlsx"

# Load the Excel file into a pandas DataFrame
data_pd = pd.read_excel(excel_file_url)

# Convert the pandas DataFrame to a Spark DataFrame
data = spark.createDataFrame(data_pd)

# Select relevant columns for the model
selected_cols = ["TotalCredit", "MonthlyPayments", "CreditScore", "CustomerAge", "Gender", "OwnsCar", "OwnsProperty", "NumberOfChildren", "Fraud"]
data = data.select(selected_cols)

# Drop rows with any missing values
data = data.na.drop()

# VectorAssembler to combine features into a single vector column
assembler = VectorAssembler(inputCols=["TotalCredit", "MonthlyPayments", "CreditScore", "CustomerAge", "OwnsCar", "OwnsProperty", "NumberOfChildren"], outputCol="features")

# Logistic Regression model
lr = LogisticRegression(labelCol="Fraud", featuresCol="features")

# Pipeline for assembling features and training the model
pipeline = Pipeline(stages=[assembler, lr])

# Split data into training and test sets
training_data, test_data = data.randomSplit([0.8, 0.2], seed=1234)

# Train the model using the training data
model = pipeline.fit(training_data)

# Make predictions on the test set
predictions = model.transform(test_data)

# Display the predictions
predictions.select("Fraud", "prediction").show()


+-----+----------+
|Fraud|prediction|
+-----+----------+
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       0.0|
|    0|       0.0|
|    0|       0.0|
+-----+----------+
only showing top 20 rows



In [31]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# Create a StringIndexer to convert the "Gender" column into numerical indices
indexer = StringIndexer(inputCol="Gender", outputCol="GenderIndex")

# Create a OneHotEncoder to convert the "GenderIndex" column into a binary vector representation
encoder = OneHotEncoder(inputCols=["GenderIndex"], outputCols=["GenderVec"])

# Update the VectorAssembler to include the "GenderVec" column in the inputCols
vector_assembler = VectorAssembler(inputCols=["TotalCredit", "MonthlyPayments", "CreditScore", "CustomerAge", "GenderVec", "OwnsCar", "OwnsProperty", "NumberOfChildren"], outputCol="features")

# Create a pipeline with the updated stages and Random Forest classifier
pipeline_rf = Pipeline(stages=[indexer, encoder, vector_assembler, rf])

# Fit the pipeline on the training data
model_rf = pipeline_rf.fit(training_data)

# Make predictions on the test set
predictions_rf = model_rf.transform(test_data)

# Show the predicted and actual fraud labels
predictions_rf.select("Fraud", "prediction").show()


+-----+----------+
|Fraud|prediction|
+-----+----------+
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       0.0|
|    0|       0.0|
|    0|       0.0|
+-----+----------+
only showing top 20 rows



In [33]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create a BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="Fraud")

# Calculate the AUC-ROC
auc_roc = evaluator.evaluate(predictions_rf, {evaluator.metricName: "areaUnderROC"})
print(f"AUC-ROC: {auc_roc}")

# Calculate the AUC-PR
auc_pr = evaluator.evaluate(predictions_rf, {evaluator.metricName: "areaUnderPR"})
print(f"AUC-PR: {auc_pr}")


AUC-ROC: 0.9209713295914127
AUC-PR: 0.45256803562764064


In [34]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create a multiclass classification evaluator
evaluator_multiclass = MulticlassClassificationEvaluator(labelCol="Fraud", predictionCol="prediction", metricName="accuracy")

# Calculate accuracy
accuracy = evaluator_multiclass.evaluate(predictions)

print(f"Accuracy: {accuracy}")


Accuracy: 0.9154747487971722


In [1]:
# Install required libraries
!pip install pyspark
!pip install findspark

# Import necessary modules
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, sum, avg, max, min
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
import pandas as pd

# Initialize SparkSession
spark = SparkSession.builder.appName("FraudAnalysis").getOrCreate()


Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285397 sha256=48c188ba33e26268516f444d80da408f80aaa17d2cf76ee074299fa806383ad3
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [2]:
# Download the Excel data from GitHub URL
excel_file_url = "https://github.com/Ramme121/project/raw/main/FraudData.xlsx"
df_pandas = pd.read_excel(excel_file_url)

# Convert the Pandas DataFrame to a PySpark DataFrame
df = spark.createDataFrame(df_pandas)

# Data Preprocessing
# Assuming you have already done the data preprocessing, let's proceed with the analysis


In [3]:
# Calculate Average LatePaymentAmount For Fraud and Non-Frauds Using SQL
df.createOrReplaceTempView("FraudData")
late_payment_fraud_analysis = spark.sql(
    "SELECT Fraud, AVG(LatePaymentAmount) AS AverageLatePaymentAmount FROM FraudData GROUP BY Fraud"
)

# Show the results for Late Payment Fraud Analysis
late_payment_fraud_analysis.show()


+-----+------------------------+
|Fraud|AverageLatePaymentAmount|
+-----+------------------------+
|    0|       40983.30796360626|
|    1|       59970.60809667674|
+-----+------------------------+



In [4]:
# Prepare the data for classification
assembler = VectorAssembler(inputCols=["CustomerAge", "LatePaymentAmount"], outputCol="features")

# Create the Logistic Regression classifier
lr = LogisticRegression(labelCol="Fraud", featuresCol="features", maxIter=10)

# Create a Pipeline to chain the VectorAssembler and LogisticRegression
pipeline = Pipeline(stages=[assembler, lr])

# Split the data into training and testing sets (80% for training, 20% for testing)
(training_data, test_data) = df.randomSplit([0.8, 0.2], seed=42)

# Train the model using the Pipeline
model = pipeline.fit(training_data)

# Make predictions on the test set
predictions = model.transform(test_data)

# Evaluate the model using BinaryClassificationEvaluator for both AUC and accuracy
evaluator = BinaryClassificationEvaluator(labelCol="Fraud", rawPredictionCol="rawPrediction")
au_roc = evaluator.setMetricName("areaUnderROC").evaluate(predictions)
au_pr = evaluator.setMetricName("areaUnderPR").evaluate(predictions)

# Create a DataFrame to store the evaluation results
evaluation_result = spark.createDataFrame([(au_roc, "Area Under ROC"), (au_pr, "Area Under PR")], ["Value", "Metric"])

# Show the evaluation results
evaluation_result.show()


+-------------------+--------------+
|              Value|        Metric|
+-------------------+--------------+
|  0.846113825639713|Area Under ROC|
|0.28770872586642376| Area Under PR|
+-------------------+--------------+



In [5]:
risk_level_analysis = df.withColumn("RiskLevel", when(col("RiskLevel").isNull(), "No Risk").otherwise(col("RiskLevel"))) \
    .groupBy("RiskLevel") \
    .agg(
        count("LoanID").alias("TotalLoans"),
        sum("Fraud").alias("TotalFrauds"),
        avg("AnnualIncome").alias("AvgIncome"),
        avg("CreditScore").alias("AvgCreditScore"),
        max("CustomerAge").alias("MaxCustomerAge"),
        min("CustomerAge").alias("MinCustomerAge")
    )

# Show the results for Risk Level Analysis
risk_level_analysis.show()


+---------+----------+-----------+------------------+------------------+--------------+--------------+
|RiskLevel|TotalLoans|TotalFrauds|         AvgIncome|    AvgCreditScore|MaxCustomerAge|MinCustomerAge|
+---------+----------+-----------+------------------+------------------+--------------+--------------+
|     None|    105786|          0| 169088.9987063978|399.64179570075436|            75|            30|
|  LowRisk|     78841|       8242|169608.79358316105|372.61752134041933|            75|            25|
| HighRisk|     43601|       8301| 167419.7960801358| 352.2618517923901|            75|            25|
|     Risk|     79283|       8282|168361.07009068778| 373.3037725615832|            75|            25|
+---------+----------+-----------+------------------+------------------+--------------+--------------+



In [6]:
# Stop the SparkSession
spark.stop()
