In [None]:
!pip install pyspark

In [None]:
import numpy as np
import pandas as pd
from functools import reduce
from pyspark.sql.functions import isnan, when, count, col
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
spark = SparkSession.builder.master('local[*]').appName('TrafficAnalysisUsingPySpark').getOrCreate()
print(spark)

#Data Insert

In [None]:
DATA = "/content/drive/MyDrive/BDA project/Dataset-Unicauca-Version2-87Atts.csv"

df = spark.read.options(header='True',inferSchema='True').csv(path=DATA)
df.printSchema()

#Preprocessing

In [None]:
current_columns = df.columns

new_columns = list(map(lambda item : item.replace(" ","_").replace(".","_").upper().strip(),current_columns))
final_df = reduce(lambda data, idx: data.withColumnRenamed(current_columns[idx], new_columns[idx]), range(len(current_columns)), df)
final_df.printSchema()


In [None]:
final_df.count()

In [None]:
final_df.columns

In [None]:
sub_df = final_df.select('FLOW_ID',
 'SOURCE_IP',
 'SOURCE_PORT',
 'DESTINATION_IP',
 'DESTINATION_PORT',
 'PROTOCOL',
 'TIMESTAMP',
 'FLOW_DURATION',
 'TOTAL_FWD_PACKETS',
 'TOTAL_BACKWARD_PACKETS',
 'TOTAL_LENGTH_OF_FWD_PACKETS',
 'TOTAL_LENGTH_OF_BWD_PACKETS',
 'FLOW_BYTES_S',
 'FLOW_PACKETS_S',
 'AVERAGE_PACKET_SIZE',
 'LABEL',
 'PROTOCOLNAME')
sub_df.show(10)


In [None]:
sub_df.groupBy("PROTOCOLNAME").count().show()

In [None]:
sub_df.select('FLOW_DURATION').describe().show()

In [None]:
sub_df.select([count(when(isnan(c), c)).alias(c) for c in sub_df.columns]).show()

In [None]:
sub_df.select('PROTOCOLNAME').distinct().collect()

In [None]:
socmed = ['TWITTER','INSTAGRAM','FACEBOOK']

records= sub_df.filter(sub_df.PROTOCOLNAME.isin(socmed))


In [None]:
records_df = records.toPandas()

In [None]:
records_df

In [None]:
records_df.head()

In [None]:
records_df["PROTOCOLNAME"].value_counts().plot.bar()

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
plt.figure(figsize=(10,4))
sns.countplot(x = 'PROTOCOLNAME', data = records_df)

#ML

Decision Tree Classifier

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score

In [None]:
encoder = LabelEncoder().fit(records_df['PROTOCOLNAME'])
records_df['PROTOCOLNAME'] = encoder.fit_transform(records_df['PROTOCOLNAME'])
records_df['PROTOCOLNAME']

In [None]:

X = records_df.drop(columns = ['FLOW_ID',
 'SOURCE_IP',
 'SOURCE_PORT',
 'DESTINATION_IP',
 'DESTINATION_PORT',
 'PROTOCOL',
 'TIMESTAMP',
 'LABEL',
 'PROTOCOLNAME'])
Y = records_df['PROTOCOLNAME']
X

In [None]:

X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=3)

In [None]:
model = DecisionTreeClassifier()
model.fit(X_train,Y_train)

In [None]:
pred = model.predict(X_test)

In [None]:
accuracy_score(pred,Y_test)

Random Forest Classifier

In [None]:
from sklearn.ensemble import RandomForestClassifier
model1= RandomForestClassifier(n_estimators= 10, criterion="entropy")
model1.fit(X_train,Y_train)

In [None]:
y_pred= model1.predict(X_test)

In [None]:
accuracy_score(y_pred,Y_test)

In [None]:
from sklearn.naive_bayes import GaussianNB
classifier = GaussianNB()
classifier.fit(X_train, Y_train)

In [None]:
y_pred1= model1.predict(X_test)

In [None]:
accuracy_score(y_pred1,Y_test)

# Pipeline


Random Forest Classifier

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer,VectorAssembler
from pyspark.ml.classification import RandomForestClassifier,NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [None]:
inputColumns = ['FLOW_DURATION',	'TOTAL_FWD_PACKETS',	'TOTAL_BACKWARD_PACKETS',	'TOTAL_LENGTH_OF_FWD_PACKETS',	'TOTAL_LENGTH_OF_BWD_PACKETS',	'FLOW_BYTES_S','FLOW_PACKETS_S',	'AVERAGE_PACKET_SIZE']
outputColumn = "PROTOCOL_NAME"
indexer = StringIndexer(inputCol="PROTOCOLNAME", outputCol="PROTOCOL_NAME")
in_df = indexer.fit(records).transform(records)
in_df=in_df.drop("PROTOCOLNAME").withColumnRenamed("PROTOCOL_NAME","PROTOCOLNAME")
vector_assembler = VectorAssembler(inputCols=inputColumns, outputCol=outputColumn)
rf_model = RandomForestClassifier(labelCol="PROTOCOLNAME", featuresCol="PROTOCOL_NAME")
stages = [vector_assembler, rf_model]
pipeline1 = Pipeline(stages=stages)
in_df=in_df.drop(in_df['LABEL'])

