<a href="https://colab.research.google.com/github/hamel-amir/Projet_Big_Data/blob/main/Projet_Big_Data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Projet Big Data : Apache Spark

Réalisé par:

* Amir  Hamel
* Noor Khalal
* Zahra Alliche

## Objectifs du TP
Ce TP consiste à regrouper des documents textuels tels que les documents qui
partagent le même thème se retrouvent dans le même groupe, et les documents qui
portent sur des sujets très différents se trouvent dans des groupes différents.

## 2 Mise en place de l'environnement de travail 

### 2.2 Installation de Spark

**Instalation de la bibliotheque Java**

In [1]:
! apt-get install openjdk-8-jdk-headless -qq > /dev/null

**Instalation de Spark**

In [2]:
! wget -q https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz


In [3]:
! tar xf spark-3.3.2-bin-hadoop3.tgz

**Instalation de PySpark**

In [4]:
# instalation de pyspark
! pip install -q findspark
! pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m19.2 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=fbdbd7a5ca36e8e3c4dcf90ec24899e8213c85876e20121971038507fbcae43a
  Stored in directory: /root/.cache/pip/wheels/b1/59/a0/a1a0624b5e865fd389919c1a10f53aec9b12195d6747710baf
Successfully built pyspark
Installing collected packages: py4j, pyspa

### 2.3/4 Définir la vatiable d'environnement et créer l'objet SparkContext 

In [5]:
import os
# definir deux variables d'environement
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"  # l'endroit de java
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3" # l'endroit de spark
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.5 pyspark-shell' 

import findspark
findspark.init("spark-3.3.2-bin-hadoop3")

from pyspark import SparkContext, SparkConf

# lancement du spark en local (en une seule machine qui est la machine virtuelle de colab dans ce cas) avec 4 processus (workers node)
configuration = SparkConf().setAppName("name").setMaster("local[4]").set('spark.jars.packages', 'org.apache.spark:spark-avro_2.11:2.4.5')
# name: c'est le nom qu'on donne a notre app (code)
sc = SparkContext(conf=configuration) # l'objet 

In [6]:
sc

**Création de l'objet sparkSession**

In [7]:
# L'objet sparkSession créé pour utiliser l'API Spark SQL
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf=configuration).getOrCreate()

## 3 Données

### 3.1 Téléchargement des données

In [8]:
# Téléchargement du dossier des documents
! wget -q http://qwone.com/~jason/20Newsgroups/20news-19997.tar.gz

### 3.2 Decompression des données

In [9]:
# décompresser le dossier 
! tar xf /content/20news-19997.tar.gz

### 3.3 Chargement des données dans deux variables de type RDD



In [10]:
rdd1 = sc.wholeTextFiles("/content/20_newsgroups/alt.atheism")
rdd2 = sc.wholeTextFiles("/content/20_newsgroups/rec.sport.baseball")

In [11]:
x=rdd2.take(2)

###3.4 Séparation de l’entête

In [12]:
def separateur(x):
  l=x[1].split("\n\n")
  return (x[0],l)

#separateur(x[1])

rdd1=rdd1.map(separateur)
rdd2=rdd2.map(separateur)

In [13]:
# tester le premier élément
test=rdd1.take(1)
test

