In [1]:
from pyspark.ml.feature import VectorAssembler

In [2]:
inputFile = '/data/students/bigdata_internet/lab4/log_tcp_complete_classes.txt'

In [3]:
inputDf = spark.read.load(inputFile,sep = ' ',format = "csv",header =True , inferSchema =True)

                                                                                

In [4]:
#1.1 number of columns/features the file has.
columnsNumber = inputDf.columns
len(columnsNumber)

207

In [5]:
#1.2 number of tcp conection
rowsNumber = inputDf.count()
rowsNumber

24/01/17 11:24:05 WARN util.Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
                                                                                

100000

In [6]:
Allclass = inputDf.groupBy("class:207").count()

In [7]:
#2.0.1 number of classes in the file
Allclass.count()

                                                                                

10

In [8]:
#2.0.2 showing all the classes
Allclass.show()

                                                                                

+---------------+-----+
|      class:207|count|
+---------------+-----+
|   class:google|10000|
|   class:amazon|10000|
|class:instagram|10000|
| class:facebook|10000|
|  class:netflix|10000|
|     class:ebay|10000|
|  class:spotify|10000|
| class:linkedin|10000|
|  class:youtube|10000|
|     class:bing|10000|
+---------------+-----+



In [9]:
#2.0.3 it has 10 different classes in total,and each of them has 10000 connection per webservice presented in data frame

In [10]:
#I choese these columns as selected features for training
#c_bytes_uniq:7 (number of bytes sent in the payload from the client), 
#s_bytes_uniq:21 (number of bytes sent in the payload from the server)
#c_pkts_data:8 (number of segments with payload from client)
#s_pkts_data:22 (number of segments with payloadfrom server)

In [11]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

In [12]:
indexer = StringIndexer(inputCol="class:207", outputCol="ClassIndex", handleInvalid="keep")

In [13]:
indexerModel= indexer.fit(inputDf)

                                                                                

In [14]:
indexDF = indexerModel.transform(inputDf)

In [None]:
#2.1.1. Does it make sense to use the IP addresses + ports (`#31#c_ip:1`, `c_port:2`,`s_ip:15`, `s_port:16`) as features?
#no we can not use them, ports and ip addresses are numerical and categorial which is not meaningful because machine learning measures 
#the distances which in this case doesn't make sense to  use them.

In [None]:
#2.1.2. Would it be fair to use the Fully Qualified Domain Name (FQDN, `fqdn:127`, forinstance `www.google.com`) for the classification?

#using FQDNs as features for classification can be fair and effective, especially if they capture relevant information 
#about the classes of interest. we should ensure that we handle them appropriately, encoding, and the 
#dynamic nature of network traffic

In [16]:
featuresColumn = ['c_bytes_uniq:7', 's_bytes_uniq:21', 'c_pkts_data:8','s_pkts_data:22' ]

In [17]:
vectorAssembler = VectorAssembler(inputCols = featuresColumn, outputCol = 'features')
transformedDF = vectorAssembler.transform(indexDF)

In [18]:
#Read & Split data

In [19]:
trainValidation, test= transformedDF.randomSplit([0.75, 0.25])

In [20]:
from pyspark.ml.classification import DecisionTreeClassifier

In [None]:
#2.4.1 How much does it take to train the model (time in seconds), for the differentalgorithm and parameters?

In [21]:
import time

start_time = time.time()
dt = DecisionTreeClassifier(labelCol="ClassIndex", featuresCol='features',maxDepth = 10)

# Your training code
dtModel = dt.fit(trainValidation)
finalDF=dtModel.transform(trainValidation)

end_time = time.time()

training_time = end_time - start_time
print(f"Training time: {training_time} seconds")

                                                                                

Training time: 26.020915031433105 seconds


In [22]:
finalTestDF=dtModel.transform(test)

In [23]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
myEvaluatorTree = MulticlassClassificationEvaluator(labelCol="ClassIndex",predictionCol="prediction",metricName='accuracy')
print("Accuracy on training Decision Tree is ", myEvaluatorTree.evaluate(finalDF))



Accuracy on training Decision Tree is  0.5572778525636753


                                                                                

In [24]:
print("Accuracy on test Decision Tree is ", myEvaluatorTree.evaluate(finalTestDF))



Accuracy on test Decision Tree is  0.5452324696648112


                                                                                

In [25]:
from pyspark.ml.classification import RandomForestClassifier

In [26]:
start_time2 = time.time()
rf = RandomForestClassifier(labelCol="ClassIndex",featuresCol="features",numTrees=20,maxDepth = 10)
rfModel=rf.fit(trainValidation)
finalDFForest=rfModel.transform(trainValidation)
end_time2 = time.time()

