In [1]:
import os
import sys
import time
import pandas as pd
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

from pyspark.sql import SparkSession
try:
    sc.stop()
except:
    print()

spark = SparkSession.builder\
        .master("local[2]")\
        .appName("Abdenour")\
        .getOrCreate()
data = spark.read \
    .format('csv') \
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load('kddcup15mo.csv')
df=data.withColumnRenamed("label","lab")
numeric_cols = ["duration","src_bytes","dst_bytes","land","wrong_fragment","urgent","hot","num_failed_logins",
                "logged_in","lnum_compromised","lroot_shell","lsu_attempted","lnum_root","lnum_file_creations",
                "lnum_shells","lnum_access_files","lnum_outbound_cmds","is_host_login","is_guest_login",
                "count","srv_count","serror_rate", "srv_serror_rate","same_srv_rate", "diff_srv_rate",
                "srv_diff_host_rate","dst_host_count","dst_host_srv_count","dst_host_same_srv_rate",
                "dst_host_diff_srv_rate","dst_host_same_src_port_rate","dst_host_srv_diff_host_rate","dst_host_serror_rate",
                "dst_host_srv_serror_rate","dst_host_rerror_rate","dst_host_srv_rerror_rate"
                ]
categorical_cols=["protocol_type","service","flag","lab"]
from pyspark.ml.feature import StringIndexer

label_indexer = StringIndexer().setInputCol("lab").setOutputCol("label")
protocol_type_indexer = StringIndexer().setInputCol("protocol_type").setOutputCol("protocol_type_indexed")
service_indexer = StringIndexer().setInputCol("service").setOutputCol("service_indexed")
flag_indexer = StringIndexer().setInputCol("flag").setOutputCol("flag_indexed")
from pyspark.ml.feature import VectorAssembler

assembleur = VectorAssembler()\
    .setInputCols(["protocol_type_indexed","service_indexed", "flag_indexed"] + numeric_cols)\
    .setOutputCol("features")

from pyspark.ml import Pipeline

#définition du pipeline
unPipeline = Pipeline()\
                        .setStages([label_indexer, protocol_type_indexer, service_indexer, flag_indexer, assembleur])

#la fonction fit pour rencenser les valeurs possibles des index pour chaque variable à partir du dataframe 
fitPipeline = unPipeline.fit(df)

#appliquer les résultats de la fonction fit pour transformer le DataFrame en ajoutant les nouvelles colonnes.
#on obtient un nouveau dataframe
dfPreparee = fitPipeline.transform(df)
from pyspark.ml.classification import DecisionTreeClassifier

#sélection du modèle à entrainer: il s'agit d'un arbre de décision.
classeur = DecisionTreeClassifier(impurity="entropy")\
            .setLabelCol("label")\
            .setFeaturesCol("features")

#échantillonnage du jeu d'entainement : 70% pour le training set et 30 pour le test
(train, test) = dfPreparee.randomSplit([0.7, 0.3])
from pyspark.ml.tuning import ParamGridBuilder
params=ParamGridBuilder()\
    .addGrid(classeur.maxDepth, [5,10,15, 20])\
    .addGrid(classeur.maxBins, [100, 200])\
    .addGrid(classeur.minInstancesPerNode, [100, 250,500])\
    .build()
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluateur=MulticlassClassificationEvaluator()\
    .setMetricName("accuracy")\
    .setPredictionCol("prediction")\
    .setLabelCol("label")
from pyspark.ml.tuning import CrossValidator
crossVal=CrossValidator()\
    .setEstimatorParamMaps(params)\
    .setNumFolds(10)\
    .setEstimator(classeur)\
    .setEvaluator(evaluateur)

#lancement de la validation croisée pour obtenir le meilleur modèle
from time import time
# Construire le meilleur modele
t0 = time()
cvModel=crossVal.fit(train)
tt = time() - t0
print ("Classifier trained in {} seconds".format(round(tt,3)))
#mesurant sa perfomance sur l'ensemble des données de test
predictions=cvModel.transform(test)
accuracy = evaluateur.evaluate(predictions)
print("Test Accuracy = %g " % (accuracy))


Classifier trained in 262.947 seconds
Test Accuracy = 0.99436 


