# <h1 align='center'>  la classification automatique et la discrimination multi-classes. </h1>

### OBJECTIF :



L'objectif de la classification des newsgroups est de développer des modèles d'apprentissage automatique capables de prédire la classe thématique d'un document à partir de son contenu textuel. Cela peut être réalisé en utilisant diverses techniques d'apprentissage automatique, telles que les classificateurs basés sur les arbres de décision, les machines à vecteurs de support (SVM), les réseaux de neurones, etc.

### DATA SET DESCRIPTION :

Le jeu de données **"Twenty Newsgroups"** est un ensemble de données couramment utilisé dans le domaine de l'apprentissage automatique pour la classification de texte. L'objectif principal de ce jeu de données est de permettre la classification automatique des articles de presse en fonction de leur thème, ainsi que la discrimination entre plusieurs classes.

Les données du jeu de données "Twenty Newsgroups" sont disponibles à l'adresse suivante : http://archive.ics.uci.edu/ml/datasets/Twenty+Newsgroups. Vous pouvez y trouver les fichiers nécessaires pour télécharger les données et les informations supplémentaires.

Voici une description générale du jeu de données :

- Le jeu de données comprend un total de 20 000 documents issus de 20 newsgroups différents, d'où le nom du jeu de données.

- Les newsgroups couvrent une large gamme de sujets, notamment la politique, les sports, la religion, les ordinateurs, les sciences, etc.

- Chaque document est un article de presse en texte brut, sans mise en forme supplémentaire.

- Les documents sont déjà prétraités pour supprimer les en-têtes, les signatures et les citations, afin de se concentrer uniquement sur le contenu principal.

- Les documents sont répartis de manière équilibrée entre les 20 classes, avec 1 000 documents par classe.

En utilisant ce jeu de données, nous pouvons construire et entraîner des modèles de classification pour prédire la classe thématique d'un article de presse inconnu. Il est également possible d'explorer les différences et les similitudes entre les classes et de réaliser des analyses supplémentaires sur les données textuelles.

### IMPORTATION DES BIBLIOTHEQUES :

In [None]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=9c8a4a99c738df057ced27738a3ba16fe014fe789ec13bfb19e212219a746ed0
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [None]:
from pyspark import SparkContext

from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql import SQLContext

from pyspark.ml.linalg import Vector
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF,StopWordsRemover,IDF,Tokenizer

### Download dataset file :

In [None]:
import sys
import os

# URL of the file to be downloaded
url = "http://archive.ics.uci.edu/ml/machine-learning-databases/20newsgroups-mld/mini_newsgroups.tar.gz"

# Extract the filename from the URL
filename = os.path.basename(url)

# Use the `wget` command to download the file
os.system(f"wget {url}")


0

### Current working directory :

The **os.getcwd()** function is used to retrieve the current working directory, which is the directory in which the Python script is being executed.

In [None]:
import os


# Get the current working directory
current_directory = os.getcwd()

### Extraction of tar.gz :

The **subprocess.run()** function is used to execute the tar command with specific arguments to extract the contents of the tar.gz archive. 

The **-xzf** options are used to extract the archive, while **-C** is used to specify the target directory for extraction, which in this case is current_directory.

In [None]:
import shlex, subprocess
# Replace '20news-bydate.tar.gz' with your file name
filename = 'mini_newsgroups.tar.gz'



# Extract the tar.gz archive with specific directory
subprocess.run(['tar', '-xzf', filename, '-C',current_directory])

CompletedProcess(args=['tar', '-xzf', 'mini_newsgroups.tar.gz', '-C', '/content'], returncode=0)

### List directory contents :

The **os.listdir()** function is used to obtain a list of all files and directories present in the specified directory_path. In this case, the directory_path variable is set to the path of the extracted directory, which is current_directory + "/mini_newsgroups".

In [None]:
import os


# List the contents of the extracted directory
os.listdir(current_directory+"/mini_newsgroups")


['misc.forsale',
 'rec.motorcycles',
 'comp.sys.mac.hardware',
 'rec.autos',
 'comp.sys.ibm.pc.hardware',
 'comp.os.ms-windows.misc',
 'sci.med',
 'sci.crypt',
 'alt.atheism',
 'talk.politics.mideast',
 'talk.religion.misc',
 'soc.religion.christian',
 'talk.politics.guns',
 'sci.electronics',
 'sci.space',
 'talk.politics.misc',
 'comp.graphics',
 'rec.sport.hockey',
 'rec.sport.baseball',
 'comp.windows.x']