training_time2 = end_time2 - start_time2
print(f"Training time: {training_time2} seconds")

[Stage 75:>                                                         (0 + 1) / 1]

Training time: 29.339547872543335 seconds


                                                                                

In [27]:
finalTestDFForest=rfModel.transform(test)

In [28]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
myEvaluatorTree = MulticlassClassificationEvaluator(labelCol="ClassIndex",predictionCol="prediction",metricName='accuracy')
print("Accuracy on training Random Forest is ", myEvaluatorTree.evaluate(finalDFForest))
print("Accuracy on test Random Forest is ", myEvaluatorTree.evaluate(finalTestDFForest))

                                                                                

Accuracy on training Random Forest is  0.6380866065121487




Accuracy on test Random Forest is  0.6268070962316287


                                                                                

In [29]:
from pyspark.mllib.evaluation import MulticlassMetrics

In [30]:
outRDDForest=finalTestDFForest.select("prediction","ClassIndex").rdd.map(lambda x: (float(x[0]),float(x[1])))

In [31]:
metricsForest=MulticlassMetrics(outRDDForest)

In [32]:
labels = finalTestDF.select("ClassIndex").distinct().rdd.flatMap(lambda x: x).collect()
for label in sorted(labels):
    print(f"Class {label} - Precision: {metricsForest.precision(label)}, Recall: {metricsForest.recall(label)}, F1-Score: {metricsForest.fMeasure(label)}")



Class 0.0 - Precision: 0.6619354838709678, Recall: 0.6185691318327974, F1-Score: 0.6395179721587367
Class 1.0 - Precision: 0.789272030651341, Recall: 0.5156445556946183, F1-Score: 0.6237698713096139
Class 2.0 - Precision: 0.5784810126582278, Recall: 0.5477427087495006, F1-Score: 0.5626923866201519
Class 3.0 - Precision: 0.8220164609053497, Recall: 0.6420249096022499, F1-Score: 0.7209564628919468
Class 4.0 - Precision: 0.8089539007092199, Recall: 0.7291250499400719, F1-Score: 0.766967850388737
Class 5.0 - Precision: 0.6526717557251909, Recall: 0.7624645318200243, F1-Score: 0.7033090297251823
Class 6.0 - Precision: 0.5539380365808138, Recall: 0.5734157650695518, F1-Score: 0.5635086386937537
Class 7.0 - Precision: 0.4284876905041032, Recall: 0.8674841772151899, F1-Score: 0.5736332722992414
Class 8.0 - Precision: 0.6095238095238096, Recall: 0.38538739462063426, F1-Score: 0.47220855878012785
Class 9.0 - Precision: 0.6937056737588653, Recall: 0.621771950735002, F1-Score: 0.6557720511208884


                                                                                

In [33]:
#2.5.1 Comment your results: which classes are easier to classify? Which get confused themost?
#based on F1-score which is balance between percision and recall class 4 has the highest value in both  decision tree
#and random forest which means easier to predict they're easier
#class 8 has the least F1=score

In [None]:
#2.5.2 Which classifier performs better? Why do you think it is the case?
#the overall accuracy in decision tree is 55% and overall accuracy of random forest is 63% ,random forest performs better

In [34]:
outRDDTree=finalTestDF.select("prediction","ClassIndex").rdd.map(lambda x: (float(x[0]),float(x[1])))
metricsTree=MulticlassMetrics(outRDDTree)

In [35]:
labels = finalTestDF.select("ClassIndex").distinct().rdd.flatMap(lambda x: x).collect()
for label in sorted(labels):
    print(f"Class {label} - Precision: {metricsTree.precision(label)}, Recall: {metricsTree.recall(label)}, F1-Score: {metricsTree.fMeasure(label)}")



Class 0.0 - Precision: 0.48909860071591277, Recall: 0.6043425814234017, F1-Score: 0.5406474820143885
Class 1.0 - Precision: 0.73006993006993, Recall: 0.3996937212863706, F1-Score: 0.5165759524987631
Class 2.0 - Precision: 0.5596059113300492, Recall: 0.45043616177636797, F1-Score: 0.4991212653778558
Class 3.0 - Precision: 0.8428725701943844, Recall: 0.6083398285268901, F1-Score: 0.7066545948392937
Class 4.0 - Precision: 0.6887389287220582, Recall: 0.6659869494290375, F1-Score: 0.6771718847190547
Class 5.0 - Precision: 0.6893617021276596, Recall: 0.6623058053965658, F1-Score: 0.6755629691409508
Class 6.0 - Precision: 0.4521415270018622, Recall: 0.4718227749708511, F1-Score: 0.46177253708634464
Class 7.0 - Precision: 0.4313384113166485, Recall: 0.7871326449563145, F1-Score: 0.5572894699845353
Class 8.0 - Precision: 0.4924406047516199, Recall: 0.2695035460992908, F1-Score: 0.3483575248281131
Class 9.0 - Precision: 0.440793368857312, Recall: 0.6104961049610496, F1-Score: 0.5119477393845625


                                                                                