In [2]:
import os
import sys
import time
import pandas as pd
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

from pyspark.sql import SparkSession
try:
    sc.stop()
except:
    print()

spark = SparkSession.builder\
        .master("local[4]")\
        .appName("Abdenour")\
        .getOrCreate()
data = spark.read \
    .format('csv') \
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load('kddcup99.csv')
df=data.withColumnRenamed("label","lab")
numeric_cols = ["duration","src_bytes","dst_bytes","land","wrong_fragment","urgent","hot","num_failed_logins",
                "logged_in","lnum_compromised","lroot_shell","lsu_attempted","lnum_root","lnum_file_creations",
                "lnum_shells","lnum_access_files","lnum_outbound_cmds","is_host_login","is_guest_login",
                "count","srv_count","serror_rate", "srv_serror_rate","same_srv_rate", "diff_srv_rate",
                "srv_diff_host_rate","dst_host_count","dst_host_srv_count","dst_host_same_srv_rate",
                "dst_host_diff_srv_rate","dst_host_same_src_port_rate","dst_host_srv_diff_host_rate","dst_host_serror_rate",
                "dst_host_srv_serror_rate","dst_host_rerror_rate","dst_host_srv_rerror_rate"
                ]
categorical_cols=["protocol_type","service","flag","lab"]
from pyspark.ml.feature import StringIndexer

label_indexer = StringIndexer().setInputCol("lab").setOutputCol("label")
protocol_type_indexer = StringIndexer().setInputCol("protocol_type").setOutputCol("protocol_type_indexed")
service_indexer = StringIndexer().setInputCol("service").setOutputCol("service_indexed")
flag_indexer = StringIndexer().setInputCol("flag").setOutputCol("flag_indexed")
from pyspark.ml.feature import VectorAssembler

assembleur = VectorAssembler()\
    .setInputCols(["protocol_type_indexed","service_indexed", "flag_indexed"] + numeric_cols)\
    .setOutputCol("features")

from pyspark.ml import Pipeline

#définition du pipeline
unPipeline = Pipeline()\
                        .setStages([label_indexer, protocol_type_indexer, service_indexer, flag_indexer, assembleur])

#la fonction fit pour rencenser les valeurs possibles des index pour chaque variable à partir du dataframe 
fitPipeline = unPipeline.fit(df)

#appliquer les résultats de la fonction fit pour transformer le DataFrame en ajoutant les nouvelles colonnes.
#on obtient un nouveau dataframe
dfPreparee = fitPipeline.transform(df)
from pyspark.ml.classification import DecisionTreeClassifier

#sélection du modèle à entrainer: il s'agit d'un arbre de décision.
classeur = DecisionTreeClassifier(impurity="entropy")\
            .setLabelCol("label")\
            .setFeaturesCol("features")

#échantillonnage du jeu d'entainement : 70% pour le training set et 30 pour le test
(train, test) = dfPreparee.randomSplit([0.7, 0.3])
from pyspark.ml.tuning import ParamGridBuilder
params=ParamGridBuilder()\
    .addGrid(classeur.maxDepth, [5,10,15, 20])\
    .addGrid(classeur.maxBins, [100, 200])\
    .addGrid(classeur.minInstancesPerNode, [100, 250,500])\
    .build()
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluateur=MulticlassClassificationEvaluator()\
    .setMetricName("accuracy")\
    .setPredictionCol("prediction")\
    .setLabelCol("label")
from pyspark.ml.tuning import CrossValidator
crossVal=CrossValidator()\
    .setEstimatorParamMaps(params)\
    .setNumFolds(10)\
    .setEstimator(classeur)\
    .setEvaluator(evaluateur)

#lancement de la validation croisée pour obtenir le meilleur modèle
from time import time
# Construire le meilleur modele
t0 = time()
cvModel=crossVal.fit(train)
tt = time() - t0
print ("Classifier trained in {} seconds".format(round(tt,3)))
#mesurant sa perfomance sur l'ensemble des données de test
predictions=cvModel.transform(test)
accuracy = evaluateur.evaluate(predictions)
print("Test Accuracy = %g " % (accuracy))


