## Fraud detection project using PySpark and Streamlit

In [1]:
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [2]:
# Create the Spark Session
from pyspark.sql import SparkSession
spark = (
    SparkSession
    .builder
    .appName("Streaming FraudDetection Data")
    .config("spark.streaming.stopGracefullyOnShutdown", True)
    .master("local[*]")
    .getOrCreate()
)

spark

In [3]:
# To allow automatic schemaInference while reading
spark.conf.set("spark.sql.streaming.schemaInference", True)

# Create the streaming_df to read from input directory
df = (
    spark
    .readStream
    .option("header", "true")
    .option("cleanSource", "archive")
    .option("sourceArchiveDir", "fraud_detect_archive_dir")
    .option("maxFilesPerTrigger", 1)
    .format("csv")
    .load("/content/")
)

# Show schema and sample data
df.printSchema()
# df.show(5)

root
 |-- Time: string (nullable = true)
 |-- V1: string (nullable = true)
 |-- V2: string (nullable = true)
 |-- V3: string (nullable = true)
 |-- V4: string (nullable = true)
 |-- V5: string (nullable = true)
 |-- V6: string (nullable = true)
 |-- V7: string (nullable = true)
 |-- V8: string (nullable = true)
 |-- V9: string (nullable = true)
 |-- V10: string (nullable = true)
 |-- V11: string (nullable = true)
 |-- V12: string (nullable = true)
 |-- V13: string (nullable = true)
 |-- V14: string (nullable = true)
 |-- V15: string (nullable = true)
 |-- V16: string (nullable = true)
 |-- V17: string (nullable = true)
 |-- V18: string (nullable = true)
 |-- V19: string (nullable = true)
 |-- V20: string (nullable = true)
 |-- V21: string (nullable = true)
 |-- V22: string (nullable = true)
 |-- V23: string (nullable = true)
 |-- V24: string (nullable = true)
 |-- V25: string (nullable = true)
 |-- V26: string (nullable = true)
 |-- V27: string (nullable = true)
 |-- V28: string (nulla

In [4]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

# Define the schema
schema = StructType([
    StructField("Time", DoubleType(), True),
    StructField("V1", DoubleType(), True),
    StructField("V2", DoubleType(), True),
    StructField("V3", DoubleType(), True),
    StructField("V4", DoubleType(), True),
    StructField("V5", DoubleType(), True),
    StructField("V6", DoubleType(), True),
    StructField("V7", DoubleType(), True),
    StructField("V8", DoubleType(), True),
    StructField("V9", DoubleType(), True),
    StructField("V10", DoubleType(), True),
    StructField("V11", DoubleType(), True),
    StructField("V12", DoubleType(), True),
    StructField("V13", DoubleType(), True),
    StructField("V14", DoubleType(), True),
    StructField("V15", DoubleType(), True),
    StructField("V16", DoubleType(), True),
    StructField("V17", DoubleType(), True),
    StructField("V18", DoubleType(), True),
    StructField("V19", DoubleType(), True),
    StructField("V20", DoubleType(), True),
    StructField("V21", DoubleType(), True),
    StructField("V22", DoubleType(), True),
    StructField("V23", DoubleType(), True),
    StructField("V24", DoubleType(), True),
    StructField("V25", DoubleType(), True),
    StructField("V26", DoubleType(), True),
    StructField("V27", DoubleType(), True),
    StructField("V28", DoubleType(), True),
    StructField("Amount", DoubleType(), True),
    StructField("Class", IntegerType(), True)
])


In [5]:
# Read the CSV file with the schema
df = spark.read.csv("/content/creditcard.csv", schema=schema, header=True)

# Verify schema
df.printSchema()

root
 |-- Time: double (nullable = true)
 |-- V1: double (nullable = true)
 |-- V2: double (nullable = true)
 |-- V3: double (nullable = true)
 |-- V4: double (nullable = true)
 |-- V5: double (nullable = true)
 |-- V6: double (nullable = true)
 |-- V7: double (nullable = true)
 |-- V8: double (nullable = true)
 |-- V9: double (nullable = true)
 |-- V10: double (nullable = true)
 |-- V11: double (nullable = true)
 |-- V12: double (nullable = true)
 |-- V13: double (nullable = true)
 |-- V14: double (nullable = true)
 |-- V15: double (nullable = true)
 |-- V16: double (nullable = true)
 |-- V17: double (nullable = true)
 |-- V18: double (nullable = true)
 |-- V19: double (nullable = true)
 |-- V20: double (nullable = true)
 |-- V21: double (nullable = true)
 |-- V22: double (nullable = true)
 |-- V23: double (nullable = true)
 |-- V24: double (nullable = true)
 |-- V25: double (nullable = true)
 |-- V26: double (nullable = true)
 |-- V27: double (nullable = true)
 |-- V28: double (nulla

In [6]:
# Check for null values
df.select([col(c).isNull().alias(c) for c in df.columns])

# Convert labels to integers
# df = df.withColumn("Class", col("Class").cast("integer"))

# Select features and target variable
features = [col for col in df.columns if col not in ["Class", "Time"]]

In [7]:
# Vectorize features
assembler = VectorAssembler(inputCols=features, outputCol="features")
df_vectorized = assembler.transform(df).select("features", "Class")

In [40]:
df_vectorized.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Class: integer (nullable = true)



In [None]:
# scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
# scaler_model = scaler.fit(df_vectorized)
# df_scaled = scaler_model.transform(df_vectorized).select("scaledFeatures", "Class")

In [8]:
train_data, test_data = df_vectorized.randomSplit([0.8, 0.2], seed=42)

In [42]:
train_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Class: integer (nullable = true)



In [9]:
# Fill nulls or drop rows with nulls
train_data = train_data.na.fill(0)  # Replace nulls with 0, or use a value appropriate for your data
# OR
train_data = train_data.na.drop()

In [10]:
lr = LogisticRegression(featuresCol="features", labelCol="Class")

# Remove missing values
train_data = train_data.dropna()

lr_model = lr.fit(train_data)

In [11]:
predictions = lr_model.transform(test_data)

# Evaluate using BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="Class", rawPredictionCol="rawPrediction")
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")

AUC: 0.9639094999994379


In [12]:
lr_model.save("/content/saved_model")

In [13]:
!pip install streamlit

Collecting streamlit
  Downloading streamlit-1.41.1-py2.py3-none-any.whl.metadata (8.5 kB)
Collecting watchdog<7,>=2.1.5 (from streamlit)
  Downloading watchdog-6.0.0-py3-none-manylinux2014_x86_64.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.3/44.3 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
Collecting pydeck<1,>=0.8.0b4 (from streamlit)
  Downloading pydeck-0.9.1-py2.py3-none-any.whl.metadata (4.1 kB)
Downloading streamlit-1.41.1-py2.py3-none-any.whl (9.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.1/9.1 MB[0m [31m55.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pydeck-0.9.1-py2.py3-none-any.whl (6.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.9/6.9 MB[0m [31m85.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading watchdog-6.0.0-py3-none-manylinux2014_x86_64.whl (79 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m79.1/79.1 kB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[

In [None]:
!npm install localtunnel

[1G[0K⠙[1G[0K⠹[1G[0K⠸[1G[0K⠼[1G[0K⠴[1G[0K⠦[1G[0K⠧[1G[0K⠇[1G[0K⠏[1G[0K⠋[1G[0K⠙[1G[0K⠹[1G[0K⠸[1G[0K⠼[1G[0K⠴[1G[0K⠦[1G[0K⠧[1G[0K⠇[1G[0K⠏[1G[0K⠋[1G[0K⠙[1G[0K⠹[1G[0K
added 22 packages in 3s
[1G[0K⠹[1G[0K
[1G[0K⠹[1G[0K3 packages are looking for funding
[1G[0K⠹[1G[0K  run `npm fund` for details
[1G[0K⠹[1G[0K

In [53]:
%%writefile app.py

import streamlit as st
from pyspark.ml.classification import LogisticRegressionModel
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
import pandas as pd

# Initialize Spark session and load model
spark = SparkSession.builder.appName("FraudDetectionApp").getOrCreate()
lr_model = LogisticRegressionModel.load("/content/saved_model")

# Streamlit UI
st.title("Fraud Detection System")
st.write("Enter transaction details below:")

# Input features
feature_columns = [
    'V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10',
    'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19',
    'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28',
    'Amount'
]

# Collect user input for all features
input_data = {}
for feature in feature_columns:
    input_data[feature] = st.number_input(f"Enter {feature}", value=0.0)

# Convert input to a Pandas DataFrame
df = pd.DataFrame([input_data])

# Predict button
if st.button("Predict"):
    # Convert Pandas DataFrame to Spark DataFrame
    spark_df = spark.createDataFrame(df)

    # Vectorize features
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    df_vectorized = assembler.transform(spark_df).select("features", "Class")

    # Perform prediction
    predictions = lr_model.transform(df_vectorized)

    # Extract prediction result
    prediction = predictions.select("prediction").collect()[0][0]

    # Display result
    st.write("Fraudulent Transaction" if prediction == 1 else "Legitimate Transaction")

Overwriting app.py


In [None]:
!curl https://loca.lt/mytunnelpassword

In [None]:
!wget -q -O - ipv4.icanhazip.com

In [None]:
!streamlit run app.py & npx localtunnel --port 8501