# Install Spark
Installing PySpark library, which is a Python API for Apache Spark, allowing for big data processing.

In [None]:
!pip install pyspark



Importing the SparkSession library form pyspark.sql module to create a SparkSession, which is the entry point to programming Spark with the Dataset and DataFrame API.

In [None]:
from pyspark.sql import SparkSession

Creating a Spark session
Initializes a SparkSession with configurations such as the app name and master node details.
Here, the app will be named "ML Fraud Pred" and runs locally using all available cores.

In [None]:
spark = SparkSession.builder\
    .master("local[*]")\
    .appName("ML Fraud Pred")\
    .getOrCreate()
sc = spark.sparkContext


# Data Reading
Loading a CSV file into a Spark DataFrame with column headers and automatically infers column data types.
The df.printSchema prints the schema of the DataFrame, which shows the column names and data types.

In [None]:
df = spark.read.csv('/content/Fraud_Data1.csv', header = True, inferSchema = True)
df.printSchema()


root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



This command shows the first 5 rows of the DataFrame to get a quick overview of the data.

In [None]:
df.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

# Filtering the Data and Performing Undersampling
Creates two seperate DataFrames by filtering the original DataFrame based on the value of the 'isFraud' column.

The counts the number of rows in each DataFrame, representing fraud and non-fraud instances.

Calculates the ratio of fraud to non-fraud instances to use for understanding the majority class.

Creates a new DataFrame from the non-fraud DataFrame by sampling a fraction of its instance without replacement, where the fraction is determined by the previously calculated ratio.

Combining the fraud DataFrame with the undersampled non-fraud DataFrame to create a more balanced dataset.

In [None]:
# Separate the two classes
fraud_df = df.filter(df.isFraud == 1)
non_fraud_df = df.filter(df.isFraud == 0)

# Count the instances
fraud_count = fraud_df.count()
non_fraud_count = non_fraud_df.count()

# Calculate the ratio to undersample the larger class
ratio = fraud_count / non_fraud_count

# Perform undersampling
undersampled_non_fraud_df = non_fraud_df.sample(False, ratio)

# Combine back the undersampled non-fraud data with the fraud data
balanced_df = fraud_df.union(undersampled_non_fraud_df)


Importing necessary classes for feature transformation and machine learning model

Preparing the data for machine learning
Selects columns to be used as features and the target variable, then uses VectorAssembler to transform these columns into a single vector column.

Applying the transformation
Transforms the balanced DataFrame and selects only the features vector and target variable for the machine learning algorithms.

Splitting the data into training and test sets
Randomly splits the data into training and test sets with 70-30 split and a seed for reproducibility.

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# Assuming `balanced_df` is your DataFrame

# First, index the string column to numeric indices
stringIndexer = StringIndexer(inputCol="type", outputCol="typeIndex")

# Then, one-hot encode these indices
encoder = OneHotEncoder(inputCols=["typeIndex"], outputCols=["typeVec"])

# Define other feature columns that don't need encoding
featureCols = ["step", "amount", "oldbalanceOrg", "newbalanceOrig"]

# Assemble all feature columns (numeric + one-hot encoded) into a single vector
assembler = VectorAssembler(inputCols=featureCols + ["typeVec"], outputCol="features")

# Define a pipeline that executes the steps in sequence
pipeline = Pipeline(stages=[stringIndexer, encoder, assembler])

# Transform the data
transformed_data = pipeline.fit(balanced_df).transform(balanced_df)

# Now, select only the features vector and the label for ML algorithms
data = transformed_data.select("features", "isFraud")

# Split the data into training and test sets
train_data, test_data = data.randomSplit([0.7, 0.3], seed=42)


In [None]:
from pyspark.ml.classification import LogisticRegression

# Initialize the Logistic Regression model
lr = LogisticRegression(labelCol="isFraud", featuresCol="features", maxIter=10)

# Train the model
lrModel = lr.fit(train_data)


In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Predict on the test data
predictions = lrModel.transform(test_data)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol="isFraud")
auc = evaluator.evaluate(predictions)

print(f"Test AUC: {auc}")


Test AUC: 0.9591836734693877
