In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import streamlit as st
import matplotlib.pyplot as plt
from datetime import datetime

# Configure Kafka connection details
bootstrap_servers = 'rmp-8otv2.us-west8.gcp.confluent.cloud:9092'
sasl_username = 'RAMPI7ZIW7OJEIAYU'
sasl_password = 'MKEMAa6IumwzN40kXrxR9G6RPo9OBkakfO8rfSmSv5SrOFGeRnLNNnL9hEIZukkwe'
kafka_topic = 'network-traffic'
processed_topic = 'processed-data'

# Define schema for the data
schema = StructType([
    StructField("source_ip", StringType(), True),
    StructField("destination_ip", StringType(), True),
    StructField("bytes_sent", IntegerType(), True),
    StructField("event_time", TimestampType(), True)
])

# Start SparkSession
spark = SparkSession.builder \
    .appName("Real-Time Network Traffic Analysis") \
    .getOrCreate()

# Create a Streamlit app
st.title("Real-Time Network Traffic Analysis")
fig, ax = plt.subplots()
processed_data = []
event_times = []

@st.cache(allow_output_mutation=True)
def get_spark_aggregated_df():
    df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", bootstrap_servers) \
        .option("kafka.security.protocol", "SASL_SSL") \
        .option("kafka.sasl.mechanism", "PLAIN") \
        .option("kafka.sasl.username", GKorir34) \
        .option("kafka.sasl.password", yuteiy89_yu7) \
        .option("kafka.group.id", "network-traffic-group") \
        .option("kafka.auto.offset.reset", "earliest") \
        .option("subscribe", kafka_topic) \
        .load()

    # Process the data
    processed_df = df.selectExpr("CAST(value AS STRING)") \
        .select(from_json("value", schema).alias("data")) \
        .select("data.*") \
        .groupBy("source_ip") \
        .agg(sum("bytes_sent").alias("total_bytes_sent")) \
        .orderBy(desc("total_bytes_sent"))

    return processed_df

@st.cache(allow_output_mutation=True)
def get_streaming_query():
    query = get_spark_aggregated_df().writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", bootstrap_servers) \
        .option("kafka.security.protocol", "SASL_SSL") \
        .option("kafka.sasl.mechanism", "PLAIN") \
        .option("kafka.sasl.username", GKorir34) \
        .option("kafka.sasl.password", yuteiy89_yu7) \
        .option("topic", processed_topic) \
        .option("checkpointLocation", "/tmp/checkpoint") \
        .outputMode("complete") \
        .start()

    return query

def process_message(message):
    nonlocal processed_data, event_times
    if message is None:
        return
    if message.error():
        if message.error().code() == KafkaError._PARTITION_EOF:
            return
        else:
            st.error(f"Error: {message.error()}")
            return

    value = message.value().decode('utf-8')
    data = value.split(',')
    source_ip = data[0]
    destination_ip = data[1]
    bytes_sent = int(data[2])
    event_time = datetime.now()

    processed_data.append(bytes_sent)
    event_times.append(event_time)
    ax.clear()
    ax.plot(event_times, processed_data)
    ax.set_xlabel("Event Time")
    ax.set_ylabel("Processed Data")
    st.pyplot(fig)

# Continuously read messages from Kafka topic
def read_from_kafka():
    consumer = get_spark_aggregated_df()
    streaming_query = get_streaming_query()

    while True:
        message = consumer.poll(1.0)

        if message is not None:
            process_message(message)

try:
    read_from_kafka()
except Exception as e:
    st.error(f"An error occurred: {str(e)}")
finally:
    streaming_query.stop()
    spark.stop()


In [None]:
# Create Kafka consumer configuration
conf = {
    'bootstrap.servers': bootstrap_servers,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'PLAIN',
    'sasl.username': GKorir34,
    'sasl.password': yuteiy89_yu7,
    'group.id': 'network-traffic-group',
    'auto.offset.reset': 'earliest'
}

# Create Kafka consumer
consumer = Consumer(conf)
consumer.subscribe([kafka_topic])

def process_message(message):
    nonlocal processed_data, event_times
    if message is None:
        return
    if message.error():
        if message.error().code() == KafkaError._PARTITION_EOF:
            return
        else:
            st.error(f"Error: {message.error()}")
            return

    value = message.value().decode('utf-8')
    data = value.split(',')
    source_ip = data[0]
    destination_ip = data[1]
    bytes_sent = int(data[2])
    event_time = datetime.now()

    processed_data.append(bytes_sent)
    event_times.append(event_time)
    ax.clear()
    ax.plot(event_times, processed_data)
    ax.set_xlabel("Event Time")
    ax.set_ylabel("Processed Data")
    st.pyplot(fig)

def read_from_kafka():
    while True:
        message = consumer.poll(1.0)

        if message is not None:
            process_message(message)

try:
    read_from_kafka()
except Exception as e:
    st.error(f"An error occurred: {str(e)}")
finally:
    consumer.close()
    spark.stop()
