## Proyecto NLP para reconocer mensajerías tipo SPAM

In [1]:
import findspark
findspark.init("/home/mauricio/spark-3.2.1-bin-hadoop3.2")
import pyspark

In [2]:
from pyspark.sql import SparkSession
SPARK = SparkSession.builder.appName("ProyectoNLP").getOrCreate()
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression

22/09/03 17:58:15 WARN Utils: Your hostname, mauricio-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
22/09/03 17:58:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/09/03 17:58:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/09/03 17:58:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
from pyspark.ml.feature import (Tokenizer, RegexTokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer,NGram)

In [4]:
from pyspark.sql.functions import col,udf, length

In [5]:
from pyspark.sql.types import IntegerType

In [6]:
df = SPARK.read.csv("/home/mauricio/curso/Spark_for_Machine_Learning/Natural_Language_Processing/smsspamcollection/SMSSpamCollection",
                   inferSchema=True, sep="\t")



### Crear una linea base

In [8]:
df.show(3)

+----+--------------------+
| _c0|                 _c1|
+----+--------------------+
| ham|Go until jurong p...|
| ham|Ok lar... Joking ...|
|spam|Free entry in 2 a...|
+----+--------------------+
only showing top 3 rows



In [9]:
print((df.count(), len(df.columns)))

(5574, 2)


In [13]:
## Renombrar columnas
df = df.withColumnRenamed("_c0","class").withColumnRenamed("_c1","response_text")

In [689]:
df.show(3)

+-----+--------------------+------+
|class|       response_text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
+-----+--------------------+------+
only showing top 3 rows



In [690]:
### Se crea el Pipeline
tokenizer = Tokenizer(inputCol="response_text" ,outputCol="token_text")
stop_remove=StopWordsRemover(inputCol="token_text" ,outputCol="stop_text")
count_vect=CountVectorizer(inputCol="stop_text" ,outputCol="c_vec")
idf=IDF(inputCol="c_vec" ,outputCol="tf_idf")

flag_nflag_numeric=StringIndexer(inputCol="class" ,outputCol="label") ### Label Encoder ( 0, 1)

In [691]:
from pyspark.ml.feature import VectorAssembler

In [692]:
## Eliges que Features (x's) usar en el modelo para entrenarlo
clean_up = VectorAssembler(inputCols=["tf_idf"],outputCol="features")

In [693]:
from pyspark.ml.classification import NaiveBayes

In [694]:
nb  = NaiveBayes()

In [695]:
from pyspark.ml import Pipeline

In [696]:
## Creas el Pipeline, que te hace datacleanin, tokeniza y vectoriza, etc.
data_prep_pipe = Pipeline(stages = [flag_nflag_numeric,tokenizer,stop_remove,count_vect,
                          idf,clean_up])

In [697]:
cleaner = data_prep_pipe.fit(df)

In [698]:
clean_data = cleaner.transform(df)

In [699]:
clean_data = clean_data.select("label","features")

