# Spark Group Assignment
### Challenge 1 | Network Intrusion Detection

Group O-2-8

### INDEX
1. **Spark Setup and Data Loading**  
    1.1 Loading Train Data
    1.2 Loading Test Data
2. **Data Inspection**  
    2.1 Exploring numercial variables  
    2.2 Exploring categorical variables  
3. **Data Preprocessing**  
    3.1 Categorical variable ransformations  
    3.2 Creating ML Pipeline  
    3.3 Splitting dataset into train and test   
4. **Model - Logistic Regression**  
    4.1 Model   
    4.2 Cross validation  
    4.3 Evaluate the model(s)  
5. **Predicting df_test**  
    5.1 Prediction  
    5.2 Evaluation

## 1. Spark Setup

In [1]:
import os
print(os.environ['SPARK_HOME'])
dataset_path="/home/ubuntu/challenge_1/"

/usr/local/software/spark


In [2]:
import pandas as pd

In [3]:
#import findspark
#findspark.init()
import pyspark

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local") \
    .appName("Dataset") \
    .getOrCreate()

In [5]:
spark.version

'2.2.0'

### 1.1 Data Loading

Data inspection shows that the data does not have a header. Therefore we are going to use a simple for loop to assign the correct labelling to the columns. Furthermore, we are assignung the variable "connection" to the different types of network intrusion attacks. The connection types fall into the following categories:

* DOS: denial-of-service, e.g. syn flood;
* R2L: unauthorized access from a remote machine, e.g. guessing password;
* U2R:  unauthorized access to local superuser (root) privileges, e.g., various buffer overflow attacks;
* probing: surveillance and other probing, e.g., port scanning.
* normal: no attack was identified

#### Train

In [6]:
df = spark.read \
    .option("inferSchema", "true") \
    .csv("file://"+dataset_path+"full.data")

In [7]:
features=["duration", "protocol_type", "service", "flag", "src_bytes","dst_bytes", \
          "land","wrong_fragment","urgent","hot","num_failed_logins","logged_in", \
          "num_compromised","root_shell","su_attempted","num_root","num_file_creations", \
          "num_shells","num_access_files","num_outbound_cmds","is_host_login","is_guest_login", \
          "count","srv_count","serror_rate","srv_serror_rate","rerror_rate","srv_rerror_rate",\
          "same_srv_rate","diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count", \
          "dst_host_same_srv_rate","dst_host_diff_srv_rate","dst_host_same_src_port_rate", \
          "dst_host_srv_diff_host_rate","dst_host_serror_rate","dst_host_srv_serror_rate","dst_host_rerror_rate",\
          "dst_host_srv_rerror_rate"]

target=["connection"]

fieldnames=features+target

rawnames=df.schema.names

# Create a small function
def updateColNames(df,oldnames,newnames):
    for i in range(len(newnames)):
        df=df.withColumnRenamed(oldnames[i], newnames[i])
    return df

df=updateColNames(df,rawnames,fieldnames)

# df.printSchema()

#### Creating new attack variable 'label'

Regarding the scope of this assignment, there is no need to classify attack types into the correct group (i.e probing or DOS). We simply have to identify whether or not an attack is taking place. Thus, we are creating a new boolean column 'lable':

* Assign the value '0' for no attack (=normal)
* Assign the value '1' for attack

In [8]:
# Adding a Boolean column for attack (=1) or normal (=0)
from pyspark.sql.functions import when

df = df.withColumn('label', when(df["connection"] == 'normal.', 0).otherwise(1))

df.groupBy('label').count().show()

+-----+-------+
|label|  count|
+-----+-------+
|    1|3925650|
|    0| 972781|
+-----+-------+



#### 1.2 Loading Test Data

We have to repeat the same process for the test data:

* Assign column names
* Create new column 'label'

In [9]:
df_test = spark.read \
    .option("inferSchema", "true") \
    .csv("file://"+dataset_path+"corrected")

In [10]:
features_test=["duration", "protocol_type", "service", "flag", "src_bytes","dst_bytes", \
          "land","wrong_fragment","urgent","hot","num_failed_logins","logged_in", \
          "num_compromised","root_shell","su_attempted","num_root","num_file_creations", \
          "num_shells","num_access_files","num_outbound_cmds","is_host_login","is_guest_login", \
          "count","srv_count","serror_rate","srv_serror_rate","rerror_rate","srv_rerror_rate",\
          "same_srv_rate","diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count", \
          "dst_host_same_srv_rate","dst_host_diff_srv_rate","dst_host_same_src_port_rate", \
          "dst_host_srv_diff_host_rate","dst_host_serror_rate","dst_host_srv_serror_rate","dst_host_rerror_rate",\
          "dst_host_srv_rerror_rate"]

