## Utilisation de pyspark pour un pipeline machine learning : classification du genre sur les discours politiques français

### Jing TAN & Melchior PRUGNIAUD 
###### MS DS || ENSAE

Ce sujet réalise des traitements de NLP. Etant donné la complexité de certaines fonctions que nous allons utilisés, il est important de distribué ces computations afin de réduire le temps de travail qui normalement sur une seule machine prendrait des heures. C'est pourquoi utiliser `PySpark` nous semble être une bonne solution.

Le but ici est de comprendre l'utilisation de `PySpark` tout en développant un pipeline de machine learning facilement exécutable.

Nous avions un problème pour utiliser plusieurs packages sur le cluster de l'ENSAE, c'est pourquoi nous avons au final décider de réaliser l'ensemble des opérations directement sous collab car nous avons plus de maniabilité que sur le cluster.


### Partie II : Modèle pré-entraîné et Deep Learning Classification 


#### Colab Setup

In [1]:
import os

# Install java
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
! java -version

# Install pyspark
! pip install --ignore-installed pyspark==2.4.4

# Install Spark NLP
! pip install --ignore-installed spark-nlp==2.5.0


openjdk version "1.8.0_252"
OpenJDK Runtime Environment (build 1.8.0_252-8u252-b09-1~18.04-b09)
OpenJDK 64-Bit Server VM (build 25.252-b09, mixed mode)
Processing /root/.cache/pip/wheels/ab/09/4d/0d184230058e654eb1b04467dbc1292f00eaa186544604b471/pyspark-2.4.4-py2.py3-none-any.whl
Collecting py4j==0.10.7
  Using cached https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.4
Collecting spark-nlp==2.5.0
  Using cached https://files.pythonhosted.org/packages/75/b0/f50d169c49f5982f8be9e86e285b53e23f91fd7db0d10646c2d1de5c3ad0/spark_nlp-2.5.0-py2.py3-none-any.whl
Installing collected packages: spark-nlp
Successfully installed spark-nlp-2.5.0


Dans cette partie, nous allons dans un premier temps créer notre baseline avec 
`pyspark.ml`. Considérons les résultats de la première partie, nous allons d'abord traiter les texte avec différentes procédures de preprocessing, ensuite travailler avec la regression logistique pour la classification. Dans un deuxième temps, nous allons créer un modèle deep learning avec `sparknlp`.

> Procédure pour le baseline


1.   Vectorizer par exemple, CountVectorizer IDF, HashingTF IDF, etc

2.   Word Embedding avec skip-n-gram

3.   Classification par la régression logistique

> Procédure pour le modèle de deep learning


1.   BERT pretrained word embedding

2.   Deep Learning classifier



In [0]:
import sys
import time
import pyspark
import sparknlp

In [0]:
import pandas as pd

df_eq = pd.read_csv('df_eq.csv')
df_eq = df_eq[df_eq.columns[1:]]
df_eq.fillna('',inplace=True)
def encode(x):
  if x == 1:
    return 0
  else:
    return 1
df_eq.sexe = df_eq.sexe.apply(encode)

#df_pd =  pd.read_csv('medium_df_eq.csv')
# Number of rows in the dataframe
#print('number of rows in the data frame {}'.format(df_pd.count()))
# Number of columns and their labels
#print('number of columns in the data frame {}'.format(len(df_pd.columns)))
#print('columns: {}'.format(df_pd.columns))

#df_pd = df_pd[['Id','Texte', 'sexe']]

In [4]:
from pyspark.sql.functions import col

spark = sparknlp.start()
df_s = spark.createDataFrame(df_eq)
df_s = df_s.select(*['sexe','Texte'])
df_s.show(2)

df_s.groupBy("sexe") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+----+-----+
|sexe|count|
+----+-----+
|   1| 2500|
|   2| 2500|
+----+-----+



In [0]:
(train_set, val_set, test_set) = df_s.randomSplit([0.8, 0.1, 0.1], seed = 2000)

In [6]:
type(train_set)

pyspark.sql.dataframe.DataFrame

## HashingTF + IDF + Logistic Regression


In [0]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

Qu'est-ce qu'un Pipeline de toute façon? Dans l'apprentissage automatique, il est courant d'exécuter une séquence d'algorithmes pour traiter et apprendre des données.
Un pipeline est spécifié comme une séquence d'étapes, et chaque étape est soit un transformateur soit un estimateur. Ces étapes sont exécutées dans l'ordre et le `DataFrame` d'entrée est transformé lors de son passage à chaque étape. Autrement dit, les données sont transmises dans le pipeline ajusté dans l'ordre. La méthode transforme de chaque étape met à jour le jeu de données et le passe à l'étape suivante. Avec l'aide de Pipelines, nous pouvons nous assurer que les données de formation et de test passent par des étapes de traitement des fonctionnalités identiques.

