## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

Directory Deletion and creation

In this section we will delete existing and then create new folder directories to be used in the code.

In [0]:
# Deleting directories for re-runs
dbutils.fs.rm('/FileStore/tables/', True)

Out[160]: True

In [0]:
# Creating directories for test and training dataset csv storage
dbutils.fs.mkdirs("/FileStore/tables/training/")
dbutils.fs.mkdirs("/FileStore/tables/test/")

Out[161]: True

# Spark File Stream

In this section we create a local spark session using importing necessary classes and kickstaring a local spark session. We then import the stream of CSV files from DBFS. Spark Streaming has three major components: input sources, processing engine, and sink(destination). Input sources generate data like Kafka, Flume, HDFS/ S3/ any file system, etc. Spark Streaming engine processes incoming data from various input sources. Sinks store processed data from Spark Streaming engines like HDFS/File System, relational databases, or NoSQL DB's.

In [0]:
# Import the necessary classes

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import Column
from typing import List
from pyspark.sql.types import *

In [0]:
# Create a local SparkSession, the starting point of all functionalities related to Spark.
spark = SparkSession.builder.appName("StreamTest").getOrCreate()

In [0]:
#Converts CSV plain text RDD into SparkSQL DataFrame (former SchemaRDD) using PySpark. If columns not given, assumes first row is the header.     If separator not given, assumes comma separated

def csvToDataFrame(sqlCtx, rdd, columns=None, sep=",", parseDate=True):
    if py_version < 3:
        def toRow(line):
            return toRowSep(line.encode('utf-8'), sep)
    else:
        def toRow(line):
            return toRowSep(line, sep)

    rdd_array = rdd.map(toRow)
    rdd_sql = rdd_array

    if columns is None:
        columns = rdd_array.first()
        rdd_sql = rdd_array.zipWithIndex().filter(
            lambda r_i: r_i[1] > 0).keys()
    column_types = evaluateType(rdd_sql, parseDate)

    def toSqlRow(row):
        return toSqlRowWithType(row, column_types)

    schema = makeSchema(zip(columns, column_types))

    return sqlCtx.createDataFrame(rdd_sql.map(toSqlRow), schema=schema)

In [0]:
# Defining Schema structure
Schemadef = StructType([
                    StructField("pkSeqID", DoubleType(), True), 
                    StructField("proto", StringType(), True), 
                    StructField("saddr", StringType(), True),
                    StructField("sport", StringType(), True), 
                    StructField("daddr", StringType(), True), 
                    StructField("dport", StringType(), True), 
                    StructField("seq", DoubleType(), True), 
                    StructField("stddev", DoubleType(), True), 
                    StructField("N_IN_Conn_P_SrcIP", DoubleType(), True), 
                    StructField("min", DoubleType(), True), 
                    StructField("state_number", DoubleType(), True), 
                    StructField("mean", DoubleType(), True), 
                    StructField("N_IN_Conn_P_DstIP", DoubleType(), True), 
                    StructField("drate", DoubleType(), True), 
                    StructField("srate", DoubleType(), True),
					StructField("max", DoubleType(), True), 
                    StructField("attack", DoubleType(), True), 
                    StructField("category", StringType(), True), 
                    StructField("subcategory", StringType(), True)])
					


In [0]:
#Function to define schema structure
def makeSchema(columns):
    struct_field_map = {'string': StringType(),
                        'date': TimestampType(),
                        'double': DoubleType(),
                        'int': IntegerType(),
                        'none': NullType()}
    fields = [StructField(k, struct_field_map[v], True) for k, v in columns]

    return StructType(fields)

In [0]:
# Print schema definition in string format.
Schemadef.simpleString()

Out[79]: 'struct<pkSeqID:double,proto:string,saddr:string,sport:string,daddr:string,dport:string,seq:double,stddev:double,N_IN_Conn_P_SrcIP:double,min:double,state_number:double,mean:double,N_IN_Conn_P_DstIP:double,drate:double,srate:double,max:double,attack:double,category:string,subcategory:string>'

In [0]:
# Create Streaming DataFrame by reading data from directory.
# This will read maximum of 2 files per mini batch. However, it can read less than 2 
df = spark.readStream.format("csv").option("maxFilesPerTrigger",1).option("header", True).schema(Schemadef).csv("/FileStore/tables/training/")

df1 = df.groupBy("saddr").sum("pkSeqID")
#df.printSchema()
display(df1)


