Basics:

CPU 
Storage - A,B numbers are in storage
Numbers read from storage into registers in cpu
Then variables multiplied and stored in third register
Third is written into storage

If sequential:
step latency - time it takes to finish 1 step
total latency - time to finish all steps
Main memory is fast
spinning disk is slow

Most latency is from memory latency, not operational latency
Most latency is from reading and writing

Sorting can improve memory locality
Reason is money - local memory faster
CPU checks if memory location in cache, and if it's not there,
then retrieval will be slow. First case is a cache hit 
(when CPU intends to read a location and it's in cache)
Second case is a cache miss.

To handle a cache miss: 
(1) Free space in cache
(2) After freeing, read in memory location and copy the block size
Caches are effective if they have high hit rates.

Unsorted word counts would entail temporal locality for common words, but
no spatial locality.

Caching reduces latency by bringing relevant data closer to the CPU.

Access locality is the ability of software to make good use of the cache

Temporal Locality = locality accessing the same elemnt over and over again

Spatial Locality - ways to store n x terms to be squared. 
Linked lists have poor locality whereas indexed-arrays have
good locality.

Arrays store elements consecutively

Row by row for array scanning is faster

Effect increases proportionally with the number of elements in array


Memory Hierarchy from small/fast storage closest to CPU at top.

CPU (registers) - L2 Cache - Memory - Disk

CPU registers -> L1/L2/L3 cache -> RAM -> SSDs -> Magnetic Tapes

Data Centers are the physical aspect of "the cloud"


HDFS - each file is broken into fixed-size chunks that are then copied.
* Low cost per byte of storage 
* Locality
* Redundancy - can recover from server failures
* simple abstraction - looks like standard file system


In [None]:
sc

In [None]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

In [None]:
val spark = SparkSession
  .builder
  .appName("portdata-causality")
  .getOrCreate()

In [None]:
import spark.implicits._

In [None]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [None]:
spark.conf.set("spark.sql.retainGroupColumns","false")

In [None]:
/* 
ip_flags, tcp_flags_ack, ip_dsfield, tcp_seq, tcp_flags_fin, tcp_flags_urg,tcp_flags_push
*/

In [None]:
%%python
probeDf = spark.read.csv("newPortData.csv",header=True)

In [None]:
%%python
df = probeDf.toPandas()

In [None]:
%%python
df.head()

In [None]:
%%python
# "ip_flags", "tcp_flags_ack",\
#                 "ip_dsfield",\
#                 "tcp_seq", "tcp_flags_fin",\
#                 "tcp_flags_urg",\
#                 "tcp_flags_push",\
#                 "tcp_options_mss_val",\
#                 "ip_ttl",\
#                 "tcp_window_size",\
#                 "tcp_checksum",\
#                 "tcp_srcport",\
#                 "tcp_dstport",\
#                 "label"

# "frame_info_time" is highly correlated with other variables
# selected after feature importance with fake attributes
probeDf = probeDf.select(*("frame_info_len", "tcp_ack",\
                "tcp_seq",\
                "ip_len", "tcp_flags",\
                "tcp_options_mss_val",\
                "ip_ttl",\
                "tcp_window_size",\
                "tcp_checksum",\
                "tcp_srcport",\
                "tcp_dstport",\
                "label"))


In [None]:
%%python
probeDf.printSchema()

In [None]:
%%python
from pyspark.sql.functions import col
for col_ in probeDf.columns[:len(probeDf.columns)-1]:
    if col_ == "tcp_options_mss_val":
        probeDf = probeDf.withColumn(col_, col(col_).cast("float"))
    else:
        probeDf = probeDf.withColumn(col_, col(col_).cast("integer"))
probeDf.printSchema()

In [None]:
%%python
from pyspark.ml.feature import StringIndexer
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(probeDf)

In [None]:
%%python
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["frame_info_len", "tcp_ack",\
                "tcp_seq",\
                "ip_len", "tcp_flags",\
                "tcp_options_mss_val",\
                "ip_ttl",\
                "tcp_window_size",\
                "tcp_checksum",\
                "tcp_srcport",\
                "tcp_dstport"],
    outputCol="features")

output = assembler.setHandleInvalid("skip").transform(probeDf)

In [None]:
%%python
probeDf = output.select(*("features","label"))
probeDf.show()

In [None]:
%%python
from pyspark.ml.feature import VectorIndexer
featureIndexer =\
    VectorIndexer(inputCol="features",\
    outputCol="indexedFeatures",\
    maxCategories=3).fit(probeDf)

In [None]:
%%python
DF = probeDf.toPandas()

In [None]:
%%python
labelTypes = ['normal', 'nmap_null', 'nmap_connect', 'zmap', 'nmap_window', 'masscan', 'hping_syn', 'unicorn_null', 'unicorn_syn', 'nmap_xmas', 'nmap_syn', 'unicorn_conn', 'unicorn_xmas', 'nmap_ack', 'hping_fin', 'nmap_maimon', 'hping_null', 'hping_xmas', 'hping_ack', 'nmap_fin', 'unicorn_fxmas']
labelMapping = {lab:i for i,lab in enumerate(labelTypes)}

