In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [2]:
from pyspark import SparkFiles
url="https://raw.githubusercontent.com/IPGreene/FW-Neural-net/master/ASA_log.csv"
spark.sparkContext.addFile(url)
data = spark.read.csv(SparkFiles.get("ASA_log.csv"), header=True)


In [3]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler, OneHotEncoderEstimator
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql.functions import rand
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np

In [4]:
data.createOrReplaceTempView("firewall")
data = spark.sql('SELECT * FROM firewall')
data = data.withColumn('sourcePort', data['sourcePort'].cast(IntegerType()))
data = data.withColumn('destinationPort', data['destinationPort'].cast(IntegerType()))
data = data.withColumn('deviceId', data['deviceId'].cast(IntegerType()))
data = data.withColumn('event_category', data['event_category'].cast(IntegerType()))
data = data.withColumn('relevance', data['relevance'].cast(IntegerType()))
data = data.withColumn('credibility', data['credibility'].cast(IntegerType()))
data = data.withColumn('severity', data['severity'].cast(IntegerType()))
data = data.withColumn('magnitude', data['magnitude'].cast(IntegerType()))
#data = data.withColumn('eventCount', data['eventCount'].cast(IntegerType()))
data = data.drop('Event_DateTime', 'categoryDescription', 'eventDescription', 'eventCount')
data = data.fillna('Unknown')
data.show()

+----------+---------------+------------+-------------+--------+--------------+---------+-----------+--------+---------+
|sourcePort|destinationPort|protocolName|        IPgeo|deviceId|event_category|relevance|credibility|severity|magnitude|
+----------+---------------+------------+-------------+--------+--------------+---------+-----------+--------+---------+
|     52217|           2000|      tcp_ip|      Unknown|   31410|          5010|        8|         10|       9|        9|
|     51405|             80|      tcp_ip|      Unknown|   31410|          7024|        8|         10|       1|        6|
|     36002|            445|      tcp_ip|      Unknown|   31410|          7024|        8|         10|       1|        6|
|     35074|            445|      tcp_ip|      Unknown|   31410|          7024|        8|         10|       1|        6|
|     55631|            443|      tcp_ip|United States|   31410|          4002|       10|         10|       0|        6|
|     55991|            443|    

In [20]:
str_col = ['protocolName', 'event_category', 'IPgeo']
label = 'event_category'
stages = []
for c in str_col:
    indexer = StringIndexer(inputCol=c, outputCol=c+'_index')
    stages += [indexer]
pipeline = Pipeline(stages=stages)
model = pipeline.fit(data)
transformed = model.transform(data)
transformed = transformed.drop('protocolName', 'IPgeo', 'event_category')
transformed.show()
df = transformed.toPandas()


+----------+---------------+--------+---------+-----------+--------+---------+------------------+--------------------+-----------+
|sourcePort|destinationPort|deviceId|relevance|credibility|severity|magnitude|protocolName_index|event_category_index|IPgeo_index|
+----------+---------------+--------+---------+-----------+--------+---------+------------------+--------------------+-----------+
|     52217|           2000|   31410|        8|         10|       9|        9|               0.0|                 4.0|        0.0|
|     51405|             80|   31410|        8|         10|       1|        6|               0.0|                 3.0|        0.0|
|     36002|            445|   31410|        8|         10|       1|        6|               0.0|                 3.0|        0.0|
|     35074|            445|   31410|        8|         10|       1|        6|               0.0|                 3.0|        0.0|
|     55631|            443|   31410|       10|         10|       0|        6|     

In [22]:
df_np = df.as_matrix()
#transformed.select('event_category').distinct().show()

  if __name__ == '__main__':


In [53]:
#transformed.printSchema()

root
 |-- sourcePort: integer (nullable = true)
 |-- destinationPort: integer (nullable = true)
 |-- deviceId: integer (nullable = true)
 |-- relevance: integer (nullable = true)
 |-- credibility: integer (nullable = true)
 |-- severity: integer (nullable = true)
 |-- magnitude: integer (nullable = true)
 |-- protocolName_index: double (nullable = false)
 |-- event_category_index: double (nullable = false)
 |-- IPgeo_index: double (nullable = false)
 |-- encoded: vector (nullable = true)