In [0]:
# To identify whether a DataFrame/Dataset has streaming data or not by using df.isStreaming.
print("Streaming DataFrame : " + str(df.isStreaming))

Streaming DataFrame : True


In [0]:
final_df = df.select("pkSeqID")

# Filesink only support Append mode. Supports these formats : csv, json, orc, parquet
final_df.writeStream.format("memory").queryName("memory_data").trigger(processingTime = "5 seconds").start()

In [0]:
%sql
select count(1) from memory_data

In [0]:
#query = df.writeStream.format("csv").option("path", "/FileStore/tables/training/ddddd").option("checkpointLocation", "/FileStore/tables/training/ddddd/check").start()

#query.awaitTermination()

#writeStream = inputDF.writeStream.format("csv").option("checkpointLocation", "/path/to/checkpoint/dir").option("path", "/FileStore/tables/training/").start()

In [0]:
MLdf = spark.read.format('csv').option('header',True).schema(Schemadef).load("/FileStore/tables/training/")

MLdf.display()

pkSeqID,proto,saddr,sport,daddr,dport,seq,stddev,N_IN_Conn_P_SrcIP,min,state_number,mean,N_IN_Conn_P_DstIP,drate,srate,max,attack,category,subcategory
2166384.0,tcp,192.168.100.150,6385,192.168.100.3,80,252991.0,2.022434,100.0,0.0,3.0,2.022434,100.0,0.0,0.14034,4.044867,1.0,DDoS,TCP
3125195.0,udp,192.168.100.150,61807,192.168.100.3,80,234417.0,1.8999,63.0,0.0,4.0,2.686863,100.0,0.0,0.494991,4.03235,1.0,DDoS,UDP
893473.0,udp,192.168.100.147,35043,192.168.100.7,80,14039.0,0.453591,100.0,1.368065,4.0,2.381885,100.0,0.0,0.404678,2.603799,1.0,DoS,UDP
2250590.0,tcp,192.168.100.150,27071,192.168.100.3,80,75049.0,0.037993,58.0,3.604164,3.0,3.642169,100.0,0.0,0.270854,3.680173,1.0,DDoS,TCP
100909.0,tcp,192.168.100.148,31313,192.168.100.6,80,99432.0,0.006877,100.0,0.0,3.0,0.003439,100.0,0.0,0.129206,0.017193,1.0,DoS,TCP
1605733.0,udp,192.168.100.149,7962,192.168.100.5,80,201991.0,0.546035,95.0,3.282345,4.0,3.793497,95.0,0.0,0.274883,4.627728,1.0,DoS,UDP
2105674.0,tcp,192.168.100.149,4982,192.168.100.3,80,192280.0,1.792529,100.0,0.157126,1.0,2.691005,100.0,0.239184,0.408508,4.023759,1.0,DDoS,TCP
3466482.0,udp,192.168.100.149,58927,192.168.100.3,80,51401.0,1.007607,100.0,1.966237,4.0,3.390672,100.0,0.0,0.4909,4.136768,1.0,DDoS,UDP
1087695.0,udp,192.168.100.149,42482,192.168.100.5,80,208261.0,1.775258,81.0,0.0,4.0,3.074651,81.0,0.0,0.23093,4.123226,1.0,DoS,UDP
2876551.0,udp,192.168.100.150,61538,192.168.100.3,80,247934.0,0.159768,100.0,4.03621,4.0,4.195985,100.0,0.0,0.48059,4.355759,1.0,DDoS,UDP


In [0]:
MLdfcopy = MLdf;

# Machine Learning Using CSV data

In the below section, we create a machine learning model using PySpark ML library. We designed a Random Forest model to train the model using Traning dataset csv files.
The trained model is then implemented on the Test dataset.

In [0]:
# Importing several modules from the pyspark.ml package 
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

In [0]:
## To be deleted

# Create the logistic regression model
lr = LogisticRegression(maxIter=10, regParam= 0.01)

In [0]:
#categoricalColumns = [ "subcategory", "proto", "saddr", "sport", "daddr", "dport"]
categoricalColumns = [ "subcategory", "proto", "saddr", "sport", "daddr", "dport"]
stages = []

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

In [0]:
cols = MLdf.columns

MLdf = MLdf.drop("pkSeqID","seq")

cols = MLdf.columns

print(cols)