In [36]:
from pyspark.ml.tuning import ParamGridBuilder

In [37]:
from pyspark.ml.tuning import CrossValidator

In [38]:
import numpy

In [39]:
paramGrid_dt = ParamGridBuilder().addGrid(dt.maxDepth, [10, 15, 20]).addGrid(dt.impurity, ["Gini", "Entropy"]).build()
myEvaluator_dt =MulticlassClassificationEvaluator(labelCol="ClassIndex",predictionCol="prediction",metricName="accuracy")
cvDecisionTree=CrossValidator(estimator=dt,evaluator=myEvaluator_dt,estimatorParamMaps =paramGrid_dt, numFolds=3)
DecisionTreeModel = cvDecisionTree.fit(transformedDF)
DecisionTreeFinalDf = DecisionTreeModel.transform(transformedDF)

                                                                                

In [40]:
DecisionTreeModel.getEstimatorParamMaps()[numpy.argmax(DecisionTreeModel.avgMetrics)]

{Param(parent='DecisionTreeClassifier_a581bfed18fb', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 20,
 Param(parent='DecisionTreeClassifier_a581bfed18fb', name='impurity', doc='Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini'): 'Gini'}

In [41]:
DtNew = DecisionTreeClassifier(labelCol="ClassIndex", featuresCol="features",maxDepth = 20, impurity = "Gini")
ModelDtNew = DtNew.fit(trainValidation)
FinaldfDtNew = ModelDtNew.transform(trainValidation)
DtNewTest = ModelDtNew.transform(test)

                                                                                

In [42]:
myEvaluatorTreNew = MulticlassClassificationEvaluator(labelCol="ClassIndex",predictionCol="prediction",metricName='accuracy')
print("Accuracy on new training Decision Tree is ", myEvaluatorTree.evaluate(FinaldfDtNew))
print("Accuracy on new test Decision Tree is ", myEvaluatorTree.evaluate(DtNewTest))

                                                                                

Accuracy on new training Decision Tree is  0.7911441378942313




Accuracy on new test Decision Tree is  0.6913033061811212


                                                                                

In [44]:
paramGridRf = ParamGridBuilder().addGrid(rf.maxDepth, [20,25]).addGrid(rf.numTrees, [20,25]).build()
myEvaluatorRf = MulticlassClassificationEvaluator(labelCol="ClassIndex",predictionCol="prediction", metricName="accuracy")
cvForest=CrossValidator(estimator=rf,evaluator=myEvaluatorRf,estimatorParamMaps =paramGridRf, numFolds=3)
cvRfModel=cvForest.fit(transformedDF)
finalDFRf=cvRfModel.transform(transformedDF)

                                                                                

In [45]:
cvRfModel.getEstimatorParamMaps()[numpy.argmax(cvRfModel.avgMetrics)]

{Param(parent='RandomForestClassifier_d0daa5384d57', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 20,
 Param(parent='RandomForestClassifier_d0daa5384d57', name='numTrees', doc='Number of trees to train (>= 1).'): 25}

In [46]:
RfNew = RandomForestClassifier(labelCol="ClassIndex",featuresCol="features",numTrees=25,maxDepth = 20)
ModelRfNew = RfNew.fit(trainValidation)
FinaldfRfNew = ModelRfNew.transform(trainValidation)
RfNewTest = ModelRfNew.transform(test)


                                                                                

In [47]:
myEvaluatorForestNew = MulticlassClassificationEvaluator(labelCol="ClassIndex",predictionCol="prediction",metricName='accuracy')
print("Accuracy on new training Random Forest is ", myEvaluatorTree.evaluate(FinaldfRfNew))
print("Accuracy on new test Random Forest is ", myEvaluatorTree.evaluate(RfNewTest))

                                                                                

Accuracy on new training Random Forest is  0.7669559669852072




Accuracy on new test Random Forest is  0.764185614474375


                                                                                

In [None]:
#2.6.1 Report the accuracy results for all the parameters you tried. What can youconclude?
#Accuracy on training Decision Tree first was  0.55 then reached to 0.79 and for the test it increased from
#0.54 to 0.76
#Accuracy on training Random Forest first was  0.63 and changed to 0.69 which isn't a significant change.and
#for the test it was same as before crossvalidation (0.76)