In [None]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.3.1'
spark_version = 'spark-3.5.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Waiting for headers] [Waiting for headers] [Waiting for headers] [Connected to r2u.stat.illinois                                                                                                    Get:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
                                                                                                    Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
                                                                                                    Get:4 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
                                                                                                    Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
0% [2 InRelease 47.5 kB/128 kB 37%] [3 InRelease 14.2 kB/129 kB 11%] [Waiting for hea

In [None]:
# Importing pyspark packages and linking google colab driver
from pyspark.sql import functions as f
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Starting the spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("GoogleDriveCSV") \
    .getOrCreate()

# Getting the correct file path
file_path = '/content/drive/My Drive/Fraud_Detection/fraudTest.csv'
df = spark.read.csv(file_path, header=True, inferSchema=True)
df.show()

In [None]:
# Finding the total rows in the first data set
total_rows = df.count()
print(total_rows)

In [None]:
# Getting the second file path to prepair for union
file_path2 = '/content/drive/My Drive/Fraud_Detection/fraudTrain.csv'
train_df = spark.read.csv(file_path2, header=True, inferSchema=True)
train_df.show()

In [None]:
# Finding the total rows in the second data set
total_rows_2 = train_df.count()
print(total_rows_2)

In [None]:
# creating a union data set from both csv files and finding the total number of rows
union_df = df.union(train_df)
print(union_df.count())

In [None]:
# Dropping columns that we found not important for the model
clean_df = union_df.drop("trans_date_trans_time", "first","last","cc_num","street", "city", "state", "zip", "job", "dob","trans_num")
clean_df

In [None]:
# Changing the gender column to a integer format
gender_df = clean_df.withColumn("gender", f.when(clean_df["gender"] == "M", 1).otherwise(0))
gender_df.show()

In [None]:
# adding a indexer for the merchant names
indexer = StringIndexer(inputCol="merchant", outputCol="merchant_index")
df_indexed = indexer.fit(gender_df).transform(gender_df)

# Using the pyspark package to convert the merchant column to a interger format
encoder = OneHotEncoder(inputCols=["merchant_index"], outputCols=["merchant_OHE"])
df_encoded = encoder.fit(df_indexed).transform(df_indexed)

# Shwoing the results
df_encoded.show(truncate=False)

In [None]:
# Repeating the process for the category index
indexer_2 = StringIndexer(inputCol="category", outputCol="category_index")
df_indexed_2 = indexer_2.fit(df_encoded).transform(df_encoded)

# Repeating the process with the encoder for category column
encoder_2 = OneHotEncoder(inputCols=["category_index"], outputCols=["category_OHE"])
df_encoded_2 = encoder_2.fit(df_indexed_2).transform(df_indexed_2)

# Showing the results
df_encoded_2.show(truncate=False)

In [None]:
# The final cleaing for for the model to use
cleaned_df = df_encoded_2.drop("merchant", "category","merchant_index","category_index")
cleaned_df.show()

In [None]:
# Importing the correct packages for the learning process
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# **Model 1**


In [None]:
# Using the assembler to transform the the columns into a features column
assembler = VectorAssembler(inputCols=["amt", "lat", "long", "city_pop", "unix_time", "merch_lat", "merch_long", "merchant_OHE", "category_OHE"], outputCol="features")
df_assembled = assembler.transform(cleaned_df)

# spliting the data into 80% training and 20% testing
train_df, test_df = df_assembled.randomSplit([0.8, 0.2], seed=23)

# using random forest to classify the fraud label
rf = RandomForestClassifier(featuresCol="features", labelCol="is_fraud", numTrees=10)

# fitting the model
rf_model = rf.fit(train_df)

# adding a prediction function
predictions = rf_model.transform(test_df)

# evaluating the data to test for accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="is_fraud", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Model Accuracy: {accuracy}")

In [None]:
# showing the data from our results
predictions.select("_c0", "features", "is_fraud", "prediction").show(truncate=False)

In [None]:
# Calculating the value counts and printing the results
valuecounts_data = predictions.groupBy("prediction", "is_fraud").count()
valuecounts_data.show()


# **model 2**

In [None]:
# Dropping the original data in the is_fraud category
cleaned_df_test = df_encoded_2.drop("is_fraud","merchant", "category","merchant_index","category_index")
cleaned_df_test.show()

In [None]:
# adding a column that generates a number between 0 and 1 for each row, with 20% being 0 and 80% being 1
from pyspark.sql.functions import rand, when
df_with_fraud_test = cleaned_df_test.withColumn(
    "fraud_test",
    when(rand() <0.2, 1).otherwise(0)
)

# Showing the results
df_with_fraud_test.show()

In [None]:
# Using the assembler to transform the the columns into a features column
assembler_2 = VectorAssembler(inputCols=["amt", "lat", "long", "city_pop", "unix_time", "merch_lat", "merch_long", "merchant_OHE", "category_OHE"], outputCol="features")
df_assembled_2 = assembler_2.transform(df_with_fraud_test)

# spliting the data into 80% training and 20% testing
train_df_2, test_df_2 = df_assembled_2.randomSplit([0.8, 0.2], seed=23)

# using random forest to classify the fraud label
rf_2 = RandomForestClassifier(featuresCol="features", labelCol="fraud_test", numTrees=10)

# fitting the model
rf_model_2 = rf_2.fit(train_df_2)

# adding a prediction function
predictions_2 = rf_model_2.transform(test_df_2)

# evaluating the data to test for accuracy
evaluator_2 = MulticlassClassificationEvaluator(labelCol="fraud_test", predictionCol="prediction", metricName="accuracy")
accuracy_2 = evaluator_2.evaluate(predictions_2)
print(f"Model Accuracy: {accuracy_2}")

In [None]:
# showing the data from our results
predictions_2.select("_c0", "features", "fraud_test", "prediction").show(truncate=False)

In [None]:
# Calculating the value counts and printing the results
valuecounts_data_2 = predictions_2.groupBy("prediction", "fraud_test").count()
valuecounts_data_2.show()

# **model 3**

In [None]:
## adding a column that generates a number between 0 and 1 for each row, with 50% being 0 and 50% being 1
from pyspark.sql.functions import rand, when # Import the 'when' function
df_with_fraud_test_3 = cleaned_df_test.withColumn(
    "fraud_test",
    when(rand() <0.5, 1).otherwise(0)
)

# Showing the results
df_with_fraud_test_3.show()

In [None]:
# Using the assembler to transform the the columns into a features column
assembler_3 = VectorAssembler(inputCols=["amt", "lat", "long", "city_pop", "unix_time", "merch_lat", "merch_long", "merchant_OHE", "category_OHE"], outputCol="features")
df_assembled_3 = assembler_3.transform(df_with_fraud_test_3)

# spliting the data into 80% training and 20% testing
train_df_3, test_df_3 = df_assembled_3.randomSplit([0.8, 0.2], seed=23)

# using random forest to classify the fraud label
rf_3 = RandomForestClassifier(featuresCol="features", labelCol="fraud_test", numTrees=10)

# fitting the model
rf_model_3 = rf_3.fit(train_df_3)

# adding a prediction function
predictions_3 = rf_model_3.transform(test_df_3)

# evaluating the data to test for accuracy
evaluator_3 = MulticlassClassificationEvaluator(labelCol="fraud_test", predictionCol="prediction", metricName="accuracy")
accuracy_3 = evaluator_3.evaluate(predictions_3)
print(f"Model Accuracy: {accuracy_3}")

In [None]:
# showing the data from our results
predictions_3.select("_c0", "features", "fraud_test", "prediction").show(truncate=False)

In [None]:
# Calculating the value counts and printing the results
valuecounts_data_3 = predictions_3.groupBy("prediction", "fraud_test").count()
valuecounts_data_3.show()

# **model 4**

In [None]:
# adding a column that generates a number between 0 and 1 for each row, with 50% being 0 and 50% being 1
from pyspark.sql.functions import rand, when # Import the 'when' function
df_with_fraud_test_4 = cleaned_df_test.withColumn(
    "fraud_test",
    when(rand() <0.5, 1).otherwise(0)
)

# Showing the results
df_with_fraud_test_4.show()

In [None]:
# Using the assembler to transform the the columns into a features column
assembler_4 = VectorAssembler(inputCols=["amt", "lat", "long", "city_pop", "unix_time", "merch_lat", "merch_long", "merchant_OHE", "category_OHE"], outputCol="features")
df_assembled_4 = assembler_4.transform(df_with_fraud_test_4)

# spliting the data into 80% training and 20% testing and changing the seed
train_df_4, test_df_4 = df_assembled_4.randomSplit([0.8, 0.2], seed=32)

# using random forest to classify the fraud label
rf_4 = RandomForestClassifier(featuresCol="features", labelCol="fraud_test", numTrees=10)

# fitting the model
rf_model_4 = rf_4.fit(train_df_4)

# adding a prediction function
predictions_4 = rf_model_4.transform(test_df_4)

# evaluating the data to test for accuracy
evaluator_4 = MulticlassClassificationEvaluator(labelCol="fraud_test", predictionCol="prediction", metricName="accuracy")
accuracy_4 = evaluator_4.evaluate(predictions_4)
print(f"Model Accuracy: {accuracy_4}")

In [None]:
# showing the data from our results
predictions_4.select("_c0", "features", "fraud_test", "prediction").show(truncate=False)

In [None]:
# Calculating the value counts and printing the results
valuecounts_data_4 = predictions_4.groupBy("prediction", "fraud_test").count()
valuecounts_data_4.show()

In [None]:
# importing more pyspark packages
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

# identifing the features and coverting into an array
feature_names = ["amt", "lat", "long", "city_pop", "unix_time", "merch_lat", "merch_long", "merchant_OHE", "category_OHE"]
feature_importances = rf_model_4.featureImportances
importances = feature_importances.toArray()

# converting into the correct language for pyspark to understand
data = [(feature, float(importance)) for feature, importance in zip(feature_names, importances)]
schema = StructType([
    StructField("feature", StringType(), True),
    StructField("importance", DoubleType(), True)
])
importances_df = spark.createDataFrame(data, schema)

# Shwoing the results
importances_df.show()

In [None]:
# Collecting the importance values into a list
importance_values = [0.03559887212631051, 0.03753287591376552, 0.022066089033708977, 0.02222002030989909,
                     0.011923027363131824, 0.03455210984922371, 0.0, 0.0, 0.0]

# Totaling the sum of importances
total_importance = sum(importance_values)

# Normalizing the importance values (divide each by the total)
normalized_importances = [importance / total_importance for importance in importance_values]

# Show normalized importances
print(f"Normalized Importances: {normalized_importances}")


In [None]:
# checking sum to make sur it adds to 1
sum(normalized_importances)


In [None]:
# Initializing Spark session (if not already done)
spark = SparkSession.builder.master("local").appName("NormalizedFeatureImportance").getOrCreate()

# Original feature names
feature_names = ["amt", "lat", "long", "city_pop", "unix_time", "merch_lat", "merch_long", "merchant_OHE", "category_OHE"]

# Normalized importance values (calculated previously)
normalized_importances = [0.216, 0.229, 0.135, 0.135, 0.073, 0.211, 0.0, 0.0, 0.0]

# Combining feature names with normalized importances into tuples
normalized_data = [(feature, importance) for feature, importance in zip(feature_names, normalized_importances)]

# Defining the schema for the DataFrame
schema = StructType([
    StructField("feature", StringType(), True),
    StructField("normalized_importance", DoubleType(), True)
])

# Creating the PySpark DataFrame from the combined data
normalized_df = spark.createDataFrame(normalized_data, schema)

# Show the resulting DataFrame
normalized_df.show(truncate=False)

In [None]:
# importing for visuals
import pandas as pd
import matplotlib.pyplot as plt

# Convert the PySpark DataFrame to Pandas DataFrame for easy plotting
pandas_df = normalized_df.toPandas()

# Plotting the normalized feature importances using matplotlib
plt.figure(figsize=(10, 6))
plt.bar(pandas_df['feature'], pandas_df['normalized_importance'], color='skyblue')
plt.xlabel('Feature')
plt.ylabel('Normalized Importance')
plt.title('Normalized Feature Importance')
plt.xticks(rotation=45, ha='right')  # Rotate feature names for readability
plt.tight_layout()  # Adjust layout to avoid clipping
plt.show()

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Preparing the data for plotting
fraud_test_values = [0, 1]  # Fraud categories (0 = not fraud, 1 = fraud)
prediction_value = 0.0       # Only looking at prediction = 0.0

# Counts for each fraud_test (is_fraud 0 and 1) under prediction = 1.0
counts = np.array([368566, 1898])  # [not fraud, fraud]

# Creating the stacked bar plot
fig, ax = plt.subplots(figsize=(8, 6))

# Bar width for each category
bar_width = 0.3
index = np.arange(len(fraud_test_values))  # Position of the bars on x-axis

# Plotting the bars
ax.bar(index, counts, bar_width, label=f"Prediction: {prediction_value}", color='skyblue')

# Adding labels and title
ax.set_xlabel('Actual Fraud Test', fontsize=12)
ax.set_ylabel('Count', fontsize=12)
ax.set_title(f'Stacked Bar Plot: Prediction = {prediction_value} vs Actual Fraud Test', fontsize=16)

# Set x-axis labels
ax.set_xticks(index)
ax.set_xticklabels(['Not Fraud', 'Fraud'])

# Adding the legend
ax.legend()

# Displaing the plot
plt.show()


In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Preparing the data for plotting
prediction_values = [1.0, 0.0]
fraud_test_values = [0, 1]

# Arranging the counts in a 2x2 matrix for fraud_test vs prediction
counts = np.array([[160136, 25310],  # fraud_test = 0 (Predicted: 1.0, 0.0)
                   [25158, 159805]])  # fraud_test = 1 (Predicted: 0.0, 1.0)

# Creating the stacked bar plot
fig, ax = plt.subplots(figsize=(8, 6))

# Bar width for each category
bar_width = 0.3
index = np.arange(len(fraud_test_values))  # Position of the bars on x-axis

# Plotting the bars
ax.bar(index, counts[0], bar_width, label="Predicted: fraud", color='skyblue')  # Predicted fraud (fraud_test == 0)
ax.bar(index, counts[1], bar_width, bottom=counts[0], label="Predicted: not fraud", color='lightcoral')  # Predicted not fraud

# Adding labels and title
ax.set_xlabel('Actual Fraud Test', fontsize=12)
ax.set_ylabel('Count', fontsize=12)
ax.set_title('Stacked Bar Plot: Prediction vs Fraud Test', fontsize=16)

# Setting x-axis labels
ax.set_xticks(index)
ax.set_xticklabels(['Not Fraud', 'Fraud'])

# Adding the legend
ax.legend()

# Displaing the plot
plt.show()
