In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

binet = ''

def CategoryToDouble(col_stringToDouble):
    global binet
    for x in col_stringToDouble:
        indexer = StringIndexer(inputCol=x, outputCol=x+"_indexed")
        fit_model = indexer.fit(binet)
        binet = fit_model.transform(binet).drop(x)


def predict_file(file_name):
    print (file_name)
    global binet
    binet = sqlContext.read.format('csv').load('/input/'+file_name, header = True)
    binet = binet.drop("StartTime").drop("SrcAddr").drop("Sport").drop("DstAddr").drop("sTos").drop("dTos").drop("sTos").drop("State")
    binet = binet.withColumn('BytesPerSec', binet.TotBytes/binet.Dur).\
        withColumn("SrcByTotalBytes", binet.SrcBytes/binet.TotBytes).\
        withColumn("Dur", binet.Dur.cast('float')).\
        withColumn("Dport", binet.Dport.cast('int')).\
        withColumn("TotPkts", binet.TotPkts.cast('float')).\
        withColumn("SrcBytes", binet.SrcBytes.cast('float')).\
        drop("TotBytes")

    binet = binet.dropna(subset="Dport")
    binet = binet.fillna(0, subset="BytesPerSec")

    binet_background = binet.where(binet.Label.like("%Background%")).withColumn("Label", lit(2.0))
    binet_normal = binet.where(binet.Label.like("%Normal%")).withColumn("Label", lit(1.0))
    binet_botnet = binet.where(binet.Label.like("%Botnet%")).withColumn("Label", lit(0.0))

    binet = binet_normal.union(binet_botnet)
    binet = binet.union(binet_background)

    col_stringToDouble = ["Proto","Dir"]
    CategoryToDouble(col_stringToDouble)

    binet_background = binet.where(binet.Label == lit(2.0))
    binet_normal = binet.where(binet.Label == lit(1.0))
    binet_botnet = binet.where(binet.Label == lit(0.0))

    binet_train = binet_normal.union(binet_botnet)
    binet_test = binet_background.drop("Label")

    normal = binet_normal.count()
    botnet = binet_botnet.count()
    total = binet_train.count()
    print("\t------- Data -------")
    print("\tNormal -> " + str(normal) + " - " + "{0:.2f}%".format(normal/total*100))
    print("\tBotnet -> " + str(botnet) + " - " + "{0:.2f}%".format(botnet/total*100))
    print("\tTotal -> " + str(total))

    del(binet_background)
    del(binet_botnet)
    del(binet_normal)

    colnames = binet_train.columns
    colnames.remove("Label")

    vecAssembler = VectorAssembler(inputCols=colnames, outputCol="features")
    binet_test = vecAssembler.transform(binet_test)
    binet_train = vecAssembler.transform(binet_train)

    (trainingData, testData) = binet_train.randomSplit([0.62, 0.38])

    rf = RandomForestClassifier(labelCol="Label", featuresCol="features", numTrees=100)
    pipeline = Pipeline(stages=[rf])

    model = pipeline.fit(binet_train)
    '''
    predictions = model.transform(testData)
    predictions.select("prediction", "Label", "features").show(5)

    evaluator = MulticlassClassificationEvaluator(labelCol="Label", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    print("Test Error = %g" % (1.0 - accuracy))

    rfModel = model.stages[0]
    print(rfModel)
    '''

    #Background Data
    predictions = model.transform(binet_test)

    normal = predictions.select(sum(predictions.prediction)).collect()[0][0]
    botnet = binet_test.count() - normal
    total = binet_test.count()
    print("\t------- Prediction -------")
    print("\tNormal -> " + str(normal) + " - " + "{0:.2f}%".format(normal/total*100))
    print("\tBotnet -> " + str(botnet) + " - " + "{0:.2f}%".format(botnet/total*100))
    print("\tTotal -> " + str(total))

file_names = ["capture20110811.binetflow", "capture20110815-2.binetflow", "capture20110815.binetflow", "capture20110816-2.binetflow", "capture20110816.binetflow"]
for file in file_names:
    predict_file(file)