In [72]:
# Creer une pipeline pour traiter les textes (Tokenizer, vectorizer avec HashTF et IDF)
tokenizer = Tokenizer(inputCol="Texte", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "sexe", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(val_set)
train_df.show(5)

+------+--------------------+----+--------------------+--------------------+--------------------+-----+
|    Id|               Texte|sexe|               words|                  tf|            features|label|
+------+--------------------+----+--------------------+--------------------+--------------------+-----+
|126905| Pour un nouvel é...|   1|[, pour, un, nouv...|(65536,[187,225,4...|(65536,[187,225,4...|  0.0|
|126923|Après avoir arrêt...|   1|[après, avoir, ar...|(65536,[46,104,13...|(65536,[46,104,13...|  0.0|
|126994|Monsieur le Prési...|   1|[monsieur, le, pr...|(65536,[131,180,2...|(65536,[131,180,2...|  0.0|
|127053|UN MOMENT SOLENNE...|   1|[un, moment, sole...|(65536,[18,33,167...|(65536,[18,33,167...|  0.0|
|127078|Vienne, le 28 avr...|   1|[vienne,, le, 28,...|(65536,[75,144,43...|(65536,[75,144,43...|  0.0|
+------+--------------------+----+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



In [74]:
# Entrainer le modèle de la Regression logistique 
lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_df)
predictions = lrModel.transform(val_df)

# Evaluer le modèle
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
roc_auc = evaluator.evaluate(predictions)
print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(roc_auc))

0.7851088908845982

## CountVectorizer + IDF + Logistic Regression

In [68]:
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.classification import LogisticRegression

# Creer une pipeline pour traiter les texte et faire la regression
# (Tokenizer, vectorizer avec CountVectorizer et IDF, regression logistique)
tokenizer = Tokenizer(inputCol="Texte", outputCol="words")
cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "sexe", outputCol = "label")
lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, lr])
# Entrainer le modèle
pipelineFit = pipeline.fit(train_set)
predictions = pipelineFit.transform(val_set)
# Evaluer le modèle
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
roc_auc = evaluator.evaluate(predictions)

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(roc_auc))

Accuracy Score: 0.7434
ROC-AUC: 0.8102


## N-gram + CountVect + IDF + Logistic regression



In [0]:
from pyspark.ml.feature import NGram, VectorAssembler

def build_ngrams_wocs(inputCol=["Texte","sexe"], n=3):
  '''
  Creer une pipeline pour traiter les texte, faire du skip n gram word embedding et faire la regression logistique
  Input :
    inputCol : noms de la colonne du texte
    n        : nombre de skips
  Output:
    pipeline : pipeline qui procéde aux traitement du texte et la regression
  '''
    tokenizer = [Tokenizer(inputCol="Texte", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=5460,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="features"
    )]
    label_stringIdx = [StringIndexer(inputCol = "sexe", outputCol = "label")]
    lr = [LogisticRegression(maxIter=100)]
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx+lr)


In [69]:
# Entrainer le modèle
trigramwocs_pipelineFit = build_ngrams_wocs().fit(train_set)
predictions_wocs = trigramwocs_pipelineFit.transform(val_set)
accuracy_wocs = predictions_wocs.filter(predictions_wocs.label == predictions_wocs.prediction).count() / float(val_set.count())
roc_auc_wocs = evaluator.evaluate(predictions_wocs)
# print accuracy, roc_auc
print("Accuracy Score: {0:.4f}".format(accuracy_wocs))
print("ROC-AUC: {0:.4f}".format(roc_auc_wocs))

Accuracy Score: 0.7753
ROC-AUC: 0.8517


In [70]:
test_predictions = trigramwocs_pipelineFit.transform(test_set)
test_accuracy = test_predictions.filter(test_predictions.label == test_predictions.prediction).count() / float(test_set.count())
test_roc_auc = evaluator.evaluate(test_predictions)
# print accuracy, roc_auc
print("Accuracy Score: {0:.4f}".format(test_accuracy))
print("ROC-AUC: {0:.4f}".format(test_roc_auc))

Accuracy Score: 0.8049
ROC-AUC: 0.8865


## SparkNLP pretrained 


Dans Spark NLP, tous les annotateurs sont des estimateurs ou des transformateurs comme nous le voyons dans Spark ML. Un estimateur dans Spark ML est un algorithme qui peut être ajusté sur un `DataFrame` pour produire un transformateur. Par exemple, un algorithme d'apprentissage est un estimateur qui s'entraîne sur un `DataFrame` et produit un modèle. Un Transformer est un algorithme qui peut transformer un `DataFrame` en un autre `DataFrame`. Par exemple, un modèle ML est un transformateur qui transforme un `DataFrame` avec des fonctionnalités en un `DataFrame` avec des prédictions. Dans Spark NLP, il existe deux types d'annotateurs: AnnotatorApproach et AnnotatorModel AnnotatorApproach étend les estimateurs à partir de Spark ML, qui sont destinés à être formés via `fit()`, et AnnotatorModel étend les transformateurs qui sont destinés à transformer les trames de données via `transform()`. Certains annotateurs Spark NLP ont un suffixe Model et d'autres non. Le suffixe du modèle est explicitement indiqué lorsque l'annotateur est le résultat d'un processus de formation. Certains annotateurs, tels que `Tokenizer`, sont des transformateurs mais ne contiennent pas le suffixe Model car ils ne sont pas des annotateurs entraînés. Les annotateurs de modèle ont un `pretrained()` sur son objet statique, pour récupérer la version publique pré-formée d'un modèle. Pour faire court, s’il s’entraîne sur un DataFrame et produit un modèle, c’est une AnnotatorApproach; et s'il transforme un `DataFrame` en un autre `DataFrame` via certains modèles, il s'agit d'un AnnotatorModel (par exemple `WordEmbeddingsModel`) et il ne prend pas le suffixe du modèle s'il ne s'appuie pas sur un annotateur pré-formé lors de la transformation d'un `DataFrame` (par exemple `Tokenizer`).

In [0]:
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline
import pandas as pd

spark = sparknlp.start() 
# sparknlp.start(gpu=True) >> for training on GPU

print("Spark NLP version", sparknlp.version())

In [0]:
from sparknlp.pretrained import PretrainedPipeline

# actual content is inside description column
document = DocumentAssembler()\
    .setInputCol("Texte")\
    .setOutputCol("document")
    
# we can also use sentence detector here if we want to train on and get 
# predictions for each sentence downloading pretrained embeddings
use = UniversalSentenceEncoder.pretrained()\
 .setInputCols(["document"])\
 .setOutputCol("sentence_embeddings")
# the classes/labels/categories are in 'sexe' column
classsifierdl = ClassifierDLApproach()\
  .setInputCols(["sentence_embeddings"])\
  .setOutputCol("class")\
  .setLabelColumn("sexe")\
  .setMaxEpochs(10)\
  .setEnableOutputLogs(True)
use_clf_pipeline = Pipeline(
    stages = [
        document,
        use,
        classsifierdl
    ])

tfhub_use download started this may take some time.
Approximate size to download 923.7 MB
[OK!]


In [0]:
use_pipelineModel = use_clf_pipeline.fit(train_set)

In [0]:
!cd ~/annotator_logs && ls -l

total 4
-rw-r--r-- 1 root root 518 May 14 18:01 ClassifierDLApproach_0d5e46a8ca9d.log


In [0]:
!cat ~/annotator_logs/ClassifierDLApproach_0d5e46a8ca9d.log

Training started - total epochs: 5 - learning rate: 0.005 - batch size: 64 - training examples: 3974
Epoch 0/5 - 1.793177918%.2fs - loss: 43.781788 - accuracy: 0.51974124 - batches: 63
Epoch 1/5 - 1.139157638%.2fs - loss: 44.482933 - accuracy: 0.62214386 - batches: 63
Epoch 2/5 - 1.138320012%.2fs - loss: 45.3547 - accuracy: 0.63012433 - batches: 63
Epoch 3/5 - 1.128797706%.2fs - loss: 44.363316 - accuracy: 0.6432291 - batches: 63
Epoch 4/5 - 1.132367959%.2fs - loss: 44.203808 - accuracy: 0.64625335 - batches: 63


In [0]:
from sklearn.metrics import classification_report, accuracy_score

preds = use_pipelineModel.transform(test_set)
df = preds.select('sexe','Texte', 'class.result').toPandas()
df['result'] = pd.to_numeric(df['result'].apply(lambda x: x[0]))
print(classification_report(df.sexe, df.result))
print(accuracy_score(df.sexe, df.result))

              precision    recall  f1-score   support

           1       0.66      0.44      0.53       225
           2       0.63      0.81      0.71       267

    accuracy                           0.64       492
   macro avg       0.65      0.63      0.62       492
weighted avg       0.65      0.64      0.63       492

0.6422764227642277


Spark NLP fournit une API facile à intégrer avec Spark ML Pipelines et tous les annotateurs et transformateurs Spark NLP peuvent être utilisés dans Spark ML Pipelines. 

In [48]:
from pyspark.ml import Pipeline

document_assembler = DocumentAssembler()\
 .setInputCol("Texte")\
 .setOutputCol("document")

# split into sentences
sentenceDetector = SentenceDetector()\
 .setInputCols(["document"])\
 .setOutputCol("sentences")

tokenizer = Tokenizer()\
 .setInputCols(["sentences"])\
 .setOutputCol("token")

# removes all dirty characters from text following a regex pattern 
# and transforms words based on a provided dictionary
normalizer = Normalizer()\
 .setInputCols(["token"])\
 .setOutputCol("normal")

# exclude from a sequence of strings (e.g. the output of a Tokenizer, Normalizer
# , Lemmatizer, and Stemmer) and drops all the stop words from the input sequences.
stopwords_cleaner = StopWordsCleaner()\
 .setInputCols(["normal"])\
 .setOutputCol("cleanedTokens")

# load pretrained lemmatizer in French
# retrieves lemmas out of words with the objective of returning a base dictionary word
lemmatizer = LemmatizerModel.pretrained("lemma", lang="fr")\
 .setInputCols(["cleanedTokens"])\
 .setOutputCol("lemma")

# load pretrained embeddings multi-lingual
word_embeddings = BertEmbeddings\
    .pretrained('bert_multi_cased', 'xx') \
    .setInputCols(["document",'lemma'])\
    .setOutputCol("embeddings")\
    .setPoolingLayer(-2) # default 0

sentence_embeddings = SentenceEmbeddings()\
 .setInputCols(["document","embeddings"])\
 .setOutputCol("sentence_embeddings")\
 .setPoolingStrategy("AVERAGE")

# deep learning classifier
classsifierdl = ClassifierDLApproach()\
  .setInputCols(["sentence_embeddings"])\
  .setOutputCol("class")\
  .setLabelColumn("sexe")\
  .setMaxEpochs(8)\
  .setEnableOutputLogs(True)

nlpPipeline = Pipeline(stages=[
 document_assembler, 
 sentenceDetector,
 tokenizer,
 normalizer,
 stopwords_cleaner,
 lemmatizer,
 word_embeddings,
 sentence_embeddings,
 classsifierdl
 ])


lemma download started this may take some time.
Approximate size to download 2.2 MB
[OK!]
bert_multi_cased download started this may take some time.
Approximate size to download 638.7 MB
[OK!]


Lorsque nous `fit()` sur le pipeline avec le bloc de data frame, sa colonne de texte est introduite dans le transformateur `DocumentAssembler()` dans un premier temps, puis une nouvelle colonne «document» est créée dans Type de document (AnnotatorType). Ce transformateur est fondamentalement le point d'entrée initial de Spark NLP pour toute trame de données Spark. Ensuite, sa colonne de document est introduite dans `SentenceDetector()` (AnnotatorApproach) et le texte est divisé en un tableau de phrases et une nouvelle colonne «phrases» dans Type de document est créée. Ensuite, la colonne «phrases» est introduite dans `Tokenizer()` (AnnotatorModel) et chaque phrase est tokenisée et une nouvelle colonne «token» dans le type Token est créée. Etc.

In [56]:
from sklearn.metrics import classification_report, accuracy_score

# Entrainer le modèle
bert_pipelineModel = nlpPipeline.fit(train_set)
df = bert_pipelineModel.transform(test_set).select('Texte', 'sexe', 'class.result').toPandas()

df['result'] = pd.to_numeric(df['result'].apply(lambda x: x[0]))
print(classification_report(df.sexe, df.result))
print(accuracy_score(df.sexe, df.result))


              precision    recall  f1-score   support

           1       0.00      0.00      0.00       225
           2       0.54      1.00      0.70       267

    accuracy                           0.54       492
   macro avg       0.27      0.50      0.35       492
weighted avg       0.29      0.54      0.38       492

0.5426829268292683


  _warn_prf(average, modifier, msg_start, len(result))


In [52]:
bert_pipelineModel.stages

[DocumentAssembler_b017a8f16175,
 SentenceDetector_97345b8cc3c5,
 REGEX_TOKENIZER_21548488e6ee,
 NORMALIZER_c4f47fb84c4b,
 StopWordsCleaner_742f67280cc9,
 LEMMATIZER_21d85c0f00b3,
 BERT_EMBEDDINGS_a122a99548e9,
 SentenceEmbeddings_c1fc0aa1159c,
 ClassifierDLModel_be04f315cbde]

Nous pouvons enregristré notre modèle et le réutiliser plus tard.

In [0]:
bert_pipelineModel.stages[-1].write().overwrite().save('myClassifier15052020')
classsifierdlmodel = ClassifierDLModel.load('myClassifier15052020')


Les `LightPipeline` sont des pipelines spécifiques à Spark NLP, équivalents à Spark ML Pipeline, mais destinés à traiter de plus petites quantités de données. Ils sont utiles pour travailler avec de petits ensembles de données, déboguer les résultats ou lors de l'exécution d'une formation ou d'une prédiction à partir d'une API qui sert des demandes ponctuelles.

Les Spark NLP LightPipelines sont des pipelines Spark ML convertis en une seule machine mais la tâche multithread, devenant plus de 10 fois plus rapide pour de plus petites quantités de données (petite est relative, mais les phrases de 50k sont à peu près un bon maximum). Pour les utiliser, il suffit de brancher un pipeline formé (fitted) puis d'annoter un texte brut. Nous n'avons même pas besoin de convertir le texte d'entrée en DataFrame afin de le nourrir dans un pipeline qui accepte DataFrame comme entrée en premier lieu. Cette fonctionnalité serait très utile lorsqu'il s'agit d'obtenir une prédiction pour quelques lignes de texte à partir d'un modèle ML entrainé.

In [45]:
from sparknlp.base import LightPipeline

light_model = LightPipeline(classsifierdlmodel)
val_set.select('Texte').take(2)

[Row(Texte='Mesdames, Messieurs,Je vous ai invité à ce point presse pour vous présenter les Assises nationales du développement durable, que nous organisons, conjointement avec la région Midi-Pyrénées, dont Mme Marie-Françoise Mendez, ici présente, est vice-présidente.Nous nous étions déjà réunis, avec M. le Président Martin Malvy, le 15 janvier dernier, à la signature de la convention entre le MATE et le Conseil Régional Midi-Pyrénées. Aujourd\'hui, à 10 jours de la tenue de ces Assises, je tenais à vous en rappeler l\'importance, et à vous en présenter les enjeux.Le sommet mondial du développement durable de Johannesburg aura lieu à la fin du mois d\'août prochain, c\'est-à-dire très bientôt.La France et l\'Europe ont un rôle essentiel à jouer dans ce sommet, d\'autant, vous l\'avez vu, que M. le Président Bush menace de ne pas s\'y rendre lui-même, malgré les exhortations de nombreuses associations.Vous connaissez le contexte électoral particulier dans lequel nous nous trouvons aujo

In [46]:
text='''
Mesdames, Messieurs,Je vous ai invité à ce point presse pour vous présenter les Assises nationales du développement durable, que nous organisons, conjointement avec la région Midi-Pyrénées, dont Mme Marie-Françoise Mendez, ici présente, est vice-présidente.Nous nous étions déjà réunis, avec M. le Président Martin Malvy, le 15 janvier dernier, à la signature de la convention entre le MATE et le Conseil Régional Midi-Pyrénées. Aujourd\'hui, à 10 jours de la tenue de ces Assises, je tenais à vous en rappeler l\'importance, et à vous en présenter les enjeux.Le sommet mondial du développement durable de Johannesburg aura lieu à la fin du mois d\'août prochain, c\'est-à-dire très bientôt.La France et l\'Europe ont un rôle essentiel à jouer dans ce sommet, d\'autant, vous l\'avez vu, que M. le Président Bush menace de ne pas s\'y rendre lui-même, malgré les exhortations de nombreuses associations.Vous connaissez le contexte électoral particulier dans lequel nous nous trouvons aujourd\'hui. Il est difficile, dans ces conditions, de se projeter dans l\'après- mois de mai/juin. Pourtant , c\'est notre rôle de le faire. Les Assises Nationales du développement durable de Toulouse, qui se tiendront les 11 et 12 mars prochains, seront ainsi l\'occasion, en France, avant ces échéances électorales internes, d\'ouvrir un large dialogue sur la préparation du sommet de Johannesburg, et sur le bilan de ce qui a été réalisé dans notre pays depuis la conférence de Rio en 1992.Le concept de développement durable a été popularisé par le rapport " Brundtland ", rapport de la Commission Mondiale sur l\'Environnement et le Développement, en 1987. C\'est dans ce rapport qui a proposé la définition la plus communément admise du développement durable : " un développement qui réponde aux besoins du présent sans compromettre la capacité des générations futures à répondre aux leurs " Vous le savez, le Sommet de la Terre, a constitué une étape décisive dans l\'engagement des pays en faveur du développement durable et dans l\'expression des finalités et principes fondateurs de ce concept. Elle a mis en valeur l\'idée selon laquelle " le monde se localise en même temps qu\'il se mondialise ". Les espoirs de Rio, vous le savez aussi, n\'ont pas encore été concrétisés : les modes de production et de consommation ont peu changé, les écarts entre le Nord et le Sud se sont aggravés.Vous connaissez les principes fondateurs du développement durable : la prise en compte, en amont de toute politiques publique et à chaque étape de leur mise en oeuvre, des trois piliers que sont le développement économique, la protection de l\'environnement, et le bien être social. Cette imbrication suppose de sortir de l\'idéologie de la croissance du PIB comme seul objectif des politiques de développement. J\'insiste pour ma part sur le fait que le développement durable envisage la personne humaine dans sa globalité. En ce sens, le souci de l\'éducation, de la formation tout au long de la vie, de la diversité culturelle et de la protection de la liberté nécessaire à toute création artistique, à toute pensée, font partie du développement durable.Les principes affirmés à Rio irriguent notre action quotidienne, au sein de ce Ministère, mais aussi au sein de toutes les collectivités locales :- le principe d\'intégration de la protection de l\'environnement comme partie intégrante du processus de développement ; - le principe de responsabilité et de solidarité internationale ;- le principe de participation et de nouvelle gouvernance ;Les 171 gouvernements présents à Rio ont aussi adopté l\'Agenda 21 (ou Action 21). Il constitue un plan global d\'action qui doit être mis en uvre par les gouvernements, les institutions du développement, les organismes des Nations Unies et les secteurs indépendants.Ses 40 chapitres font émerger : - les dimensions économiques et sociales ;- la conservation et la préservation des ressources aux fins de développement ;- la participation de la société civile à l\'élaboration et la mise en oeuvre d\'actions s\'appuyant sur l\'expression des besoins de la société civile ; - la mise en oeuvre de moyens transversaux à toutes les échelles de territoires. Le chapitre 28 recommande ainsi que les collectivités locales mettent en place un Agenda 21 local, traductionterritorialisée de l\'Agenda 21.Depuis Rio, une étape majeure a été franchie à Kyoto, en 1997, et Johannesburg sera la prochaine. Les enjeux de Johannesburg sont donc fondamentaux : Le Premier Ministre l\'a rappelé dans son discours de Lyon, en soulignant notamment la nécessité d\'une OME, de la ratification du Protocole de Kyoto.Cependant, nous avons quelques inquiétudes car le contexte international n\'est pas favorable : le programme d\'action de George Bush pour le changement climatique est inacceptable, la conférence de Carthagène a été décevante, celle de Monterrey ne laisse pas d\'être inquiétanteDans ce contexte, il importe plus que jamais de se mobiliser pour que le Sommet du Développement Durable soit un succès qui redonne un nouveau souffle au concept, en insistant sur la nécessaire intégration des préoccupations économiques, sociales et environnementales.
'''
result = light_model.annotate(text)

result['class']

['2']

``` bert_multi_cased ``` est une base de pretrained word embeddings. Elle contient plusieurs langues. 

Les problèmes avec cette méthode est :
- beaucoup de calcul : Le modèle est deep et le texte est longue pour chaque discours.
- information sparse ( multi-lingue) : Nous voulons entrainer le modèle avec CamemBERT qui est dédiée pour la langue française, pourtant cette base n'est pas disponible sur sparknlp pour l'instant. 




### Conclusion : 
Les méthodes de machine learning permettent d'avoir des scores satisfaisants avec peu de temps d'éxecution. Avec le tuning des hyperparamètres, nous faisons du multi-processing avec `pyspark`.  
Le problème avec les modèles deep learning sur spark quand on traite avec le CPU, nous ne pouvons pas faire mieux que quand on travaille sur le GPU. Et les resultats est moins satisfaisant avec le temps et la capacité de calcul limités.