target_test=["connection"]

fieldnames_test=features_test+target_test

rawnames_test=df_test.schema.names

# Create a small function
def updateColNames_test(df_test,oldnames,newnames):
    for i in range(len(newnames)):
        df_test=df_test.withColumnRenamed(oldnames[i], newnames[i])
    return df_test

df_test=updateColNames(df_test,rawnames,fieldnames)

# df_test.printSchema()

In [11]:
# Adding a Boolean column for attack (=1) or normal (=0)
from pyspark.sql.functions import when

df_test = df_test.withColumn('label', when(df_test["connection"] == 'normal.', 0).otherwise(1))

df_test.groupBy('label').count().show()

+-----+------+
|label| count|
+-----+------+
|    1|250436|
|    0| 60593|
+-----+------+



## 2. Data Inspection


* How many records do we have?
* What is the schema of our data?
* Is it numerical , is it categorical?
* Visualize your data

In [12]:
# Print the number of records in the data frame
print('Nb. of records Train : %d' % df.count())
print('Nb. of records Test : %d' % df_test.count())

Nb. of records Train : 4898431
Nb. of records Test : 311029


In [None]:
df.select('duration','src_bytes','dst_bytes','wrong_fragment','num_failed_logins').describe().show()

### 2.1 Exploring numercial variables

In total, there are 28 numercial variables in our dataset:

* 22 continous
* 6 boolean

#### Correlation between label and numeric columns

We are using the correlation function from Lab 5 to compute the correlation between all numerical variables and the traget variable 'labe'. Find a list of the eight most correlating features below:

* 0.76 |	**count**
* 0.65 |	**dst_host_count**
* 0.57 |	**srv_count**
* 0.48 |	**dst_host_same_src_port_rate**
* 0.23 |	**dst_host_srv_serror_rate**
* 0.23 |	**serror_rate**
* 0.22 |	**srv_serror_rate**
* 0.22 |	**dst_host_serror_rate**


Those will be the only numercial features we are going to use for further analysis.

In [None]:
# drop categorical column
cor_data = df.drop('connection','flag', 'protocol_type', 'service')

In [None]:
lst = []
def computeCorrelation(df,targetColumnName):
    from pyspark.ml.stat import Correlation
    for col in df.columns:
        r=df.stat.corr(col,targetColumnName)
        lst.append(r)
        #print("Pearson correlation : %s %s %f \n" %(col,targetColumnName,r))

In [None]:
computeCorrelation(cor_data, 'label')

In [None]:
import pandas as pd
cor_data = pd.DataFrame({"correlation" : lst, "features" : cor_data.schema.names})

In [None]:
cor_data.sort_values('correlation', ascending=False)

#### Explore data with groupBy() operations
We are using agg() operations in order to compare means between attack and non-attack networks and receive a couple of insights:

* Duration: the mean duration of normal connection is longer
* Dst_bytes: the mean number of data bytes from destination to source is 6x greater
* Hot: the mean number of 'hot' indiactors is 15x smaller for attacks

In [None]:
# Some stats on numerical features
df.groupBy('label').agg({'duration': 'mean'}).orderBy("avg(duration)", ascending = False).show(30)

In [None]:
df.groupBy('label').agg({'dst_bytes': 'mean'}).orderBy("avg(dst_bytes)", ascending = False).show(30)

In [None]:
# Some stats on numerical features
df.groupBy('label').agg({'hot': 'mean'}).orderBy("avg(hot)", ascending = False).show(30)

### 2.2. Exploring the categorical variables

Again, we are using grouby() commands to explore the categorical variables and their count().

* protocol_type (3 distinct types)
* service       (70 distinct types)
* flag          (11 distinct types)
* connection    (21 distinct types)

in term of the number of categories and count()

In [None]:
# How many distict flags we have
df.groupby('protocol_type').count().show()

In [None]:
# How many distict services we have
df.groupby('service').count().show()

In [None]:
# How many distict flags we have
df.groupby('flag').count().show()

In [None]:
df.groupby('connection').count()\
    .orderBy('count', ascending =False)\
    .show(100)

## 3. Preprocess Data

The data inspetion shows that our dataset contains three categorical variables:

