In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget https://downloads.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

--2023-12-25 22:11:05--  https://downloads.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
Resolving downloads.apache.org (downloads.apache.org)... 135.181.214.104, 88.99.95.219, 2a01:4f9:3a:2c57::2, ...
Connecting to downloads.apache.org (downloads.apache.org)|135.181.214.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 400395283 (382M) [application/x-gzip]
Saving to: ‘spark-3.5.0-bin-hadoop3.tgz.1’


2023-12-25 22:11:27 (17.6 MB/s) - ‘spark-3.5.0-bin-hadoop3.tgz.1’ saved [400395283/400395283]



In [None]:
!tar -xvzf spark-3.5.0-bin-hadoop3.tgz

In [None]:
import os
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"


In [None]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
import findspark
findspark.init()

In [None]:
from google.colab import drive

# Authenticate and mount the drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.sql.functions import col, isnan, when, count
from pyspark.ml.feature import StringIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.feature import PCA
from pyspark.ml import Pipeline
from mpl_toolkits.mplot3d import Axes3D


In [None]:
#Initiation de la session Spark
spark=SparkSession.builder.appName("ProjetFinal").getOrCreate()

In [None]:
#Chargement des données
data = spark.read.csv("/content/drive/MyDrive/Data.csv",header=True, inferSchema=True)
#spark.read.option("multiLine", True).csv("/content/drive/MyDrive/mental_health.csv",header=True, inferSchema=True)
# enlever les colonnes vide
data = data.na.drop()
data.show(100)

+--------------------+------+
|              Review|Rating|
+--------------------+------+
|nice hotel expens...|     4|
|ok nothing specia...|     2|
|nice rooms not 4*...|     3|
|unique, great sta...|     5|
|great stay great ...|     5|
|love monaco staff...|     5|
|cozy stay rainy c...|     5|
|excellent staff, ...|     4|
|hotel stayed hote...|     5|
|excellent stayed ...|     5|
|poor value stayed...|     2|
|nice value seattl...|     4|
|nice hotel good l...|     4|
|nice hotel not ni...|     3|
|great hotel night...|     4|
|horrible customer...|     1|
|disappointed say ...|     2|
|fantastic stay mo...|     5|
|good choice hotel...|     5|
|hmmmmm say really...|     3|
|service service s...|     5|
|excellent stay, d...|     5|
|good value downto...|     4|
|hotel monaco grea...|     5|
|great location ne...|     2|
|n't mind noise pl...|     3|
|loved, stayed war...|     4|
|met expectations ...|     3|
|nice hotel husban...|     4|
|good hotel not la...|     4|
|good choi

Nous constatons que les rating sont de 1 à 5 , nous voulons résoudre un problème de classification binaire sentiment positif ou négatif , nous retraitons donc la données .

In [None]:
def ratings(rating):
    if rating>3 and rating<=5:
        return 1
    if rating>0 and rating<=3:
        return 0

In [None]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
#appliquer la fonction rating pour avoir deux classes

data = data.withColumn("Sentiment", udf(ratings, IntegerType())(data["Rating"]))

In [None]:
data.show()

+--------------------+------+---------+
|              Review|Rating|Sentiment|
+--------------------+------+---------+
|nice hotel expens...|     4|        1|
|ok nothing specia...|     2|        0|
|nice rooms not 4*...|     3|        0|
|unique, great sta...|     5|        1|
|great stay great ...|     5|        1|
|love monaco staff...|     5|        1|
|cozy stay rainy c...|     5|        1|
|excellent staff, ...|     4|        1|
|hotel stayed hote...|     5|        1|
|excellent stayed ...|     5|        1|
|poor value stayed...|     2|        0|
|nice value seattl...|     4|        1|
|nice hotel good l...|     4|        1|
|nice hotel not ni...|     3|        0|
|great hotel night...|     4|        1|
|horrible customer...|     1|        0|
|disappointed say ...|     2|        0|
|fantastic stay mo...|     5|        1|
|good choice hotel...|     5|        1|
|hmmmmm say really...|     3|        0|
+--------------------+------+---------+
only showing top 20 rows



In [None]:
data.printSchema()

