In [None]:
!pip install kafka-python
!pip install pyspark
!pip install confluent-kafka
!pip install streamlit
!pip install kafka

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/246.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m245.8/246.5 kB[0m [31m8.4 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m246.5/246.5 kB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.0.2
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building whe

In [32]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from confluent_kafka import Consumer, KafkaError, Producer
import streamlit as st
import matplotlib.pyplot as plt
from datetime import datetime
import time
import random




In [None]:

# Start SparkSession
spark = SparkSession.builder \
    .appName("Real-Time Network Traffic Analysis for Telecommunications") \
    .getOrCreate()

# Configure Kafka connection details
bootstrap_servers = 'pkc-6ojv2.us-west4.gcp.confluent.cloud:90922'
sasl_username = 'DTD7CDW7KYC6A5LW'
sasl_password = 'apbM5BtlA57GiGcbBmgTZbT+tRxgVqyamXb6uhpBZm/z1RFm/E7F8buyelzboYs5'
kafka_topic = 'network-traffic'
processed_topic = 'processed-data'

# Define schema for the data
schema = StructType([
    StructField("source_ip", StringType(), True),
    StructField("destination_ip", StringType(), True),
    StructField("bytes_sent", IntegerType(), True),
    StructField("event_time", TimestampType(), True)
])

# Create Kafka consumer configuration
conf = {
    'bootstrap.servers': bootstrap_servers,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'PLAIN',
    'sasl.username': sasl_username,
    'sasl.password': sasl_password,
    'group.id': 'network-traffic-group',
    'auto.offset.reset': 'earliest'
}

# Create Kafka consumer
consumer = Consumer(conf)
producer = Producer(conf)

# Subscribe to the Kafka topic
consumer.subscribe([kafka_topic])

# Read data from Kafka and perform real-time visualization
def read_from_kafka():
    # Create a Streamlit app
    st.title("Real-Time Network Traffic Analysis for Telecommunications")

    # Create a plot to visualize processed data
    fig, ax = plt.subplots()

    # Initialize an empty list to store processed data and event times
    processed_data = []
    event_times = []

    # Function to process incoming Kafka messages and update the plot
    def process_message(message):
        nonlocal processed_data, event_times
        if message is None:
            return
        if message.error():
            if message.error().code() == KafkaError._PARTITION_EOF:
                return
            else:
                print(f"Error: {message.error()}")
                return

        value = message.value().decode('utf-8')
        data = value.split(',')
        source_ip = data[0]
        destination_ip = data[1]
        bytes_sent = int(data[2])
        event_time = datetime.now()

        row = (source_ip, destination_ip, bytes_sent, event_time)
        df = spark.createDataFrame([row], schema=schema)

        # Perform window-based aggregations
        aggregated_df = df \
            .groupBy(window("event_time", "1 minute"), "source_ip") \
            .agg(sum("bytes_sent").alias("total_bytes_sent")) \
            .orderBy(desc("total_bytes_sent"))

        # Convert the aggregated DataFrame to Pandas DataFrame for plotting
        pd_df = aggregated_df.toPandas()

        # Process the Kafka message and update the plot
        processed_data.append(bytes_sent)
        event_times.append(event_time)
        ax.clear()
        ax.plot(event_times, processed_data)
        ax.set_xlabel("Event Time")
        ax.set_ylabel("Information")
        st.pyplot(fig)

    # Continuously read messages from Kafka topic
    while True:
        message = consumer.poll(1.0)
        if message is None:
            continue
        if message.error():
            if message.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(f"Error: {message.error()}")
                break

        process_message(message)


# Publish network traffic data to Kafka topic
def publish_to_kafka(source_ip, destination_ip, bytes_sent):
    producer.produce(kafka_topic, f"{source_ip},{destination_ip},{bytes_sent}".encode('utf-8'))
    producer.flush()


# Run the Streamlit app
if __name__ == '__main__':
    read_from_kafka()