[('file:/content/20_newsgroups/alt.atheism/53791',
  ['Xref: cantaloupe.srv.cs.cmu.edu alt.atheism:53791 talk.religion.misc:84109 talk.origins:41079\nNewsgroups: alt.atheism,talk.religion.misc,talk.origins\nPath: cantaloupe.srv.cs.cmu.edu!crabapple.srv.cs.cmu.edu!bb3.andrew.cmu.edu!news.sei.cmu.edu!fs7.ece.cmu.edu!europa.eng.gtefsd.com!howland.reston.ans.net!zaphod.mps.ohio-state.edu!swrinde!cs.utexas.edu!utnut!torn!nott!cunews!rjg\nFrom: rjg@doe.carleton.ca (Richard Griffith)\nSubject: Re: Burden of Proof\nMessage-ID: <rjg.735427215@wesley>\nSender: news@cunews.carleton.ca (News Administrator)\nOrganization: Dept. of Electronics, Carleton University\nReferences: <C5t5sF.8oz@mentor.cc.purdue.edu> <1r4b59$7hg@aurora.engr.LaTech.edu>\nDate: Wed, 21 Apr 1993 21:20:15 GMT\nLines: 23',
   'In <1r4b59$7hg@aurora.engr.LaTech.edu> ray@engr.LaTech.edu (Bill Ray) writes:',
   '>If I make a statement, "That God exists, loves me, etc." but in no way\n>insist that you believe it, does that place a 

On voit ici qu'un document est représenté sous la forme d'un tuple `(Path, List)`,

 ou `List = [entête, contenu du document ]`

### 3.5 Extraction des champs de l’entête

In [14]:
#fonction d'ectraction des éléments de l’entête
#prend en prametre l’entête d'un document
#retourne un dictionnaire (Intitulé_Champs, Contenu_Champs)

def extraction_informations_entete(entete):
  #chaque ligne représente un nouveau champs
  l=entete.split("\n")
  d=dict()

  for x in l:
    #l'intitulé du champs est séparé de son contenu par ':'
    z=x.split(":")
    #quelques lignes ne contiennent pas de ':'
    if len(z)==2:
       d[z[0]]=z[1]

  return d

#### Premier example d'extraction

In [15]:
# extraire des informations du premier document du rdd1 (alt.atheism)

# par l'action first
doc1=rdd1.first()
entete=doc1[1][0]

# Dictionnaire des informations de l'entete
dict1_info=extraction_informations_entete(entete)
dict1_info

{'Newsgroups': ' alt.atheism,talk.religion.misc,talk.origins',
 'Path': ' cantaloupe.srv.cs.cmu.edu!crabapple.srv.cs.cmu.edu!bb3.andrew.cmu.edu!news.sei.cmu.edu!fs7.ece.cmu.edu!europa.eng.gtefsd.com!howland.reston.ans.net!zaphod.mps.ohio-state.edu!swrinde!cs.utexas.edu!utnut!torn!nott!cunews!rjg',
 'From': ' rjg@doe.carleton.ca (Richard Griffith)',
 'Message-ID': ' <rjg.735427215@wesley>',
 'Sender': ' news@cunews.carleton.ca (News Administrator)',
 'Organization': ' Dept. of Electronics, Carleton University',
 'References': ' <C5t5sF.8oz@mentor.cc.purdue.edu> <1r4b59$7hg@aurora.engr.LaTech.edu>',
 'Lines': ' 23'}

In [16]:
# selectionner Organization et Newsgroups
print(dict1_info['Newsgroups'])
print(dict1_info['Organization'])

 alt.atheism,talk.religion.misc,talk.origins
 Dept. of Electronics, Carleton University


#### Deuxieme example d'extraction

In [17]:
# par l'action take
doc2=rdd2.take(2)
entete2=doc2[1][1][0]
# Dictionnaire des informations de l'entete
dict2_info=extraction_informations_entete(entete2)
dict2_info

{'Newsgroups': ' rec.sport.baseball',
 'Path': ' cantaloupe.srv.cs.cmu.edu!das-news.harvard.edu!noc.near.net!howland.reston.ans.net!usenet.ins.cwru.edu!wsu-cs!vela.acs.oakland.edu!cs.uiuc.edu!cs.uiuc.edu!steph',
 'From': ' steph@cs.uiuc.edu (Dale Stephenson)',
 'Subject': ' Defensive Averages 1988-1992, Third Base',
 'Message-ID': ' <C5JJrJ.EM3@cs.uiuc.edu>',
 'Summary': ' career defensive averages at third',
 'Organization': ' University of Illinois, Dept. of Comp. Sci., Urbana, IL',
 'Lines': ' 68'}

In [18]:
# selectionner Organization et Newsgroups
print(dict2_info['Newsgroups'])
print(dict2_info['Organization'])

 rec.sport.baseball
 University of Illinois, Dept. of Comp. Sci., Urbana, IL


Ici, nous allons créer une nouvelle fonction de séparation de document qui va créer un élément de RDD de la forme 
`(Path, [Dictionnaire de l'entete , le contenu du document])`

Ceci rendra la transformation des élément en type pyspark.sql.Row plus facile durant le choix des colonnes car certains champs de l,entete n'e=éxistent pas dans tous les documents, le test d'éxistance sera plus facile grace au dictionnaire.

In [19]:
# cette fonction prend en entrée un élement du rdd (doc)
# et retourne un tuple contenant ( path, [dictionnaire de l'entete , le contenu du document])
def structuration_document(doc):
  if  doc is not None:
    l= doc[1][0].split("\n")
    d=dict()
    if len(l)!=0:
      for x in l:
        z=x.split(":",1)
        if len(z)==2:
          d[z[0]]=z[1]
        if len(z)<2:
           pass
        

      return (doc[0],[d,doc[1][1]])

In [20]:
new_rdd1=rdd1.map(structuration_document)


In [21]:
# affichage du resultat de new_rdd1
new_rdd1.take(2)

[('file:/content/20_newsgroups/alt.atheism/53791',
  [{'Xref': ' cantaloupe.srv.cs.cmu.edu alt.atheism:53791 talk.religion.misc:84109 talk.origins:41079',
    'Newsgroups': ' alt.atheism,talk.religion.misc,talk.origins',
    'Path': ' cantaloupe.srv.cs.cmu.edu!crabapple.srv.cs.cmu.edu!bb3.andrew.cmu.edu!news.sei.cmu.edu!fs7.ece.cmu.edu!europa.eng.gtefsd.com!howland.reston.ans.net!zaphod.mps.ohio-state.edu!swrinde!cs.utexas.edu!utnut!torn!nott!cunews!rjg',
    'From': ' rjg@doe.carleton.ca (Richard Griffith)',
    'Subject': ' Re: Burden of Proof',
    'Message-ID': ' <rjg.735427215@wesley>',
    'Sender': ' news@cunews.carleton.ca (News Administrator)',
    'Organization': ' Dept. of Electronics, Carleton University',
    'References': ' <C5t5sF.8oz@mentor.cc.purdue.edu> <1r4b59$7hg@aurora.engr.LaTech.edu>',
    'Date': ' Wed, 21 Apr 1993 21:20:15 GMT',
    'Lines': ' 23'},
   'In <1r4b59$7hg@aurora.engr.LaTech.edu> ray@engr.LaTech.edu (Bill Ray) writes:']),
 ('file:/content/20_newsgro

In [22]:
# en appliquant la meme fonction sur le rdd2
new_rdd2=rdd2.map(structuration_document)

In [23]:
new_rdd2.take(2)

[('file:/content/20_newsgroups/rec.sport.baseball/104985',
  [{'Newsgroups': ' rec.sport.baseball',
    'Path': ' cantaloupe.srv.cs.cmu.edu!rochester!udel!gatech!howland.reston.ans.net!zaphod.mps.ohio-state.edu!magnus.acs.ohio-state.edu!dmoney',
    'From': ' dmoney@magnus.acs.ohio-state.edu (Dean R Money)',
    'Subject': ' The Braves will come around...',
    'Message-ID': ' <1993Apr23.193027.26515@magnus.acs.ohio-state.edu>',
    'Sender': ' news@magnus.acs.ohio-state.edu',
    'Nntp-Posting-Host': ' bottom.magnus.acs.ohio-state.edu',
    'Organization': ' The Ohio State University',
    'Date': ' Fri, 23 Apr 1993 19:30:27 GMT',
    'Lines': ' 28'},
   'To all the Braves doubters:']),
 ('file:/content/20_newsgroups/rec.sport.baseball/104504',
  [{'Newsgroups': ' rec.sport.baseball',
    'Path': ' cantaloupe.srv.cs.cmu.edu!das-news.harvard.edu!noc.near.net!howland.reston.ans.net!usenet.ins.cwru.edu!wsu-cs!vela.acs.oakland.edu!cs.uiuc.edu!cs.uiuc.edu!steph',
    'From': ' steph@cs.uiu

### 3.6 Fusion des deux new RDD

en faisant l'union des deux RDD

In [24]:
# Fusionner les deux RDD (union)
fusion=new_rdd1.union(new_rdd2)

In [25]:
fusion.take(2)

[('file:/content/20_newsgroups/alt.atheism/53791',
  [{'Xref': ' cantaloupe.srv.cs.cmu.edu alt.atheism:53791 talk.religion.misc:84109 talk.origins:41079',
    'Newsgroups': ' alt.atheism,talk.religion.misc,talk.origins',
    'Path': ' cantaloupe.srv.cs.cmu.edu!crabapple.srv.cs.cmu.edu!bb3.andrew.cmu.edu!news.sei.cmu.edu!fs7.ece.cmu.edu!europa.eng.gtefsd.com!howland.reston.ans.net!zaphod.mps.ohio-state.edu!swrinde!cs.utexas.edu!utnut!torn!nott!cunews!rjg',
    'From': ' rjg@doe.carleton.ca (Richard Griffith)',
    'Subject': ' Re: Burden of Proof',
    'Message-ID': ' <rjg.735427215@wesley>',
    'Sender': ' news@cunews.carleton.ca (News Administrator)',
    'Organization': ' Dept. of Electronics, Carleton University',
    'References': ' <C5t5sF.8oz@mentor.cc.purdue.edu> <1r4b59$7hg@aurora.engr.LaTech.edu>',
    'Date': ' Wed, 21 Apr 1993 21:20:15 GMT',
    'Lines': ' 23'},
   'In <1r4b59$7hg@aurora.engr.LaTech.edu> ray@engr.LaTech.edu (Bill Ray) writes:']),
 ('file:/content/20_newsgro

### 3.7 Transformation le nouveau RDD obtenu pour que chaque élément soit de type pyspark.sql.Row

In [26]:
from pyspark.sql import Row

# cette fonction ToRow est utilisée pour manipuler la table sql et faire des requetes.
def ToRow(x):
  entete=x[1][0]
  
  #Certains éléments (comme Subject ou Date) n'éxistent pas dans tous les documents
  #C'est pour cela que la création du dictionnaire avec la fonction structuration_document() va nous aider dans les tests d'éxistance
  row = Row(Path=x[0], Newsgroups=entete['Newsgroups'] if 'Newsgroups' in entete.keys() else None,
            Organization=entete['Organization'] if 'Organization' in entete.keys() else None,
            Subject=entete['Subject'] if 'Subject' in entete.keys() else None,
            Date=entete['Date'] if 'Date' in entete.keys() else None,
            Lines=entete['Lines'] if 'Lines' in entete.keys() else None,
            From=entete['From'] if 'From' in entete.keys() else None,

            Contenu=x[1][1])
  
  return row

rddF=fusion.map(ToRow)


In [27]:
rddF.take(2)

[Row(Path='file:/content/20_newsgroups/alt.atheism/53791', Newsgroups=' alt.atheism,talk.religion.misc,talk.origins', Organization=' Dept. of Electronics, Carleton University', Subject=' Re: Burden of Proof', Date=' Wed, 21 Apr 1993 21:20:15 GMT', Lines=' 23', From=' rjg@doe.carleton.ca (Richard Griffith)', Contenu='In <1r4b59$7hg@aurora.engr.LaTech.edu> ray@engr.LaTech.edu (Bill Ray) writes:'),
 Row(Path='file:/content/20_newsgroups/alt.atheism/54220', Newsgroups=' alt.atheism', Organization=' Case Western Reserve University', Subject=' Re: free moral agency', Date=' Thu, 22 Apr 1993 23:48:54 GMT', Lines=' 29', From=' kmr4@po.CWRU.edu (Keith M. Ryan)', Contenu='In article <C5uxJ9.pJ@darkside.osrhe.uoknor.edu> bil@okcforum.osrhe.edu (Bill Conner) writes:')]

### 3.8 Création d'unobjet de type DataFrame à partir du RDD fusionné

In [28]:
#Create dataframe spark
df=spark.createDataFrame(rddF)
df.printSchema()
df.show()
df.count()

root
 |-- Path: string (nullable = true)
 |-- Newsgroups: string (nullable = true)
 |-- Organization: string (nullable = true)
 |-- Subject: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Lines: string (nullable = true)
 |-- From: string (nullable = true)
 |-- Contenu: string (nullable = true)

+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+
|                Path|          Newsgroups|        Organization|             Subject|                Date|Lines|                From|             Contenu|
+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+
|file:/content/20_...| alt.atheism,talk...| Dept. of Electro...| Re: Burden of Proof| Wed, 21 Apr 1993...|   23| rjg@doe.carleton...|In <1r4b59$7hg@au...|
|file:/content/20_...|         alt.atheism| Case Western Res...| 

2000

### 3.9 Sauvegarde la DataFrame au format Avro

In [29]:
!pip install pandavro

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pandavro
  Downloading pandavro-1.7.1.tar.gz (8.1 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting fastavro==1.5.1
  Downloading fastavro-1.5.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.6/2.6 MB[0m [31m40.4 MB/s[0m eta [36m0:00:00[0m
Building wheels for collected packages: pandavro
  Building wheel for pandavro (setup.py) ... [?25l[?25hdone
  Created wheel for pandavro: filename=pandavro-1.7.1-py3-none-any.whl size=5688 sha256=59d4e81ee874531b44aeacb6f4c85f1113387fcc011bfe419965cf015dca116a
  Stored in directory: /root/.cache/pip/wheels/85/cc/15/480a3cfa1a9fc49e2af9bde7437bd6c8c0265f17e61f32672b
Successfully built pandavro
Installing collected packages: fastavro, pandavro
Successfully installed fastavro-1.5.1 pandavro-1.7.1


In [30]:
import numpy as np
import pandas as pd
import pandavro as pdx
from pyspark.sql.avro.functions import from_avro, to_avro
filename = "df.avro"
# on a passé par le dataframe pandas mais ça envoie tout au driver
df2=df.toPandas()
pdx.to_avro(filename, df2)
#df.write.partitionBy("Path",).avro("output_dir")
#avroDf = df.select(to_avro(df.Path).alias("avro"))
#avroDf.collect()

In [31]:
#lecture du fichier avro créé
saved = pdx.read_avro(filename)


### 3.10 Sauvegarder la DataFrame au format Parquet

In [32]:
# sauvgarder le dataframe au format parquet
import pyarrow as pa
import pyarrow.parquet as pq
table = pa.Table.from_pandas(df2)
pq.write_table(table, 'df.parquet')

In [33]:
# test: lecture du df a partir du fichier parquet
table2 = pq.read_table('df.parquet')
table2.to_pandas()

Unnamed: 0,Path,Newsgroups,Organization,Subject,Date,Lines,From,Contenu
0,file:/content/20_newsgroups/alt.atheism/53791,"alt.atheism,talk.religion.misc,talk.origins","Dept. of Electronics, Carleton University",Re: Burden of Proof,"Wed, 21 Apr 1993 21:20:15 GMT",23,rjg@doe.carleton.ca (Richard Griffith),In <1r4b59$7hg@aurora.engr.LaTech.edu> ray@eng...
1,file:/content/20_newsgroups/alt.atheism/54220,alt.atheism,Case Western Reserve University,Re: free moral agency,"Thu, 22 Apr 1993 23:48:54 GMT",29,kmr4@po.CWRU.edu (Keith M. Ryan),In article <C5uxJ9.pJ@darkside.osrhe.uoknor.ed...
2,file:/content/20_newsgroups/alt.atheism/51127,alt.atheism,"California Institute of Technology, Pasadena",Re: >>>>>>Pompous ass,2 Apr 93 21:01:40 GMT,14,keith@cco.caltech.edu (Keith Allan Schneider),livesey@solntze.wpd.sgi.com (Jon Livesey) writes:
3,file:/content/20_newsgroups/alt.atheism/54191,alt.atheism,Decision Support Inc.,Re: Yet more Rushdie [Re: ISLAMIC LAW],26 Apr 1993 11:08:49 -0400,28,perry@dsinc.com (Jim Perry),In article <1993Apr25.031703.5230@monu6.cc.mon...
4,file:/content/20_newsgroups/alt.atheism/51225,alt.atheism,Welch Medical Library,Re: A Little Too Satanic,"Mon, 5 Apr 1993 13:34:38 GMT",21,"davidk@welch.jhu.edu (David ""Go-Go"" Kitaguchi)","In article 65934@mimsy.umd.edu, mangoe@cs.umd...."
...,...,...,...,...,...,...,...,...
1995,file:/content/20_newsgroups/rec.sport.baseball...,rec.sport.baseball,The Cleveland Clinic Foundation,Re: Wounded Redbirds,"Wed, 21 Apr 1993 17:23:28 GMT",7,tknuth@bio.ri.ccf.org (Todd Knuth),"In article 1@acad.drake.edu, sbp002@acad.drake..."
1996,file:/content/20_newsgroups/rec.sport.baseball...,rec.sport.baseball,University of Illinois at Urbana,Re: Moe Berg,"Tue, 20 Apr 1993 18:10:13 GMT",31,rkoffler@ux4.cso.uiuc.edu (Bighelmet),barring@cs.washington.edu (David Barrington) w...
1997,file:/content/20_newsgroups/rec.sport.baseball...,rec.sport.baseball,Kendall Square Research Corp.,Texas Rangers Roster - PLEASE HELP!,20 Apr 93 12:55:09 EDT,19,dorin@ksr.com (Bob Dorin),I need a little help from a Texas Rangers expert.
1998,file:/content/20_newsgroups/rec.sport.baseball...,rec.sport.baseball,University of Washington,Re: New Uniforms,6 Apr 93 17:32:36 GMT,16,neuharth@hardy.u.washington.edu (John Neuharth),jpopovich@guvax.acc.georgetown.edu writes:


## 4 Analyse descriptive API SPARK SQL

### 4.1 Vérification de deux catégories différentes de documents

In [34]:
# le nombre de documents par Newsgroups
df.groupBy("Newsgroups").count().show()

+--------------------+-----+
|          Newsgroups|count|
+--------------------+-----+
| talk.religion.mi...|    2|
| sci.skeptic,alt....|    7|
| alt.atheism,soc....|    1|
| talk.abortion,al...|    1|
| alt.atheism,talk...|    3|
| alt.atheism,rec....|    3|
| alt.atheism,talk...|   92|
| rec.scouting,soc...|    1|
| soc.culture.arab...|    2|
| alt.atheism,talk...|    5|
| alt.atheism,alt....|    2|
| alt.atheism,soc....|    8|
|         alt.atheism|  734|
| alt.atheism,alt....|    7|
| talk.religion.mi...|    4|
| alt.atheism,talk...|   20|
| alt.slack,talk.r...|    1|
| talk.religion.mi...|    4|
| talk.abortion,al...|   94|
| alt.atheism,soc....|    2|
+--------------------+-----+
only showing top 20 rows



On remarque que le champ Newsgroups peut contenir des éléments en plus des deux catégories principales "alt.atheism" ou "rec.sport.baseball".

Pour vérifier qu'on a bien ces deux catégories principales différentes, nous allons créer deux vues: 

* Une vue `sport` qui contient toutes les lignes ou `rec.sport.baseball` apparait.
* Une vue `religion` qui contient toutes les lignes ou `alt.atheism` apparait.

Puis nous allons compter le nombre d'éléments dans chaque liste, en plus de voir si l'intersectiondes deux vues est vide ou pas.

Si l'intersection est vide, et que la somme des éléments des deux vues est égales au nombre total des éléments de la fusion, cela veut dire que les documents sont bien separés par deux catégories principales  "alt.atheism" et "rec.sport.baseball".

In [35]:
# Mettre le dataframe dans une table sql (Document)

from pandas._libs.hashtable import value_count
df.createOrReplaceTempView("Document")

In [36]:
#Si les vues sont déja crées
#spark.sql("DROP VIEW sport")
#spark.sql("DROP VIEW religion")

In [37]:
# Création d'une vue (sport) contennat tous les document de catégorie rec.sport.baseball
sqlDF = spark.sql("CREATE TEMP VIEW sport AS SELECT Newsgroups FROM Document WHERE Newsgroups LIKE '%rec.sport.baseball%'")

In [38]:
# Création d'une vue (religion) contennat tous les document de catégorie alt.atheism
sqlDF = spark.sql("CREATE TEMP VIEW religion AS SELECT Newsgroups FROM Document WHERE Newsgroups LIKE '%alt.atheism%'")

In [39]:
# affichage la vue Sport
sqlDF2 = spark.sql("SELECT * FROM sport")
sqlDF2.show()

+--------------------+
|          Newsgroups|
+--------------------+
|  rec.sport.baseball|
|  rec.sport.baseball|
|  rec.sport.baseball|
|  rec.sport.baseball|
|  rec.sport.baseball|
|  rec.sport.baseball|
|  rec.sport.baseball|
|  rec.sport.baseball|
|  rec.sport.baseball|
|  rec.sport.baseball|
|  rec.sport.baseball|
|  rec.sport.baseball|
|  rec.sport.baseball|
|  rec.sport.baseball|
| rec.sport.baseba...|
|  rec.sport.baseball|
|  rec.sport.baseball|
|  rec.sport.baseball|
|  rec.sport.baseball|
|  rec.sport.baseball|
+--------------------+
only showing top 20 rows



In [40]:
# Affichage la vue religion
sqlDF3 = spark.sql("SELECT * FROM religion")
sqlDF3.show()

+--------------------+
|          Newsgroups|
+--------------------+
| alt.atheism,talk...|
|         alt.atheism|
|         alt.atheism|
|         alt.atheism|
|         alt.atheism|
|         alt.atheism|
|         alt.atheism|
|         alt.atheism|
| sci.skeptic,alt....|
| alt.atheism,talk...|
|         alt.atheism|
|         alt.atheism|
|         alt.atheism|
|         alt.atheism|
|         alt.atheism|
|         alt.atheism|
|         alt.atheism|
|         alt.atheism|
|         alt.atheism|
|         alt.atheism|
+--------------------+
only showing top 20 rows



In [41]:
# le nombre de documents dont la categorie contient rec_baseball
sqlDF3 = spark.sql("SELECT count(s.Newsgroups) as nbr_rec_baseball FROM sport s ")
sqlDF3.show()

+----------------+
|nbr_rec_baseball|
+----------------+
|            1000|
+----------------+



In [42]:
# le nombre de documents dont la categorie contient alt.atheism
sqlDF3 = spark.sql("SELECT count(Newsgroups) as nb_alt_atheism FROM religion")
sqlDF3.show()
 

+--------------+
|nb_alt_atheism|
+--------------+
|          1000|
+--------------+



In [43]:
# Pour prouver qu'il n'y a pas d'intersection entre les deux catégories 
# de plus on a 1000 lignes dans la vue religion et 1000 dans la vue religion
#(en sachant que le nombre total des lignes dans la table document est de 2000)
sqlDF3 = spark.sql("SELECT * FROM religion INTERSECT (SELECT * FROM sport) ")
sqlDF3.show()

+----------+
|Newsgroups|
+----------+
+----------+



**Conclusion:** on a bien deux catégories différentes de documents

### 4.2 le nombre d'organisations différentes

In [44]:
#le nombre d'organisations différentes
from pyspark.sql.functions import countDistinct
df2=df.select(countDistinct("Organization"))
df2.show()

+----------------------------+
|count(DISTINCT Organization)|
+----------------------------+
|                         485|
+----------------------------+



### 4.3 D’autres statistiques descriptives

Max, Min , Moyenne par rapport a l'attribut Lines

In [45]:
df.describe("Lines").show()

+-------+-----------------+
|summary|            Lines|
+-------+-----------------+
|  count|             1994|
|   mean|36.48294884653962|
| stddev|48.79137273450094|
|    min|                1|
|    max|               99|
+-------+-----------------+



Le nombre de documents par organization

In [46]:
df.groupBy("Organization").count().show()

+--------------------+-----+
|        Organization|count|
+--------------------+-----+
|           SunSelect|    2|
| Case Western Res...|   39|
| Princeton Univer...|   24|
| University of Ne...|    2|
| S-CUBED, A Divis...|    1|
| Harlequin Ltd. C...|    1|
| University of Ne...|    7|
|   Cured, discharged|    3|
|   Augustana College|    1|
| National Optical...|    2|
| Dept. of Electro...|    1|
| Cabletron System...|    6|
| Welch Medical Li...|    2|
| Red Wolfe @ The ...|    1|
|  Indiana University|   17|
| Johns Hopkins Un...|    7|
| Okcforum Unix Us...|   32|
| Iowa State Unive...|    2|
| Wesleyan University|    7|
| Tektronix, Inc.,...|   11|
+--------------------+-----+
only showing top 20 rows



Le nombre de documents qui ont été créés pour chaque auteur

In [47]:
df.groupBy("FROM").count().show()

+--------------------+-----+
|                FROM|count|
+--------------------+-----+
| dpw@sei.cmu.edu ...|    4|
| mccullou@snake2....|    6|
| livesey@solntze....|   70|
| markp@avignon (M...|    1|
| jvigneau@cs.ulow...|    1|
| kilman2y@fiu.edu...|    1|
| darice@yoyo.cc.m...|   14|
| cust_ts@klaava.H...|    2|
| kax@cs.nott.ac.u...|    2|
| acooper@mac.cc.m...|    4|
| "James F. Tims" ...|    1|
| kutluk@ccl.umist...|    1|
| <MVS104@psuvm.ps...|    1|
| davec@silicon.cs...|    1|
| halat@pooh.bears...|   15|
| nancyo@shnext15....|    1|
| David O Hunt <bl...|    1|
| sbradley@scic.in...|    2|
| mikec@sail.LABS....|    2|
| adpeters@sunflow...|    3|
+--------------------+-----+
only showing top 20 rows



##5.A. Transformation de texte : Tout le document
Dans ce qui suit, nous allons créer un nouvel RDD de type pyspark.sql.Row puis un dataframe qui a une seule colonne contenant tout le document. Ceci est pour faire un premier test sur l'algorithme KMeans.


In [48]:
def ToRow_all_document(x):
  entete=x[1][0]
  val=" ".join(entete.values())

  row = Row(document=val+x[1][1])
  return row

all_document=fusion.map(ToRow_all_document)

In [49]:
# create spark dataframe contenant tout le document
all_document_df=spark.createDataFrame(all_document)
all_document_df.printSchema()

root
 |-- document: string (nullable = true)



In [50]:
#Transformation du Texte
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

###5.A.2 Découpage les documents en listes de mots à l’aide de Tokenizer

In [51]:
tokenizer = Tokenizer(inputCol="document", outputCol="words")
#d=df.select("Contenu")
wordsData = tokenizer.transform(all_document_df)
wordsData.show()

+--------------------+--------------------+
|            document|               words|
+--------------------+--------------------+
| cantaloupe.srv.c...|[, cantaloupe.srv...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|
| alt.atheism  can...|[, alt.atheism, ,...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|
| alt.atheism  can...|[, alt.atheism, ,...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|
| alt.atheism  can...|[, alt.atheism, ,...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|
| alt.atheism  can...|[, alt.atheism, ,...|
| cantaloupe.srv.c...|[, cantalo

###5.A.3 Création d'une représentation vectorielle des documents à l’aide de HashingTF

In [52]:
#Création d'une représentation vectorielle des documents à l’aide de HashingTF
hashingTF = HashingTF(inputCol="words", outputCol="features_hache", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
featurizedData.show()

+--------------------+--------------------+--------------------+
|            document|               words|      features_hache|
+--------------------+--------------------+--------------------+
| cantaloupe.srv.c...|[, cantaloupe.srv...|(20,[0,1,2,3,5,6,...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|(20,[0,2,3,4,5,6,...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|(20,[0,1,2,3,4,5,...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|(20,[0,1,2,3,4,5,...|
| alt.atheism  can...|[, alt.atheism, ,...|(20,[0,1,2,3,4,5,...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|(20,[1,2,5,6,7,9,...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|(20,[0,2,3,4,5,6,...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|(20,[0,2,3,4,5,6,...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|(20,[0,1,2,3,4,5,...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|(20,[0,1,2,3,4,5,...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|(20,[2,4,5,7,8,9,...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|(20,[0,1,2,3,4,5,...|
| cantaloupe.srv.c...|[, 

##6-7.A. Groupement des documents ayants des représentations vectorielles proches

###A.1. KMeans avec la pondération Tf-Idf

#### Pondération des mots avec la formule Tf-*Idf*

In [53]:

idf = IDF(inputCol="features_hache", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
rescaledData.show()

+--------------------+--------------------+--------------------+--------------------+
|            document|               words|      features_hache|            features|
+--------------------+--------------------+--------------------+--------------------+
| cantaloupe.srv.c...|[, cantaloupe.srv...|(20,[0,1,2,3,5,6,...|(20,[0,1,2,3,5,6,...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|(20,[0,2,3,4,5,6,...|(20,[0,2,3,4,5,6,...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|(20,[0,1,2,3,4,5,...|(20,[0,1,2,3,4,5,...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|(20,[0,1,2,3,4,5,...|(20,[0,1,2,3,4,5,...|
| alt.atheism  can...|[, alt.atheism, ,...|(20,[0,1,2,3,4,5,...|(20,[0,1,2,3,4,5,...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|(20,[1,2,5,6,7,9,...|(20,[1,2,5,6,7,9,...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|(20,[0,2,3,4,5,6,...|(20,[0,2,3,4,5,6,...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|(20,[0,2,3,4,5,6,...|(20,[0,2,3,4,5,6,...|
| cantaloupe.srv.c...|[, cantaloupe.srv...|(20,[0,1,2,

#### KMeans avec ponderation Tf-Idf

Comme nous allons tester l'algorithme de KMeans dans plusieurs cas (document entier/entete et Tf-Idf/normalisation), nous allons créer une fonction K_means qui prend un vecteur et applique l'algorithme de clustering dessus, puis affiche le Silhouette score, le nombre de clusters, leurs centres et la taille de chaque cluster.

In [55]:
# Kmeans  
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import seaborn as sns
import pandas as pd
import matplotlib as plt


def K_means(Datavector):
    # Trains a k-means model.
    kmeans = KMeans().setK(2).setSeed(5)
    model = kmeans.fit(Datavector)
    # make prediction
    predictions = model.transform(Datavector)
    # Evaluate clustering by computing Silhouette score
    evaluator = ClusteringEvaluator()

    # Le Silhouette score est une mesure utilisée pour calculer la qualité technique du clustering , sa valeur est comprise entre 1 et -1
    # 1: un bon clustering
    # 0: Signifie que les clusters sont indifférents, ou nous pouvons dire que la distance entre les clusters n'est pas significative.
    # -1: Signifie que les clusters sont affectés dans le mauvais sens.
    # s=(ba)/max(a,b) ou a est la distance intra-cluster moyenne et b est la distance inter-cluster moyenne

    silhouette = evaluator.evaluate(predictions)
    print("Silhouette with squared euclidean distance = " + str(silhouette))
    # Shows the result.
    centers = model.clusterCenters()
    print("Cluster Centers: ", len(centers))
    for center in centers:
      print(center)

    #Affichage du résultat du clustering
    predictions.groupBy('prediction').count().show()

In [None]:
#Kmeans pour idf
K_means(rescaledData)

***A REVOIR CETTE PARTIE***

Le Silhouette score est une métrique allant de -1 à 1. Le score obtenu est donc quasi maximal.

Le modèle a trouvé deux clusters bien espacés mais la quasi totalité des prédictions (99,75%) a été assigné au cluster 0.

###A.2 Normalisation des vecteurs représentant les documents

#### Normalisation

In [None]:
# normalisation
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors
normalizer = Normalizer(inputCol="features_hache", outputCol="features", p=1.0)
l1NormData = normalizer.transform(featurizedData)
l1NormData.show()

#### KMeans avec normalisation

In [None]:
# Kmeans avec normalisation
K_means(l1NormData)

##5.B. Transformation de texte : En tete seulement

Comme le résultat du clustering n'a pas donné des résultats satisfaisants, nous allons essayer les meme étapes sur l'entete des documents.

Nous allons commencer par créer un nouvel RDD de type pyspark.sql.Row puis un dataframe qui a une seule colonne ne contenant que l'eentete. Ceci est pour faire un deuxieme test sur l'algorithme KMeans.

In [None]:
def ToRow_entete(x):
  entete=x[1][0]
  val=" ".join(entete.values())
  row = Row(entete=val)
  return row

entete_document_row=fusion.map(ToRow_entete)

In [None]:
Entete_df=spark.createDataFrame(entete_document_row)
Entete_df.printSchema()

###5.B.2 Découpage les documents en listes de mots à l’aide de Tokenizer

In [None]:
tokenizer2 = Tokenizer(inputCol="entete", outputCol="words")
#d=df.select("Contenu")
wordsData_entete = tokenizer2.transform(Entete_df)
wordsData_entete.show()

###5.B.3 Création d'une représentation vectorielle des documents à l’aide de HashingTF

In [None]:
#Création d'une représentation vectorielle des documents à l’aide de HashingTF
hashingTF = HashingTF(inputCol="words", outputCol="features_hache", numFeatures=20)
featurizedData_entete = hashingTF.transform(wordsData_entete)
featurizedData_entete.show()

##6-7.B. Groupement des documents ayants des représentations vectorielles proches

###B.1. KMeans avec ponderation Tf-Idf

#### Pondération des mots avec la formule Tf-*Idf*

In [None]:
idf = IDF(inputCol="features_hache", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData_entete = idfModel.transform(featurizedData_entete)
rescaledData_entete.show()

#### KMeans avec ponderation Tf-Idf

In [None]:
K_means(rescaledData_entete)

###B.2. KMeans avec  normalisation

####Normalisation

In [None]:
# normalisation
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors
normalizer = Normalizer(inputCol="features_hache", outputCol="features", p=1.0)
l1NormData_entete = normalizer.transform(featurizedData_entete)
l1NormData_entete.show()

####KMeans avec normalisation

In [None]:
K_means(l1NormData_entete)