In [None]:
# Task 4: Individual Assessment - Comprehensive Analysis of the UNSW-NB15 Dataset

# Import necessary libraries
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyhive import hive
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count

# Function to execute a Hive query and return the result as a Pandas DataFrame
def execute_hive_query(query, conn):
    with conn.cursor() as cursor:
        cursor.execute(query)
        result = cursor.fetchall()
        columns = [desc[0] for desc in cursor.description]
    return pd.DataFrame(result, columns=columns)

# Establish Hive connection
hive_conn = hive.Connection(host='localhost', port=10000, username='hadoop')

# Initialize Spark session
spark = SparkSession.builder \
    .appName("UNSW-NB15 Individual Assessment") \
    .getOrCreate()

# Load the dataset into a Spark DataFrame
data_path = '../data/UNSW-NB15.csv'  # Adjust the path if necessary
df_spark = spark.read.csv(data_path, header=True, inferSchema=True)

# Hive Query 1: Average destination bytes (dbytes) for each service type
query1 = """
SELECT service, AVG(dbytes) AS avg_dbytes
FROM unsw_nb15
GROUP BY service
"""
hive_result1 = execute_hive_query(query1, hive_conn)
print(hive_result1)

# Visualize average destination bytes for each service type
plt.figure(figsize=(12, 8))
sns.barplot(x='service', y='avg_dbytes', data=hive_result1)
plt.title('Average Destination Bytes for Each Service Type')
plt.xlabel('Service Type')
plt.ylabel('Average Destination Bytes')
plt.xticks(rotation=90)
plt.show()

# Spark Query 1: Top 10 destination IPs by the number of connections
top_dst_ips = df_spark.groupBy("dstip").agg(count("*").alias("connections")) \
    .orderBy(col("connections").desc()).limit(10).toPandas()
print(top_dst_ips)

# Visualize the top 10 destination IPs by the number of connections
plt.figure(figsize=(12, 8))
sns.barplot(x='connections', y='dstip', data=top_dst_ips)
plt.title('Top 10 Destination IPs by Number of Connections')
plt.xlabel('Number of Connections')
plt.ylabel('Destination IP')
plt.show()

# Hive Query 2: Distribution of labels by protocol type
query2 = """
SELECT proto, label, COUNT(*) AS count
FROM unsw_nb15
GROUP BY proto, label
ORDER BY proto, label
"""
hive_result2 = execute_hive_query(query2, hive_conn)
print(hive_result2)

# Visualize the distribution of labels by protocol type
plt.figure(figsize=(14, 8))
sns.barplot(x='proto', y='count', hue='label', data=hive_result2)
plt.title('Distribution of Labels by Protocol Type')
plt.xlabel('Protocol Type')
plt.ylabel('Count')
plt.xticks(rotation=90)
plt.show()

# Spark Query 2: Average packet size for each label
avg_pkt_size = df_spark.groupBy("label").agg(avg("sttl").alias("avg_sttl"), avg("dttl").alias("avg_dttl")).toPandas()
print(avg_pkt_size)

# Visualize the average packet size for each label
plt.figure(figsize=(10, 6))
avg_pkt_size_melted = avg_pkt_size.melt(id_vars="label", var_name="type", value_name="average")
sns.barplot(x='label', y='average', hue='type', data=avg_pkt_size_melted)
plt.title('Average Packet Size (TTL) for Each Label')
plt.xlabel('Label')
plt.ylabel('Average TTL')
plt.show()

# Close the Hive connection and stop the Spark session
hive_conn.close()
spark.stop()