In [700]:
clean_data.show(4)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(13423,[7,11,31,6...|
|  0.0|(13423,[0,24,297,...|
|  1.0|(13423,[2,13,19,3...|
|  0.0|(13423,[0,70,80,1...|
+-----+--------------------+
only showing top 4 rows



In [701]:
training,test=clean_data.randomSplit([0.7,0.3])

In [702]:
flag_detector = nb.fit(training)

22/09/03 22:23:07 WARN DAGScheduler: Broadcasting large task binary with size 1141.3 KiB
22/09/03 22:23:07 WARN DAGScheduler: Broadcasting large task binary with size 1125.6 KiB


In [703]:
df.printSchema()

root
 |-- class: string (nullable = true)
 |-- response_text: string (nullable = true)
 |-- length: integer (nullable = true)



In [704]:
test_results = flag_detector.transform(test)

In [705]:
### Observamos como predice
test_results.show(50)

22/09/03 22:23:07 WARN DAGScheduler: Broadcasting large task binary with size 1358.8 KiB


+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13423,[0,1,2,7,8...|[-671.15936157306...|[1.0,6.8628272745...|       0.0|
|  0.0|(13423,[0,1,2,41,...|[-928.38226325591...|[1.0,1.5089958811...|       0.0|
|  0.0|(13423,[0,1,7,8,1...|[-1002.1610889989...|[1.0,6.4086836214...|       0.0|
|  0.0|(13423,[0,1,7,15,...|[-539.30905223688...|[1.0,2.1955725259...|       0.0|
|  0.0|(13423,[0,1,14,78...|[-573.53850329822...|[1.0,1.8668975625...|       0.0|
|  0.0|(13423,[0,1,18,20...|[-729.08725061967...|[1.0,2.1655579625...|       0.0|
|  0.0|(13423,[0,1,21,27...|[-617.06976453005...|[1.0,4.7987801895...|       0.0|
|  0.0|(13423,[0,1,23,63...|[-1109.8093978057...|[1.0,4.7821200902...|       0.0|
|  0.0|(13423,[0,1,24,31...|[-274.58074613916...|[1.0,8.1731217499...|       0.0|
|  0.0|(13423,[0

In [706]:
## Observaremos Accuracy, f1, etc.
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_eval=MulticlassClassificationEvaluator()

In [707]:
ff1 = acc_eval.evaluate(test_results,{acc_eval.metricName: "f1"})

22/09/03 22:23:08 WARN DAGScheduler: Broadcasting large task binary with size 1363.7 KiB


In [None]:
### Obtenemos un f1 de 92%
print(ff1)
### Este será nuestra linea base

In [708]:
evaluator= MulticlassClassificationEvaluator()

evaluator.setPredictionCol("prediction")

evaluator.evaluate(test_results)

evaluator.evaluate(test_results, {evaluator.metricName: "accuracy"})
### Un accuracy del 91

22/09/03 22:23:08 WARN DAGScheduler: Broadcasting large task binary with size 1363.7 KiB
22/09/03 22:23:08 WARN DAGScheduler: Broadcasting large task binary with size 1363.7 KiB


0.9114990969295605

### La matriz de confusión de abajo es de NBayes

In [710]:
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType
preds_and_labels = test_results.select(['prediction',
                                       'label']).withColumn('label',
                                                        F.col('label').cast(FloatType())).orderBy('prediction')

from pyspark.mllib.evaluation import MulticlassMetrics
preds_and_labels = preds_and_labels.select(['prediction','label'])

metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))

print(metrics.confusionMatrix().toArray())
### Califica a los 0 con un 96% y a los 1 con un 90%

22/09/03 22:23:09 WARN DAGScheduler: Broadcasting large task binary with size 1352.9 KiB
22/09/03 22:23:09 WARN DAGScheduler: Broadcasting large task binary with size 1354.1 KiB
22/09/03 22:23:09 WARN DAGScheduler: Broadcasting large task binary with size 1340.5 KiB


[[1306.  140.]
 [   7.  208.]]


22/09/03 22:23:10 WARN DAGScheduler: Broadcasting large task binary with size 1350.6 KiB


In [716]:
#### Intentando con Log Reg

training,test=clean_data.randomSplit([0.7,0.3])
flag_detector = lr.fit(training)
test_results = flag_detector.transform(test)
ff1 = acc_eval.evaluate(test_results,{acc_eval.metricName: "f1"})
ff1
## F1 de 98

22/09/03 22:27:02 WARN DAGScheduler: Broadcasting large task binary with size 1139.1 KiB
22/09/03 22:27:03 WARN DAGScheduler: Broadcasting large task binary with size 1140.7 KiB
22/09/03 22:27:03 WARN DAGScheduler: Broadcasting large task binary with size 1140.7 KiB
22/09/03 22:27:04 WARN DAGScheduler: Broadcasting large task binary with size 1140.7 KiB
22/09/03 22:27:04 WARN DAGScheduler: Broadcasting large task binary with size 1140.7 KiB
22/09/03 22:27:04 WARN DAGScheduler: Broadcasting large task binary with size 1140.7 KiB
22/09/03 22:27:04 WARN DAGScheduler: Broadcasting large task binary with size 1140.7 KiB
22/09/03 22:27:04 WARN DAGScheduler: Broadcasting large task binary with size 1140.7 KiB
22/09/03 22:27:05 WARN DAGScheduler: Broadcasting large task binary with size 1140.7 KiB
22/09/03 22:27:05 WARN DAGScheduler: Broadcasting large task binary with size 1140.7 KiB
22/09/03 22:27:05 WARN DAGScheduler: Broadcasting large task binary with size 1140.7 KiB
22/09/03 22:27:05 WAR

0.9858046481960333

In [717]:
evaluator= MulticlassClassificationEvaluator()

evaluator.setPredictionCol("prediction")

evaluator.evaluate(test_results)

evaluator.evaluate(test_results, {evaluator.metricName: "accuracy"})
## Accuracy del 98%

22/09/03 22:27:12 WARN DAGScheduler: Broadcasting large task binary with size 1265.1 KiB
22/09/03 22:27:13 WARN DAGScheduler: Broadcasting large task binary with size 1265.1 KiB


0.9859484777517564

### La matriz de confusion de abajo es de Regresión Logistica

In [718]:
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType
preds_and_labels = test_results.select(['prediction',
                                       'label']).withColumn('label',
                                                        F.col('label').cast(FloatType())).orderBy('prediction')

from pyspark.mllib.evaluation import MulticlassMetrics
preds_and_labels = preds_and_labels.select(['prediction','label'])

metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))

print(metrics.confusionMatrix().toArray())
### Predice un poco mejor 

22/09/03 22:27:13 WARN DAGScheduler: Broadcasting large task binary with size 1254.2 KiB
22/09/03 22:27:13 WARN DAGScheduler: Broadcasting large task binary with size 1255.4 KiB
22/09/03 22:27:14 WARN DAGScheduler: Broadcasting large task binary with size 1241.9 KiB


[[1486.    7.]
 [  17.  198.]]


22/09/03 22:27:14 WARN DAGScheduler: Broadcasting large task binary with size 1251.7 KiB


## Pasando a hacer un poco de Ingeniería de Data y EDA

In [None]:
# Hacemos Data Exploration
df =  df.withColumn("length", length(df["_c1"]))

In [12]:
df.show(4)

+----+--------------------+------+
| _c0|                 _c1|length|
+----+--------------------+------+
| ham|Go until jurong p...|   111|
| ham|Ok lar... Joking ...|    29|
|spam|Free entry in 2 a...|   155|
| ham|U dun say so earl...|    49|
+----+--------------------+------+
only showing top 4 rows



In [38]:
df.select("response_text").show(truncate=False)
## Vemos que hay textos con secuencias de caracteres sin sentido

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|response_text                                                                                                                                                                                       |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                                                                     |
|Ok lar... Joking wif u oni...                                                                                                                                                                       |
|Free

#### Para limpiar un poco la data del texto, quizá podemos enviarlo todo a lower case, quitar comas, puntos y comillas.

In [43]:
## Observemos si hay data inútiles
from pyspark.sql.functions import col,length

## Uso esta linea para ver las palabras
#df.select("response_text").where(length(col("response_text"))<10).show(truncate=False)

## Luego esta linea para contar
df.select("response_text").where(length(col("response_text"))<10).count()
## parece que las que son menores de 10 letras no tienen mucha utilidad y son 56 rows (no quitamos mucha data)

56

In [44]:
df =  df.withColumn("length", length(df["response_text"]))
df_filtrado = df.filter(df.length>10)
## Filtramos y nos quedamos con las que tengan mas de 10 letras

In [45]:
df_filtrado.show(3)

+-----+--------------------+------+
|class|       response_text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
+-----+--------------------+------+
only showing top 3 rows



In [55]:
df_filtrado.groupBy("class").mean().show()

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham| 83.7940640458214|
| spam|139.3028263795424|
+-----+-----------------+



#### Vemos que la mayoría de los mensajes marcados como SPAM suelen tener un AVG de 139 caracterres, 
#### eso es el casi el doble de los que no están marcados

In [46]:
LISTA = ["/","@","*","+","-",")","(","&","$","#","!"]

from pyspark.sql.functions import regexp_replace
newDF = df_filtrado.withColumn("response_text",regexp_replace("response_text","'",""))
newDF = newDF.withColumn("response_text",regexp_replace("response_text","@",""))
newDF = newDF.withColumn("response_text",regexp_replace("response_text","/",""))
## Remover caracteres inputiles

In [67]:
newDF.show(3)

+-----+--------------------+------+
|class|       response_text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
+-----+--------------------+------+
only showing top 3 rows



In [48]:
import pyspark.sql.functions as f
# Se hace un lwoercase del texto
NDF = newDF.withColumn("response_text",f.lower("response_text"))

In [68]:
NDF.show(3)

+-----+--------------------+------+
|class|       response_text|length|
+-----+--------------------+------+
|  ham|go until jurong p...|   111|
| spam|free entry in 2 a...|   155|
|  ham|u dun say so earl...|    49|
+-----+--------------------+------+
only showing top 3 rows



In [259]:
df1 = NDF

In [260]:
## Para poder contar palabras y stopwords tenemos que tokenizar primero
tokenizer = Tokenizer(inputCol="response_text" ,outputCol="token_text") 
tokenized = tokenizer.transform(df1)

In [261]:
tokenized.show(3)

+-----+--------------------+------+--------------------+
|class|       response_text|length|          token_text|
+-----+--------------------+------+--------------------+
|  ham|go until jurong p...|   111|[go, until, juron...|
| spam|free entry in 2 a...|   155|[free, entry, in,...|
|  ham|u dun say so earl...|    49|[u, dun, say, so,...|
+-----+--------------------+------+--------------------+
only showing top 3 rows



In [262]:
from pyspark.sql.functions import col, count, explode

q_palabras = tokenized.select("*", explode("token_text").alias("exploded")).groupBy("class", "response_text","token_text","length").agg(count("exploded").alias("palabras"))
 
#Ya tengo la cantidad de palabras 

In [263]:
q_palabras.show(3)

+-----+--------------------+--------------------+------+--------+
|class|       response_text|          token_text|length|palabras|
+-----+--------------------+--------------------+------+--------+
| spam|want 2 get laid t...|[want, 2, get, la...|   162|      28|
| spam|call germany for ...|[call, germany, f...|   128|      24|
|  ham|i taught that ran...|[i, taught, that,...|   143|      27|
+-----+--------------------+--------------------+------+--------+
only showing top 3 rows



In [264]:
SWS = StopWordsRemover(inputCol="token_text" ,outputCol="SWS_") 
swsd = SWS.transform(q_palabras)
### Sacamos las StopWords para luego restar las palabras vs palabras sin StopWords y obtener la cantidad
### de StopWords

In [265]:
swsd.show(3)

+-----+--------------------+--------------------+------+--------+--------------------+
|class|       response_text|          token_text|length|palabras|                SWS_|
+-----+--------------------+--------------------+------+--------+--------------------+
| spam|want 2 get laid t...|[want, 2, get, la...|   162|      28|[want, 2, get, la...|
| spam|call germany for ...|[call, germany, f...|   128|      24|[call, germany, 1...|
|  ham|i taught that ran...|[i, taught, that,...|   143|      27|[taught, ranjith,...|
+-----+--------------------+--------------------+------+--------+--------------------+
only showing top 3 rows



In [268]:
#####                                                     AQUI agrupar por Flagged
Q_stops1 = swsd.select("*", explode("SWS_").alias("boom")).groupBy("class","response_text", "SWS_","palabras","length").agg(count("boom").alias("q_de_stops"))

In [269]:
Q_stops1.show(3)

+-----+--------------------+--------------------+--------+------+----------+
|class|       response_text|                SWS_|palabras|length|q_de_stops|
+-----+--------------------+--------------------+--------+------+----------+
|  ham|oops sorry. just ...|[oops, sorry., ch...|      22|   110|        11|
|  ham|mmmmm ... i loved...|[mmmmm, ..., love...|      35|   158|        18|
|  ham|aww thats the fir...|[aww, thats, firs...|      22|    99|        16|
+-----+--------------------+--------------------+--------+------+----------+
only showing top 3 rows



In [270]:
#### Visualizar data
Q_stops1.groupBy("class").mean().show()

##### 17-10 = 7 stopers avg (ham)      -----       27-17=10 stopers avg (spam)
#### La resta es el AVG de StopWords por clase (clase: spam, no spam)

+-----+------------------+-----------------+------------------+
|class|     avg(palabras)|      avg(length)|   avg(q_de_stops)|
+-----+------------------+-----------------+------------------+
|  ham|17.772002200220022|82.27970297029702| 10.18124312431243|
| spam|27.503852080123266|  138.42218798151|17.486902927580893|
+-----+------------------+-----------------+------------------+



In [271]:
# Visualizar data
q_palabras.groupBy("class").mean().show()

#### Los flageados suelen tener más cantidad de palabras 
#### Importa poco xq ya medimos la cantidad de letras, pero el ratio es similar y sirve para observar la Q de stopwords

### Solo usaremos como feature la cantidad de stopwords y length avg (letras)


+-----+-----------------+------------------+
|class|      avg(length)|     avg(palabras)|
+-----+-----------------+------------------+
|  ham|82.27970297029702|17.772002200220022|
| spam|  138.42218798151|27.503852080123266|
+-----+-----------------+------------------+



In [272]:
# Visualizar data
q_palabras.show(3)

+-----+--------------------+--------------------+------+--------+
|class|       response_text|          token_text|length|palabras|
+-----+--------------------+--------------------+------+--------+
| spam|want 2 get laid t...|[want, 2, get, la...|   162|      28|
| spam|call germany for ...|[call, germany, f...|   128|      24|
|  ham|i taught that ran...|[i, taught, that,...|   143|      27|
+-----+--------------------+--------------------+------+--------+
only showing top 3 rows



In [273]:
## Restamos palabras - palabras_sin_stopwords y obtenemos la cantidad de stopwords
df2 = Q_stops1.withColumn("StopWordsQ",col("palabras")-col("q_de_stops"))

In [274]:
df2.show(3)

+-----+--------------------+--------------------+--------+------+----------+----------+
|class|       response_text|                SWS_|palabras|length|q_de_stops|StopWordsQ|
+-----+--------------------+--------------------+--------+------+----------+----------+
|  ham|oops sorry. just ...|[oops, sorry., ch...|      22|   110|        11|        11|
|  ham|mmmmm ... i loved...|[mmmmm, ..., love...|      35|   158|        18|        17|
|  ham|aww thats the fir...|[aww, thats, firs...|      22|    99|        16|         6|
+-----+--------------------+--------------------+--------+------+----------+----------+
only showing top 3 rows



In [296]:
df1.printSchema()

root
 |-- class: string (nullable = true)
 |-- response_text: string (nullable = true)
 |-- length: integer (nullable = true)



In [297]:
df2.printSchema()

root
 |-- class: string (nullable = true)
 |-- response_text: string (nullable = true)
 |-- SWS_: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- palabras: long (nullable = false)
 |-- length: integer (nullable = true)
 |-- q_de_stops: long (nullable = false)
 |-- StopWordsQ: long (nullable = false)



In [275]:
df2.groupBy("class").mean().show()
### Vemos que el average de cantidad de stopwords es igual al visto anteriormente

+-----+------------------+-----------------+------------------+------------------+
|class|     avg(palabras)|      avg(length)|   avg(q_de_stops)|   avg(StopWordsQ)|
+-----+------------------+-----------------+------------------+------------------+
|  ham|17.772002200220022|82.27970297029702| 10.18124312431243| 7.590759075907591|
| spam|27.503852080123266|  138.42218798151|17.486902927580893|10.016949152542374|
+-----+------------------+-----------------+------------------+------------------+



#### Ahora con este DF completo se arma el pipeline,
#### Ojo que este ya está TOKENIZADO y Sin STOPWORDS

In [570]:
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [596]:
### Se crea el Pipeline
tokenizer = Tokenizer(inputCol="response_text" ,outputCol="token_text")
stop_remove=StopWordsRemover(inputCol="token_text" ,outputCol="stop_text")
#bigrams = NGram(n=24, inputCol="stop_text",outputCol="bigrams") ### con 2 no sirve Reduce mucho el accuracy y f1 score
###       #####                 #####                 #####      ### con 10+ queda igual que la linea base
count_vect=CountVectorizer(inputCol="stop_text" ,outputCol="c_vec")
idf=IDF(inputCol="c_vec" ,outputCol="tf_idf")

flag_nflag_numeric=StringIndexer(inputCol="class" ,outputCol="label")

In [597]:
from pyspark.ml.feature import VectorAssembler

In [598]:
## Eliges que Features (x's) usar en el modelo para entrenarlo, en este caso
## usamos otras features como length y cantidad de stopwords, para ver si se le da mas poder predictivo al modelo

clean_up = VectorAssembler(inputCols=["tf_idf","length","StopWordsQ"],outputCol="features")

In [599]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes

In [600]:
nb  = NaiveBayes()
lr  = LogisticRegression()

In [601]:
from pyspark.ml import Pipeline

In [602]:
## Creas el Pipeline, que te hace datacleanin, tokeniza y vectoriza, etc.
## Aumenta el bigrams o remuevelo si lo usas o no
data_prep_pipe = Pipeline(stages = [flag_nflag_numeric, tokenizer,stop_remove, count_vect,
                          idf,clean_up])

In [603]:
cleaner = data_prep_pipe.fit(df2)

In [604]:
clean_data = cleaner.transform(df2)

In [605]:
clean_data = clean_data.select("label","features")

In [606]:
clean_data.show(4)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(12866,[9,184,216...|
|  0.0|(12866,[22,49,50,...|
|  0.0|(12866,[0,24,25,5...|
|  0.0|(12866,[26,123,20...|
+-----+--------------------+
only showing top 4 rows



22/09/03 22:07:12 WARN DAGScheduler: Broadcasting large task binary with size 1125.0 KiB


In [621]:
training,test=clean_data.randomSplit([0.8,0.2])

In [622]:
flag_detector = nb.fit(training)

22/09/03 22:07:53 WARN DAGScheduler: Broadcasting large task binary with size 1138.6 KiB
22/09/03 22:07:54 WARN DAGScheduler: Broadcasting large task binary with size 1115.7 KiB


In [623]:
df2.printSchema()

root
 |-- class: string (nullable = true)
 |-- response_text: string (nullable = true)
 |-- SWS_: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- palabras: long (nullable = false)
 |-- length: integer (nullable = true)
 |-- q_de_stops: long (nullable = false)
 |-- StopWordsQ: long (nullable = false)



In [624]:
test_results = flag_detector.transform(test)

In [625]:
### Veamos como predice
test_results.show(3)

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(12866,[0,1,4,16,...|[-677.55956522220...|[1.0,1.7476945285...|       0.0|
|  0.0|(12866,[0,1,7,10,...|[-870.73954884635...|[1.0,9.1773894723...|       0.0|
|  0.0|(12866,[0,1,20,33...|[-395.07555018710...|[1.0,1.2979971771...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 3 rows



22/09/03 22:07:55 WARN DAGScheduler: Broadcasting large task binary with size 1349.4 KiB


In [626]:
## Observaremos el accuracy f1 etc
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_eval=MulticlassClassificationEvaluator()

In [627]:
ff1 = acc_eval.evaluate(test_results,{acc_eval.metricName: "f1"})

22/09/03 22:07:55 WARN DAGScheduler: Broadcasting large task binary with size 1353.2 KiB


In [628]:
ff1
## f1 de 93 

0.930390966110323

In [629]:
evaluator= MulticlassClassificationEvaluator()

evaluator.setPredictionCol("prediction")

evaluator.evaluate(test_results)

evaluator.evaluate(test_results, {evaluator.metricName: "accuracy"})
## Accuracy de 926

22/09/03 22:07:56 WARN DAGScheduler: Broadcasting large task binary with size 1353.2 KiB
22/09/03 22:07:57 WARN DAGScheduler: Broadcasting large task binary with size 1353.2 KiB


0.9261744966442953

In [630]:
### Ahora intenamos con Log Reg
flag_detector = lr.fit(training)

22/09/03 22:07:59 WARN DAGScheduler: Broadcasting large task binary with size 1138.1 KiB
22/09/03 22:07:59 WARN DAGScheduler: Broadcasting large task binary with size 1139.7 KiB
22/09/03 22:07:59 WARN DAGScheduler: Broadcasting large task binary with size 1139.7 KiB
22/09/03 22:08:00 WARN DAGScheduler: Broadcasting large task binary with size 1139.7 KiB
22/09/03 22:08:00 WARN DAGScheduler: Broadcasting large task binary with size 1139.7 KiB
22/09/03 22:08:00 WARN DAGScheduler: Broadcasting large task binary with size 1139.7 KiB
22/09/03 22:08:00 WARN DAGScheduler: Broadcasting large task binary with size 1139.7 KiB
22/09/03 22:08:01 WARN DAGScheduler: Broadcasting large task binary with size 1139.7 KiB
22/09/03 22:08:01 WARN DAGScheduler: Broadcasting large task binary with size 1139.7 KiB
22/09/03 22:08:01 WARN DAGScheduler: Broadcasting large task binary with size 1139.7 KiB
22/09/03 22:08:01 WARN DAGScheduler: Broadcasting large task binary with size 1139.7 KiB
22/09/03 22:08:02 WAR

In [631]:
test_results = flag_detector.transform(test)

In [632]:
ff1 = acc_eval.evaluate(test_results,{acc_eval.metricName: "f1"})

22/09/03 22:08:11 WARN DAGScheduler: Broadcasting large task binary with size 1259.0 KiB


In [633]:
ff1
### Con log reg aumenta a 96

0.968944970314222

In [634]:
evaluator= MulticlassClassificationEvaluator()

evaluator.setPredictionCol("prediction")

evaluator.evaluate(test_results)

evaluator.evaluate(test_results, {evaluator.metricName: "accuracy"})
### Con Acc aumenta a 96

22/09/03 22:08:12 WARN DAGScheduler: Broadcasting large task binary with size 1259.0 KiB
22/09/03 22:08:13 WARN DAGScheduler: Broadcasting large task binary with size 1259.0 KiB


0.9697986577181208

## Añadianedo lematizer

In [461]:
from nltk.stem import WordNetLemmatizer
from nltk.stem.porter import *

In [473]:
import nltk
from nltk import *

In [635]:
from pyspark.sql.types import StringType
def get_wordnet_pos(treebank_tag):
    """
    return WORDNET POS compliance to WORDENT lemmatization (a,n,r,v) 
        """
    if treebank_tag.startswith('J'):
            return 'a'
    elif treebank_tag.startswith('V'):
            return 'v'
    elif treebank_tag.startswith('N'):
            return 'n'
    elif treebank_tag.startswith('R'):
            return 'r'
    else:
    # As default pos in lemmatization is Noun
        return 'n'


def lemmatize1(data_str):
    # expects a string
    list_pos = 0
    cleaned_str = ''
    lmtzr = WordNetLemmatizer()
    #text = data_str.split()
    tagged_words = nltk.pos_tag(data_str)
    for word in tagged_words:
        lemma = lmtzr.lemmatize(word[0], get_wordnet_pos(word[1]))
        if list_pos == 0:
            cleaned_str = lemma
        else:
            cleaned_str = cleaned_str + ' ' + lemma
        list_pos += 1
    return cleaned_str

sparkLemmer1 = udf(lambda x: lemmatize1(x), StringType())

In [636]:
## Se tokeniza antes de aplicar el lematizer
tokenizer = Tokenizer(inputCol="response_text" ,outputCol="token_text") 
tokenized = tokenizer.transform(df2)

In [637]:
tokenized.show(3)

+-----+--------------------+--------------------+--------+------+----------+----------+--------------------+
|class|       response_text|                SWS_|palabras|length|q_de_stops|StopWordsQ|          token_text|
+-----+--------------------+--------------------+--------+------+----------+----------+--------------------+
|  ham|oops sorry. just ...|[oops, sorry., ch...|      22|   110|        11|        11|[oops, sorry., ju...|
|  ham|mmmmm ... i loved...|[mmmmm, ..., love...|      35|   158|        18|        17|[mmmmm, ..., i, l...|
|  ham|aww thats the fir...|[aww, thats, firs...|      22|    99|        16|         6|[aww, thats, the,...|
+-----+--------------------+--------------------+--------+------+----------+----------+--------------------+
only showing top 3 rows



In [638]:
## Se lematiza y se selecciona las cols que te quieras quedar
lema_df= tokenized.select("class","response_text","length", "SWS_","length", "StopWordsQ",
                                 sparkLemmer1("token_text").alias('lems'))

In [639]:
lema_df.show()

[Stage 2514:>                                                       (0 + 1) / 1]

+-----+--------------------+------+--------------------+------+----------+--------------------+
|class|       response_text|length|                SWS_|length|StopWordsQ|                lems|
+-----+--------------------+------+--------------------+------+----------+--------------------+
|  ham|oops sorry. just ...|   110|[oops, sorry., ch...|   110|        11|oops sorry. just ...|
|  ham|mmmmm ... i loved...|   158|[mmmmm, ..., love...|   158|        17|mmmmm ... i love ...|
|  ham|aww thats the fir...|    99|[aww, thats, firs...|    99|         6|aww thats the fir...|
|  ham|no need to buy lu...|    47|[need, buy, lunch...|    47|         4|no need to buy lu...|
|  ham|o we cant see if ...|    74|[o, cant, see, jo...|    74|         7|o we cant see if ...|
|  ham|nope thats fine. ...|    41|[nope, thats, fin...|    41|         3|nope thats fine. ...|
|  ham|those cocksuckers...|   133|[cocksuckers., ma...|   133|         9|those cocksuckers...|
|  ham|jade its paul. y ...|    89|[jade

                                                                                

In [640]:
### Se crea el Pipeline
### Ahora no hay tokenizer, entra el "lems" del lema_df
tokenizer = Tokenizer(inputCol="lems" ,outputCol="token_text") 
stop_remove=StopWordsRemover(inputCol="token_text" ,outputCol="stop_text")
#bigrams = NGram(n=3, inputCol="stop_text",outputCol="bigrams")
count_vect=CountVectorizer(inputCol="stop_text" ,outputCol="c_vec")
idf=IDF(inputCol="c_vec" ,outputCol="tf_idf") 

flag_nflag_numeric=StringIndexer(inputCol="class" ,outputCol="label")

In [641]:
from pyspark.ml.feature import VectorAssembler

In [642]:
## Eliges que Features (x's) usar en el modelo para entrenarlo
clean_up = VectorAssembler(inputCols=["tf_idf","length","StopWordsQ"],outputCol="features")

In [643]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes

In [644]:
nb  = NaiveBayes()
lr  = LogisticRegression()

In [645]:
from pyspark.ml import Pipeline

In [646]:
## Creas el Pipeline, que t ehace datacleanin, tokeniza y vectoriza, etc.
data_prep_pipe = Pipeline(stages = [flag_nflag_numeric,tokenizer,stop_remove,count_vect,
                          idf,clean_up])

In [647]:
cleaner = data_prep_pipe.fit(lematizar_eldf)

                                                                                

In [648]:
clean_data = cleaner.transform(lematizar_eldf)

In [649]:
clean_data = clean_data.select("label","features")

In [650]:
clean_data.show(4)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(11994,[10,85,168...|
|  0.0|(11994,[0,3,24,30...|
|  0.0|(11994,[0,19,22,2...|
|  0.0|(11994,[26,101,16...|
+-----+--------------------+
only showing top 4 rows



[Stage 2547:>                                                       (0 + 1) / 1]                                                                                

In [651]:
training,test=clean_data.randomSplit([0.8,0.2])

In [652]:
flag_detector = nb.fit(training)

22/09/03 22:10:10 WARN DAGScheduler: Broadcasting large task binary with size 1078.7 KiB
22/09/03 22:10:16 WARN DAGScheduler: Broadcasting large task binary with size 1052.6 KiB
                                                                                

In [653]:
test_results = flag_detector.transform(test)

In [654]:
test_results.show(3)

22/09/03 22:10:17 WARN DAGScheduler: Broadcasting large task binary with size 1275.7 KiB
[Stage 2566:>                                                       (0 + 1) / 1]

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(11994,[0,1,3,4,7...|[-964.94723188897...|[1.0,1.3294001706...|       0.0|
|  0.0|(11994,[0,1,3,4,7...|[-822.47970638325...|[1.0,6.3716408349...|       0.0|
|  0.0|(11994,[0,1,3,11,...|[-756.50714821824...|[1.0,1.1209148547...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 3 rows



                                                                                

In [655]:
## Observaremos el accuracy f1 etc
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
f1_eval=MulticlassClassificationEvaluator()

In [656]:
ff1 = f1_eval.evaluate(test_results,{acc_eval.metricName: "f1"})

22/09/03 22:10:22 WARN DAGScheduler: Broadcasting large task binary with size 1279.5 KiB
                                                                                

In [657]:
ff1

0.9204199024221255

In [658]:
evaluator= MulticlassClassificationEvaluator()

evaluator.setPredictionCol("prediction")

evaluator.evaluate(test_results)

evaluator.evaluate(test_results, {evaluator.metricName: "accuracy"})

22/09/03 22:10:27 WARN DAGScheduler: Broadcasting large task binary with size 1279.5 KiB
22/09/03 22:10:32 WARN DAGScheduler: Broadcasting large task binary with size 1279.5 KiB
                                                                                

0.9148936170212766

In [None]:
### intentamos con LogReg

In [659]:
flag_detector = lr.fit(training)

22/09/03 22:10:37 WARN DAGScheduler: Broadcasting large task binary with size 1078.0 KiB
22/09/03 22:10:42 WARN DAGScheduler: Broadcasting large task binary with size 1079.6 KiB
22/09/03 22:10:46 WARN DAGScheduler: Broadcasting large task binary with size 1079.6 KiB
22/09/03 22:10:47 WARN DAGScheduler: Broadcasting large task binary with size 1079.6 KiB
22/09/03 22:10:47 WARN DAGScheduler: Broadcasting large task binary with size 1079.6 KiB
22/09/03 22:10:47 WARN DAGScheduler: Broadcasting large task binary with size 1079.6 KiB
22/09/03 22:10:47 WARN DAGScheduler: Broadcasting large task binary with size 1079.6 KiB
22/09/03 22:10:48 WARN DAGScheduler: Broadcasting large task binary with size 1079.6 KiB
22/09/03 22:10:48 WARN DAGScheduler: Broadcasting large task binary with size 1079.6 KiB
22/09/03 22:10:48 WARN DAGScheduler: Broadcasting large task binary with size 1079.6 KiB
22/09/03 22:10:48 WARN DAGScheduler: Broadcasting large task binary with size 1079.6 KiB
22/09/03 22:10:49 WAR

In [660]:
test_results = flag_detector.transform(test)

In [661]:
### test_results.show(100)

In [662]:
ff1 = acc_eval.evaluate(test_results,{acc_eval.metricName: "f1"})

22/09/03 22:10:55 WARN DAGScheduler: Broadcasting large task binary with size 1192.0 KiB
                                                                                

In [663]:
ff1
## Mejora

0.9719526505432493

In [725]:
evaluator= MulticlassClassificationEvaluator()

evaluator.setPredictionCol("prediction")

evaluator.evaluate(test_results)

evaluator.evaluate(test_results, {evaluator.metricName: "accuracy"})

### LogReg Mejora otra vez, sin embargo, con lematizer, sin lematizer y sin o con mas features el modelo no mejora

22/09/04 00:28:19 WARN DAGScheduler: Broadcasting large task binary with size 1265.1 KiB
22/09/04 00:28:19 WARN DAGScheduler: Broadcasting large task binary with size 1265.1 KiB


0.9859484777517564

In [665]:
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType
preds_and_labels = test_results.select(['prediction',
                                       'label']).withColumn('label',
                                                        F.col('label').cast(FloatType())).orderBy('prediction')

#select only prediction and label columns
from pyspark.mllib.evaluation import MulticlassMetrics
preds_and_labels = preds_and_labels.select(['prediction','label'])

metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))

print(metrics.confusionMatrix().toArray())

22/09/03 22:12:53 WARN DAGScheduler: Broadcasting large task binary with size 1182.3 KiB
22/09/03 22:13:00 WARN DAGScheduler: Broadcasting large task binary with size 1182.2 KiB
22/09/03 22:13:04 WARN DAGScheduler: Broadcasting large task binary with size 1157.5 KiB
22/09/03 22:13:05 WARN DAGScheduler: Broadcasting large task binary with size 1167.3 KiB


[[700.   1.]
 [ 22. 123.]]


In [None]:
#### Predice muy bien los 0's

In [723]:
(2*(700/(701)) * (700/(722))) / ((700/(701))+(700/(722)))
## f1

0.9838369641602248

In [729]:
lr_metric = MulticlassMetrics(test_results["label","prediction"].rdd)

22/09/04 00:32:33 WARN DAGScheduler: Broadcasting large task binary with size 1256.7 KiB


In [731]:
print("Accuracy", lr_metric.accuracy)
print("Precision", lr_metric.precision(1.0))
print("Recall", lr_metric.recall(1.0))
print("F1score", lr_metric.fMeasure(1.0))
### Reusltados buenos

Accuracy 0.9859484777517564
Precision 0.9209302325581395
Recall 0.9658536585365853
F1score 0.9428571428571427


In [732]:
print(700/(701)) P
print((700/(722))) R

0.9985734664764622
0.9695290858725761


### Conclusiones
##### Podemos concluir que sin/con lematizer y sin/con nuevas features (cantidad stopwords y length)
##### el modelo fue igual en todos los casos, pero se mejora si en vez de usar NBayes se usa una Regresion Logistica.
##### Tambien la matriz de confusion nos demuesta que el modelo tiene un comportamiento correcto respecto al Recall, Precision