root
 |-- Review: string (nullable = true)
 |-- Rating: integer (nullable = true)
 |-- Sentiment: integer (nullable = true)



**1- Préparation des données**

Enlever les espaces

In [None]:
from pyspark.sql.functions import regexp_replace, trim
# nettoyage de la colonne 'Review' d'un DataFrame en supprimant les espaces redondants
data = data.withColumn('Review', regexp_replace('Review', '[\s]{2,}', ''))
# élimine les espaces inutiles au début ou à la fin de chaque valeur de cette colonne
data = data.withColumn('Review', trim(data.Review))
data.head(1)

[Row(Review='nice hotel expensive parking got good deal stay hotel anniversary, arrived late evening took advice previous reviews did valet parking, check quick easy, little disappointed non-existent view room room clean nice size, bed comfortable woke stiff neck high pillows, not soundproof like heard music room night morning loud bangs doors opening closing hear people talking hallway, maybe just noisy neighbors, aveda bath products nice, did not goldfish stay nice touch taken advantage staying longer, location great walking distance shopping, overall nice experience having pay 40 parking night,', Rating=4, Sentiment=1)]

In [None]:
from pyspark.sql.functions import col, lower
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover

In [None]:
# Mettre tout en minuscule
data = data.withColumn("Review", lower(col('Review')))

In [None]:
# Enlever les lignes vides après nettoyage
data = data.filter(data.Review != '')

In [None]:
# initialiser le tokeniser
tokenizer = Tokenizer(inputCol="Review", outputCol="words")
# transformer la colonne text en mots (token)
wordsData = tokenizer.transform(data)

In [None]:
# Création du transformateur StopWordsRemover
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
# l'appliquer sur nos données pour enlever les stopwords
Nstopwords = remover.transform(wordsData)

**2- Vectorisation**

**a- Word2vec**

In [None]:
from pyspark.ml.feature import Word2Vec
# création du modèle w2vec
word2Vec = Word2Vec(vectorSize=150, inputCol="filtered", outputCol="features")
# entrainement du modèle word2vec
word2vec_data = word2Vec.fit(Nstopwords)

In [None]:
from pyspark.ml.feature import Word2VecModel

In [None]:
# sauvegarder les vecteurs
word2vec_data.save("/content/drive/MyDrive/w2vec")

In [None]:
# Charger le modèle Word2Vec sauvegardé
word2vec_data = Word2VecModel.load("/content/drive/MyDrive/w2vec")

In [None]:
#appliquer le modèle entrainé à nos datas
w2v_data = word2vec_data.transform(Nstopwords)

**Division des données en données de test et de train**

La méthode randomSplit() de Spark MLlib divise le DataFrame en ensembles d'apprentissage et de test en utilisant une répartition aléatoire selon les proportions spécifiées. Cependant, elle ne garantit pas une répartition égale des classes dans chaque ensemble.

Si les classes ne sont pas équilibrées dans le DataFrame original, la répartition des classes peut différer dans les ensembles d'apprentissage et de test générés. Par conséquent, il est possible que la proportion de chaque classe dans les ensembles d'apprentissage et de test ne soit pas égale.

Pour obtenir des ensembles d'apprentissage et de test avec une répartition égale des classes, vous pouvez envisager d'utiliser des méthodes comme stratified sampling, qui garantissent une répartition équilibrée des classes dans les ensembles. Malheureusement, Spark MLlib ne propose pas directement de méthode stratifiée pour la division des ensembles via randomSplit().

Une alternative serait d'effectuer une division manuelle en filtrant les données par classe et en utilisant randomSplit() séparément pour chaque classe, tout en maintenant la proportion souhaitée pour chaque classe dans les ensembles d'apprentissage et de test. Cela garantira une répartition plus équilibrée des classes dans les ensembles.

In [None]:
# split dataframes between 0s and 1s
zeros = w2v_data.filter(w2v_data["Sentiment"]==0)
ones = w2v_data.filter(w2v_data["Sentiment"]==1)
# split datasets into training and testing
train0, test0 = zeros.randomSplit([0.8,0.2])
train1, test1 = ones.randomSplit([0.8,0.2])
# merge datasets back together
train = train0.union(train1)
test = test0.union(test1)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