['proto', 'saddr', 'sport', 'daddr', 'dport', 'stddev', 'N_IN_Conn_P_SrcIP', 'min', 'state_number', 'mean', 'N_IN_Conn_P_DstIP', 'drate', 'srate', 'max', 'attack', 'category', 'subcategory']


In [0]:
distinctval = MLdf.select("category").distinct() 

distinctval.show()

+--------------+
|      category|
+--------------+
|           DoS|
|Reconnaissance|
|          DDoS|
|        Normal|
+--------------+



In [0]:
MLdf.groupBy("category").count().show()

+--------------+-----+
|      category|count|
+--------------+-----+
|           DoS| 4509|
|Reconnaissance| 6453|
|          DDoS| 5246|
|        Normal|    3|
+--------------+-----+



In [0]:
label_stringIdx = StringIndexer(inputCol = 'category', outputCol = 'label',stringOrderType="frequencyDesc")
stages += [label_stringIdx]


In [0]:
#numericCols = ["pkSeqID", "seq", "stddev", "N_IN_Conn_P_SrcIP", "min","state_number", "mean", "N_IN_Conn_P_DstIP", "drate", "srate", "max", "attack"]
numericCols = ["stddev", "N_IN_Conn_P_SrcIP", "min","state_number", "mean", "N_IN_Conn_P_DstIP", "drate", "srate", "max", "attack"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols = assemblerInputs, outputCol = "features").setHandleInvalid("skip")
stages += [assembler]


print(stages)

[StringIndexer_62ccf399df74, OneHotEncoder_a52542dee7a8, StringIndexer_2435d0d74508, OneHotEncoder_37b80042b81a, StringIndexer_20e37194f3bf, OneHotEncoder_b1d65950368c, StringIndexer_33cc89bb356c, OneHotEncoder_5493957e8b2e, StringIndexer_f403707c1e61, OneHotEncoder_83445de5aa3b, StringIndexer_1ff8098b16fe, OneHotEncoder_7bb740a06cab, StringIndexer_38c0441081bd, VectorAssembler_e849045c4d45]


In [0]:
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(MLdf)
MLdf1 = pipelineModel.transform(MLdf)
selectedcols = ["label", "features"] + cols
MLdf2 = MLdf1.select(selectedcols)

In [0]:
MLdf2.select("label","category").distinct().show()

+-----+--------------+
|label|      category|
+-----+--------------+
|  3.0|        Normal|
|  0.0|Reconnaissance|
|  1.0|          DDoS|
|  2.0|           DoS|
+-----+--------------+



In [0]:
MLdf2.show(10)