In [24]:
train, test = transformed.randomSplit([0.70, 0.30], seed=1234)
x_train = train.drop('event_category_index')
y_train = train.select('event_category_index')
x_test = test.drop('event_category_index')
y_test = test.select('event_category_index')
#x_train.limit(5).toPandas()
#y_train.limit(5).toPandas()
#x_test.limit(5).toPandas()
#y_test.limit(5).toPandas()

In [28]:
from keras.utils import to_categorical

In [27]:
x_train_pd = x_train.toPandas()
y_train_pd = y_train.toPandas()
x_test_pd = x_test.toPandas()
y_test_pd = y_test.toPandas()
y_train_np = y_train_pd.as_matrix()
y_test_np = y_test_pd.as_matrix()
x_train_np = x_train_pd.as_matrix()
x_test_np = x_test_pd.as_matrix()


[[0.]
 [0.]
 [0.]
 ...
 [0.]
 [0.]
 [0.]]




In [30]:
y_train_2 = to_categorical(y_train_np)
y_test_2 = to_categorical(y_test_np)
y_train_2[0:5]

array([[1., 0., 0., 0., 0.],
       [1., 0., 0., 0., 0.],
       [1., 0., 0., 0., 0.],
       [0., 1., 0., 0., 0.],
       [1., 0., 0., 0., 0.]], dtype=float32)

In [61]:
from keras.models import Sequential
from keras.layers import Dense
from keras.callbacks import EarlyStopping
#create model
model = Sequential()

#get number of columns in training data
n_cols = x_train_pd.shape[1]
print(n_cols)
#add model layers
model.add(Dense(250, activation='relu', input_shape=(n_cols,)))
model.add(Dense(250, activation='relu'))
model.add(Dense(250, activation='relu'))
model.add(Dense(5, activation='softmax'))
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
#set early stopping monitor so the model stops training when it won't improve anymore
early_stopping_monitor = EarlyStopping(patience=3)
model.summary()
#train model
model.fit(x_train_np, y_train_2, batch_size=64, validation_split=0.2, epochs=10)


9
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_91 (Dense)             (None, 250)               2500      
_________________________________________________________________
dense_92 (Dense)             (None, 250)               62750     
_________________________________________________________________
dense_93 (Dense)             (None, 250)               62750     
_________________________________________________________________
dense_94 (Dense)             (None, 5)                 1255      
Total params: 129,255
Trainable params: 129,255
Non-trainable params: 0
_________________________________________________________________
Train on 873 samples, validate on 219 samples
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7f9d32b08160>

In [62]:
test_y_predictions = model.predict(x_test_pd)
test_y_predictions[0:50]

array([[0.000000e+00, 5.903765e-35, 1.000000e+00, 0.000000e+00,
        0.000000e+00],
       [0.000000e+00, 0.000000e+00, 1.000000e+00, 0.000000e+00,
        0.000000e+00],
       [0.000000e+00, 0.000000e+00, 1.000000e+00, 0.000000e+00,
        0.000000e+00],
       [0.000000e+00, 0.000000e+00, 1.000000e+00, 0.000000e+00,
        0.000000e+00],
       [0.000000e+00, 0.000000e+00, 1.000000e+00, 0.000000e+00,
        0.000000e+00],
       [0.000000e+00, 0.000000e+00, 1.000000e+00, 0.000000e+00,
        0.000000e+00],
       [0.000000e+00, 0.000000e+00, 1.000000e+00, 0.000000e+00,
        0.000000e+00],
       [0.000000e+00, 0.000000e+00, 1.000000e+00, 0.000000e+00,
        0.000000e+00],
       [0.000000e+00, 0.000000e+00, 1.000000e+00, 0.000000e+00,
        0.000000e+00],
       [0.000000e+00, 0.000000e+00, 1.000000e+00, 0.000000e+00,
        0.000000e+00],
       [0.000000e+00, 0.000000e+00, 1.000000e+00, 0.000000e+00,
        0.000000e+00],
       [0.000000e+00, 0.000000e+00, 1.00000