In [7]:
# Charger les libriries qu'il faudra pour le taraitement 
import findspark    
import pandas as pd
import numpy as np
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml.feature import VectorAssembler 
from pyspark.sql.functions import col

In [8]:
#sc = SparkContext('local')
spark = SparkSession(sc)

In [5]:
df = spark.read.csv("hdfs://192.168.122.206:8020/testN/SMSSpamCollection.txt",sep='\t')

In [6]:
df


DataFrame[_c0: string, _c1: string]

In [56]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



In [57]:
df.show()

+----+--------------------+
| _c0|                 _c1|
+----+--------------------+
| ham|Go until jurong p...|
| ham|Ok lar... Joking ...|
|spam|Free entry in 2 a...|
| ham|U dun say so earl...|
| ham|Nah I don't think...|
|spam|FreeMsg Hey there...|
| ham|Even my brother i...|
| ham|As per your reque...|
|spam|WINNER!! As a val...|
|spam|Had your mobile 1...|
| ham|I'm gonna be home...|
|spam|SIX chances to wi...|
|spam|URGENT! You have ...|
| ham|I've been searchi...|
| ham|I HAVE A DATE ON ...|
|spam|XXXMobileMovieClu...|
| ham|Oh k...i'm watchi...|
| ham|Eh u remember how...|
| ham|Fine if thats th...|
|spam|England v Macedon...|
+----+--------------------+
only showing top 20 rows



In [58]:
df = df.withColumnRenamed("_c0","label").withColumnRenamed("_c1","text")

In [59]:
df.show()


+-----+--------------------+
|label|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if thats th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



In [60]:
df.columns

['label', 'text']

In [61]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

# Transformer la colonne en une séquence de chaine 

In [62]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")


In [63]:
wordsData = tokenizer.transform(df)
wordsData.show()

