BIG DATA - TP 2 sur Apache Spark 

(Florent Giauna et Zewei Lin)

## 1. Objectifs

Le TP consiste à regrouper des documents textuels tels que les documents qui partagent le même thème se retrouvent dans le même groupe, et les documents qui portent sur des sujets très différents se trouvent dans des groupes différents.

##2. Mise en place de l'environnement de travail

### 2.1 Installation et paramétrage de la machine virtuelle

In [1]:
#Installation d'une VM Java
! apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Téléchargement de Apache Spark
! wget -q https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
#Décompression de l'archive
! tar xf /content/spark-3.3.2-bin-hadoop3.tgz
#Installation de Spark
! pip install -q findspark
! pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m18.3 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=27602a1b1ecb59ec4cf28c5353f5ac4d6b726830f86a69a1d56359327505e5c0
  Stored in directory: /root/.cache/pip/wheels/b1/59/a0/a1a0624b5e865fd389919c1a10f53aec9b12195d6747710baf
Successfully built pyspark
Installing collected packages: py4j, pyspa

### 2.2 Paramétrage et initialisation de Spark

In [2]:
#Variables d'environnement
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" #indique où trouver Java
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3" #indique où trouver Spark
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.5 pyspark-shell' #indique où télécharger Avro

#findspark permet d'importer pyspark comme une librairie
import findspark
findspark.init("spark-3.3.2-bin-hadoop3")

#Création d'un objet SparkContext
from pyspark import SparkContext, SparkConf
configuration = SparkConf().setAppName("name").setMaster("local[4]") #instancie 4 workers sur la VM
sc = SparkContext(conf=configuration)

#Création d'un objet SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf=configuration).getOrCreate()

## 3. Données

### 3.1 Télécharger les données

In [3]:
#Récupération des données
! wget -q http://qwone.com/~jason/20Newsgroups/20news-19997.tar.gz

### 3.2 Décompresser les données

In [4]:
#Décompression de l'archive
! tar xf /content/20news-19997.tar.gz

### 3.3 Charger les données dans deux variables de type RDD

In [5]:
#Création de deux RDD contenant les documents des thèmes athéisme et baseball
rdd_a = sc.wholeTextFiles('20_newsgroups/alt.atheism', use_unicode=False) #athéisme
rdd_b = sc.wholeTextFiles('20_newsgroups/rec.sport.baseball', use_unicode=False) #baseball

#Exemple de document sur le thème athéisme
rdd_a.first()

