In [4]:
import pandas as pd
import numpy as np

import itertools
import multiprocessing as mp

import findspark
import pyspark

from pyspark.sql import SparkSession
from pyspark import SparkContext

import matplotlib.pyplot as plt
from matplotlib import style
style.use("ggplot")

import scipy.cluster.hierarchy as sch
from sklearn.cluster import KMeans

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.sql.types import StructType, StructField, NumericType
from pyspark.sql.functions import col #To drop stuff

#Data into Spark
spark = SparkSession.builder     .master('local[*]')     .config("spark.driver.memory", "15g")     .appName('BOUN_TCP_Anon')     .getOrCreate()
spark_df = spark.read.csv(
    path=r'C:\Users\Cayo\Downloads\s5117135_Project_Code\s5117135_Project_Code\BOUN_TCP_Format.csv',
    sep=",",
    header=True,
    quote='"',
    inferSchema=True,
)

RuntimeError: Java gateway process exited before sending its port number

In [5]:
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col


#Index all IP address with unique INT value from String in new column
indexer_dst = StringIndexer(inputCol="Dst_IP", outputCol="Dst_IP_index")
indexer_model = indexer_dst.fit(spark_df)
indexed_data_dst= indexer_model.transform(spark_df)

indexer_src = StringIndexer(inputCol="Src_ip", outputCol="Src_IP_index")
indexer_model_src = indexer_src.fit(indexed_data_dst)
indexed_data_src = indexer_model_src.transform(indexed_data_dst)

indexer_ttl = StringIndexer(inputCol="TTL", outputCol="TTL_index")
indexer_model_ttl = indexer_ttl.fit(indexed_data_src)
indexed_data = indexer_model_ttl.transform(indexed_data_src)

#INDEX value of victim IP 10.50.199.86 is == 0
indexed_data.filter(col('Dst_ip').isin(['10.50.199.86']) == True).show(5)
indexed_data.show(5)

AttributeError: Cannot load _jvm from SparkContext. Is SparkContext initialized?

In [None]:
#Drop column
#'ACK', 'TTL_index', 'Dst_Port'

index_col_drop=['Time','Frame_No', 'Src_Port', 'Src_IP_index', 'Pro', 'RST', 'Src_ip', 'Dst_IP', 'TTL']
indexed_data = indexed_data.drop(*index_col_drop)
indexed_data.show(5)
indexed_data.printSchema()

In [None]:
#Create features from data
#Vectorizes data columns into vecotr column
from pyspark.ml.feature import VectorAssembler
spark_df.columns

features_columns = ('SYN','Frame_lng', 'Dst_IP_index', 'TTL_index', 'ACK', 'Dst_Port') #The columns to vecotrize intro features collumn

assemble=VectorAssembler(inputCols=features_columns, outputCol='features') #Vectorize

assembled_data=assemble.transform(indexed_data) #Dataframe with vectorized column
assembled_data.show(5)
print(assemble)

In [None]:
#Standardize data
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import MinMaxScaler # min=0, max=1, (Parameters for scaling)

scaler=StandardScaler(inputCol='features',outputCol='standardized')
scaler_model=scaler.fit(assembled_data)
scaled_data =scaler_model.transform(assembled_data)
scaled_data.show(5)
scaled_data.printSchema()


In [None]:
#THIS PUTS STANDARDIZED DATA BACK INTO THE COLUMNS FROM ARRAY
import pyspark.sql.functions as F
from pyspark.ml.linalg import Vectors
from pyspark.ml.functions import vector_to_array
from pyspark.mllib.linalg import Vectors as OldVectors
from pyspark.sql.functions import explode, explode_outer
from pyspark.sql.functions import lit

#SELECT STANDARIZED COLUMN
spkScaled = scaled_data.select(vector_to_array('standardized').alias('standardized'))
spkScaled.collect()
spkScaled.show(5)

In [None]:
#2D object to be read by PCA
stand_list = list(spkScaled.select('standardized').toPandas()['standardized'])
stand_list

In [None]:
#Compress data into principle componenets

from sklearn.decomposition import PCA
pca = PCA()
pca.fit(stand_list)

In [None]:
pca.explained_variance_ratio_

In [None]:
#Plot data variance on graph to find optimal PCA value

plt.figure(figsize = (9,10))
plt.plot(range(1,7), pca.explained_variance_ratio_.cumsum(), marker = 'o', linestyle = '--')
plt.title('Explained Variance by Components')
plt.xlabel('Number of Components')
plt.ylabel('Cumulative Explained Variance')

