In [None]:
%delete_livy_session --cluster bigdata-course-spark-cluster --id gr1391_Kainelainen_Teterin

In [None]:
%create_livy_session \
--cluster bigdata-course-spark-cluster \
--id gr1391_Kainelainen_Teterin \
--conf spark.executor.instances=1 \
--conf spark.executor.cores=1

In [None]:
#!spark --session gr1391_Kainelainen_Teterin
filePathUnbalanced = "/user/bigdata-course/DDoS Dataset/ddos_imbalanced/unbalaced_20_80_dataset.csv"
df = spark.read.csv(filePathUnbalanced,
                    header=True,
                    inferSchema=True)

In [None]:
#!spark --session gr1391_Kainelainen_Teterin
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import desc

dfSelected = df.limit(2_250_000)
dfSelected = dfSelected.withColumn("_c0", monotonically_increasing_id()).orderBy(desc("_c0")).limit(2_000_000)

In [None]:
#!spark --session gr1391_Kainelainen_Teterin
stringCols = []
outputStringCols = []
for i in range(len(df.columns)):
    if df.dtypes[i][1] == "string":
        stringCols.append(df.dtypes[i][0])
        outputStringCols.append('indexed_' + df.dtypes[i][0])

In [None]:
#!spark --session gr1391_Kainelainen_Teterin
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

labelIndexer = StringIndexer(inputCols=stringCols, outputCols=outputStringCols, handleInvalid="skip").fit(dfSelected)
dfSelected = labelIndexer.transform(dfSelected)
dfSelected = dfSelected.drop(*stringCols)

features = dfSelected.columns.copy()
features.remove("_c0")
features.remove("indexed_Label")
features.remove("indexed_Flow ID")

In [None]:
#!spark --session gr1391_Kainelainen_Teterin
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

va = VectorAssembler(
    inputCols=features,
    outputCol="Features",
    handleInvalid="skip"
).transform(dfSelected)

In [None]:
#!spark --session gr1391_Kainelainen_Teterin
import time
start_time = time.time()
from pyspark.ml.classification import GBTClassifier

train, test = va.randomSplit([0.7, 0.3])
classifier = GBTClassifier(
    labelCol="indexed_Label",
    featuresCol="Features",
    maxIter=3,
    maxBins=dfSelected.count()
)
model = classifier.fit(train)

prediction = model.transform(test)
prediction.show(n=1, vertical=True)

from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="indexed_Label", 
                                              predictionCol="prediction", 
                                              metricName="f1")
accuracy = evaluator.evaluate(prediction)
print(accuracy)
print(f"Test Error = {1.0 - accuracy}")

print(f"Execution time: {time.time() - start_time}")

In [None]:
#!spark --session gr1391_Kainelainen_Teterin
evaluator = MulticlassClassificationEvaluator(labelCol="indexed_Label", 
                                              predictionCol="prediction", 
                                              metricName="weightedPrecision") 
accuracy = evaluator.evaluate(prediction)
print(f"Weighted Precision: {accuracy}")

evaluator = MulticlassClassificationEvaluator(labelCol="indexed_Label", 
                                              predictionCol="prediction", 
                                              metricName="logLoss") 
accuracy = evaluator.evaluate(prediction)
print(f"Log Loss: {accuracy}")

evaluator = MulticlassClassificationEvaluator(labelCol="indexed_Label", 
                                              predictionCol="prediction", 
                                              metricName="weightedRecall") 
accuracy = evaluator.evaluate(prediction)
print(f"Weighted Recall: {accuracy}")

evaluator = MulticlassClassificationEvaluator(labelCol="indexed_Label", 
                                              predictionCol="prediction", 
                                              metricName="weightedTruePositiveRate") 
accuracy = evaluator.evaluate(prediction)
print(f"Weighted True Positive Rate: {accuracy}")

evaluator = MulticlassClassificationEvaluator(labelCol="indexed_Label", 
                                              predictionCol="prediction", 
                                              metricName="weightedFMeasure") 
accuracy = evaluator.evaluate(prediction)
print(f"Weighted F-Measure: {accuracy}")