Classifier trained in 597.366 seconds
Test Accuracy = 0.997887 


In [3]:
import os
import sys
import time
import pandas as pd
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

from pyspark.sql import SparkSession
try:
    sc.stop()
except:
    print()

spark = SparkSession.builder\
        .master("local[6]")\
        .appName("Abdenour")\
        .getOrCreate()
data = spark.read \
    .format('csv') \
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load('kddcup99.csv')
df=data.withColumnRenamed("label","lab")
numeric_cols = ["duration","src_bytes","dst_bytes","land","wrong_fragment","urgent","hot","num_failed_logins",
                "logged_in","lnum_compromised","lroot_shell","lsu_attempted","lnum_root","lnum_file_creations",
                "lnum_shells","lnum_access_files","lnum_outbound_cmds","is_host_login","is_guest_login",
                "count","srv_count","serror_rate", "srv_serror_rate","same_srv_rate", "diff_srv_rate",
                "srv_diff_host_rate","dst_host_count","dst_host_srv_count","dst_host_same_srv_rate",
                "dst_host_diff_srv_rate","dst_host_same_src_port_rate","dst_host_srv_diff_host_rate","dst_host_serror_rate",
                "dst_host_srv_serror_rate","dst_host_rerror_rate","dst_host_srv_rerror_rate"
                ]
categorical_cols=["protocol_type","service","flag","lab"]
from pyspark.ml.feature import StringIndexer

label_indexer = StringIndexer().setInputCol("lab").setOutputCol("label")
protocol_type_indexer = StringIndexer().setInputCol("protocol_type").setOutputCol("protocol_type_indexed")
service_indexer = StringIndexer().setInputCol("service").setOutputCol("service_indexed")
flag_indexer = StringIndexer().setInputCol("flag").setOutputCol("flag_indexed")
from pyspark.ml.feature import VectorAssembler

assembleur = VectorAssembler()\
    .setInputCols(["protocol_type_indexed","service_indexed", "flag_indexed"] + numeric_cols)\
    .setOutputCol("features")

from pyspark.ml import Pipeline

#définition du pipeline
unPipeline = Pipeline()\
                        .setStages([label_indexer, protocol_type_indexer, service_indexer, flag_indexer, assembleur])

#la fonction fit pour rencenser les valeurs possibles des index pour chaque variable à partir du dataframe 
fitPipeline = unPipeline.fit(df)

#appliquer les résultats de la fonction fit pour transformer le DataFrame en ajoutant les nouvelles colonnes.
#on obtient un nouveau dataframe
dfPreparee = fitPipeline.transform(df)
from pyspark.ml.classification import DecisionTreeClassifier

#sélection du modèle à entrainer: il s'agit d'un arbre de décision.
classeur = DecisionTreeClassifier(impurity="entropy")\
            .setLabelCol("label")\
            .setFeaturesCol("features")

#échantillonnage du jeu d'entainement : 70% pour le training set et 30 pour le test
(train, test) = dfPreparee.randomSplit([0.7, 0.3])
from pyspark.ml.tuning import ParamGridBuilder
params=ParamGridBuilder()\
    .addGrid(classeur.maxDepth, [5,10,15, 20])\
    .addGrid(classeur.maxBins, [100, 200])\
    .addGrid(classeur.minInstancesPerNode, [100, 250,500])\
    .build()
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluateur=MulticlassClassificationEvaluator()\
    .setMetricName("accuracy")\
    .setPredictionCol("prediction")\
    .setLabelCol("label")
from pyspark.ml.tuning import CrossValidator
crossVal=CrossValidator()\
    .setEstimatorParamMaps(params)\
    .setNumFolds(10)\
    .setEstimator(classeur)\
    .setEvaluator(evaluateur)

#lancement de la validation croisée pour obtenir le meilleur modèle
from time import time
# Construire le meilleur modele
t0 = time()
cvModel=crossVal.fit(train)
tt = time() - t0
print ("Classifier trained in {} seconds".format(round(tt,3)))
#mesurant sa perfomance sur l'ensemble des données de test
predictions=cvModel.transform(test)
accuracy = evaluateur.evaluate(predictions)
print("Test Accuracy = %g " % (accuracy))