In [None]:
%%python
import pandas as pd
df = pd.read_csv("newPortData.csv")

indices = dict()
for col in labelTypes:
    indices[col] = list(df[df['label'] == col]["Unnamed: 0"])

In [None]:
%%python
# doing data split

newTest = None
newTrain = None

for col in labelTypes:
    allCol = probeDf.filter(probeDf["label"]==col)
    newColFiltered = allCol.sample(False, 0.5, seed=101)
    if newTest is None:
        newTest = (allCol).subtract(newColFiltered)
    else:
        newTest = newTest.union((allCol).subtract(newColFiltered))
    
    if newTrain is None:
        newTrain = newColFiltered
    else:
        newTrain = newTrain.union(newColFiltered)



In [None]:
%%python
from pyspark.ml.classification import RandomForestClassifier
#(trainingData, testData) = probeDf.randomSplit([0.70, 0.30])

rfProbe = RandomForestClassifier(labelCol="indexedLabel", 
                                 featuresCol="indexedFeatures", 
                                 numTrees = 5,
                                 maxDepth = 3)


In [None]:
%%python
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rfProbe])

In [None]:
%%python
model = pipeline.fit(newTrain)

In [None]:
%%python
predictions = model.transform(newTest)

In [None]:
%%python
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

In [None]:
%%python
rfModel = model.stages[2]
print(rfModel.featureImportances)

In [None]:
%%python
# looking to other models for classification

from keras.layers import LSTM
from keras.layers import Dense
from keras.layers.embeddings import Embedding
from keras.models import Sequential
import pandas as pd
lstmDf = pd.read_csv("fullProbeData.csv")



In [None]:
%%python
lstmDf = lstmDf.drop(columns=["Unnamed: 0"])

In [None]:
%%python
lstmFeaturesDf = lstmDf[["frame_info_len", "tcp_ack",\
                "tcp_seq","ip_len", "tcp_flags",\
                "tcp_options_mss_val","ip_ttl",\
                "tcp_window_size","tcp_checksum",\
                "tcp_srcport","tcp_dstport"]]
lstmX = lstmFeaturesDf.values
lstmY = lstmDf["label"].values

In [None]:
%%python
from imblearn.over_sampling import SMOTE
oversample = SMOTE()
lstmX, lstmY = oversample.fit_resample(lstmX, lstmY)

In [None]:
%%python
from keras.utils import to_categorical
encoded = to_categorical(lstmY-1)
print(encoded)

In [None]:
%%python
from sklearn.model_selection import train_test_split
lstmXTrain, lstmXTest, lstmYTrain, lstmYTest = \
    train_test_split(lstmX, encoded, test_size = 0.33, random_state=101)

In [None]:
%%python
lstmXTrain = lstmXTrain.reshape((lstmXTrain.shape[0], lstmXTrain.shape[1], 1))
lstmXTest = lstmXTest.reshape((lstmXTest.shape[0], lstmXTest.shape[1], 1))

In [None]:
%%python
from tensorflow.keras import regularizers
from keras.layers import Dropout, Input, Flatten,BatchNormalization
from keras.models import Model
def buildLSTMModel(input_shape):
    lstmProbeModel = Sequential()
    lstmProbeModel.add(LSTM(units=100,
                           input_shape=input_shape))
    lstmProbeModel.add(Dense(50, activation='relu'))
    lstmProbeModel.add(Dropout(0.1))
    lstmProbeModel.add(Dense(50, activation='tanh'))
    lstmProbeModel.add(BatchNormalization())
    lstmProbeModel.add(Dense(21, activation='softmax'))
    lstmProbeModel.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    print(lstmProbeModel.summary())
    lstmProbeModel.fit(lstmXTrain, lstmYTrain, validation_data=(lstmXTest, lstmYTest), epochs=5, batch_size=5000)
    return lstmProbeModel
lstmProbeModel = buildLSTMModel((lstmXTrain.shape[1],1))

In [None]:
%%python
from tensorflow.keras.models import save_model
lstmProbeModel.save("lstmProbeModel.h5")

test_loss, test_acc = lstmProbeModel.evaluate(lstmXTest, lstmYTest)

print("Test accuracy", test_acc)
print("Test loss", test_loss)

In [None]:
%%python
lstmXTrain2, lstmXTest2, lstmYTrain2, lstmYTest2 = \
    train_test_split(lstmX, encoded, test_size = 0.33, random_state=402)

In [None]:
%%python
from tensorflow.keras.models import load_model
lstmProbeModel2 = load_model("./lstmProbeModel.h5")

In [None]:
%%python
lstmProbeModel2.fit(lstmXTrain2, lstmYTrain2, validation_data=(lstmXTest2, lstmYTest2), 
                    epochs=6, 
                    batch_size=1000)