+-----+--------------------+-----+---------------+-----+-------------+-----+--------+-----------------+--------+------------+--------+-----------------+--------+--------+--------+------+--------+-----------+
|label|            features|proto|          saddr|sport|        daddr|dport|  stddev|N_IN_Conn_P_SrcIP|     min|state_number|    mean|N_IN_Conn_P_DstIP|   drate|   srate|     max|attack|category|subcategory|
+-----+--------------------+-----+---------------+-----+-------------+-----+--------+-----------------+--------+------------+--------+-----------------+--------+--------+--------+------+--------+-----------+
|  1.0|(12729,[2,5,10,96...|  tcp|192.168.100.150| 6385|192.168.100.3|   80|2.022434|            100.0|     0.0|         3.0|2.022434|            100.0|     0.0| 0.14034|4.044867|   1.0|    DDoS|        TCP|
|  1.0|(12729,[0,6,10,93...|  udp|192.168.100.150|61807|192.168.100.3|   80|  1.8999|             63.0|     0.0|         4.0|2.686863|            100.0|     0.0|0.49499

In [0]:
MLdf2.show()

+-----+--------------------+-----+---------------+-----+-------------+-----+--------+-----------------+--------+------------+--------+-----------------+--------+--------+--------+------+--------+-----------+
|label|            features|proto|          saddr|sport|        daddr|dport|  stddev|N_IN_Conn_P_SrcIP|     min|state_number|    mean|N_IN_Conn_P_DstIP|   drate|   srate|     max|attack|category|subcategory|
+-----+--------------------+-----+---------------+-----+-------------+-----+--------+-----------------+--------+------------+--------+-----------------+--------+--------+--------+------+--------+-----------+
|  1.0|(12729,[2,5,10,96...|  tcp|192.168.100.150| 6385|192.168.100.3|   80|2.022434|            100.0|     0.0|         3.0|2.022434|            100.0|     0.0| 0.14034|4.044867|   1.0|    DDoS|        TCP|
|  1.0|(12729,[0,6,10,93...|  udp|192.168.100.150|61807|192.168.100.3|   80|  1.8999|             63.0|     0.0|         4.0|2.686863|            100.0|     0.0|0.49499

In [0]:
train, test = MLdf2.randomSplit([0.7, 0.3])

In [0]:
raw_data = MLdf2.filter('subcategory != " "')
raw_data.count()
#label | features |  pkSeqID | proto| saddr | sport | daddr | dport | seq | stddev | N_IN_Conn_P_SrcIP | min | state_number |    mean|N_IN_Conn_P_DstIP|   drate|   srate|     max|attack|category|subcategory|

Out[177]: 16211

In [0]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train)
predictions = rfModel.transform(test)
predictions.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- proto: string (nullable = true)
 |-- saddr: string (nullable = true)
 |-- sport: string (nullable = true)
 |-- daddr: string (nullable = true)
 |-- dport: string (nullable = true)
 |-- stddev: double (nullable = true)
 |-- N_IN_Conn_P_SrcIP: double (nullable = true)
 |-- min: double (nullable = true)
 |-- state_number: double (nullable = true)
 |-- mean: double (nullable = true)
 |-- N_IN_Conn_P_DstIP: double (nullable = true)
 |-- drate: double (nullable = true)
 |-- srate: double (nullable = true)
 |-- max: double (nullable = true)
 |-- attack: double (nullable = true)
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [0]:
test.groupBy("category").count().show()


train.groupBy("category").count().show()

+--------------+-----+
|      category|count|
+--------------+-----+
|           DoS| 1322|
|Reconnaissance| 1900|
|          DDoS| 1590|
+--------------+-----+

+--------------+-----+
|      category|count|
+--------------+-----+
|           DoS| 3187|
|Reconnaissance| 4553|
|          DDoS| 3656|
|        Normal|    3|
+--------------+-----+



In [0]:
selected = predictions.select("label", "prediction", "probability")
display(selected)

predictions.select("label", "prediction").distinct().show()


label,prediction,probability
0.0,0.0,"Map(vectorType -> dense, length -> 4, values -> List(0.5824165163815828, 0.2441934618219131, 0.1731059619737871, 2.8405982271703493E-4))"
0.0,0.0,"Map(vectorType -> dense, length -> 4, values -> List(0.5824165163815828, 0.2441934618219131, 0.1731059619737871, 2.8405982271703493E-4))"
0.0,0.0,"Map(vectorType -> dense, length -> 4, values -> List(0.5824165163815828, 0.2441934618219131, 0.1731059619737871, 2.8405982271703493E-4))"
0.0,0.0,"Map(vectorType -> dense, length -> 4, values -> List(0.6342261396517398, 0.19562873325052813, 0.16987206474559668, 2.730623521352687E-4))"
0.0,0.0,"Map(vectorType -> dense, length -> 4, values -> List(0.6342261396517398, 0.19562873325052813, 0.16987206474559668, 2.730623521352687E-4))"
0.0,0.0,"Map(vectorType -> dense, length -> 4, values -> List(0.6415545195788553, 0.17385661201164926, 0.18429885115739414, 2.90017252101359E-4))"
0.0,0.0,"Map(vectorType -> dense, length -> 4, values -> List(0.4679906387437058, 0.2939082796283626, 0.23793358352922392, 1.6749809870769404E-4))"
0.0,0.0,"Map(vectorType -> dense, length -> 4, values -> List(0.6342261396517398, 0.19562873325052813, 0.16987206474559668, 2.730623521352687E-4))"
0.0,0.0,"Map(vectorType -> dense, length -> 4, values -> List(0.6342261396517398, 0.19562873325052813, 0.16987206474559668, 2.730623521352687E-4))"
0.0,0.0,"Map(vectorType -> dense, length -> 4, values -> List(0.4679906387437058, 0.2939082796283626, 0.23793358352922392, 1.6749809870769404E-4))"


+-----+----------+
|label|prediction|
+-----+----------+
|  2.0|       0.0|
|  1.0|       1.0|
|  1.0|       0.0|
|  2.0|       2.0|
|  2.0|       1.0|
|  0.0|       0.0|
|  0.0|       1.0|
+-----+----------+



In [0]:
predictions.groupby("label", "prediction").count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  2.0|       0.0|   43|
|  1.0|       1.0| 1520|
|  1.0|       0.0|   70|
|  2.0|       2.0|  679|
|  2.0|       1.0|  600|
|  0.0|       0.0| 1894|
|  0.0|       1.0|    6|
+-----+----------+-----+



In [0]:
selected.show(3)

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.58241651638158...|
|  0.0|       0.0|[0.58241651638158...|
|  0.0|       0.0|[0.58241651638158...|
+-----+----------+--------------------+
only showing top 3 rows



In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator()
evaluator.evaluate(predictions)

Out[181]: 0.839582796096545

In [0]:
from pyspark.mllib.evaluation import MulticlassMetrics
metrics = MulticlassMetrics(predictions.rdd.map(lambda row: (row.prediction, row.label)))

confusion_matrix = metrics.confusionMatrix().toArray()

print(confusion_matrix)




[[1894.    6.    0.]
 [  70. 1520.    0.]
 [  43.  600.  679.]]


In [0]:
dbutils.fs.mkdirs("/FileStore/tables/test/")

Out[182]: True

In [0]:
TestMLdf = spark.read.format('csv').option('header',True).schema(Schemadef).load("/FileStore/tables/test/")

TestMLdf.display()

pkSeqID,proto,saddr,sport,daddr,dport,seq,stddev,N_IN_Conn_P_SrcIP,min,state_number,mean,N_IN_Conn_P_DstIP,drate,srate,max,attack,category,subcategory


In [0]:
label_stringIdx = StringIndexer(inputCol = 'category', outputCol = 'label')
stages += [label_stringIdx]

In [0]:
pipelineModel = pipeline.fit(TestMLdf)
TestMLdf1 = pipelineModel.transform(TestMLdf)
TestMLdf2 = TestMLdf1.select(selectedcols)
#MLdf2.show(3)

print(TestMLdf2)

[0;31m---------------------------------------------------------------------------[0m
[0;31mIllegalArgumentException[0m                  Traceback (most recent call last)
[0;32m<command-1880090652635535>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m [0mpipelineModel[0m [0;34m=[0m [0mpipeline[0m[0;34m.[0m[0mfit[0m[0;34m([0m[0mTestMLdf[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      2[0m [0mTestMLdf1[0m [0;34m=[0m [0mpipelineModel[0m[0;34m.[0m[0mtransform[0m[0;34m([0m[0mTestMLdf[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      3[0m [0mTestMLdf2[0m [0;34m=[0m [0mTestMLdf1[0m[0;34m.[0m[0mselect[0m[0;34m([0m[0mselectedcols[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      4[0m [0;31m#MLdf2.show(3)[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[1;32m      5[0m [0;34m[0m[0m

[0;32m/databricks/python_shell/dbruntime/MLWorkloadsInstrumentation/_pyspark.py[0m in [0;36mpatched_method[0;34m(self, *args, **kw

In [0]:
from pyspark.ml.classification import RandomForestClassifier

predictionsTest = rfModel.transform(TestMLdf2)
predictionsTest.printSchema()

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
[0;32m<command-2450576483249133>[0m in [0;36m<cell line: 3>[0;34m()[0m
[1;32m      1[0m [0;32mfrom[0m [0mpyspark[0m[0;34m.[0m[0mml[0m[0;34m.[0m[0mclassification[0m [0;32mimport[0m [0mRandomForestClassifier[0m[0;34m[0m[0;34m[0m[0m
[1;32m      2[0m [0;34m[0m[0m
[0;32m----> 3[0;31m [0mpredictionsTest[0m [0;34m=[0m [0mrfModel[0m[0;34m.[0m[0mtransform[0m[0;34m([0m[0mTestMLdf2[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      4[0m [0mpredictionsTest[0m[0;34m.[0m[0mprintSchema[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m

[0;31mNameError[0m: name 'TestMLdf2' is not defined

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluatorTest = MulticlassClassificationEvaluator()
evaluatorTest.evaluate(predictionsTest)


[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
[0;32m<command-2450576483249136>[0m in [0;36m<cell line: 5>[0;34m()[0m
[1;32m      3[0m [0;34m[0m[0m
[1;32m      4[0m [0mevaluatorTest[0m [0;34m=[0m [0mMulticlassClassificationEvaluator[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 5[0;31m [0mevaluatorTest[0m[0;34m.[0m[0mevaluate[0m[0;34m([0m[0mpredictionsTest[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;31mNameError[0m: name 'predictionsTest' is not defined