IMPORTS

In [None]:
!pip install kafka-python pyspark




In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


INITIALIZING SPARK SESSION

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("RealTimeProcessing").getOrCreate()


INDEXER INITIATION TO DEAL WITH CATEGORICAL VARIABLES

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql import functions as F

spark_df = spark.read.csv(path="/content/drive/My Drive/fraudTest.csv",header=True,inferSchema=True)
cat_var = [item[0] for item in spark_df.dtypes if item[1].startswith('string')]
indexers = [StringIndexer(inputCol=column, outputCol=column+"-encoded").fit(spark_df) for column in cat_var]
pipeline = Pipeline(stages=indexers)

MODEL PREPARATION (TRAINING AND TESTING)

In [None]:
# model training
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

file_path = '/content/drive/My Drive/fraudTrain.csv'
data = spark.read.csv(file_path, header=True, inferSchema=True)
relevant_columns = ['city_pop', 'category', 'gender', 'is_fraud']
data = data.select(relevant_columns)

# Ensure target labels are integers
data = data.withColumn("label", col("is_fraud").cast("int"))

# Encode categorical columns
data = pipeline.fit(data).transform(data)

# Assemble features into a single vector
feature_columns = ['city_pop', 'category-encoded', 'gender-encoded']
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(data)

# Keep only features and label
data = data.select(col("features"), col("label"))

# Balance the dataset: Equal number of fraud and non-fraud cases
fraud_data = data.filter(data.label == 1)
non_fraud_data = data.filter(data.label == 0)
non_fraud_downsampled = non_fraud_data.sample(withReplacement=False, fraction=fraud_data.count() / non_fraud_data.count(), seed=42)
balanced_data = fraud_data.union(non_fraud_downsampled)

# Split the balanced data into training and testing sets
train_data, test_data = balanced_data.randomSplit([0.8, 0.2], seed=42)

# Train a simpler Random Forest model
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=30, maxDepth=4)
model = rf.fit(train_data)

# Make predictions
predictions = model.transform(test_data)

# Evaluate accuracy and F1 score
accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
f1_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

accuracy = accuracy_evaluator.evaluate(predictions)
f1_score = f1_evaluator.evaluate(predictions)

# Print results
print(f"Adjusted Random Forest Model Accuracy: {accuracy}")
print(f"Adjusted Random Forest F1-Score: {f1_score}")
predictions.select("label", "prediction", "probability").show(10)




