In [1]:
import os
import numpy as np
import pandas as pd
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
import findspark
from time import time

In [2]:
findspark.init()

### Creation de la session Spark:

In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *

spark = SparkSession.builder.config("spark.executor.memory", '8g').config("spark.driver.memory",'8g').config("spark.python.worker.memory", '8g').config("spark.executor.cores", '2').config("spark.driver.maxResultSize", '0').config("spark.sql.crossJoin.enabled", "true").getOrCreate()

### Lecture des fichiers :

In [4]:
df_train = spark.read.option("sep", "\t").option("header", "true").option("inferschema", "true").option("mode", "DROPMALFORMED").csv('data\drugsComTrain_raw.tsv')
df_test = spark.read.option("sep", "\t").option("header", "true").option("inferschema", "true").option("mode", "DROPMALFORMED").csv('data\drugsComTest_raw.tsv')
df_train = df_train.select(['_c0', 'review', 'rating'])
df_test = df_test.select(['_c0', 'review', 'rating'])   
df_train.show(5)

+------+--------------------+------+
|   _c0|              review|rating|
+------+--------------------+------+
|206461|"""It has no side...|   9.0|
|138000|"""This is my fir...|   8.0|
| 35696|"""Suboxone has c...|   9.0|
|155963|"""2nd day on 5mg...|   2.0|
|165907|"""He pulled out,...|   1.0|
+------+--------------------+------+
only showing top 5 rows



In [5]:
df_test.show(5)

+------+--------------------+------+
|   _c0|              review|rating|
+------+--------------------+------+
|163740|"""I&#039;ve trie...|  10.0|
|206473|"""My son has Cro...|   8.0|
|159672|"""Quick reductio...|   9.0|
| 39293|"""Contrave combi...|   9.0|
| 97768|"""I have been on...|   9.0|
+------+--------------------+------+
only showing top 5 rows



On définit les fonctions de preprocessing et de transformation de la variable de sortie :

In [6]:
import re
import string
stemmer = nltk.stem.SnowballStemmer('english')
# lower case everything
def textLower(x):
    return x.lower()

#delete numbers and replace punctuation by space 
def keepletters(input_str):
    input_str = re.sub(r'\d+', '', input_str)
    return input_str.translate(str.maketrans(string.punctuation,' '*32))

#remove last space and first space
def stripfc(input_str):
    return input_str.strip()

def tokenize_stpords(input_str):
    stop_words = set(stopwords.words('english'))
    tokens = word_tokenize(input_str)
    return [i for i in tokens if not i in stop_words]

def preprocess(x):
    x = textLower(x)
    x = keepletters(x)
    x = stripfc(x)
    tokens = tokenize_stpords(x)
    tokens_stem = ' '.join([stemmer.stem(token) for token in tokens])
    return tokens_stem

def discretise(x):
    if not isinstance(x, float):
        try :
            x = float(x)
        except Exception:
            return 0
    if x <= 5:
        return 0
    else:
        return 1

On effectue le traitement des données en utilisant la fonction preprocess et discretise :

In [8]:
from pyspark.sql import Row
start = time()
df_train_clean = df_train.rdd.map(lambda row: Row(row[0], preprocess(row[1]), discretise(row[2]))).toDF()
df_train_clean = df_train_clean.withColumnRenamed('_1', 'ID').withColumnRenamed('_2', 'review').withColumnRenamed('_3', 'label')
train_size = df_train_clean.count()
end = time()
print('Preprocessing et catégorisation du training set prend {}, taille {}'.format(end-start, train_size))

Preprocessing et catégorisation du training set prend 74.71499848365784, taille 143313


In [9]:
start = time()
df_test_clean = df_test.rdd.map(lambda row: Row(row[0], preprocess(row[1]), discretise(row[2]))).toDF()
df_test_clean = df_test_clean.withColumnRenamed('_1', 'ID').withColumnRenamed('_2', 'review').withColumnRenamed('_3', 'label')
test_size = df_test_clean.count()
end = time()
print('Preprocessing et catégorisation du test set prend {}, taille {}'.format(end-start, test_size))

Preprocessing et catégorisation du test set prend 28.428353309631348, taille 47694


##### Etape de tokenisation :

In [10]:
from pyspark.ml.feature import Tokenizer
start = time()
tokenizer = Tokenizer(inputCol='review', outputCol='review_words')
df_train_words = tokenizer.transform(df_train_clean)
df_test_words = tokenizer.transform(df_test_clean)
train_size = df_train_words.count()
test_size = df_test_words.count()
end = time()
print('Split des phrases en liste de mots prend {}, taille {}'.format(end-start, train_size+test_size))

Split des phrases en liste de mots prend 91.83711051940918, taille 191007


##### Vectorisation : calcul des vecteurs TF-IDF

In [11]:
# Hashing Term-Frequency
from pyspark.ml.feature import HashingTF, IDF
start = time()
hashing_tf = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol='review_tf', numFeatures=10000)
df_train_tf = hashing_tf.transform(df_train_words)
df_test_tf = hashing_tf.transform(df_test_words)
# Inverse Document Frequency
idf = IDF(inputCol=hashing_tf.getOutputCol(), outputCol="features")
idf_model = idf.fit(df_train_tf) # fit to build the model on all the data, and then apply it line by line
df_train_tfidf = idf_model.transform(df_train_tf)
df_test_tfidf = idf_model.transform(df_test_tf)

train_size = df_train_tfidf.count()
test_size = df_test_tfidf.count()
end = time()
print('Calcul du terme TF-IDF prend {}, taille {}'.format(end-start, train_size+test_size))

Calcul du terme TF-IDF prend 161.70283460617065, taille 191007


#### Phase d'apprentissage

##### Régression logistique :

In [12]:
from pyspark.ml.classification import LogisticRegression
start = time()
lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(df_train_tfidf)
end = time()
print('Le training prend {}'.format(end-start))

Le training prend 84.52868366241455


In [13]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
start = time()
predictions = lrModel.transform(df_test_tfidf)
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
accuracy = evaluator.evaluate(predictions)
end = time()
print('Evaluation sur le test set prend {}'.format(end-start))
print('Accuracy : {}'.format(accuracy))

Evaluation sur le test set prend 30.619154691696167
Accuracy : 0.8595840957940644


##### Naive Bayes :

In [14]:
from pyspark.ml.classification import NaiveBayes
start = time()
nb = NaiveBayes(smoothing=1.0)
nbModel = nb.fit(df_train_tfidf)
end = time()
print('Le training prend {}'.format(end-start))
predictions = nbModel.transform(df_test_tfidf)
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
accuracy = evaluator.evaluate(predictions)
end = time()
print('Evaluation sur le test set prend {}'.format(end-start))
print('Accuracy : {}'.format(accuracy))

Le training prend 69.61200952529907
Evaluation sur le test set prend 99.16759467124939
Accuracy : 0.48508329549263274


##### SVM :

In [15]:
from pyspark.ml.classification import LinearSVC

lsvc = LinearSVC(maxIter=10, regParam=0.1)

# Fit the model
lsvcModel = lsvc.fit(df_train_tfidf)

end = time()
print('Le training prend {}'.format(end-start))
predictions = lsvcModel.transform(df_test_tfidf)

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
accuracy = evaluator.evaluate(predictions)
end = time()
print('Evaluation sur le test set prend {}'.format(end-start))
print('Accuracy : {}'.format(accuracy))

Le training prend 1731.0260424613953
Evaluation sur le test set prend 1755.5469198226929
Accuracy : 0.8649745880270016
