# Import Dependencies

In [4]:
!pip install matplotlib

Collecting matplotlib
  Downloading matplotlib-3.9.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.metadata (11 kB)
Collecting contourpy>=1.0.1 (from matplotlib)
  Downloading contourpy-1.2.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.metadata (5.8 kB)
Collecting cycler>=0.10 (from matplotlib)
  Downloading cycler-0.12.1-py3-none-any.whl.metadata (3.8 kB)
Collecting fonttools>=4.22.0 (from matplotlib)
  Downloading fonttools-4.52.4-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.metadata (161 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m161.7/161.7 kB[0m [31m1.2 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting kiwisolver>=1.3.1 (from matplotlib)
  Downloading kiwisolver-1.4.5-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.metadata (6.4 kB)
Collecting pillow>=8 (from matplotlib)
  Downloading pillow-10.3.0-cp311-cp311-manylinux_2_28_aarch64.whl.metadata (9.2 kB)
Collecting pyparsing>=2.3.

In [5]:
import pandas as pd
import json

In [6]:
import matplotlib.pyplot as plt

In [7]:
import numpy as np

In [8]:
# %load_ext nb_black
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [109]:
from pyspark.sql.functions import current_timestamp, count
from pyspark.sql.window import Window

In [9]:
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/31 02:52:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [10]:
from pyspark.sql.types import StructType, StructField, StringType, LongType

schema = StructType([
    StructField("account-id", LongType(), True),
    StructField("action", StringType(), True),
    StructField("az-id", StringType(), True),
    StructField("dstaddr", StringType(), True),
    StructField("dstport", LongType(), True),
    StructField("srcaddr", StringType(), True),
    StructField("srcport", LongType(), True),
    StructField("start", LongType(), True),
    StructField("end", LongType(), True),
    StructField("log-status", StringType(), True),
    StructField("packets", LongType(), True),
    StructField("protocol", LongType(), True),
    StructField("region", StringType(), True),
    StructField("subnet-id", StringType(), True),
    StructField("tcp-flags", LongType(), True),
    StructField("traffic-path", StringType(), True),
    StructField("type", StringType(), True),
    StructField("vpc-id", StringType(), True)
])



In [11]:
streaming = (
    spark.readStream.schema(schema)
    .option("maxFilesPerTrigger", 1)
    .csv("data/vpclog/", schema=schema, header = True, sep = ' ')
)

In [12]:
from pyspark.sql.functions import from_unixtime, col, to_date, to_timestamp, date_format


In [13]:
protocol_dict = {
    '1': 'ICMP',
    '2': 'IGMP', 
    '3': 'GGP',
    '4': 'IP-in-IP',
    '6': 'TCP',
    '8': 'EGP',
    '9': 'IGP',
    '17': 'UDP',
    '41': 'IPv6',
    '47': 'GRE',
    '50': 'ESP',
    '51': 'AH',
    '58': 'ICMPv6',
    '88': 'EIGRP',
    '89': 'OSPF',
    '92': 'MSP',
    '103': 'PIM',
    '112': 'VRRP',
    '115': 'L2TP',
    '132': 'SCTP',
    '136': 'UDPLite'
}

In [20]:
from pyspark.sql.functions import when, col

# Convert the 'start' and 'end' columns to timestamps
cleaned_streaming = streaming.withColumn("start", from_unixtime("start").cast("timestamp")) \
                             .withColumn("end", from_unixtime("end").cast("timestamp"))\
                             .withColumn("protocol", 
                                         when(col("protocol") == "1", "ICMP")
                                         .when(col("protocol") == "2", "IGMP")
                                         .when(col("protocol") == "3", "GGP")
                                         .when(col("protocol") == "4", "IP-in-IP")
                                         .when(col("protocol") == "6", "TCP")
                                         .when(col("protocol") == "8", "EGP")
                                         .when(col("protocol") == "9", "IGP")
                                         .when(col("protocol") == "17", "UDP")
                                         .when(col("protocol") == "41", "IPv6")
                                         .when(col("protocol") == "47", "GRE")
                                         .when(col("protocol") == "50", "ESP")
                                         .when(col("protocol") == "51", "AH")
                                         .when(col("protocol") == "58", "ICMPv6")
                                         .when(col("protocol") == "88", "EIGRP")
                                         .when(col("protocol") == "89", "OSPF")
                                         .when(col("protocol") == "92", "MSP")
                                         .when(col("protocol") == "103", "PIM")
                                         .when(col("protocol") == "112", "VRRP")
                                         .when(col("protocol") == "115", "L2TP")
                                         .when(col("protocol") == "132", "SCTP")
                                         .when(col("protocol") == "136", "UDPLite")
                                         .otherwise("Unknown"))

# Extract the date and time components
cleaned_streaming = cleaned_streaming.withColumn("start_date", to_date("start")) \
                                    .withColumn("end_date", to_date("end")) \
                                    .withColumn("start_time", date_format("start", "HH:mm:ss")) \
                                    .withColumn("end_time", date_format("end", "HH:mm:ss"))

In [21]:
from pyspark.sql.functions import col, when, split, substring

# Create the boolean conditions
accepted_idx = (col("action") == "ACCEPT")
rejected_idx = (col("action") == "REJECT")

dst_ip = split(col("dstaddr"), "\\.")
src_ip = split(col("srcaddr"), "\\.")
dst_idx = (substring(col("dstaddr"), 1, 4) == "10.0")
src_idx = (substring(col("srcaddr"), 1, 4) == "10.0")

in_idx = ~dst_idx & src_idx
out_idx = ~src_idx & dst_idx
local_idx = src_idx & dst_idx



In [22]:
# Apply the boolean conditions to the DataFrame|
indexed_streaming = cleaned_streaming.withColumn("accepted_idx", accepted_idx) \
                                    .withColumn("rejected_idx", rejected_idx) \
                                    .withColumn("out_idx", out_idx) \
                                    .withColumn("in_idx", in_idx) \
                                    .withColumn("local_idx", local_idx)

In [79]:


# Define the streaming DataFrame with a watermark
local_traffic = indexed_streaming.filter(indexed_streaming.local_idx == True) \
                .withColumn("timestamp", current_timestamp()) \
                .withWatermark("timestamp", "1 minute") \
                .groupBy("timestamp", "start_time") \
                .count()

accepted_in_traffic = indexed_streaming.filter(indexed_streaming.in_idx == True).filter(indexed_streaming.accepted_idx == True) \
                                                .withColumn("timestamp", current_timestamp()) \
                                                .withWatermark("timestamp", "1 minute") \
                                                .groupBy('start_time', 'timestamp').count()

rejected_in_traffic = indexed_streaming.filter(indexed_streaming.in_idx == True).filter(indexed_streaming.rejected_idx == True)\
                                                .withColumn("timestamp", current_timestamp()) \
                                                .withWatermark("timestamp", "1 minute") \
                                                .groupBy('start_time', 'timestamp').count()

accepted_out_traffic = indexed_streaming.filter(indexed_streaming.out_idx == True).filter(indexed_streaming.accepted_idx == True)\
                                                .withColumn("timestamp", current_timestamp()) \
                                                .withWatermark("timestamp", "1 minute") \
                                                .groupBy('start_time', 'timestamp').count()


rejected_out_traffic = indexed_streaming.filter(indexed_streaming.out_idx == True).filter(indexed_streaming.rejected_idx == True)\
                                                .withColumn("timestamp", current_timestamp()) \
                                                .withWatermark("timestamp", "1 minute") \
                                                .groupBy('start_time', 'timestamp').count()

In [80]:
top10_in_traffic = indexed_streaming.filter(indexed_streaming.in_idx == True).filter(indexed_streaming.accepted_idx == True)\
                                                .withColumn("timestamp", current_timestamp()) \
                                                .withWatermark("timestamp", "1 minute") \
                                                .groupBy('srcaddr', 'timestamp').count()
top10_out_traffic = indexed_streaming.filter(indexed_streaming.out_idx == True).filter(indexed_streaming.accepted_idx == True)\
                                                .withColumn("timestamp", current_timestamp()) \
                                                .withWatermark("timestamp", "1 minute") \
                                                .groupBy('srcaddr', 'timestamp').count()

In [81]:
protocol_stream = indexed_streaming.select('protocol')\
                                    .withColumn("timestamp", current_timestamp()) \
                                    .withWatermark("timestamp", "1 minute") \
                                    .groupBy('protocol', 'timestamp').count()

In [94]:
# protocol_query = (
#     protocol_stream.writeStream.queryName("protocol_counts")
#     .format("console")
#     .outputMode("update")
#     .start()
# )


# Save the streaming DataFrame to a CSV file
protocol_query = (
    protocol_stream.writeStream
    .queryName("protocol_counts")
    .format("csv")
    .option("path", "streaming/protocol/data")
    .option("checkpointLocation", "streaming/protocol/checkpoint/")
    .trigger(processingTime="1 minute")
    .outputMode("append")
    .start()
)

# protocol_query.awaitTermination()



24/05/31 03:52:48 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

In [97]:
# local_query = (
#     local_traffic.writeStream.queryName("local_counts")
#     .format("console")
#     .outputMode("update")
#     .start()
# )

# Save the streaming DataFrame to a CSV file
local_query = (
    local_traffic.writeStream
    .queryName("local_counts")
    .format("csv")
    .option("path", "streaming/local/data")
    .option("checkpointLocation", "streaming/local/checkpoint/")
    .outputMode("append")
    .start()
)






24/05/31 03:53:46 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

In [96]:
# local_query.awaitTermination()
local_query.stop()

In [93]:
protocol_query.stop()

In [98]:
# accepted_in_query = (
#     accepted_in_traffic.writeStream.queryName("accepted_in_counts")
#     .format("console")
#     .outputMode("update")
#     .start()
# )

# Save the streaming DataFrame to a CSV file
accepted_in_query = (
    accepted_in_traffic.writeStream
    .queryName("accepted_in_counts")
    .format("csv")
    .option("path", "streaming/accepted_in/data")
    .option("checkpointLocation", "streaming/accepted_in/checkpoint/")
    .outputMode("append")
    .start()
)

# local_query.awaitTermination()



24/05/31 03:54:34 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

In [None]:
accepted_in_query.stop()

In [99]:
# rejected_in_query = (
#     rejected_in_traffic.writeStream.queryName("rejected_in_counts")
#     .format("console")
#     .outputMode("update")
#     .start()
# )

# Save the streaming DataFrame to a CSV file
rejected_in_query = (
    rejected_in_traffic.writeStream
    .queryName("rejected_in_counts")
    .format("csv")
    .option("path", "streaming/rejected_in/data")
    .option("checkpointLocation", "streaming/rejected_in/checkpoint/")
    .outputMode("append")
    .start()
)

# local_query.awaitTermination()



24/05/31 03:55:19 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

In [100]:
# accepted_out_query = (
#     accepted_out_traffic.writeStream.queryName("accepted_out_counts")
#     .format("console")
#     .outputMode("update")
#     .start()
# )

# Save the streaming DataFrame to a CSV file
accepted_out_query = (
    accepted_out_traffic.writeStream
    .queryName("accepted_out_counts")
    .format("csv")
    .option("path", "streaming/accepted_out/data")
    .option("checkpointLocation", "streaming/accepted_out/checkpoint/")
    .outputMode("append")
    .start()
)

# local_query.awaitTermination()



24/05/31 03:56:26 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

In [101]:
# rejected_out_query = (
#     rejected_out_traffic.writeStream.queryName("rejected_out_counts")
#     .format("console")
#     .outputMode("update")
#     .start()
# )

# Save the streaming DataFrame to a CSV file
rejected_out_query = (
    rejected_out_traffic.writeStream
    .queryName("rejected_out_counts")
    .format("csv")
    .option("path", "streaming/rejected_out/data")
    .option("checkpointLocation", "streaming/rejected_out/checkpoint/")
    .outputMode("append")
    .start()
)


# local_query.awaitTermination()



24/05/31 03:57:12 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

In [None]:
rejected_in_query.stop()

In [None]:
rejected_out_query.stop()

In [102]:
# top10_in_query = (
#     top10_in_traffic.writeStream.queryName("top10_in_counts")
#     .format("console")
#     .outputMode("update")
#     .start()
# )

# Save the streaming DataFrame to a CSV file
top10_in_query = (
    top10_in_traffic.writeStream
    .queryName("top10_in_counts")
    .format("csv")
    .option("path", "streaming/top10_in/data")
    .option("checkpointLocation", "streaming/top10_in/checkpoint/")
    .outputMode("append")
    .start()
)

# local_query.awaitTermination()



24/05/31 03:58:00 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

In [None]:
top10_in_query.stop()

In [103]:
# top10_out_query = (
#     top10_out_traffic.writeStream.queryName("top10_out_counts")
#     .format("console")
#     .outputMode("update")
#     .start()
# )

# Save the streaming DataFrame to a CSV file
top10_out_query = (
    top10_out_traffic.writeStream
    .queryName("top10_out_counts")
    .format("csv")
    .option("path", "streaming/top10_out/data")
    .option("checkpointLocation", "streaming/top10_out/checkpoint/")
    .outputMode("append")
    .start()
)

# local_query.awaitTermination()



24/05/31 03:58:53 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

In [None]:
top10_out_query.stop()

In [111]:
# Transform the streaming DataFrame into a streaming DataFrame with two fields: type_of_packet and count
packet_type_count = indexed_streaming \
    .withColumn("type_of_packet", when(col("in_idx") == "true", "inbound")
                .when(col("out_idx") == "true", "outbound")
                .when(col("local_idx") == "true", "local")
                .otherwise("unknown")) \
    .withColumn("timestamp", current_timestamp()) \
    .withWatermark("timestamp", "1 minute") \
    .groupBy("type_of_packet", "timestamp") \
    .agg(count("*").alias("count")) \
    .select("type_of_packet", "count")

In [112]:
# packet_type_query = (
#     packet_type_count.writeStream.queryName("packet_type_counts")
#     .format("console")
#     .outputMode("update")
#     .start()
# )

# Save the streaming DataFrame to a CSV file
packet_type_query = (
    packet_type_count.writeStream
    .queryName("packet_type_counts")
    .format("csv")
    .option("path", "streaming/packet_type/data")
    .option("checkpointLocation", "streaming/packet_type/checkpoint/")
    .outputMode("append")
    .start()
)

# local_query.awaitTermination()



24/05/31 04:04:58 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

In [None]:
packet_type_query.stop()

In [142]:
protocol_dict = {
    '1': 'ICMP',
    '2': 'IGMP', 
    '3': 'GGP',
    '4': 'IP-in-IP',
    '6': 'TCP',
    '8': 'EGP',
    '9': 'IGP',
    '17': 'UDP',
    '41': 'IPv6',
    '47': 'GRE',
    '50': 'ESP',
    '51': 'AH',
    '58': 'ICMPv6',
    '88': 'EIGRP',
    '89': 'OSPF',
    '92': 'MSP',
    '103': 'PIM',
    '112': 'VRRP',
    '115': 'L2TP',
    '132': 'SCTP',
    '136': 'UDPLite'
}