In [None]:
train, test = in_df.randomSplit([0.7, 0.3], seed = 2018)
train

In [None]:
paramGrid_rf = ParamGridBuilder() \
    .addGrid(rf_model.numTrees, [1, 1, 1]) \
    .addGrid(rf_model.maxDepth, [1, 1, 1]) \
    .addGrid(rf_model.featureSubsetStrategy, ['auto','log2']) \
    .build()

In [None]:
crossval_rf = CrossValidator(estimator=pipeline1, estimatorParamMaps=paramGrid_rf,
                             evaluator=MulticlassClassificationEvaluator(
                                 labelCol='PROTOCOLNAME', predictionCol='prediction', metricName='accuracy'),
                             numFolds=2)

In [None]:
cvModel_rf =crossval_rf.fit(train)
best_model_rf = cvModel_rf.bestModel

In [None]:
model = pipeline1.fit(train)

In [None]:
y_pred = model.transform(test)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import classification_report
evaluator = MulticlassClassificationEvaluator(labelCol="PROTOCOLNAME", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(y_pred)
pd = y_pred.select("PROTOCOLNAME", "prediction").toPandas()
true_labels = pd["PROTOCOLNAME"].tolist()
predicted_labels = pd["prediction"].tolist()
report = classification_report(true_labels, predicted_labels)
print(report)

In [None]:
predictions_rf = best_model_rf.transform(test)
accuracy_rf = evaluator.evaluate(predictions_rf)
print(f"Test Accuracy for Random Forest: {accuracy_rf:.2f}")

Decision Tree Classifier

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer,VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [None]:
inputColumns = ['FLOW_DURATION',	'TOTAL_FWD_PACKETS',	'TOTAL_BACKWARD_PACKETS',	'TOTAL_LENGTH_OF_FWD_PACKETS',	'TOTAL_LENGTH_OF_BWD_PACKETS',	'FLOW_BYTES_S','FLOW_PACKETS_S',	'AVERAGE_PACKET_SIZE']
outputColumn = "PROTOCOL_NAME"
indexer = StringIndexer(inputCol="PROTOCOLNAME", outputCol="PROTOCOL_NAME")
in_df = indexer.fit(records).transform(records)
in_df=in_df.drop("PROTOCOLNAME").withColumnRenamed("PROTOCOL_NAME","PROTOCOLNAME")
vector_assembler = VectorAssembler(inputCols=inputColumns, outputCol=outputColumn)
dt_model = DecisionTreeClassifier(labelCol="PROTOCOLNAME", featuresCol="PROTOCOL_NAME")
stages = [vector_assembler, dt_model]
pipeline2 = Pipeline(stages=stages)
in_df=in_df.drop(in_df['LABEL'])

In [None]:
train1, test1 = in_df.randomSplit([0.7, 0.3], seed = 2018)
train1

In [None]:
paramGrid_dt = ParamGridBuilder() \
    .addGrid(dt_model.maxDepth, [3, 5, 7]) \
    .addGrid(dt_model.minInstancesPerNode, [1, 3, 5]) \
    .addGrid(dt_model.impurity, ['gini', 'entropy']) \
    .build()

In [None]:
crossval_dt = CrossValidator(estimator=pipeline2, estimatorParamMaps=paramGrid_dt,
                             evaluator=MulticlassClassificationEvaluator(
                                 labelCol='PROTOCOLNAME', predictionCol='prediction', metricName='accuracy'),
                             numFolds=5)

In [None]:
cvModel_dt = crossval_dt.fit(train1)
best_model_dt = cvModel_dt.bestModel

In [None]:
model2 = pipeline2.fit(train1)
y_pred = model2.transform(test1)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import classification_report
evaluator = MulticlassClassificationEvaluator(labelCol="PROTOCOLNAME", predictionCol="prediction", metricName="accuracy")
accuracy2 = evaluator.evaluate(y_pred)
pd = y_pred.select("PROTOCOLNAME", "prediction").toPandas()
true_labels = pd["PROTOCOLNAME"].tolist()
predicted_labels = pd["prediction"].tolist()
report = classification_report(true_labels, predicted_labels)
print(report)

In [None]:
predictions_dt = best_model_dt.transform(test1)
accuracy_dt = evaluator.evaluate(predictions_dt)
print(f"Test Accuracy for Decision Tree: {accuracy_dt:.2f}")

Naive Bayes

In [None]:
inputColumns = ['FLOW_DURATION',	'TOTAL_FWD_PACKETS',	'TOTAL_BACKWARD_PACKETS',	'TOTAL_LENGTH_OF_FWD_PACKETS',	'TOTAL_LENGTH_OF_BWD_PACKETS',	'FLOW_BYTES_S','FLOW_PACKETS_S',	'AVERAGE_PACKET_SIZE']
outputColumn = "PROTOCOL_NAME"
indexer = StringIndexer(inputCol="PROTOCOLNAME", outputCol="PROTOCOL_NAME")
in_df = indexer.fit(records).transform(records)
in_df=in_df.drop("PROTOCOLNAME").withColumnRenamed("PROTOCOL_NAME","PROTOCOLNAME")
vector_assembler = VectorAssembler(inputCols=inputColumns, outputCol=outputColumn)
nb_model= NaiveBayes(featuresCol='features', labelCol='target')
stages = [vector_assembler, dt_model]
pipeline3 = Pipeline(stages=stages)
in_df=in_df.drop(in_df['LABEL'])

In [None]:
train2, test2 = in_df.randomSplit([0.7, 0.3], seed = 2018)
train2

In [None]:
paramGrid_nb = ParamGridBuilder() \
    .addGrid(nb_model.smoothing, [0.0, 1.0]) \
    .build()

In [None]:
crossval_nb = CrossValidator(estimator=pipeline3, estimatorParamMaps=paramGrid_nb,
                              evaluator=MulticlassClassificationEvaluator(
                                  labelCol='label', predictionCol='prediction', metricName='accuracy'),
                              numFolds=5)

In [None]:
cvModel_nb = crossval_nb.fit(train1)
best_model_nb = cvModel_nb.bestModel

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import classification_report
evaluator = MulticlassClassificationEvaluator(labelCol="PROTOCOLNAME", predictionCol="prediction", metricName="accuracy")
accuracy3 = evaluator.evaluate(y_pred)
pd = y_pred.select("PROTOCOLNAME", "prediction").toPandas()
true_labels = pd["PROTOCOLNAME"].tolist()
predicted_labels = pd["prediction"].tolist()
report = classification_report(true_labels, predicted_labels)
print(report)

In [None]:
predictions_nb = best_model_nb.transform(test1)
accuracy_nb = evaluator.evaluate(predictions_nb)
print(f"Test Accuracy for Decision Tree: {accuracy_nb:.2f}")

In [None]:
import matplotlib.pyplot as plt
model_names_1 = ['Decision Tree', 'Random Forest']
accuracies_1 = [accuracy,accuracy2]
model_names_2 = ['Decision Tree', 'Random Forest']
accuracies_2 = [accuracy_dt, accuracy_rf]
fig, axes = plt.subplots(1, 2, figsize=(15, 6))
axes[0].bar(model_names_1, accuracies_1, color=['blue', 'green', 'red', 'purple'])
axes[0].set_xlabel('Models')
axes[0].set_ylabel('Accuracy')
axes[0].set_title('Model Accuracy Comparison (Set 1)')
axes[0].set_ylim(0, 1)
axes[1].bar(model_names_2, accuracies_2, color=['blue', 'green', 'red', 'purple'])
axes[1].set_xlabel('Models')
axes[1].set_ylabel('Accuracy')
axes[1].set_title('Model Accuracy Comparison (Set 2)')
axes[1].set_ylim(0, 1)
plt.tight_layout()
plt.show()

In [None]:
import matplotlib.pyplot as plt
model_names = ['Decision Tree', 'Random Forest']
accuracies = [accuracy, accuracy2]
plt.figure(figsize=(10, 6))
plt.bar(model_names, accuracies, color=['blue', 'green', 'red', 'purple'])
plt.xlabel('Models')
plt.ylabel('Accuracy')
plt.title('Model Accuracy Comparison')
plt.ylim(0, 1)
plt.show()

In [None]:
import matplotlib.pyplot as plt
model_names = ['Decision Tree', 'Random Forest']
accuracies = [accuracy_dt, accuracy_rf]
plt.figure(figsize=(10, 6))
plt.bar(model_names, accuracies, color=['blue', 'green', 'red', 'purple'])
plt.xlabel('Models')
plt.ylabel('Accuracy')
plt.title('Model Accuracy Comparison')
plt.ylim(0, 1)
plt.show()