* protocol_type
* service
* flag

We are going to use StringIndexer, OneHotEncoder, Vector Assembler and a Pipeline to compute feature transformation.

* **StringIndexer**: converts a single column to an index column (similar to a factor column in R)
* **OneHotEncoder**: One-hot encoding maps a column of label indices to a column of binary vectors, with at most a single one-value. This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features.
* **VectorAssembler**: A transformer that combines a given list of columns into a single vector column.
* **Pipelines**: Facilitates the creation, tuning, and inspection of practical ML workflows. A Spark Pipeline is specified as a sequence of stages, and each stage is either a Transformer or an Estimator. These stages are run in order, and the input DataFrame is transformed as it passes through each stage. 

### 3.1. Categorical variable transformations

Unfortunatly, the important variable 'service' has different amounts of sub-categories for the train and test set. Because of that, we cannot run .dot products as we have different vector lenghts. We could simply exclud the service variabe (which would result in a less accurate model). Therefore, we have to decided to combine both sets, perform all transformations and split them again at a later stage.

In [13]:
df_mixed = df.union(df_test)

In [14]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

In [15]:
# protocol_type COLUMN
# Create a StringIndexer
protocol_type_indexer = StringIndexer(inputCol="protocol_type", outputCol="protocol_type_index")

# Create a OneHotEncoder
protocol_type_encoder = OneHotEncoder(inputCol="protocol_type_index", outputCol="protocol_type_fact")

In [16]:
# service COLUMN
service_indexer = StringIndexer(inputCol="service", outputCol="service_index", handleInvalid='skip')
service_encoder = OneHotEncoder(inputCol="service_index", outputCol="service_fact")

In [17]:
# flag COLUMN
flag_indexer = StringIndexer(inputCol="flag", outputCol="flag_index")
flag_encoder = OneHotEncoder(inputCol="flag_index", outputCol="flag_fact")

#### VectorAssembler 

This output will include both the numeric columns and the one-hot encoded binary vector columns in our dataset.

We are not going to use all of the numeric features from the dataset. The most important features have been identified while inspecting the data. 

In [18]:
# Select features
num_cols = ["count","dst_host_count","srv_count","dst_host_same_src_port_rate","dst_host_srv_serror_rate","serror_rate", "srv_serror_rate","dst_host_serror_rate"]
fact_cols = ["protocol_type_fact", "flag_fact","service_fact"]
assembler_input = num_cols + fact_cols

In [19]:
# Make a VectorAssembler
vec_assembler = VectorAssembler(inputCols= assembler_input,
                                outputCol="features")

### 3.2 Creating a ML Pipeline

MLlib uses Pipeline, which consists of a sequence of PipelineStages (Transformers and Estimators) to be run in a specific order. 

In [20]:
# Import Pipeline
from pyspark.ml import Pipeline

# Make the pipeline
pipeline = Pipeline(stages=[protocol_type_indexer, protocol_type_encoder, \
                                flag_indexer, flag_encoder,\
                                service_indexer, service_encoder, \
                                vec_assembler])

In [21]:
transformer = pipeline.fit(df_mixed)
transformed_df_mixed = transformer.transform(df_mixed)

# Focus on the relevant columns and define dataset
selection = ["label", "features"]
dataset_mixed = transformed_df_mixed.select(selection)

In [22]:
from pyspark.sql.functions import monotonically_increasing_id
# This will return a new DF with all the columns + id
transformed_df_mixed = transformed_df_mixed.withColumn("id", monotonically_increasing_id())

In [23]:
transformed_df_mixed.createOrReplaceTempView("Mixed")

In [24]:
train_new = spark.sql("select * from Mixed Limit 4898431")
#train_new.limit(10).toPandas()

In [25]:
test_new = spark.sql("select * from Mixed order by id desc Limit 311029")
#test_new.limit(10).toPandas()

In [26]:
(train_data, test_data) = train_new.randomSplit([0.7, 0.3], seed = 123)
print('Training records : %d' % train_data.count())
print('Test records : %d ' % test_data.count())
train_data.cache()

Training records : 3426985
Test records : 1471446 