Adjusted Random Forest Model Accuracy: 0.6936936936936937
Adjusted Random Forest F1-Score: 0.6931366790953475
+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|    1|       1.0|[0.38004012540987...|
|    1|       1.0|[0.31261827774027...|
|    1|       0.0|[0.70273127621111...|
|    1|       1.0|[0.31261827774027...|
|    1|       0.0|[0.66936887746116...|
|    1|       1.0|[0.35547352666769...|
|    1|       1.0|[0.27133635900283...|
|    1|       1.0|[0.27133635900283...|
|    1|       1.0|[0.31261827774027...|
|    1|       1.0|[0.31261827774027...|
+-----+----------+--------------------+
only showing top 10 rows



REAL TIME FRAUD DETECTION SIMULATION AND VISUALIZATION OF RESULTS

In [100]:
import plotly.express as px
import plotly.graph_objects as go
import pandas as pd
import time
from kafka import KafkaConsumer
import json

# Kafka Consumer Configuration
consumer = KafkaConsumer(
    'fraud-detection',
    bootstrap_servers='ec2-3-16-55-19.us-east-2.compute.amazonaws.com:9092',
    value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)

# Initialize counters for fraud and non-fraud
fraud_count = 0
non_fraud_count = 0
live_data = {"Time": [], "Fraud Count": [], "Non-Fraud Count": []}

# Define color mapping for Pie Chart
color_map = {
    "Fraudulent": "red",         # Red for Fraudulent transactions
    "Non-Fraudulent": "green"    # Green for Non-Fraudulent transactions
}

# Real-time Visualization Loop
for i, message in enumerate(consumer):
    record = message.value
    df = spark.createDataFrame([record])
    df_r = pipeline.fit(df).transform(df)
    df_r = df_r.drop(*cat_var)
    label = df_r['is_fraud']
    df_r = df_r.drop('first-encoded','last-encoded','lat','long','merch_lat','merch_long','is_fraud')
    df_t = df_r.drop('is_fraud')
    relevant_columns = ['city_pop', 'category-encoded', 'gender-encoded']
    df_t = df_t.select(relevant_columns)
    feature_columns = ['city_pop', 'category-encoded', 'gender-encoded']
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    feature_vector = assembler.transform(df_t)
    pred = model.transform(feature_vector)

    # Extract prediction value
    prediction_row = pred.select("prediction").head()
    if prediction_row:
        value = prediction_row["prediction"]
    else:
        value = None
    if value == 1.0:
        fraud_count += 1
    elif value == 0.0:
        non_fraud_count += 1
    else:
        print("Unknown prediction value:", value)

    # Update live data
    current_time = time.strftime("%H:%M:%S")
    live_data["Time"].append(current_time)
    live_data["Fraud Count"].append(fraud_count)
    live_data["Non-Fraud Count"].append(non_fraud_count)

    # Create Pie Chart
    pie_data = pd.DataFrame({
        "Category": ["Fraudulent", "Non-Fraudulent"],
        "Count": [fraud_count, non_fraud_count]
    })
    pie_chart = px.pie(
        pie_data,
        names="Category",
        values="Count",
        title="Fraudulent vs Non-Fraudulent Transactions",
        color="Category",
        color_discrete_map=color_map
    )

    # Create Line Chart
    line_df = pd.DataFrame(live_data)
    line_chart = go.Figure()
    line_chart.add_trace(go.Scatter(
        x=line_df["Time"],
        y=line_df["Fraud Count"],
        mode="lines+markers",
        name="Fraudulent",
        line=dict(color='red')
    ))
    line_chart.add_trace(go.Scatter(
        x=line_df["Time"],
        y=line_df["Non-Fraud Count"],
        mode="lines+markers",
        name="Non-Fraudulent",
        line=dict(color='green')
    ))
    line_chart.update_layout(
        title="Live Trend of Fraudulent Transactions",
        xaxis_title="Time",
        yaxis_title="Transaction Count"
    )

    # Display Charts
    if i % 10 == 0:  # Update display every 10 messages
        print("Updated Visualizations:")
        pie_chart.show()
        line_chart.show()




Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


Updated Visualizations:


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [102]:
import streamlit as st
import plotly.express as px
import plotly.graph_objects as go
import pandas as pd
import time
from kafka import KafkaConsumer
import json
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("FraudDetectionDashboard").getOrCreate()

# Initialize Streamlit app
st.title("Live Fraud Detection Dashboard")

# Create placeholders for the charts
pie_placeholder = st.empty()
line_placeholder = st.empty()

# Initialize counters for fraud and non-fraud
fraud_count = 0
non_fraud_count = 0
live_data = {"Time": [], "Fraud Count": [], "Non-Fraud Count": []}

# Define color mapping for Pie Chart
color_map = {
    "Fraudulent": "red",         # Red for Fraudulent transactions
    "Non-Fraudulent": "green"    # Green for Non-Fraudulent transactions
}

# Kafka Consumer Configuration
consumer = KafkaConsumer(
    'fraud-detection',
    bootstrap_servers='ec2-3-16-55-19.us-east-2.compute.amazonaws.com:9092',
    value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)

# Assuming 'pipeline' and 'model' are already defined and loaded
# For example:
# pipelineModel = PipelineModel.load("/path/to/pipeline_model")
# model = RandomForestClassificationModel.load("/path/to/random_forest_model")

# Replace the above comments with your actual pipeline and model loading code
# Example:
# from pyspark.ml.classification import RandomForestClassificationModel
# from pyspark.ml import PipelineModel
# pipelineModel = PipelineModel.load("/content/drive/My Drive/encoding_pipeline_model")
# model = RandomForestClassificationModel.load("/content/drive/My Drive/Random_forest_model_sdf")

# Placeholder lists
val = []
gt = []
lbl = []

# Real-time Visualization Loop
# To prevent blocking Streamlit, use Streamlit's experimental features or threading
# Here, we'll use a simple loop with a slight delay for demonstration purposes

st.info("Starting real-time data processing...")

for i, message in enumerate(consumer):
    record = message.value
    val.append(record)
    #print(f"Processing: {record}")
    df = spark.createDataFrame([record])
    df_r = pipeline.fit(df).transform(df)
    df_r = df_r.drop(*cat_var)
    label = df_r['is_fraud']
    gt.append(label)
    df_r = df_r.drop('first-encoded','last-encoded','lat','long','merch_lat','merch_long','is_fraud')
    # df_r.show()
    df_t = df_r.drop('is_fraud')
    relevant_columns = ['city_pop', 'category-encoded', 'gender-encoded']
    df_t = df_t.select(relevant_columns)
# assembler = VectorAssembler(inputCols=df_t.columns, outputCol="features")
# data = assembler.transform(df_t)
    # feature_vector = Vectors.dense(list(df_t.first().asDict().values())) #
    feature_columns = ['city_pop', 'category-encoded', 'gender-encoded']
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    feature_vector = assembler.transform(df_t)
    pred = model.transform(feature_vector)
    # predictions.select("label", "prediction", "probability").show(10
    # feature vector should be in the form of training data - fix this
    # assembler = VectorAssembler(inputCols=df_r.columns, outputCol="features")
    # data = assembler.transform(data)
    # feature_vector = Vectors.dense(list(df_r.first().asDict().values())) # Convert dict_values to a list # Get the values from the first (and only) row of df_r as a list
    #Pass the Vector object
    # pred = model.transform(feature_vector.select("features").collect()[0][0])
    lbl.append(pred)
    # value = pred.select("prediction").head()[0]
    # Append prediction to lbl

    # Extract prediction value
    prediction_row = pred.select("prediction").head()
    if prediction_row:
        value = prediction_row["prediction"]
    else:
        value = None
    if value == 1.0:
        fraud_count += 1
    elif value == 0.0:
        non_fraud_count += 1
    else:
        print("Unknown prediction value:", value)


    # Update live data
    current_time = time.strftime("%H:%M:%S")
    live_data["Time"].append(current_time)
    live_data["Fraud Count"].append(fraud_count)
    live_data["Non-Fraud Count"].append(non_fraud_count)

    # Create Pie Chart
    pie_data = pd.DataFrame({
        "Category": ["Fraudulent", "Non-Fraudulent"],
        "Count": [fraud_count, non_fraud_count]
    })
    pie_chart = px.pie(
        pie_data,
        names="Category",
        values="Count",
        title="Fraudulent vs Non-Fraudulent Transactions",
        color="Category",
        color_discrete_map=color_map
    )

    # Create Line Chart
    line_df = pd.DataFrame(live_data)
    line_chart = go.Figure()
    line_chart.add_trace(go.Scatter(
        x=line_df["Time"],
        y=line_df["Fraud Count"],
        mode="lines+markers",
        name="Fraudulent",
        line=dict(color='red')
    ))
    line_chart.add_trace(go.Scatter(
        x=line_df["Time"],
        y=line_df["Non-Fraud Count"],
        mode="lines+markers",
        name="Non-Fraudulent",
        line=dict(color='green')
    ))
    line_chart.update_layout(
        title="Live Trend of Fraudulent Transactions",
        xaxis_title="Time",
        yaxis_title="Transaction Count"
    )

    # Update Charts in Streamlit
    if i % 10 == 0:  # Update display every 10 messages
        pie_placeholder.plotly_chart(pie_chart, use_container_width=True)
        line_placeholder.plotly_chart(line_chart, use_container_width=True)
        st.write(f"Processed {i+1} messages.")

    # Optional: Add a slight delay to prevent overloading
    time.sleep(0.1)  # Adjust as necessary




NoBrokersAvailable: NoBrokersAvailable