**Utilisation de la méthode des arbres de décision**

Ici on utilise PySpark pour définir un classificateur d'arbres de décision à l'aide de la classe DecisionTreeClassifier du module pyspark.ml.classification. Ensuite, on crée une grille de paramètres (paramGrid) à explorer pour l'optimisation des hyperparamètres du classificateur.


Cette configuration permet d'explorer différentes valeurs pour les paramètres maxDepth et maxBins de l'arbre de décision afin d'optimiser ses performances pour un problème d'apprentissage supervisé.

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier()

dtparamGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [2, 5, 10, 20])
             .addGrid(dt.maxBins, [10, 20, 40])
             .build())

In [None]:
from pyspark.sql.functions import col
train = train.withColumnRenamed("Sentiment", "label")
test = test.withColumnRenamed("Sentiment", "label")
test.show()

+--------------------+------+-----+--------------------+--------------------+--------------------+
|              Review|Rating|label|               words|            filtered|            features|
+--------------------+------+-----+--------------------+--------------------+--------------------+
|2-star motel hote...|     1|    0|[2-star, motel, h...|[2-star, motel, h...|[0.00359839154407...|
|2.5 stars masqera...|     2|    0|[2.5, stars, masq...|[2.5, stars, masq...|[0.01139937909239...|
|3 king size bed, ...|     2|    0|[3, king, size, b...|[3, king, size, b...|[-0.0056014036401...|
|3 star lobby 2 st...|     2|    0|[3, star, lobby, ...|[3, star, lobby, ...|[0.04633747633532...|
|5 day away just d...|     2|    0|[5, day, away, ju...|[5, day, away, da...|[0.02343787890858...|
|50/50 response go...|     2|    0|[50/50, response,...|[50/50, response,...|[-0.0143492023811...|
|absolutely awful ...|     1|    0|[absolutely, awfu...|[absolutely, awfu...|[-0.0090345171112...|
|abysmal s

Ensuite on utilise une validation croisée pour optimiser les hyperparamètres du classificateur d'arbres de décision.

In [None]:
crossvaldt = CrossValidator(estimator=dt,
                          estimatorParamMaps=dtparamGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=10)
cvModeldt = crossvaldt.fit(train)

In [None]:
tested = cvModeldt.transform(test)
evaluation = BinaryClassificationEvaluator() #AUC
evaluation.evaluate(tested)

0.8220300418374996

temps d'execution -->  19min

**Utilisation de la régression logistique**

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression()

paramGridlr = ParamGridBuilder() \
    .addGrid(lr.elasticNetParam, [0, 1]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

crossvallr = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGridlr,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=10)
cvModellr = crossvallr.fit(train)

7 min

In [None]:
# Obtenez les meilleurs paramètres
bestParams = cvModellr.bestModel.extractParamMap()

# Affichez les meilleurs paramètres
for param, value in bestParams.items():
    print(f"{param.name}: {value}")


aggregationDepth: 2
elasticNetParam: 0.0
family: auto
featuresCol: features
fitIntercept: True
labelCol: label
maxBlockSizeInMB: 0.0
maxIter: 100
predictionCol: prediction
probabilityCol: probability
rawPredictionCol: rawPrediction
regParam: 0.01
standardization: True
threshold: 0.5
tol: 1e-06


In [None]:
best_model = cvModellr.bestModel

# Sauvegarder le meilleur modèle
best_model.save("/content/drive/MyDrive/best_model_lr")

In [None]:
tested = cvModellr.transform(test)
evaluation = BinaryClassificationEvaluator() #AUC
evaluation.evaluate(tested)

0.9249254881975545

**Utilisation de la méthode random forest**

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier()

rfparamGrid = (ParamGridBuilder()
               .addGrid(rf.maxDepth, [2, 5, 10])
               .addGrid(rf.maxBins, [5, 10, 20])
               .addGrid(rf.numTrees, [5, 20, 50])
             .build())

rfcv = CrossValidator(estimator = rf,
                      estimatorParamMaps = rfparamGrid,
                      evaluator = BinaryClassificationEvaluator(),
                      numFolds = 10)
cvModelrf = rfcv.fit(train)

38min

In [None]:
best = cvModelrf.bestModel

In [None]:
best.save("/content/drive/MyDrive/model_RF")

In [None]:
best._java_obj.getMaxDepth()

10

In [None]:
best._java_obj.getMaxBins()

20

In [None]:
best._java_obj.getNumTrees()

50

In [None]:
tested = cvModelrf.transform(test)
evaluation = BinaryClassificationEvaluator() #AUC
evaluation.evaluate(tested)

0.9049519603107046

**Utilisation de GBT classifier**
(Gradient Boosted Trees)

In [None]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier()

gbtparamGrid = ParamGridBuilder().build()

gbtcv = CrossValidator(estimator = gbt,
                      estimatorParamMaps = gbtparamGrid,
                      evaluator = BinaryClassificationEvaluator(),
                      numFolds = 10)
cvModelgbt = gbtcv.fit(train)

7min

In [None]:
tested = cvModelgbt.transform(test)
evaluation = BinaryClassificationEvaluator() #AUC
evaluation.evaluate(tested)

0.90122519050425

**Utilisation du linearSVC**

In [None]:
from pyspark.ml.classification import LinearSVC

svc = LinearSVC()

svcparamGrid = (ParamGridBuilder()
                .addGrid(svc.maxIter, [10, 100])
                .addGrid(svc.regParam, [0.001, 0.01, 1.0,10.0])
                .build())

svccv = CrossValidator(estimator = svc,
                      estimatorParamMaps = svcparamGrid,
                      evaluator = BinaryClassificationEvaluator(),
                      numFolds = 10)
cvModelsvc = svccv.fit(train)

14min

In [None]:
bestmodel = cvModelsvc.bestModel
bestmodel._java_obj.getRegParam()

0.001

In [None]:
tested = cvModelsvc.transform(test)
evaluation = BinaryClassificationEvaluator() #AUC
evaluation.evaluate(tested)

0.925238546888614

**2- Utilisation de la méthode hashingTF**

In [None]:
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=92769)
featurizedData = hashingTF.transform(Nstopwords)

In [None]:
idf = IDF(inputCol="rawFeatures", outputCol="features2")
idfModel = idf.fit(featurizedData)
tf_idf_data = idfModel.transform(featurizedData)

In [None]:
# split dataframes between 0s and 1s
zeros_data = tf_idf_data.filter(tf_idf_data["Sentiment"]==0)
ones_data = tf_idf_data.filter(tf_idf_data["Sentiment"]==1)
# split datasets into training and testing
train_0, test_0 = zeros_data.randomSplit([0.8,0.2])
train_1, test_1 = ones_data.randomSplit([0.8,0.2])
# merge datasets back together
train_tf = train_0.union(train_1)
test_tf = test_0.union(test_1)

**Utilisation de la regression logistique**

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression()

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(lr.elasticNetParam, [0, 1]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

In [None]:
train_tf = train.withColumnRenamed("Sentiment", "label")
test_tf = test.withColumnRenamed("Sentiment", "label")

In [None]:
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=10)
cvModel = crossval.fit(train_tf)

8min

In [None]:
tested = cvModel.transform(test_tf)
evaluation = BinaryClassificationEvaluator() #AUC
evaluation.evaluate(tested)

0.924921359302901

In [None]:
from pyspark.ml.classification import RandomForestClassificationModel

# Charger le modèle RandomForestClassificationModel
loaded_rf_model = RandomForestClassificationModel.load("/content/drive/MyDrive/model")

predictions = loaded_rf_model.transform(test)

In [None]:
evaluation = BinaryClassificationEvaluator() #AUC
evaluation.evaluate(predictions)

0.6327256884495065

In [None]:
from pyspark.ml.classification import LogisticRegressionModel

# Charger le modèle RandomForestClassificationModel
loaded_rf_model = LogisticRegressionModel.load("/content/drive/MyDrive/best_model_lr")

predictions = loaded_rf_model.transform(test)

In [None]:
evaluation = BinaryClassificationEvaluator() #AUC
evaluation.evaluate(predictions)

0.9208090209256902