DataFrame[duration: int, protocol_type: string, service: string, flag: string, src_bytes: int, dst_bytes: int, land: int, wrong_fragment: int, urgent: int, hot: int, num_failed_logins: int, logged_in: int, num_compromised: int, root_shell: int, su_attempted: int, num_root: int, num_file_creations: int, num_shells: int, num_access_files: int, num_outbound_cmds: int, is_host_login: int, is_guest_login: int, count: int, srv_count: int, serror_rate: double, srv_serror_rate: double, rerror_rate: double, srv_rerror_rate: double, same_srv_rate: double, diff_srv_rate: double, srv_diff_host_rate: double, dst_host_count: int, dst_host_srv_count: int, dst_host_same_srv_rate: double, dst_host_diff_srv_rate: double, dst_host_same_src_port_rate: double, dst_host_srv_diff_host_rate: double, dst_host_serror_rate: double, dst_host_srv_serror_rate: double, dst_host_rerror_rate: double, dst_host_srv_rerror_rate: double, connection: string, label: int, protocol_type_index: double, protocol_type_fact: vect

# 4. Logistic Regression Model 

### 4.1 Model

In [27]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
model = lr.fit(train_data)

### 4.2 Predicting on test data

In [28]:
# Make predictions on test data using the transform() method. Feature have been specified earlier.
predictions = model.transform(test_data)
predictions

DataFrame[duration: int, protocol_type: string, service: string, flag: string, src_bytes: int, dst_bytes: int, land: int, wrong_fragment: int, urgent: int, hot: int, num_failed_logins: int, logged_in: int, num_compromised: int, root_shell: int, su_attempted: int, num_root: int, num_file_creations: int, num_shells: int, num_access_files: int, num_outbound_cmds: int, is_host_login: int, is_guest_login: int, count: int, srv_count: int, serror_rate: double, srv_serror_rate: double, rerror_rate: double, srv_rerror_rate: double, same_srv_rate: double, diff_srv_rate: double, srv_diff_host_rate: double, dst_host_count: int, dst_host_srv_count: int, dst_host_same_srv_rate: double, dst_host_diff_srv_rate: double, dst_host_same_src_port_rate: double, dst_host_srv_diff_host_rate: double, dst_host_serror_rate: double, dst_host_srv_serror_rate: double, dst_host_rerror_rate: double, dst_host_srv_rerror_rate: double, connection: string, label: int, protocol_type_index: double, protocol_type_fact: vect

#### 4.3 Evaluation test

Binary classifiers are used to separate the elements of a given dataset into one of two possible groups (e.g. attack or no attack).

In [29]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
score = evaluator.evaluate(predictions)
print('Score is : %03f' % score )

Score is : 0.999537


### 4.2 Cross Validation

In [30]:
print(lr.explainParam("regParam"))

regParam: regularization parameter (>= 0). (default: 0.0)


In [31]:
print(lr.explainParam("elasticNetParam"))

elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)


#### Parameters Grid for Cross Validation
We will create a model for each combination of parameters in the grid specified and evaluate its result using:
 * 3 regularization param values (regParam)
 * 3 values for maximum nb of iterations
 * 3 values for elasticNetParam

Thus, the grid will have 3 x 3 x 3 = 27 parameter settings to choose from. 


**Regularization Parameter:**

* (Intuitively) is a penalty against complexity. 
* A bigger regParam penalizes "large" weight coefficients ,i.e, 
* Tries to avoid our model model picking up "noise," or "deducting a pattern where there is none."
* Tries to avoid OVERFITTING

In [34]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [1, 5, 10])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

#### Create 3-fold Cross Validaton
* numFolds determines the number of train/test dataset pairs used in the cross-validation
* The cross validation will compute the  average of the evaluation metrics produced by the n models
* by fitting the Estimator on the 3 different (training, test) dataset pairs.

In [35]:
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)

# Run cross validations
cvModel = cv.fit(train_data)
# this may take some of time (depends on the amount of models that we're creating and testing)

In [36]:
# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(test_data)
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
best_score=evaluator.evaluate(predictions)
# print('Best model score : %03f' % best_score)

## 5. Predicting test data

We are using the previously created pipleline on test_new dataset:

#### 5.1 Predicting using cross-validated model on test_new

In [37]:
# Use test set here so we can measure the accuracy of our model on new data
predictions_test = cvModel.transform(test_new)        # WE CAN EITHER USE MODEL OR CVMODEL
# cvModel uses the best model found from the Cross Validation

#### 5.2 Model evaluator

In [38]:
# Evaluate best model
best_score_test = evaluator.evaluate(predictions_test)
print('Best model score : %03f' % best_score_test)

Best model score : 0.975441


Score without using 3-fold cross validation:

Best model score: 0.980596

In [None]:
# spark.stop()