<center>
<a href="http://www.insa-toulouse.fr/" ><img src="http://www.math.univ-toulouse.fr/~besse/Wikistat/Images/logo-insa.jpg" style="float:left; max-width: 120px; display: inline" alt="INSA"/></a> 
<a href="http://wikistat.fr/" ><img src="http://www.math.univ-toulouse.fr/~besse/Wikistat/Images/wikistat.jpg" style="max-width: 250px; display: inline"  alt="Wikistat"/></a>
<a href="http://www.math.univ-toulouse.fr/" ><img src="http://www.math.univ-toulouse.fr/~besse/Wikistat/Images/logo_imt.jpg" style="float:right; max-width: 200px; display: inline" alt="IMT"/> </a>
</center>

# IA Framework.
## Lab 1  - Introduction to Pyspark.
#### Part 2 Basic Statistics and Logistic Regression with <a href="http://spark.apache.org/"><img src="http://spark.apache.org/images/spark-logo-trademark.png" style="max-width: 100px; display: inline" alt="Spark"/> </a> and  [MLlib](https://spark.apache.org/mllib/)

**Resume**: This notebook continue the introduction to [Spark](https://spark.apache.org/) trough  [`PySpark`](http://spark.apache.org/docs/latest/api/python/) API. We will see how to sample RDD, an introduction to the MlLib library, descriptive statistics and basic uni and multi dimensionals statisticsK.

## Context and Dataset

This notebook will continue to used  [KDD Cup 1999](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html) dataset

In [None]:
sc

In [None]:
DATA_PATH="" 
data_file = DATA_PATH+"kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

## RDD's sampling
They are two  `function` available for sampling a RDD in spark.* `sample` and `takeSample`. 

In [None]:
raw_data_sample = raw_data.takeSample(False, 4000, seed=1234)

In [None]:
raw_data_sample = raw_data.sample(False, 0.1, seed=1234)

**Question** What is the difference between these two function? Are the sample identic?

The use of the `sample` will allow to quickly estimate some value on a subsample of a huge dataset.

In the celss below, we estimate the rate if normal interaction on the sample and on the all dataset.

In [None]:
import time
raw_data_sample_items = raw_data_sample.map(lambda x: x.split(","))
sample_size = raw_data_sample.count()
sample_normal_tags = raw_data_sample_items.filter(lambda x: "normal." in x)
t0 = time.time()
sample_normal_tags_count = sample_normal_tags.count()
tt = time.time() - t0
sample_normal_ratio = sample_normal_tags_count / float(sample_size)
print("Normal interaction rate is {}".format(round(sample_normal_ratio,3)))
print("Running time: {} secondes".format(round(tt,3)))

Même chose sans échantillonnage.

In [None]:
raw_data_items = raw_data.map(lambda x: x.split(","))
total_size = raw_data.count()
normal_tags = raw_data_items.filter(lambda x: "normal." in x)
t0 = time.time()
normal_tags_count = normal_tags.count()
tt = time.time() - t0
normal_ratio = normal_tags_count / float(total_size)
print("Normal interaction rate is {}".format(round(sample_normal_ratio,3)))
print("Running time: {} secondes".format(round(tt,3)))

Seule la phase d'échantillonnage est distribuée / parallélisée, cette procédure prend plus de temps sur un cluster.

###  Data munging
As describe in the python [tutoriel](https://github.com/wikistat/Intro-Python) dedicated to data munging with `pandas`, data preprocessing is a essential part before analysis and modelling the data. Extraction, filtering, sampling, data completion, correction, anomaly detection, normalization, features selection, matching, ect.. 

Most of this step are unidimensional and can be easily distributed.


### [MLlib](http://spark.apache.org/docs/latest/ml-guide.html)


*MLlib* is a RDD-based library.  
Another library, *SparkML* is developed by Spark and is based on DataFrame (see part 3).

*SparkML* will soon completly replace *MlLib* library, but so far, some functionnalities (especially for basic statistics) are available only on MLlib.
This two library will coexist untill spark 3 is released.

The only up-to-date documentation is the official online [documentation](https://spark.apache.org/docs/latest/mllib-guide.html).




We first build a RDD that contains only numerical values

In [None]:
import numpy as np
def parse_interaction(line):
    line_split = line.split(",")
    # keep just numeric and logical values
    symbolic_indexes = [1,2,3,41]
    clean_line_split = [item for i,item in enumerate(line_split) if i not in symbolic_indexes]
    return np.array([float(x) for x in clean_line_split])

vector_data = raw_data.map(parse_interaction)
vector_data.take(2)

## Basic Statistics
### Summary statistics

[`colStats`](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.stat.Statistics.colStats)' MLlib function enable to compute unidimensionals statistics for each columns of the `RDD[Vector]`. It returns a[`MultivariateStatisticalSummary`](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.stat.MultivariateStatisticalSummary) object.

In [None]:
from pyspark.mllib.stat import Statistics
from math import sqrt 

# Compute column summary statistics.
summary = Statistics.colStats(vector_data)

print("Statistique des durées")
print(" Moyenne: {}".format(round(summary.mean()[0],3)))
print(" Ecart type: {}".format(round(sqrt(summary.variance()[0]),3)))
print(" Valeur max: {}".format(round(summary.max()[0],3)))
print(" Valeur min: {}".format(round(summary.min()[0],3)))
print(" Nombre de valeurs: {}".format(summary.count()))
print(" Nombre de valeurs non nulles: {}".format(summary.numNonzeros()[0]))

### By label

In [None]:
def parse_interaction_with_key(line):
    line_split = line.split(",")
    # keep just numeric and logical values
    symbolic_indexes = [1,2,3,41]
    clean_line_split = [item for i,item in enumerate(line_split) if i not in symbolic_indexes]
    return (line_split[41], np.array([float(x) for x in clean_line_split]))

def summary_by_label(raw_data, label):
    label_vector_data = raw_data.map(parse_interaction_with_key).filter(lambda x: x[0]==label)
    return Statistics.colStats(label_vector_data.values())


In [None]:
normal_sum = summary_by_label(raw_data, "normal.")

print("Duration Statistics for label: {}".format("normal"))
print(" Mean: {}".format(normal_sum.mean()[0],3))
print(" St. deviation: {}".format(round(sqrt(normal_sum.variance()[0]),3)))
print(" Max value: {}".format(round(normal_sum.max()[0],3)))
print(" Min value: {}".format(round(normal_sum.min()[0],3)))
print(" Total value count: {}".format(normal_sum.count()))
print(" Number of non-zero values: {}".format(normal_sum.numNonzeros()[0]))

### For all label

In [None]:
label_list = ["back.","buffer_overflow.","ftp_write.","guess_passwd.",
              "imap.","ipsweep.","land.","loadmodule.","multihop.",
              "neptune.","nmap.","normal.","perl.","phf.","pod.","portsweep.",
              "rootkit.","satan.","smurf.","spy.","teardrop.","warezclient.",
              "warezmaster."]

In [None]:
stats_by_label =[(label, summary_by_label(raw_data, label)) for label in label_list]

In [None]:
import pandas as pd
#Display results with `pandas`.
def get_variable_stats_df(stats_by_label, column_i):
    column_stats_by_label = [
        (stat[0], np.array([float(stat[1].mean()[column_i]), float(sqrt(stat[1].variance()[column_i])), float(stat[1].min()[column_i]), float(stat[1].max()[column_i]), int(stat[1].count())])) 
        for stat in stats_by_label
    ]
    return pd.DataFrame.from_items(column_stats_by_label, columns=["Mean", "Std Dev", "Min", "Max", "Count"], orient='index')

In [None]:
print("Duration statistics, by label")
get_variable_stats_df(stats_by_label,0)

In [None]:
print("src_bytes statistics, by label")
get_variable_stats_df(stats_by_label,1)

### Correlation
`corr` allow Spearman (rangs) and Pearson correlation computation

In [None]:
from pyspark.mllib.stat import Statistics 
correlation_matrix = Statistics.corr(vector_data, method="pearson")

In [None]:
import pandas as pd
pd.set_option('display.max_columns', 50)
col_names = ["duration","src_bytes","dst_bytes","land","wrong_fragment",
             "urgent","hot","num_failed_logins","logged_in","num_compromised",
             "root_shell","su_attempted","num_root","num_file_creations",
             "num_shells","num_access_files","num_outbound_cmds",
             "is_hot_login","is_guest_login","count","srv_count","serror_rate",
             "srv_serror_rate","rerror_rate","srv_rerror_rate","same_srv_rate",
             "diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count",
             "dst_host_same_srv_rate","dst_host_diff_srv_rate","dst_host_same_src_port_rate",
             "dst_host_srv_diff_host_rate","dst_host_serror_rate","dst_host_srv_serror_rate",
             "dst_host_rerror_rate","dst_host_srv_rerror_rate"]

corr_df = pd.DataFrame(correlation_matrix, index=col_names, columns=col_names)
corr_df

Most correlated features

In [None]:
# Une variable bouléenne est True en cas de forte corrélation
highly_correlated_df = (abs(corr_df) > .8) & (corr_df < 1.0)
# Extraction des noms des variables
correlated_vars_index = (highly_correlated_df==True).any()
correlated_var_names = correlated_vars_index[correlated_vars_index==True].index
highly_correlated_df.loc[correlated_var_names,correlated_var_names]

## Logistic Regression

**Warnings**: [J. A. Dianes](https://github.com/jadianes/spark-py-notebooks/blob/master/nb8-mllib-logit/nb8-mllib-logit.ipynb) use the previous function to select the features according to their correlation befire using them in logisticRegression.
This procedure is not recommended in Machine learning. It's better to used step-by-step method (*forward, backward, both*) and select the variable according to AIC or BIC criteria or using Lasso penalization. This is what is implemented in MLlib library.

### Reading the data.

We will now use the complete [KDD cup 1999](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html) dataset. If you have memory issue, restart the kernal and start from here. You can also sample the data to make it run on a smallest dataset.

train dataset

In [None]:
import urllib.request
ft = urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", DATA_PATH+"data.gz")
train_data_file = DATA_PATH+"data.gz"
train_raw_data = sc.textFile(train_data_file)

print("Train data size is {}".format(train_raw_data.count()))

test dataset

In [None]:
ft = urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz", DATA_PATH+"corrected.gz")
test_data_file = DATA_PATH+"corrected.gz"
test_raw_data = sc.textFile(test_data_file)

print("Test data size is {}".format(test_raw_data.count()))

### Labeled Point

In order to perform learning, you have to convert the data in the **Labeled Points** format.
This object take the input and the features as an input.
We define a function to convert the raw line to a LabeledPoint object.

In [None]:
from pyspark.mllib.regression import LabeledPoint
from numpy import array

def parse_interaction(line):
    line_split = line.split(",")
    clean_line_split = line_split[0:1]+line_split[4:41]
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

In [None]:
test_data = test_data.map(parse_interaction)

In [None]:
test_data = test_raw_data.map(parse_interaction)

###  Learning the model
Training the model is basically the same than with scikit learn.
Mllib propose two optimization [algorithms](https://spark.apache.org/docs/latest/mllib-linear-methods.html#logistic-regression)  (*mini-batch gradient descent* and L-BFGS) L-BFGS which is supposed to run faster.

As in  *Scikit-learn*, *l2* (ridge) and *l1* (lasso) are available.

See full doc [here](https://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html)

In [None]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
t0 = time()
logit_model = LogisticRegressionWithLBFGS.train(training_data)
tt = time() - t0
print("Apprentissage en {} seconds".format(round(tt,3)))

### Error Estimation
The `map` function enable to predict the attack for each entry

In [None]:
labels_and_preds = test_data.map(lambda p: (p.label, logit_model.predict(p.features)))

We can then compute the error

In [None]:
t0 = time()
test_accuracy = labels_and_preds.filter(lambda x: x[0] == x[1]).count() / float(test_data.count())
erreur=1-test_accuracy
tt = time() - t0
print("Calcul en {} secondes. Le taux d'erreur est {}".format(round(tt,3), round(erreur,4)))

**Exercices**

- Qualitative variable such as `protocol` or `service` has been removed for simplicity. Add them as indicatrices and re-train the model.(*dummy variables*)
- Try to [optimize](https://spark.apache.org/docs/latest/ml-tuning.html) the model with cross validation by trying various parameters.
- Use other algorithm such as  [RandomForest](https://spark.apache.org/docs/latest/mllib-ensembles.html#random-forests) on this problem.