+-----+--------------------+--------------------+
|label|                text|               words|
+-----+--------------------+--------------------+
|  ham|Go until jurong p...|[go, until, juron...|
|  ham|Ok lar... Joking ...|[ok, lar..., joki...|
| spam|Free entry in 2 a...|[free, entry, in,...|
|  ham|U dun say so earl...|[u, dun, say, so,...|
|  ham|Nah I don't think...|[nah, i, don't, t...|
| spam|FreeMsg Hey there...|[freemsg, hey, th...|
|  ham|Even my brother i...|[even, my, brothe...|
|  ham|As per your reque...|[as, per, your, r...|
| spam|WINNER!! As a val...|[winner!!, as, a,...|
| spam|Had your mobile 1...|[had, your, mobil...|
|  ham|I'm gonna be home...|[i'm, gonna, be, ...|
| spam|SIX chances to wi...|[six, chances, to...|
| spam|URGENT! You have ...|[urgent!, you, ha...|
|  ham|I've been searchi...|[i've, been, sear...|
|  ham|I HAVE A DATE ON ...|[i, have, a, date...|
| spam|XXXMobileMovieClu...|[xxxmobilemoviecl...|
|  ham|Oh k...i'm watchi...|[oh, k...i'm, wat...|


# Suppresion de  tous les mots d'arrêt des séquences d'entrée.

In [64]:
from pyspark.ml.feature import StopWordsRemover

remover = StopWordsRemover(inputCol="words", outputCol="filtered")
remover.transform(wordsData).show(truncate=False)

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|text                                                                                                                                                                                                |words                                                                                                                                                                                       

In [65]:
final_df = remover.transform(wordsData)

# conversion du text en numérique 

In [66]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(final_df)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show() 

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  ham|(20,[0,2,5,6,7,10...|
|  ham|(20,[0,4,6,16,19]...|
| spam|(20,[0,4,6,7,8,9,...|
|  ham|(20,[1,2,3,6,12,1...|
|  ham|(20,[0,3,4,14,17,...|
| spam|(20,[0,2,5,7,8,10...|
|  ham|(20,[3,6,7,10,11,...|
|  ham|(20,[0,2,5,6,10,1...|
| spam|(20,[0,1,3,4,5,6,...|
| spam|(20,[0,3,4,6,7,10...|
|  ham|(20,[0,1,3,5,6,10...|
| spam|(20,[0,1,2,3,6,9,...|
| spam|(20,[0,2,4,6,7,8,...|
|  ham|(20,[0,1,2,5,7,8,...|
|  ham|(20,[1,4,12],[1.1...|
| spam|(20,[1,3,6,9,10,1...|
|  ham|(20,[2,6,9,15],[1...|
|  ham|(20,[0,5,6,7,8,9,...|
|  ham|(20,[0,1,6,9,10,1...|
| spam|(20,[0,1,4,5,6,8,...|
+-----+--------------------+
only showing top 20 rows



In [67]:
 rescaledData.select("features").collect()

[Row(features=SparseVector(20, {0: 1.9253, 2: 1.1428, 5: 2.24, 6: 0.7798, 7: 3.3882, 10: 0.9194, 11: 2.5053, 15: 1.0961, 17: 0.9012, 18: 2.4509})),
 Row(features=SparseVector(20, {0: 0.9626, 4: 0.9431, 6: 0.7798, 16: 2.1826, 19: 0.9883})),
 Row(features=SparseVector(20, {0: 1.9253, 4: 0.9431, 6: 2.3395, 7: 3.3882, 8: 2.3742, 9: 3.2723, 10: 1.8389, 12: 0.9702, 13: 1.0197, 14: 2.1815, 15: 1.0961, 16: 1.0913, 17: 0.9012})),
 Row(features=SparseVector(20, {1: 1.1167, 2: 1.1428, 3: 1.1644, 6: 1.5597, 12: 1.9404, 15: 1.0961, 17: 0.9012})),
 Row(features=SparseVector(20, {0: 0.9626, 3: 1.1644, 4: 0.9431, 14: 1.0908, 17: 0.9012, 18: 1.2254, 19: 0.9883})),
 Row(features=SparseVector(20, {0: 1.9253, 2: 1.1428, 5: 1.12, 7: 1.1294, 8: 1.1871, 10: 1.8389, 11: 1.2527, 12: 1.9404, 14: 1.0908, 17: 0.9012, 18: 2.4509, 19: 2.965})),
 Row(features=SparseVector(20, {3: 1.1644, 6: 0.7798, 7: 1.1294, 10: 1.8389, 11: 2.5053, 13: 1.0197, 17: 0.9012})),
 Row(features=SparseVector(20, {0: 0.9626, 2: 2.2856, 5: 

In [68]:
rescaledData = rescaledData.withColumnRenamed('label','categorie')
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="categorie", outputCol="label")
rescaledData = indexer.fit(rescaledData).transform(rescaledData) 

In [69]:
traindf = rescaledData.select("label", "features")

In [70]:
splits = traindf.randomSplit([0.7, 0.3])

In [71]:
train = splits[0]
test = splits[1]

# Algo NaiveBayes

In [72]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [73]:
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

In [74]:
model = nb.fit(train)

# select example rows to display.
predictions = model.transform(test)
predictions.filter("label=1").show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))


+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  1.0|(20,[0,1,2,3,4,5,...|[-44.259195269954...|[0.90475245207798...|       0.0|
|  1.0|(20,[0,1,2,3,4,6,...|[-55.454821441660...|[0.67649431506217...|       0.0|
|  1.0|(20,[0,1,2,3,4,6,...|[-73.10648481442,...|[0.81657652721661...|       0.0|
|  1.0|(20,[0,1,2,3,4,6,...|[-77.444585101043...|[0.86231786879943...|       0.0|
|  1.0|(20,[0,1,2,3,4,6,...|[-77.444585101043...|[0.86231786879943...|       0.0|
|  1.0|(20,[0,1,2,3,4,6,...|[-46.871478066776...|[0.76266084772155...|       0.0|
|  1.0|(20,[0,1,2,3,4,6,...|[-71.281780905595...|[0.70363124734295...|       0.0|
|  1.0|(20,[0,1,2,3,4,6,...|[-57.247401463938...|[0.97795465353205...|       0.0|
|  1.0|(20,[0,1,2,3,5,6,...|[-84.408060642123...|[0.52726611116303...|       0.0|
|  1.0|(20,[0,1,

In [75]:
predictions=predictions.rdd.map(lambda lp: lp.prediction)
labelsAndPredictions = test.rdd.map(lambda lp: lp.label).zip(predictions)
# first metrics
from pyspark.mllib.evaluation import BinaryClassificationMetrics
metrics = BinaryClassificationMetrics(labelsAndPredictions)
# Area under precision-recall curve
print("Area under PR = %s" % metrics.areaUnderPR)
# Area under ROC curve
print("Area under ROC = %s" % metrics.areaUnderROC) 


Area under PR = 0.031359616619732224
Area under ROC = 0.7471215447336589


# Algo LogisticRegression

In [76]:
from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# instantiate the base classifier.
lr = LogisticRegression(maxIter=10, tol=1E-6, fitIntercept=True)

# instantiate the One Vs Rest Classifier.
ovr = OneVsRest(classifier=lr)

# train the multiclass model.
ovrModel = ovr.fit(train)

# score the model on test data.
predictions = ovrModel.transform(test)
predictions.filter("label=1").show()

# obtain evaluator.
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

# compute the classification error on test data.
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))
print("Test Error = %g" % (1.0 - accuracy))

+-----+--------------------+----------+
|label|            features|prediction|
+-----+--------------------+----------+
|  1.0|(20,[0,1,2,3,4,5,...|       0.0|
|  1.0|(20,[0,1,2,3,4,6,...|       0.0|
|  1.0|(20,[0,1,2,3,4,6,...|       0.0|
|  1.0|(20,[0,1,2,3,4,6,...|       0.0|
|  1.0|(20,[0,1,2,3,4,6,...|       0.0|
|  1.0|(20,[0,1,2,3,4,6,...|       0.0|
|  1.0|(20,[0,1,2,3,4,6,...|       1.0|
|  1.0|(20,[0,1,2,3,4,6,...|       0.0|
|  1.0|(20,[0,1,2,3,5,6,...|       1.0|
|  1.0|(20,[0,1,2,3,5,6,...|       1.0|
|  1.0|(20,[0,1,2,3,5,6,...|       1.0|
|  1.0|(20,[0,1,2,3,5,6,...|       0.0|
|  1.0|(20,[0,1,2,3,5,6,...|       1.0|
|  1.0|(20,[0,1,2,3,5,6,...|       0.0|
|  1.0|(20,[0,1,2,3,5,13...|       0.0|
|  1.0|(20,[0,1,2,3,6,7,...|       0.0|
|  1.0|(20,[0,1,2,3,6,8,...|       1.0|
|  1.0|(20,[0,1,2,4,5,6,...|       0.0|
|  1.0|(20,[0,1,2,4,5,6,...|       0.0|
|  1.0|(20,[0,1,2,4,5,6,...|       1.0|
+-----+--------------------+----------+
only showing top 20 rows

Test set accur

In [77]:
predictions=predictions.rdd.map(lambda lp: lp.prediction)
labelsAndPredictions = test.rdd.map(lambda lp: lp.label).zip(predictions)
# first metrics
from pyspark.mllib.evaluation import BinaryClassificationMetrics
metrics = BinaryClassificationMetrics(labelsAndPredictions)
# Area under precision-recall curve
print("Area under PR = %s" % metrics.areaUnderPR)
# Area under ROC curve
print("Area under ROC = %s" % metrics.areaUnderROC) 

Area under PR = 0.24075353941526348
Area under ROC = 0.7265446224256292


# Décision Tree

In [78]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

In [79]:
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(rescaledData)

# Split the data into training and test sets (30% held out for testing)
(trainData, testData) = rescaledData.randomSplit([0.7, 0.3])

In [80]:

dt = DecisionTreeRegressor(featuresCol="indexedFeatures")

# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, dt])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)


treeModel = model.stages[1]
# summary only
print(treeModel)

+-------------------+-----+--------------------+
|         prediction|label|            features|
+-------------------+-----+--------------------+
|0.13157894736842105|  0.0|(20,[0,4,6,7,10,1...|
|0.43537414965986393|  0.0|(20,[0,4,5,6,8,9,...|
|0.13157894736842105|  0.0|(20,[2,3,4,5,6,9,...|
| 0.0318118948824343|  0.0|(20,[1,2,4,5,7,10...|
|0.45714285714285713|  0.0|(20,[0,1,5,6,9,10...|
+-------------------+-----+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 0.308983
DecisionTreeRegressionModel (uid=DecisionTreeRegressor_2306e15cbffa) of depth 5 with 61 nodes