In [None]:
#We choose 4 because it is over 80% on the graph
pca = PCA(n_components = 5)

In [None]:
pca.fit(stand_list) #Standardized list
pca.transform(stand_list) #Standardized list
scores_pca = pca.transform(stand_list)
scores_pca #PCA Compnoents

In [None]:

from sklearn.cluster import KMeans
wcss = []

for i in range(1,21):
    kmeans_pca = KMeans(n_clusters = i, init = 'k-means++', random_state = 42) #random state = reproducable result
    kmeans_pca.fit(scores_pca)
    wcss.append(kmeans_pca.inertia_)
print(wcss)

In [None]:
#The elbow method identified 6 clusters as the optimal number.
#Silhoutte score said 4 = Silhouette Score: 0.8828039578166229
#Same random state as before
kmeans_pca = KMeans(n_clusters = 6, init = 'k-means++', random_state = 42)

#Fit data to Kmeans model
kmeans_pca.fit(scores_pca)

In [None]:
#PUTTING ALL PCA AND KMEANS LABELS INTO LIST READY TO BE ADDED TO DATAFRAME
col1 = []
for i in range(0, len(scores_pca)):
    a = scores_pca[i][0]
    col1.append(a)   

col2 = []
for i in range(0, len(scores_pca)):
    b = scores_pca[i][1]
    col2.append(b)  

col3 = []
for i in range(0, len(scores_pca)):
    c = scores_pca[i][2]
    col3.append(c)

col4 = []
for i in range(0, len(scores_pca)):
    d = scores_pca[i][3]
    col4.append(d)
    
col5 = []
for i in range(0, len(scores_pca)):
    e = scores_pca[i][4]
    col5.append(e)

col6 = []
for i in range(0, len(scores_pca)):
    f = kmeans_pca.labels_[i]
    col6.append(f)

#LIST OF PCA COLS AND KMEANS LABELS READY FOR DATA FRAME
pca_dict = {
    'PCA_1': col1,
    'PCA_2': col2,
    'PCA_3': col3,
    'PCA_4': col4,
    'PCA_5': col5,
    'label': col6
}
pca_dict

In [None]:
#LABELLED PCA COLS IN DATA FRAME (PANDAS)
pca_dict_df = pd.DataFrame(pca_dict)

#READ IN PREVIOUS DATA TO APPLY LABELS BY MERGING
panda_df = pd.read_csv(r'C:\Users\Cayo\Downloads\s5117135_Project_Code\s5117135_Project_Code\BOUN_TCP_Format.csv')

#MERGE
joined_panda = panda_df.join(pca_dict_df)

#OUTPUT THE MERGE
joined_panda.to_csv(r'C:\Users\Cayo\Downloads\s5117135_Project_Code\s5117135_Project_Code\BOUN_TCP_LABELS.csv', index=False)

#READ INTO THE CSV FILE FROM THE MERGE
Labelled_data = spark.read.csv(
    path=r'C:\Users\Cayo\Downloads\s5117135_Project_Code\s5117135_Project_Code\BOUN_TCP_LABELS.csv',
    sep=",",
    header=True,
    quote='"',
    inferSchema=True,
)

In [None]:
#COUNT DATAPOINT IN EACH CLUSTER
Labelled_data.groupBy('label').count().show()

In [None]:
#Filters for value in column (Part of testing)
Labelled_data.filter(col('Dst_ip').isin(['10.50.199.86']) == True).show(3)
Labelled_data.filter(col('label').isin(['0']) == True).show(3)

In [None]:
#Counts all the attack packets in each cluster
IP_filter = Labelled_data.filter(col('Dst_ip').isin(['10.50.199.86']) == True)
IP_filter0 = IP_filter.filter(col('label').isin([0]) == True).count()
IP_filter1 = IP_filter.filter(col('label').isin([1]) == True).count()
IP_filter2 = IP_filter.filter(col('label').isin([2]) == True).count()
IP_filter3 = IP_filter.filter(col('label').isin([3]) == True).count()
IP_filter4 = IP_filter.filter(col('label').isin([4]) == True).count()
IP_filter5 = IP_filter.filter(col('label').isin([5]) == True).count()
IP_Filter_Array = [IP_filter0,IP_filter1,IP_filter2,IP_filter3,IP_filter4,IP_filter5]
IP_Filter_Array