The output you provided shows an example list of contents within the extracted directory, including various subdirectories such as **'misc.forsale', 'alt.atheism', 'comp.sys.mac.hardware'**, etc.

### Create the path :

In [None]:
# Create the path to the files within the mini_newsgroups directory

path=current_directory+"/mini_newsgroups/*"

By using this path variable, we can perform operations or access files and directories within the **"mini_newsgroups"** directory in the current working directory.






### SparkSession, SparkContext, and SQLContext in PySpark :

the creation of a **SparkSession**, **SparkContext**, and **SQLContext** in PySpark

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext

# Create a SparkSession
spark = SparkSession.builder.appName("myApp").getOrCreate()

# Create a SparkContext
sc = spark.sparkContext

# Create a SQLContext
sqlContext = SQLContext(sc)




- The **SparkSession** is the entry point for programming Spark with the DataFrame and SQL APIs. It is responsible for coordinating the execution of tasks across the cluster. 

- The **SparkContext** represents the connection to a Spark cluster and is used to create RDDs (Resilient Distributed Datasets) and perform distributed computations.

- The **SQLContext** is a class that enables the use of SQL and DataFrame API in PySpark. It provides a programming interface to work with structured and semi-structured data.

### Train_data :

The **sc.wholeTextFiles()** method reads a collection of text files and returns an RDD (Resilient Distributed Dataset) where each element represents a file path and its corresponding content as a string. The path variable represents the path to the text files that you want to read.

In [None]:
# Read the text files as a collection
train_data=sc.wholeTextFiles(path)

### Usage of PySpark RDD :

- The **map()** transformation is applied to the train_data RDD to extract the first element of each tuple (x[0]), which represents the file path.

- The **takeSample()** action is used to randomly select a subset of file paths from the filepaths RDD.

In [None]:
# Extract the file paths from the train_data RDD
filepaths = train_data.map(lambda x: x[0])

# Sample a subset of file paths
print(filepaths.takeSample(False,5, 10))

['file:/content/mini_newsgroups/comp.sys.mac.hardware/52270', 'file:/content/mini_newsgroups/sci.crypt/15563', 'file:/content/mini_newsgroups/sci.space/61277', 'file:/content/mini_newsgroups/comp.sys.ibm.pc.hardware/60699', 'file:/content/mini_newsgroups/sci.electronics/54092']


- **extract the text content and then sample a subset of it :** 

In [None]:
text = train_data.map(lambda x: x[1])

print (text.takeSample(False,1, 10))

