In [1]:
!pip install pyspark



"local": This parameter specifies the master URL, which, in this case, is set to "local". The "local" mode is used to run Spark on a single machine locally. It does not connect to a cluster but instead runs everything on a single thread locally, making it suitable for development, testing, or small-scale data processing.

In [2]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("FraudDetection").getOrCreate()

Typical Workflow with SparkSession:

Creation: You create a SparkSession using SparkSession.builder specifying configurations and options.

Operations with DataFrames or Datasets: You can perform various operations on DataFrames or Datasets, such as reading/writing data, transforming data, running SQL queries, etc.

Execution: SparkSession manages the execution of these operations, optimizing and distributing the computations across the cluster.

Resource Management: It manages resources, handles connectivity to the cluster, and orchestrates job execution on worker nodes.

Shutdown: Once the Spark application completes its tasks, you should stop the SparkSession using spark.stop() to release resources and shut down the Spark application properly.

In [None]:
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark import SparkContext

In [None]:
!pip install pyspark

# **Load CSV file as a Spark DataFrame**


In [3]:
df=spark.read.csv("E:\\NCI\\Database & Analytics Programming\\Week 11\\creditcard.csv",header=True, inferSchema=True)
df

DataFrame[Time: double, V1: double, V2: double, V3: double, V4: double, V5: double, V6: double, V7: double, V8: double, V9: double, V10: double, V11: double, V12: double, V13: double, V14: double, V15: double, V16: double, V17: double, V18: double, V19: double, V20: double, V21: double, V22: double, V23: double, V24: double, V25: double, V26: double, V27: double, V28: double, Amount: double, Class: int]

# Check for missing values and handle them

In [4]:
df.show()

+----+------------------+-------------------+------------------+-------------------+-------------------+-------------------+--------------------+-------------------+------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+------+-----+
|Time|                V1|                 V2|                V3|                 V4|                 V5|                 V6|                  V7|                 V8|                V9|                V10|               V11|               V12|                V13|                V14|                V15|                V16|                 V17|                V18|                V19|                V20|                 V

# Drop rows with missing values for demonstration purposes

In [5]:
from pyspark.sql.functions import col, sum as spark_sum
columns = df.columns
data = df.collect()
null_counts = df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
null_counts.show()

+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+------+-----+
|Time| V1| V2| V3| V4| V5| V6| V7| V8| V9|V10|V11|V12|V13|V14|V15|V16|V17|V18|V19|V20|V21|V22|V23|V24|V25|V26|V27|V28|Amount|Class|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+------+-----+
|   0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|     0|    0|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+------+-----+



In [6]:
df_cleaned =df.dropna()
print("Original DataFrame:")
df.show()

print("\nDataFrame after dropping missing values:")
df_cleaned.show()

Original DataFrame:
+----+------------------+-------------------+------------------+-------------------+-------------------+-------------------+--------------------+-------------------+------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+------+-----+
|Time|                V1|                 V2|                V3|                 V4|                 V5|                 V6|                  V7|                 V8|                V9|                V10|               V11|               V12|                V13|                V14|                V15|                V16|                 V17|                V18|                V19|                V2

+----+------------------+-------------------+------------------+-------------------+-------------------+-------------------+--------------------+-------------------+------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+------+-----+
|Time|                V1|                 V2|                V3|                 V4|                 V5|                 V6|                  V7|                 V8|                V9|                V10|               V11|               V12|                V13|                V14|                V15|                V16|                 V17|                V18|                V19|                V20|                 V

# Split data into features and target variable

In [7]:
from pyspark.ml.feature import MinMaxScaler, StandardScaler, VectorAssembler
from pyspark.ml.feature import VectorAssembler


# Create Vector

In [8]:
assembler=VectorAssembler(inputCols=[col for col in columns if col != "Class"],outputCol='Features')
assembled_df = assembler.transform(df_cleaned)
assembled_df.show()


+----+------------------+-------------------+------------------+-------------------+-------------------+-------------------+--------------------+-------------------+------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+------+-----+--------------------+
|Time|                V1|                 V2|                V3|                 V4|                 V5|                 V6|                  V7|                 V8|                V9|                V10|               V11|               V12|                V13|                V14|                V15|                V16|                 V17|                V18|                V19|                V

# Scale features using MinMaxScaler

In [15]:
from pyspark.ml.feature import MinMaxScaler, StandardScaler, VectorAssembler
from pyspark.ml.feature import VectorAssembler
min_max_scaler = MinMaxScaler(inputCol="Features", outputCol="minmax_scaled_features")
min_max_model = min_max_scaler.fit(assembled_df)
min_max_scaled_df = min_max_model.transform(assembled_df)

# Split data into training and testing sets

In [16]:
train_data, test_data = min_max_scaled_df.randomSplit([0.7, 0.3])

# Initialize and train Random Forest classifier

In [17]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.sql.functions import col
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.util import MLUtils

rf = RandomForestClassifier(featuresCol='minmax_scaled_features', labelCol='Class')
#rf1 = DecisionTreeClassifier(featuresCol='minmax_scaled_features', labelCol='Class')
# Training the model
model = rf.fit(train_data)
#model1 = rf1.fit(train_data)


# Predict on the test set

In [18]:
# Making predictions on test data
predictions = model.transform(test_data)
#predictions1 = model1.transform(test_data)

# Evaluate the model Accuracy, precision, recall and F1 scors

In [19]:
# Evaluation
evaluator = MulticlassClassificationEvaluator(labelCol='Class', metricName='accuracy')
#evaluator1 = MulticlassClassificationEvaluator(labelCol='Class', metricName='accuracy1')
accuracy = evaluator.evaluate(predictions)
#accuracy1 = evaluator1.evaluate(predictions1)

precision_recall_f1 = MulticlassClassificationEvaluator(labelCol='Class', metricName='weightedPrecision')\
                        .evaluate(predictions),\
                    MulticlassClassificationEvaluator(labelCol='Class', metricName='weightedRecall')\
                        .evaluate(predictions),\
                    MulticlassClassificationEvaluator(labelCol='Class', metricName='f1')\
                        .evaluate(predictions)

#precision_recall_f2 = MulticlassClassificationEvaluator(labelCol='Class', metricName='weightedPrecision')\
                  #      .evaluate(predictions1),\
                   # MulticlassClassificationEvaluator(labelCol='Class', metricName='weightedRecall')\
                    #    .evaluate(predictions1),\
                    #MulticlassClassificationEvaluator(labelCol='Class', metricName='f2')\
                     #   .evaluate(predictions1)

print(f"Accuracy: {accuracy}")
#print(f"Accuracy: {accuracy1}")

Accuracy: 0.9993684432125188


In [22]:
print(f"Precision: {precision_recall_f1[0]}")
#print(f"Precision: {precision_recall_f2[0]}")

Precision: 0.9993362155654525


In [23]:
print(f"Recall: {precision_recall_f1[1]}")
#print(f"Recall: {precision_recall_f2[1]}")

Recall: 0.9993684432125188


In [24]:
print(f"F1-score: {precision_recall_f1[2]}")
#print(f"Recall: {precision_recall_f2[2]}")

F1-score: 0.999344737465534


# Stop the Spark session