Classifier trained in 600.182 seconds
Test Accuracy = 0.997414 


In [None]:
import os
import sys
import time
import pandas as pd
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

from pyspark.sql import SparkSession
try:
    sc.stop()
except:
    print()

spark = SparkSession.builder\
        .master("local[8]")\
        .appName("Abdenour")\
        .getOrCreate()
data = spark.read \
    .format('csv') \
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load('kddcup99.csv')
df=data.withColumnRenamed("label","lab")
numeric_cols = ["duration","src_bytes","dst_bytes","land","wrong_fragment","urgent","hot","num_failed_logins",
                "logged_in","lnum_compromised","lroot_shell","lsu_attempted","lnum_root","lnum_file_creations",
                "lnum_shells","lnum_access_files","lnum_outbound_cmds","is_host_login","is_guest_login",
                "count","srv_count","serror_rate", "srv_serror_rate","same_srv_rate", "diff_srv_rate",
                "srv_diff_host_rate","dst_host_count","dst_host_srv_count","dst_host_same_srv_rate",
                "dst_host_diff_srv_rate","dst_host_same_src_port_rate","dst_host_srv_diff_host_rate","dst_host_serror_rate",
                "dst_host_srv_serror_rate","dst_host_rerror_rate","dst_host_srv_rerror_rate"
                ]
categorical_cols=["protocol_type","service","flag","lab"]
from pyspark.ml.feature import StringIndexer

label_indexer = StringIndexer().setInputCol("lab").setOutputCol("label")
protocol_type_indexer = StringIndexer().setInputCol("protocol_type").setOutputCol("protocol_type_indexed")
service_indexer = StringIndexer().setInputCol("service").setOutputCol("service_indexed")
flag_indexer = StringIndexer().setInputCol("flag").setOutputCol("flag_indexed")
from pyspark.ml.feature import VectorAssembler

assembleur = VectorAssembler()\
    .setInputCols(["protocol_type_indexed","service_indexed", "flag_indexed"] + numeric_cols)\
    .setOutputCol("features")

from pyspark.ml import Pipeline

#définition du pipeline
unPipeline = Pipeline()\
                        .setStages([label_indexer, protocol_type_indexer, service_indexer, flag_indexer, assembleur])

#la fonction fit pour rencenser les valeurs possibles des index pour chaque variable à partir du dataframe 
fitPipeline = unPipeline.fit(df)

#appliquer les résultats de la fonction fit pour transformer le DataFrame en ajoutant les nouvelles colonnes.
#on obtient un nouveau dataframe
dfPreparee = fitPipeline.transform(df)
from pyspark.ml.classification import DecisionTreeClassifier

#sélection du modèle à entrainer: il s'agit d'un arbre de décision.
classeur = DecisionTreeClassifier(impurity="entropy")\
            .setLabelCol("label")\
            .setFeaturesCol("features")

#échantillonnage du jeu d'entainement : 70% pour le training set et 30 pour le test
(train, test) = dfPreparee.randomSplit([0.7, 0.3])
from pyspark.ml.tuning import ParamGridBuilder
params=ParamGridBuilder()\
    .addGrid(classeur.maxDepth, [5,10,15, 20])\
    .addGrid(classeur.maxBins, [100, 200])\
    .addGrid(classeur.minInstancesPerNode, [100, 250,500])\
    .build()
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluateur=MulticlassClassificationEvaluator()\
    .setMetricName("accuracy")\
    .setPredictionCol("prediction")\
    .setLabelCol("label")
from pyspark.ml.tuning import CrossValidator
crossVal=CrossValidator()\
    .setEstimatorParamMaps(params)\
    .setNumFolds(10)\
    .setEstimator(classeur)\
    .setEvaluator(evaluateur)

#lancement de la validation croisée pour obtenir le meilleur modèle
from time import time
# Construire le meilleur modele
t0 = time()
cvModel=crossVal.fit(train)
tt = time() - t0
print ("Classifier trained in {} seconds".format(round(tt,3)))
#mesurant sa perfomance sur l'ensemble des données de test
predictions=cvModel.transform(test)
accuracy = evaluateur.evaluate(predictions)
print("Test Accuracy = %g " % (accuracy))