['Newsgroups: comp.sys.mac.hardware\nPath: cantaloupe.srv.cs.cmu.edu!crabapple.srv.cs.cmu.edu!bb3.andrew.cmu.edu!news.sei.cmu.edu!cis.ohio-state.edu!zaphod.mps.ohio-state.edu!moe.ksu.ksu.edu!osuunx.ucc.okstate.edu!constellation!essex.ecn.uoknor.edu!cmparris\nFrom: cmparris@essex.ecn.uoknor.edu (Chris Michael Parrish)\nSubject: Networking Macs and a PC\nSender: usenet@constellation.ecn.uoknor.edu (Usenet Administrator)\nMessage-ID: <C5sHnJ.54L@constellation.ecn.uoknor.edu>\nDate: Tue, 20 Apr 1993 15:57:11 GMT\nNntp-Posting-Host: essex.ecn.uoknor.edu\nOrganization: Engineering Computer Network, University of Oklahoma, Norman, OK, USA\nLines: 24\n\n\n  At work we have a small appletalk network with 3 macs and  couple of printers.\nWe also have a PC that has some specialized accounting software that we would \nlike to operate from any of the macs. We have Soft PC, and I have found that the\nsoftware works just fine under it, but I would like to have all of the data\nfor the program reside 

The output shows an example of a subset containing one text content sampled from the text RDD. The text content represents a newsgroup message, including various header fields, the message body, and contact information.

- **extract the file IDs from the file paths :**

The **map()** transformation is applied to the filepaths RDD to split each file path by the "/" delimiter and extract the last element (filepath.split("/")[-1]), which represents the file ID. 

In [None]:
id = filepaths.map(lambda filepath: filepath.split("/")[-1])
print (id.take(5))

['53759', '51186', '53336', '53399', '51227']


The output shows an example of a subset of five file IDs sampled from the id RDD. Each file ID corresponds to a specific file in the "mini_newsgroups" directory.

- **extract the topics from the file paths :**

In [None]:
# Extract the topics from the filepaths RDD
topics = filepaths.map(lambda filepath: filepath.split("/")[-2])

# Take a sample of the topics
print (topics.take(5))

['alt.atheism', 'alt.atheism', 'alt.atheism', 'alt.atheism', 'alt.atheism']


The output shows an example of a subset of five topics sampled from the topics RDD. Each topic corresponds to a specific category or theme within the "mini_newsgroups" dataset, such as 'alt.atheism', 'comp.sys.mac.hardware', etc.

- **extract distinct topics and take a sample of them :**

In [None]:
# Take a sample of distinct topics
print( topics.distinct().take(20))

['alt.atheism', 'comp.os.ms-windows.misc', 'comp.windows.x', 'rec.motorcycles', 'sci.crypt', 'sci.space', 'soc.religion.christian', 'talk.politics.guns', 'comp.graphics', 'comp.sys.ibm.pc.hardware', 'comp.sys.mac.hardware', 'misc.forsale', 'rec.autos', 'rec.sport.baseball', 'rec.sport.hockey', 'sci.electronics', 'sci.med', 'talk.politics.mideast', 'talk.politics.misc', 'talk.religion.misc']


The output shows an example of a subset of 20 distinct topics sampled from the distinct_topics RDD. Each topic represents a unique category or theme within the "mini_newsgroups" dataset, such as **'alt.atheism', 'comp.os.ms-windows.misc', 'comp.windows.x'**, etc.

### Create a DataFrame :

the usage of PySpark DataFrame and SQL operations to create a DataFrame with a specified schema and register it as a temporary view.

In [None]:
from pyspark.sql.types import *
# The schema is encoded in a string.
schemaString = "id text topic"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
newsgroups = train_data.map(lambda filepath_text: (filepath_text[0].split("/")[-1], filepath_text[1], filepath_text[0].split("/")[-2]))
df = sqlContext.createDataFrame(newsgroups, schema)

#print schema
df.printSchema()

# Creates a temporary view using the DataFrame
df.createOrReplaceTempView("newsgroups")


root
 |-- id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- topic: string (nullable = true)



The output shows the printed schema of the df DataFrame, indicating the field names ('id', 'text', 'topic') and their data types (string).

 - **Run SQL queries on a registered DataFrame :**

In [None]:
# SQL can be run over DataFrames that have been registered as a table.
results = sqlContext.sql("SELECT id,topic,text FROM newsgroups limit 5")
results.show()

+-----+-----------+--------------------+
|   id|      topic|                text|
+-----+-----------+--------------------+
|53759|alt.atheism|Newsgroups: alt.a...|
|51186|alt.atheism|Path: cantaloupe....|
|53336|alt.atheism|Xref: cantaloupe....|
|53399|alt.atheism|Path: cantaloupe....|
|51227|alt.atheism|Path: cantaloupe....|
+-----+-----------+--------------------+



The output shows the result of the SQL query, which includes the 'id', 'topic', and 'text' columns for the first 5 rows of the DataFrame. Each row represents a record from the "newsgroups" table, containing the corresponding values for the selected columns.

- The usage of PySpark SQL to run a SQL query on the registered DataFrame "newsgroups" and obtain the distinct topics along with their respective counts.

In [None]:
results = sqlContext.sql("select distinct topic, count(*) as cnt from newsgroups group by topic order by cnt desc limit 5")
results.show()

+--------------------+---+
|               topic|cnt|
+--------------------+---+
|      comp.windows.x|100|
|        misc.forsale|100|
|    rec.sport.hockey|100|
|  rec.sport.baseball|100|
|comp.os.ms-window...|100|
+--------------------+---+



The output shows the result of the SQL query, which includes the distinct topics and their corresponding counts. The topics are sorted in descending order of counts, and the LIMIT clause restricts the result to the top 5 topics with the highest counts.

- **filters the DataFrame "df" :**

Filters the DataFrame "df" based on the topic condition and creates a new DataFrame "new_df" from the filtered results. 

In [None]:
result_list = df[df.topic.like("comp%")].collect()
new_df = sc.parallelize(result_list).toDF()
new_df.dropDuplicates().show()

+-----+--------------------+--------------------+
|   id|                text|               topic|
+-----+--------------------+--------------------+
|38907|Path: cantaloupe....|       comp.graphics|
|38904|Xref: cantaloupe....|       comp.graphics|
|38758|Xref: cantaloupe....|       comp.graphics|
| 9622|Xref: cantaloupe....|comp.os.ms-window...|
| 9911|Xref: cantaloupe....|comp.os.ms-window...|
|10094|Path: cantaloupe....|comp.os.ms-window...|
| 9943|Path: cantaloupe....|comp.os.ms-window...|
|60992|Newsgroups: comp....|comp.sys.ibm.pc.h...|
|38750|Path: cantaloupe....|       comp.graphics|
| 9485|Xref: cantaloupe....|comp.os.ms-window...|
| 9902|Newsgroups: comp....|comp.os.ms-window...|
|38867|Newsgroups: comp....|       comp.graphics|
| 9758|Xref: cantaloupe....|comp.os.ms-window...|
|38921|Newsgroups: comp....|       comp.graphics|
|10742|Newsgroups: comp....|comp.os.ms-window...|
|38929|Path: cantaloupe....|       comp.graphics|
|58994|Path: cantaloupe....|comp.sys.ibm.pc.h...|


The output shows the result of the code execution, which includes the top 20 rows of the new DataFrame "new_df" after dropping any duplicate rows. Each row represents a record with columns 'id', 'text', and 'topic'. The rows satisfy the condition where the topic starts with "comp".

### Distinct Group

In [None]:
from pyspark.sql.functions import col, regexp_extract

# Extract the word followed by a period from the topic column
extracted_word = regexp_extract(col("topic"), r"(\w+)\.", 1)

# Filter the DataFrame to select non-null extracted words
filtered_df = df.filter(extracted_word != "")

# Get the distinct topics
distinct_topics = filtered_df.select(extracted_word.alias("topic")).distinct()

# Show the distinct topics
distinct_topics.show()


+-----+
|topic|
+-----+
|  alt|
|  sci|
| misc|
|  rec|
| comp|
|  soc|
| talk|
+-----+



### COMP group

In [None]:
from pyspark.sql.functions import col
# Filter the DataFrame to select topics that start with "comp"
filtered_df = df.filter(col("topic").like("comp%"))

# Count the number of topics that start with "comp"
count = filtered_df.count()

print("Number of topics starting with 'comp':", count)

Number of topics starting with 'comp': 500


### SCI group

In [None]:
from pyspark.sql.functions import col
# Filter the DataFrame to select topics that start with "comp"
filtered_df = df.filter(col("topic").like("sci%"))

# Count the number of topics that start with "comp"
count = filtered_df.count()

print("Number of topics starting with 'sci':", count)

Number of topics starting with 'sci': 400


### ALT group

In [None]:
from pyspark.sql.functions import col
# Filter the DataFrame to select topics that start with "comp"
filtered_df = df.filter(col("topic").like("alt%"))

# Count the number of topics that start with "comp"
count = filtered_df.count()

print("Number of topics starting with 'alt':", count)

Number of topics starting with 'alt': 100


### SOC group

In [None]:
from pyspark.sql.functions import col
# Filter the DataFrame to select topics that start with "comp"
filtered_df = df.filter(col("topic").like("soc%"))

# Count the number of topics that start with "comp"
count = filtered_df.count()

print("Number of topics starting with 'soc':", count)

Number of topics starting with 'soc': 100


### MISC group

In [None]:
from pyspark.sql.functions import col
# Filter the DataFrame to select topics that start with "comp"
filtered_df = df.filter(col("topic").like("misc%"))

# Count the number of topics that start with "comp"
count = filtered_df.count()

print("Number of topics starting with 'misc':", count)

Number of topics starting with 'misc': 100


### Talk group

In [None]:
from pyspark.sql.functions import col
# Filter the DataFrame to select topics that start with "comp"
filtered_df = df.filter(col("topic").like("talk%"))

# Count the number of topics that start with "comp"
count = filtered_df.count()

print("Number of topics starting with 'talk':", count)

Number of topics starting with 'talk': 400


We can conclude that topics that start with (comp) are the most relevent so we choose to split the labels to 1 with topic start with comp and 0 else

- **adds a new column "label" to the DataFrame "df" :** 

In [None]:
labeledNewsGroups = df.withColumn("label",df.topic.like("comp%").cast("double"))
labeledNewsGroups.sample(False,0.003,10).show(5)

+------+--------------------+--------------------+-----+
|    id|                text|               topic|label|
+------+--------------------+--------------------+-----+
| 38491|Path: cantaloupe....|       comp.graphics|  1.0|
| 10008|Path: cantaloupe....|comp.os.ms-window...|  1.0|
| 51539|Path: cantaloupe....|comp.sys.mac.hard...|  1.0|
|101603|Xref: cantaloupe....|           rec.autos|  0.0|
|102625|Path: cantaloupe....|  rec.sport.baseball|  0.0|
+------+--------------------+--------------------+-----+
only showing top 5 rows



The output shows the result of the code execution, which includes a sample of 5 rows from the labeled DataFrame. Each row consists of the columns 'id', 'text', 'topic', and 'label'. The 'label' column contains binary values (0.0 or 1.0) indicating whether the topic starts with "comp" or not.

### Splits the DataFrame :

In [None]:
train_set, test_set = labeledNewsGroups.randomSplit([0.9, 0.1], 12345)
print ("Total document count:",labeledNewsGroups.count())
print ("Training-set count:",train_set.count())
print ("Test-set count:",test_set.count())

Total document count: 2000
Training-set count: 1812
Test-set count: 188


The output shows the result of the code execution, which includes the total document count in the labeled dataset, the count of documents in the training set, and the count of documents in the test set.

### The pyspark.ml module for text classification :

In [None]:
from pyspark.ml.linalg import Vector
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF,StopWordsRemover,IDF,Tokenizer
tokenizer = Tokenizer().setInputCol("text").setOutputCol("words")
remover= StopWordsRemover().setInputCol("words").setOutputCol("filtered").setCaseSensitive(False)
hashingTF = HashingTF().setNumFeatures(1000).setInputCol("filtered").setOutputCol("rawFeatures")
idf = IDF().setInputCol("rawFeatures").setOutputCol("features").setMinDocFreq(0)


### Text Classification Pipeline :

In [None]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# Création du pipeline de traitement des données
pipeline = Pipeline(stages=[tokenizer,remover,hashingTF,idf, nb])


The pipeline allows to streamline the data processing and model training steps, making it easier to apply the entire sequence of transformations consistently to both training and test datasets.

### Model fitting :

This step executes the data processing stages defined in the pipeline and trains the Naive Bayes classifier on the processed data.

In [None]:
model=pipeline.fit(train_set)

### Display a sample of predictions :

this code display a sample of predictions for documents with topics related to "comp%". 

In [None]:
predictions = model.transform(test_set)
predictions.select("id","topic","probability","prediction","label").filter(predictions.topic.like("comp%")).sample(False,0.1,10).show()

+-----+--------------------+--------------------+----------+-----+
|   id|               topic|         probability|prediction|label|
+-----+--------------------+--------------------+----------+-----+
|60439|comp.sys.ibm.pc.h...|[0.99999921868087...|       0.0|  1.0|
|60698|comp.sys.ibm.pc.h...|[0.99829050858915...|       0.0|  1.0|
|61154|comp.sys.ibm.pc.h...|[0.25238237697282...|       1.0|  1.0|
|61173|comp.sys.ibm.pc.h...|[1.94623607272862...|       1.0|  1.0|
+-----+--------------------+--------------------+----------+-----+



The displayed output shows the "id" of the document, its original "topic", the predicted "probability" for each class, the final "prediction" (class label), and the actual "label" from the test set.

- **includes additional columns in the selected output :**

In [None]:
predictions.select("id","topic","filtered","rawPrediction","features","probability","prediction","label",'text').filter(predictions.topic.like("comp%")).sample(False,00.1,10).show(5)

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+-----+--------------------+
|   id|               topic|            filtered|       rawPrediction|            features|         probability|prediction|label|                text|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+-----+--------------------+
|50439|comp.sys.mac.hard...|[xref:, cantaloup...|[-1680.0068271543...|(1000,[1,2,11,38,...|[0.99690688463568...|       0.0|  1.0|Xref: cantaloupe....|
|60766|comp.sys.ibm.pc.h...|[path:, cantaloup...|[-2303.4338113436...|(1000,[2,20,30,38...|[0.99999992702060...|       0.0|  1.0|Path: cantaloupe....|
|68239|      comp.windows.x|[newsgroups:, com...|[-921.05984444669...|(1000,[2,8,21,71,...|[0.34788291452040...|       1.0|  1.0|Newsgroups: comp....|
+-----+--------------------+--------------------+--------------------+--------------------+---

The displayed output now includes additional columns such as "filtered" (the filtered words from the document), "rawPrediction" (the raw prediction values for each class), and "features" (the extracted features used for prediction), along with the existing columns from the previous code snippet.

### Performance of the classification model :

By using the evaluator object, we can evaluate the accuracy of the predictions made by the model on the test set.

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")


### Model Accuracy :

In [None]:
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

Test set accuracy = 0.8829787234042553


The accuracy of the classification model on the test set is approximately 0.8829, or 88.29%. This means that the model correctly predicted the class labels for about 88.29% of the instances in the test set.