(b'file:/content/20_newsgroups/alt.atheism/51212',
 b'Newsgroups: alt.atheism\nPath: cantaloupe.srv.cs.cmu.edu!crabapple.srv.cs.cmu.edu!fs7.ece.cmu.edu!europa.eng.gtefsd.com!howland.reston.ans.net!usc!cs.utexas.edu!asuvax!ennews!enuxha.eas.asu.edu!guncer\nFrom: guncer@enuxha.eas.asu.edu (Selim Guncer )\nSubject: Re: Islam & Dress Code for women\nMessage-ID: <1993Apr6.030734.28563@ennews.eas.asu.edu>\nSender: news@ennews.eas.asu.edu (USENET News System)\nOrganization: Arizona State University, Tempe, AZ\nReferences: <1993Apr5.091258.11830@monu6.cc.monash.edu.au> <16BA7103C3.I3150101@dbstu1.rz.tu-bs.de>\nDate: Tue, 6 Apr 1993 03:07:34 GMT\nLines: 53\n\nIn article <16BA7103C3.I3150101@dbstu1.rz.tu-bs.de> I3150101@dbstu1.rz.tu-bs.de (Benedikt Rosenau) writes:\n>In article <1993Apr5.091258.11830@monu6.cc.monash.edu.au>\n>darice@yoyo.cc.monash.edu.au (Fred Rice) writes:\n> \n>>(2) Do women have souls in Islam?\n>>\n>>People have said here that some Muslims say that women do not have\n>>souls

### 3.4 Séparer le corps du message de l’entête

In [6]:
#Décodage des bytes en str
rdd_a_str = rdd_a.map(lambda x: x[1].decode('utf-8')) 
rdd_b_str = rdd_b.map(lambda x: x[1].decode('utf-8')) 

#Séparation du corps du message de l'en-tête sur le pattern '\n\n'
rdd_a_headers = rdd_a_str.map(lambda x: x.split('\n\n', 1)[0])
rdd_b_headers = rdd_b_str.map(lambda x: x.split('\n\n', 1)[0])

#Exemple d'en-tête
rdd_a_headers.first()

'Newsgroups: alt.atheism\nPath: cantaloupe.srv.cs.cmu.edu!crabapple.srv.cs.cmu.edu!fs7.ece.cmu.edu!europa.eng.gtefsd.com!howland.reston.ans.net!usc!cs.utexas.edu!asuvax!ennews!enuxha.eas.asu.edu!guncer\nFrom: guncer@enuxha.eas.asu.edu (Selim Guncer )\nSubject: Re: Islam & Dress Code for women\nMessage-ID: <1993Apr6.030734.28563@ennews.eas.asu.edu>\nSender: news@ennews.eas.asu.edu (USENET News System)\nOrganization: Arizona State University, Tempe, AZ\nReferences: <1993Apr5.091258.11830@monu6.cc.monash.edu.au> <16BA7103C3.I3150101@dbstu1.rz.tu-bs.de>\nDate: Tue, 6 Apr 1993 03:07:34 GMT\nLines: 53'

### 3.5 Extraire quelques champs de l’entête

In [7]:
#Définition d'une fonction pour extraire certains champs de l'en-tête
def extract_elements(rdd):
  rdd_fields = rdd.map(lambda x: x.split('\n')) #Split pour que chaque ligne corresponde à un champ
  result = rdd_fields.map(lambda x: [i for i in x if i.startswith('Newsgroups:') \
                                                  or i.startswith('Organization:') \
                                                  or i.startswith('From:')])
  return result

#Exécution de la fonction sur les RDDs
rdd_a_fields = extract_elements(rdd_a_headers)
rdd_b_fields = extract_elements(rdd_b_headers)

#Exemple de résultat
rdd_a_fields.first()

['Newsgroups: alt.atheism',
 'From: guncer@enuxha.eas.asu.edu (Selim Guncer )',
 'Organization: Arizona State University, Tempe, AZ']

### 3.6 Fusionner les deux RDD 

In [8]:
#Fusion des deux RDD
rdd_all = rdd_a_fields.union(rdd_b_fields)
print(rdd_a_fields.count(),
      rdd_b_fields.count(),
      rdd_all.count())

1000 1000 2000


### 3.7 Transformer ce RDD pour que chaque élément soit de type pyspark.sql.Row

In [9]:
#Transformation du RDD pour que chaque élément soit de type pyspark.sql.Row
from pyspark.sql import Row

#Tri pour que l'ordre des champs soit le même
rdd_sorted = rdd_all.map(lambda x: sorted(x))

#Conservation des valeurs
rdd_val = rdd_sorted.map(lambda lst: [(x.split(': ', 1)[1]) for x in lst])

#Création des champs Row
row_fields = Row('From', 'Newsgroups', 'Organization')

#Transformation en Row et si le champ est manquant ajout d'une valeur NA
rdd_row = rdd_val.map(lambda x: row_fields(*(x + ["NA"] * (len(row_fields) - len(x)))))

#Exemple
rdd_row.take(1)

[Row(From='guncer@enuxha.eas.asu.edu (Selim Guncer )', Newsgroups='alt.atheism', Organization='Arizona State University, Tempe, AZ')]

### 3.8 Créer un objet de type DataFrame à partir du RDD précédent

In [10]:
#Création d'un DataFrame à partir du RDD
df = spark.createDataFrame(rdd_row)
df.show()

+--------------------+--------------------+--------------------+
|                From|          Newsgroups|        Organization|
+--------------------+--------------------+--------------------+
|guncer@enuxha.eas...|         alt.atheism|Arizona State Uni...|
|healta@saturn.wwc...|         alt.atheism| Walla Walla College|
|sandvik@newton.ap...|         alt.atheism|Cookamunga Touris...|
|perry@dsinc.com (...|         alt.atheism|Decision Support ...|
|edm@twisto.compaq...|         alt.atheism|Compaq Computer Corp|
|rsrodger@wam.umd....|sci.skeptic,alt.a...|University of Mar...|
|Patrick C Leger <...|         alt.atheism|Sophomore, Electr...|
|livesey@solntze.w...|         alt.atheism|                 sgi|
|mathew <mathew@ma...|         alt.atheism|Mantis Consultant...|
|darice@yoyo.cc.mo...|         alt.atheism|Monash University...|
|kmr4@po.CWRU.edu ...|         alt.atheism|Case Western Rese...|
|ingles@engin.umic...|         alt.atheism|University of Mic...|
|keith@cco.caltech...|   



---


Nous devons à présent enregistrer le Dataframe au format Parquet et Avro. Cependant les méthodes par défaut telles que :

```
df.write.parquet('path') #Enregistrement du DataFrame au format Parquet
df.write.format('avro').save('path') #Enregistrement du DataFrame au format Avro
```

ne fonctionnent pas en raison d'un problème de version que nous n'avons pas réussi à résoudre. 

La seule méthode qui fonctionne consiste à d'abord transformer notre Dataframe de type PySpark, en Dataframe de type Pandas. Par exemple: 

```
df_p = df.toPandas() #Conversion en Pandas Dataframe
df_p.to_parquet('path') #Enregistrement du Dataframe au format Parquet
```

En temps normal cette méthode doit être évitée car elle envoie tout le Dataframe dans le driver. Si les données sont trop volumineuses cela risque de saturer la capacité du driver, qui est le single point of failure de notre architecture.

Dans le cadre de ce TP, les données étant peu volumineuses, le driver ne risque pas de tomber. Aussi nous continuons malgré tout avec cette méthode afin de répondre à la demande.


---




In [11]:
#/!\ ne pas faire normalement car collecte tout le df dans le driver mais les autres méthodes ne fonctionnent pas
#Conversion en Pandas DataFrame
df_p = df.toPandas()

### 3.9 Sauvegarder la DataFrame au format Avro

In [12]:
#Installation de la librairie pandavro
! pip install pandavro

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pandavro
  Downloading pandavro-1.7.1.tar.gz (8.1 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting fastavro==1.5.1
  Downloading fastavro-1.5.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.6/2.6 MB[0m [31m27.0 MB/s[0m eta [36m0:00:00[0m
Building wheels for collected packages: pandavro
  Building wheel for pandavro (setup.py) ... [?25l[?25hdone
  Created wheel for pandavro: filename=pandavro-1.7.1-py3-none-any.whl size=5688 sha256=ab47c468d8e0cd8cd074d73f3d641eaacecb816d42635a201902f32a80db06b6
  Stored in directory: /root/.cache/pip/wheels/85/cc/15/480a3cfa1a9fc49e2af9bde7437bd6c8c0265f17e61f32672b
Successfully built pandavro
Installing collected packages: fastavro, pandavro
Successfully installed fastavro-1.5.1 pandavro-1.7.1


In [13]:
#Import de la librairie
import pandavro as pdx

#Enregistrement du DataFrame au format Avro
pdx.to_avro('/content/df.avro', df_p)

### 3.10 Sauvegarder la DataFrame au format Parquet

In [14]:
import pyarrow as pa
import pyarrow.parquet as pq

#Enregistrement du pandas DataFrame au format Parquet
df_p.to_parquet('/content/df.parquet')

## 4. Analyse descriptive

### 4.1 Vérifier qu’on a bien deux catégories différentes de documents

In [15]:
#Calcul du nombre de valeurs différentes présentes dans le champ Newsgroups
df.groupBy('Newsgroups').count().count()

37

In [16]:
#Affichage de toutes les valeurs
df.groupBy('Newsgroups').count().show(truncate=False)

+-----------------------------------------------------------------------+-----+
|Newsgroups                                                             |count|
+-----------------------------------------------------------------------+-----+
|talk.religion.misc,talk.abortion,alt.atheism                           |4    |
|soc.culture.arabic,soc.culture.turkish,soc.culture.pakistan,alt.atheism|1    |
|talk.religion.misc,alt.atheism,alt.pagan                               |2    |
|alt.atheism,talk.religion.misc                                         |20   |
|talk.abortion,alt.atheism,talk.religion.misc,sci.physics               |1    |
|talk.religion.misc,alt.atheism                                         |4    |
|alt.atheism,soc.motss,rec.scouting                                     |8    |
|alt.atheism,soc.culture.arabic                                         |2    |
|alt.atheism,talk.origins                                               |5    |
|alt.atheism,rec.scouting               

Le champ Newsgroups de certains documents contient d'autres éléments en plus des deux catégories principales. En extrayant uniquement "alt.atheism" ou "rec.sport.baseball" dans une nouvelle colonne il est possible de retomber sur deux catégories uniques.

In [17]:
from pyspark.sql.functions import when, lit

#Ajout d'une nouvelle colonne et remplissage avec la valeur contenue dans le champ Newsgroups
df2 = df.withColumn('Categorie', 
                   when((df.Newsgroups.contains('alt.atheism')), lit('alt.atheism')).
                   when((df.Newsgroups.contains('rec.sport.baseball')), lit('rec.sport.baseball')))

#Affichage du nombre de catégories différentes
df2.groupBy('Categorie').count().show()

+------------------+-----+
|         Categorie|count|
+------------------+-----+
|       alt.atheism| 1000|
|rec.sport.baseball| 1000|
+------------------+-----+



### 4.2 Donner le nombre d'organisations différentes

In [18]:
#Calcul du nombre de valeurs différentes présentes dans le champ Organization
group_org = df2.groupBy('Organization').count()
group_org.count()

486

In [19]:
#Affichage de certains noms d'organisation pour vérification
group_bell = group_org.filter(df2.Organization.contains('Bell'))
group_bell.sort('Organization').show(truncate=False)

+---------------------------------------+-----+
|Organization                           |count|
+---------------------------------------+-----+
|AT&T Bell Laboratories - Holmdel, NJ   |1    |
|AT&T Bell Laboratories, Murray Hill, NJ|2    |
|AT&T Bell Labs, Murray Hill, NJ, USA   |3    |
|Bell Northern Research                 |2    |
|Bell Northern Research -- Dallas TX    |1    |
|Bell-Northern Research                 |2    |
|Bell-Northern Research, Ottawa, Canada |5    |
|Bellcore, Livingston, NJ               |2    |
|ESCA Corporation, Bellevue WA          |1    |
+---------------------------------------+-----+



In [20]:
group_cw = group_org.filter(df2.Organization.contains('Case Western'))
group_cw.sort('Organization').show(truncate=False)

+------------------------------------------------------+-----+
|Organization                                          |count|
+------------------------------------------------------+-----+
|Case Western Reserve University                       |39   |
|Case Western Reserve University, Cleveland, OH (USA)  |4    |
|Case Western Reserve University, Cleveland, Ohio (USA)|7    |
+------------------------------------------------------+-----+



Il y a donc en réalité moins de 486 d'organisations puisque l'on constate la présence de doublons avec différents orthographes pour la même organisation.

### 4.3 Donner d’autres statistiques descriptives

In [21]:
#Affichage des auteurs les plus actifs
from pyspark.sql.functions import desc

df2.groupBy('From', 'Categorie').count().sort(desc('count')).show()

+--------------------+------------------+-----+
|                From|         Categorie|count|
+--------------------+------------------+-----+
|livesey@solntze.w...|       alt.atheism|   70|
|keith@cco.caltech...|       alt.atheism|   56|
|kmr4@po.CWRU.edu ...|       alt.atheism|   42|
|frank@D012S658.uu...|       alt.atheism|   41|
|mathew <mathew@ma...|       alt.atheism|   38|
|I3150101@dbstu1.r...|       alt.atheism|   35|
|bil@okcforum.osrh...|       alt.atheism|   32|
|tedward@cs.cornel...|rec.sport.baseball|   25|
|sandvik@newton.ap...|       alt.atheism|   25|
|bobbe@vice.ICO.TE...|       alt.atheism|   24|
|cobb@alexia.lis.u...|       alt.atheism|   21|
|jaeger@buphy.bu.e...|       alt.atheism|   21|
|mss@netcom.com (M...|rec.sport.baseball|   19|
|luriem@alleg.edu(...|rec.sport.baseball|   19|
|perry@dsinc.com (...|       alt.atheism|   17|
|roger@crux.Prince...|rec.sport.baseball|   17|
|<RVESTERM@vma.cc....|rec.sport.baseball|   17|
|halat@pooh.bears ...|       alt.atheism

In [22]:
#Affichage du nombre d'auteurs par catégorie
from pyspark.sql.functions import countDistinct

df2.groupBy('Categorie').agg(countDistinct('From')).show()

+------------------+-----------+
|         Categorie|count(From)|
+------------------+-----------+
|rec.sport.baseball|        450|
|       alt.atheism|        228|
+------------------+-----------+



In [23]:
#Activité d'un auteur en moyenne
print("Activité moyenne d'un auteur toutes catégories confondues:")
df2.groupBy('Categorie', 'From').count().agg({'count': 'avg'}).show()

print("Activité moyenne d'un auteur sur l'athéisme:")
df2.filter(df2.Categorie == 'alt.atheism').groupBy('From').count().agg({'count': 'avg'}).show()

print("Activité moyenne d'un auteur sur le baseball:")
df2.filter(df2.Categorie == 'rec.sport.baseball').groupBy('From').count().agg({'count': 'avg'}).show()

Activité moyenne d'un auteur toutes catégories confondues:
+-----------------+
|       avg(count)|
+-----------------+
|2.949852507374631|
+-----------------+

Activité moyenne d'un auteur sur l'athéisme:
+-----------------+
|       avg(count)|
+-----------------+
|4.385964912280702|
+-----------------+

Activité moyenne d'un auteur sur le baseball:
+------------------+
|        avg(count)|
+------------------+
|2.2222222222222223|
+------------------+



Il y a donc plus d'auteurs sur le thème du baseball mais les auteurs du thème athéisme sont plus actifs.

In [24]:
#Affichage des organisations qui ont le plus d'auteurs
df2.groupBy('Organization').agg(countDistinct('From')).sort(desc('count(From)')).show(truncate=False)

+-------------------------------------------------------------------------+-----------+
|Organization                                                             |count(From)|
+-------------------------------------------------------------------------+-----------+
|NA                                                                       |16         |
|Penn State University                                                    |9          |
|University of Illinois at Urbana                                         |8          |
|Lehigh University                                                        |6          |
|The Ohio State University                                                |6          |
|California Institute of Technology, Pasadena                             |6          |
|DSG, Stanford University, CA 94305, USA                                  |6          |
|Georgia Institute of Technology                                          |5          |
|Indiana University             

## 5. Transformation du texte

### 5.1 Découper les documents en listes de mots à l’aide de Tokenizer

In [25]:
#Récupération du corps des documents
rdd_a_bodies = rdd_a_str.map(lambda x: x.split('\n\n', 1)[1])
rdd_b_bodies = rdd_b_str.map(lambda x: x.split('\n\n', 1)[1])

#Exemple de corps d'un document
rdd_a_bodies.first()

'In article <16BA7103C3.I3150101@dbstu1.rz.tu-bs.de> I3150101@dbstu1.rz.tu-bs.de (Benedikt Rosenau) writes:\n>In article <1993Apr5.091258.11830@monu6.cc.monash.edu.au>\n>darice@yoyo.cc.monash.edu.au (Fred Rice) writes:\n> \n>>(2) Do women have souls in Islam?\n>>\n>>People have said here that some Muslims say that women do not have\n>>souls.  I must admit I have never heard of such a view being held by\n>>Muslims of any era.  I have heard of some Christians of some eras\n>>holding this viewpoint, but not Muslims.  Are you sure you might not be\n>>confusing Christian history with Islamic history?\n>>\n> \n>Yes, it is supposed to have been a predominant view in the Turkish\n>Caliphate.\n> \n\nI am not aware of any "Turkish Caliphate" viewpoint on this. Can you\nreference?\n\nHowever, I found a quote due to Imam Ali, whom the Shias follow:\n\n"Men, never obey your women in any way whatsoever. Never let them give their\nadvice on any matter whatsoever, even those of everyday life. Indeed, 

In [26]:
#Définition des étiquettes de catégories à associer à chaque document
label_a = 0 #alt.atheism
label_b = 1 #rec.sport.baseball

#Pour chaque document création d'un tuple contenant l'étiquette et le texte
rdd_a_seq = rdd_a_bodies.map(lambda x: (label_a, x))
rdd_b_seq = rdd_b_bodies.map(lambda x: (label_b, x))

#Union des deux RDD
rdd_nlp = rdd_a_seq.union(rdd_b_seq)

#Exemple de structure
rdd_nlp.first()

(0,
 'In article <16BA7103C3.I3150101@dbstu1.rz.tu-bs.de> I3150101@dbstu1.rz.tu-bs.de (Benedikt Rosenau) writes:\n>In article <1993Apr5.091258.11830@monu6.cc.monash.edu.au>\n>darice@yoyo.cc.monash.edu.au (Fred Rice) writes:\n> \n>>(2) Do women have souls in Islam?\n>>\n>>People have said here that some Muslims say that women do not have\n>>souls.  I must admit I have never heard of such a view being held by\n>>Muslims of any era.  I have heard of some Christians of some eras\n>>holding this viewpoint, but not Muslims.  Are you sure you might not be\n>>confusing Christian history with Islamic history?\n>>\n> \n>Yes, it is supposed to have been a predominant view in the Turkish\n>Caliphate.\n> \n\nI am not aware of any "Turkish Caliphate" viewpoint on this. Can you\nreference?\n\nHowever, I found a quote due to Imam Ali, whom the Shias follow:\n\n"Men, never obey your women in any way whatsoever. Never let them give their\nadvice on any matter whatsoever, even those of everyday life. Ind

In [27]:
from pyspark.ml.feature import Tokenizer

#Création d'un Dataframe à partir du RDD
documentData = spark.createDataFrame(rdd_nlp, ['label', 'text'])

#Tokenisation du texte
tokenizer = Tokenizer(inputCol='text', outputCol='words')
wordsData = tokenizer.transform(documentData)

#Affichage
wordsData.show()

+-----+--------------------+--------------------+
|label|                text|               words|
+-----+--------------------+--------------------+
|    0|In article <16BA7...|[in, article, <16...|
|    0|In article <73502...|[in, article, <73...|
|    0|In article <1993A...|[in, article, <19...|
|    0|Perhaps it's prop...|[perhaps, it's, p...|
|    0|a> In article <1q...|[a>, in, article,...|
|    0|In article <schni...|[in, article, <sc...|
|    0|You know, it just...|[you, know,, it, ...|
|    0|In article <1qlfd...|[in, article, <1q...|
|    0|jbrown@batman.bmd...|[jbrown@batman.bm...|
|    0|In <CINDY.93Apr18...|[in, <cindy.93apr...|
|    0|In article <C4vyF...|[in, article, <c4...|
|    0|In article <1993A...|[in, article, <19...|
|    0|livesey@solntze.w...|[livesey@solntze....|
|    0|In article <1r15r...|[in, article, <1r...|
|    0|In article <1993A...|[in, article, <19...|
|    0|sandvik@newton.ap...|[sandvik@newton.a...|
|    0|In <sfnNTrC00WBO4...|[in, <sfnntrc00wb...|


### 5.2 Créer une représentation vectorielle des documents à l’aide de HashingTF

In [28]:
from pyspark.ml.feature import HashingTF

#Utilisation de HashingTF pour transformer les séquences de mots en vecteurs d'attributs de taille fixe
hashingTF = HashingTF(inputCol='words', outputCol='features') 
featurizedData = hashingTF.transform(wordsData)

#Affichage du Dataframe
featurizedData.show()

+-----+--------------------+--------------------+--------------------+
|label|                text|               words|            features|
+-----+--------------------+--------------------+--------------------+
|    0|In article <16BA7...|[in, article, <16...|(262144,[1109,188...|
|    0|In article <73502...|[in, article, <73...|(262144,[462,3378...|
|    0|In article <1993A...|[in, article, <19...|(262144,[2307,585...|
|    0|Perhaps it's prop...|[perhaps, it's, p...|(262144,[4573,538...|
|    0|a> In article <1q...|[a>, in, article,...|(262144,[9276,942...|
|    0|In article <schni...|[in, article, <sc...|(262144,[4629,927...|
|    0|You know, it just...|[you, know,, it, ...|(262144,[7221,130...|
|    0|In article <1qlfd...|[in, article, <1q...|(262144,[9276,976...|
|    0|jbrown@batman.bmd...|[jbrown@batman.bm...|(262144,[591,784,...|
|    0|In <CINDY.93Apr18...|[in, <cindy.93apr...|(262144,[1116,392...|
|    0|In article <C4vyF...|[in, article, <c4...|(262144,[1483,241...|
|    0

Le vecteur généré avec HashingTF est de taille fixe, par défaut 262 144. 

Pour avoir de meilleurs résultats nous pourrions déterminer la taille de notre vocabulaire, c'est-à-dire le nombre de termes uniques composant nos documents mais fin d'éviter l'overfitting nous devrions fixer un seuil pour limiter notre vocabulaire aux mots qui reviennent au moins 10 fois dans le texte par exemple.
Ce nombre serait ensuite assigné en valeur du paramètre numFeatures, qui définit la taille du vocabulaire. 

Une autre alternative serait de générer un vecteur avec CountVectorizer, qui fait dépendre la taille du vecteur en fonction du corpus d'entraînement et du document.

## 6. Grouper les documents qui ont des représentations vectorielles proches

### 6.1 Utiliser l’algorithme KMeans avec un nombre de cluster égal à 2

In [29]:
from pyspark.ml.clustering import KMeans

#Sélection de la colonne contenant les données à fournir au modèle
data = featurizedData.select('features')

#Initialisation du modèle k-means
kmeans = KMeans().setK(2).setSeed(7)
model = kmeans.fit(data)

#Prédictions
predictions = model.transform(featurizedData)

#Affichage de quelques résultats
predictions.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|                text|               words|            features|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|    0|In article <16BA7...|[in, article, <16...|(262144,[1109,188...|         0|
|    0|In article <73502...|[in, article, <73...|(262144,[462,3378...|         0|
|    0|In article <1993A...|[in, article, <19...|(262144,[2307,585...|         0|
|    0|Perhaps it's prop...|[perhaps, it's, p...|(262144,[4573,538...|         0|
|    0|a> In article <1q...|[a>, in, article,...|(262144,[9276,942...|         0|
|    0|In article <schni...|[in, article, <sc...|(262144,[4629,927...|         0|
|    0|You know, it just...|[you, know,, it, ...|(262144,[7221,130...|         0|
|    0|In article <1qlfd...|[in, article, <1q...|(262144,[9276,976...|         0|
|    0|jbrown@batman.bmd...|[jbrown@batman.bm...|(262144,[591,784,...|         0|
|    0|In <CINDY

Le modèle semble avoir prédit correctement le cluster des premières lignes. 

Un bon modèle doit générer des clusters denses et espacés les un des autres. Pour comparer les résultats sur la totalité des données on peut utiliser la métrique Silhouette score.

In [30]:
from pyspark.ml.evaluation import ClusteringEvaluator

#ClusteringEvaluator mesure la proximité de chaque point d'un cluster par rapport aux points des clusters voisins
evaluator = ClusteringEvaluator()

#Calcul du Silhouette score
silhouette = evaluator.evaluate(predictions)
print("Silhouette avec distance euclidienne au carré = " + str(silhouette))

#Affichage des résultats
centers = model.clusterCenters()
print("Centres des clusters: ")
for center in centers:
    print(center)

Silhouette avec distance euclidienne au carré = 0.996175770794612
Centres des clusters: 
[0.00050125 0.00050125 0.         ... 0.00100251 0.         0.00200501]
[0. 0. 0. ... 0. 0. 0.]


Le Silhouette score est une métrique allant de -1 à 1. Le score obtenu est donc quasi maximal. Notre modèle semble en overfitting mais cela reste à vérifier.

In [31]:
#Affichage du résultat du clustering
predictions.groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|    5|
|         0| 1995|
+----------+-----+



Le Silhouette score est donc trompeur dans notre cas. Le modèle a trouvé deux clusters bien espacés mais la quasi totalité des prédictions (99,75%) a été assigné au cluster 0. Ces documents doivent être trop différents du reste du corpus.

In [32]:
predictions.filter(predictions.prediction == 1).show(truncate=False)

+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Ces documents ont l'air particulièrement long, peut être est-ce la raison pour laquelle le K-Means les a classé à part. Il faudrait les consulter et éventuellement les nettoyer ou les supprimer. Avant de procéder, il est également possible d'augmenter le nombre de cluster recherché par le K-Means pour voir comment il se comporte. 



In [33]:
#Initialisation du modèle k-means avec 3 clusters à rechercher
kmeans = KMeans().setK(3).setSeed(7)
model = kmeans.fit(data)

#Prédictions
predictions = model.transform(featurizedData)

#Affichage du résultat du clustering
predictions.groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|    5|
|         2|   30|
|         0| 1965|
+----------+-----+



L'essai avec 3 clusters indique que les données nécessitent certainement d'avantage de pré-traitement.

De plus le problème des termes qui reviennent souvent mais ne contiennent pas beaucoup d'information comme 'in' ou 'the' n'a pas été pris en compte. Une solution est de pondérer les mots de notre vocabulaire.

## 7. Pour aller plus loin (optionnel)

### 7.1 Pondérer les mots avec la formule Tf-Idf

In [34]:
from pyspark.ml.feature import IDF

#Utilisation de l'estimateur IDF pour pondérer négativement les tokens qui apparraissent trop souvent (ex: 'in', 'the'...)
idf = IDF(inputCol='features', outputCol='idf_features')
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

#Sélection de la colonne contenant les données à fournir au modèle
data_2 = rescaledData.select('idf_features')

#Initialisation du modèle k-means
kmeans = KMeans(featuresCol='idf_features').setK(2).setSeed(7)
model = kmeans.fit(data_2)

#Prédictions
predictions_2 = model.transform(data_2)

#ClusteringEvaluator mesure la proximité de chaque point d'un cluster par rapport aux points des clusters voisins
evaluator = ClusteringEvaluator(featuresCol='idf_features')

#Calcul du Silhouette score
silhouette_2 = evaluator.evaluate(predictions_2)
print("Silhouette avec distance euclidienne au carré = " + str(silhouette_2))

#Affichage des résultats
centers_2 = model.clusterCenters()
print("Centres des clusters: ")
for center in centers_2:
    print(center)

Silhouette avec distance euclidienne au carré = 0.9761172872728983
Centres des clusters: 
[0.00345586 0.00345586 0.         ... 0.00650604 0.         0.01198992]
[0. 0. 0. ... 0. 0. 0.]


In [35]:
#Affichage du résultat du clustering
predictions_2.groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|    1|
|         0| 1999|
+----------+-----+



La pondération a eu un effet sur le Silouhette score puisque l'on est passé de 0,99 à 0,97.
La répartition des documents a été ajustée puisque l'on a désormais qu'un seul document assigné au cluster 1. Les résultats sont donc encore moins bons.

### 7.2 Normaliser les vecteurs représentant les documents

In [38]:
from pyspark.ml.feature import Normalizer

#Normalisation des données avec la norme L1
normalizer = Normalizer(p=1.0, inputCol='features', outputCol='norm_features')
norm_data = normalizer.transform(featurizedData)

#Utilisation de l'estimateur IDF pour pondérer négativement les tokens qui apparraissent trop souvent (ex: 'in', 'the'...)
idf = IDF(inputCol='norm_features', outputCol='idf_norm_features')
idfModel = idf.fit(norm_data)
rescaledData = idfModel.transform(norm_data)

#Sélection de la colonne contenant les données à fournir au modèle
data_3 = rescaledData.select('idf_norm_features')

#Initialisation du modèle k-means
kmeans = KMeans(featuresCol='idf_norm_features').setK(2).setSeed(7)
model = kmeans.fit(data_3)

#Prédictions
predictions_3 = model.transform(data_3)

#ClusteringEvaluator mesure la proximité de chaque point d'un cluster par rapport aux points des clusters voisins
evaluator = ClusteringEvaluator(featuresCol='idf_norm_features')

#Calcul du Silhouette score
silhouette_3 = evaluator.evaluate(predictions_3)
print("Silhouette avec distance euclidienne au carré = " + str(silhouette_3))

#Affichage des résultats
centers_3 = model.clusterCenters()
print("Centres des clusters: ")
for center in centers_3:
    print(center)

Silhouette avec distance euclidienne au carré = 0.2875202745907112
Centres des clusters: 
[4.01912864e-06 9.86315894e-05 0.00000000e+00 ... 2.95550993e-06
 0.00000000e+00 1.28484170e-05]
[0. 0. 0. ... 0. 0. 0.]


In [37]:
#Affichage du résultat du clustering
predictions_3.groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|  107|
|         0| 1893|
+----------+-----+



La normalisation employée est la norme L1 car la norme L2 supprime les tokens peu utilisés. La norme L1 réduit de beaucoup le poids des mots peu utilisés sans les supprimer, c'est donc un meilleur preprocessing pour l'IDF. On obtient un Silhouette score d'environ 0,28, ce qui indique des clusters qui se chevauchent.

Notre résultat n'est donc pas satisfaisant. Les perspectives d'amélioration sont de nettoyer d'avantage nos données et de tester d'autres types d'algorithmes de clustering comme DBSCAN. Ce dernier est peut être plus adapté à nos données car il ne présuppose pas de forme pour les clusters, contrairement à K-Means qui présuppose des cluster de forme convexe.