In [None]:
# Importations
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import numpy as np
import matplotlib.pyplot as plt
from pyspark.ml.feature import Tokenizer, CountVectorizer
from pyspark.ml.classification import LogisticRegression

from pyspark.sql import SparkSession, functions


In [None]:
# Initialisation SparkSession

# Créer un objet SparkSession
spark = SparkSession.builder.appName("MyApp").getOrCreate()

## Chargement du dataset

In [None]:
# Chargement des donnees

# Charger les données à partir d'un fichier CSV
data = spark.read.csv("urldata.csv", header=True, inferSchema=True)

data = data.select(["URLs", "Class"])
# convert the Class column to a numeric type
data = data.withColumn("Class", functions.when(data["Class"] == "benign", 0)
                                         .otherwise(1))

### Ajout de critères au dataset

In [None]:
from pyspark.sql import functions as F

# Calculer la longueur du texte dans la colonne "Text"
data = data.withColumn("url_length", F.length("URLs"))
data = data.withColumn("count-dir", F.size(F.split(F.col("URLs"), r"/")) - 3)
data = data.withColumn("count-digits", F.size(F.split(F.col("URLs"), r"\d")) - 1)
data = data.withColumn("count-points", F.size(F.split(F.col("URLs"), r"\.")) - 1)
data = data.withColumn("count-", F.size(F.split(F.col("URLs"), r"\-")) - 1)
data = data.withColumn("count_", F.size(F.split(F.col("URLs"), r"\_")) - 1)
data = data.withColumn("count@", F.size(F.split(F.col("URLs"), r"\@")) - 1)
data = data.withColumn("count?", F.size(F.split(F.col("URLs"), r"\?")) - 1)
data = data.withColumn("count%", F.size(F.split(F.col("URLs"), r"\%")) - 1)
data = data.withColumn("count=", F.size(F.split(F.col("URLs"), r"\=")) - 1)
data = data.withColumn("count-http", F.size(F.split(F.col("URLs"), r"http")) - 1)
data = data.withColumn("count-https", F.size(F.split(F.col("URLs"), r"https")) - 1)
data = data.withColumn("count-www", F.size(F.split(F.col("URLs"), r"www")) - 1)

data = data.withColumn("target", F.lit(0))

data.show(truncate=False)

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import NaiveBayes 

# Diviser les données en deux ensembles aléatoires
train_data, test_data = data.randomSplit([0.8, 0.2])

# Afficher le nombre de partitions utilisées pour distribuer les données
num_partitions = train_data.rdd.getNumPartitions()
print(num_partitions)

input_cols = ['count-digits', 
                'count-dir', 
                'count-points', 
                'count-', 
                'count_', 
                'count@', 
                'count?', 
                'count%',
                'count=', 
                'count-http', 
                'count-https', 
                'count-www', 
                'url_length'
                ]

# Sélectionner les colonnes "length" et "question_marks" comme entrées pour le modèle
assembler = VectorAssembler(inputCols=input_cols, outputCol="features")

# Créer un objet NaiveBayes
mnb = NaiveBayes(labelCol="Class", featuresCol="features")

# Définir les paramètres du modèle de régression logistique
lr = LogisticRegression(labelCol="Class", featuresCol="features", maxIter=150, regParam=0.1)

# Créer un objet RandomForestClassifier
rfc = RandomForestClassifier(labelCol="Class", featuresCol="features")

# Combiner les étapes d'assemblage des vecteurs et de régression logistique en un seul pipeline
pipeline = Pipeline(stages=[assembler, mnb])

# Entraîner le modèle sur les données d'entrée
model = pipeline.fit(train_data)

# Utiliser le modèle entraîné pour prédire des étiquettes de sortie pour les données de test
predictions = model.transform(test_data)

# Définir les paramètres d'évaluation du modèle
evaluator = MulticlassClassificationEvaluator(labelCol="Class", predictionCol="prediction")

# Évaluer le modèle en utilisant les données d'entrée et les étiquettes de sortie prédites
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})


# Afficher l'accuracy du modèle
print(accuracy)
print(f1)

# Afficher la matrice de confusion
predictions.groupBy("Class", "prediction").count().show()

# Afficher les prédictions en utilisant un diagramme à barres
predictions.select("Class", "prediction").show()

### Avec l'utilisation d'un tokenizer

In [None]:
# Décomposer les URL en mots individuels
tokenizer = Tokenizer(inputCol="URLs", outputCol="words")
data = tokenizer.transform(data)

# Compter le nombre d'occurrences de chaque mot
counter = CountVectorizer(inputCol="words", outputCol="features")
data = counter.fit(data).transform(data)

# Diviser les données en deux ensembles aléatoires
train_data, test_data = data.randomSplit([0.8, 0.2])

# Sélectionner les colonnes d'entrée et de sortie
train_input = train_data.select("features", "Class")
train_output = train_data.select("Class")

# Entraîner un modèle de forêts aléatoires
#rf = RandomForestClassifier(labelCol="Class", featuresCol="features", maxDepth=3, numTrees=2)
lr = LogisticRegression(labelCol="Class", featuresCol="features")

model = lr.fit(train_input)

# Utiliser le modèle pour faire des prédictions sur les données de test
test_input = test_data.select("features", "Class")
predictions = model.transform(test_input)

# Calculer l'erreur de classification
evaluator = MulticlassClassificationEvaluator(labelCol="Class", predictionCol="prediction")

# Afficher les résultats de l'évaluation
print("Accuracy = %g" % evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"}))
print("F1-Score = %g" % evaluator.evaluate(predictions, {evaluator.metricName: "f1"}))

# Afficher les prédictions en utilisant un diagramme à barres
predictions.select("Class", "prediction").show()