### André Campos da Silva


### 08 de Janeiro, 2021

### Projeto -  Spam Classifier


Desenvolver um algoritmo em pyspark que seja capaz de determinar se uma mensagem é spam ou não, baseados em dados históricos. 

## Carregando pacotes

In [1]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import IDF
from pyspark.ml.feature import HashingTF, Tokenizer,StopWordsRemover,CountVectorizer
from pyspark.ml.classification import NaiveBayes, NaiveBayesModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import udf, col, lower, regexp_replace,ltrim, rtrim
from nltk.stem.snowball import SnowballStemmer


## Carregando os Dados

In [2]:
RDDdataset = sc.textFile("Dados/sms_spam.csv")

In [3]:
type(RDDdataset)

pyspark.rdd.RDD

In [4]:
RDDdataset.take(10)

['type,text',
 'ham,Hope you are having a good week. Just checking in',
 'ham,K..give back my thanks.',
 'ham,Am also doing in cbe only. But have to pay.',
 'spam,"complimentary 4 STAR Ibiza Holiday or £10,000 cash needs your URGENT collection. 09066364349 NOW from Landline not to lose out! Box434SK38WP150PPM18+"',
 'spam,okmail: Dear Dave this is your final notice to collect your 4* Tenerife Holiday or #5000 CASH award! Call 09061743806 from landline. TCs SAE Box326 CW25WX 150ppm',
 'ham,Aiya we discuss later lar... Pick u up at 4 is it?',
 'ham,Are you this much buzy',
 'ham,Please ask mummy to call father',
 'spam,Marvel Mobile Play the official Ultimate Spider-man game (£4.50) on ur mobile right now. Text SPIDER to 83338 for the game & we ll send u a FREE 8Ball wallpaper']

## Tratamento dos dados

In [5]:
# Removo o cabeçalho 
cabecalho = RDDdataset.take(1)[0]
RDDdataset = RDDdataset.filter(lambda line: line !=cabecalho)

In [6]:
RDDdataset.take(10)

['ham,Hope you are having a good week. Just checking in',
 'ham,K..give back my thanks.',
 'ham,Am also doing in cbe only. But have to pay.',
 'spam,"complimentary 4 STAR Ibiza Holiday or £10,000 cash needs your URGENT collection. 09066364349 NOW from Landline not to lose out! Box434SK38WP150PPM18+"',
 'spam,okmail: Dear Dave this is your final notice to collect your 4* Tenerife Holiday or #5000 CASH award! Call 09061743806 from landline. TCs SAE Box326 CW25WX 150ppm',
 'ham,Aiya we discuss later lar... Pick u up at 4 is it?',
 'ham,Are you this much buzy',
 'ham,Please ask mummy to call father',
 'spam,Marvel Mobile Play the official Ultimate Spider-man game (£4.50) on ur mobile right now. Text SPIDER to 83338 for the game & we ll send u a FREE 8Ball wallpaper',
 'ham,"fyi I\'m at usf now, swing by the room whenever"']

In [7]:
# formula para dividir o RDD em colunas e ja converte a label de caracterer para int. 
def transformlabel(RDD):
    tolist = RDD.split(",")
    label = 0.0 if tolist[0] == "ham" else 1.0
    return [label, tolist[1]]

In [8]:
# Aplico a formula para o RDD 
RDDdataset2 = RDDdataset.map(transformlabel)
RDDdataset2.take(20)

[[0.0, 'Hope you are having a good week. Just checking in'],
 [0.0, 'K..give back my thanks.'],
 [0.0, 'Am also doing in cbe only. But have to pay.'],
 [1.0, '"complimentary 4 STAR Ibiza Holiday or £10'],
 [1.0,
  'okmail: Dear Dave this is your final notice to collect your 4* Tenerife Holiday or #5000 CASH award! Call 09061743806 from landline. TCs SAE Box326 CW25WX 150ppm'],
 [0.0, 'Aiya we discuss later lar... Pick u up at 4 is it?'],
 [0.0, 'Are you this much buzy'],
 [0.0, 'Please ask mummy to call father'],
 [1.0,
  'Marvel Mobile Play the official Ultimate Spider-man game (£4.50) on ur mobile right now. Text SPIDER to 83338 for the game & we ll send u a FREE 8Ball wallpaper'],
 [0.0, '"fyi I\'m at usf now'],
 [0.0, '"Sure thing big man. i have hockey elections at 6'],
 [0.0, 'I anything lor...'],
 [0.0, '"By march ending'],
 [0.0, '"Hmm well'],
 [0.0, "K I'll be sure to get up before noon and see what's what"],
 [0.0, 'Ha ha cool cool chikku chikku:-):-DB-)'],
 [0.0,
  'Darren w

In [9]:
# Spark Session - Seção para usar a função de dataframe do spark
spSession = SparkSession.builder.master("local").appName("SparkMLLib").getOrCreate()

In [10]:
# Crio um dataframe do spark com os dados tratados acima. 
df_spam = spSession.createDataFrame(RDDdataset2, ["label", "message"])


In [11]:
# Imprimo as primeiras 10 linhas do dataframe. 
df_spam.select("label", "message").show(10)

+-----+--------------------+
|label|             message|
+-----+--------------------+
|  0.0|Hope you are havi...|
|  0.0|K..give back my t...|
|  0.0|Am also doing in ...|
|  1.0|"complimentary 4 ...|
|  1.0|okmail: Dear Dave...|
|  0.0|Aiya we discuss l...|
|  0.0|Are you this much...|
|  0.0|Please ask mummy ...|
|  1.0|Marvel Mobile Pla...|
|  0.0| "fyi I'm at usf now|
+-----+--------------------+
only showing top 10 rows



In [12]:
# Converto todas as mesangem para minúsculo 
df_spam = df_spam.select('label', lower(col('message')).alias('message'))

# Tiro os espaços
df_spam = df_spam.select('label', rtrim(col('message')).alias('message'))
df_spam = df_spam.select('label', ltrim(col('message')).alias('message'))

# Limpeza a mesangem, tirando pontuações, números e etc.
df_spam = df_spam.select('label',(regexp_replace('message','[^a-zA-Z\\s]', '')).alias('message'))



In [13]:
# Imprimo as primeiras 10 linhas do dataframe. 
df_spam.select("label", "message").show(10)

+-----+--------------------+
|label|             message|
+-----+--------------------+
|  0.0|hope you are havi...|
|  0.0|kgive back my thanks|
|  0.0|am also doing in ...|
|  1.0|complimentary  st...|
|  1.0|okmail dear dave ...|
|  0.0|aiya we discuss l...|
|  0.0|are you this much...|
|  0.0|please ask mummy ...|
|  1.0|marvel mobile pla...|
|  0.0|   fyi im at usf now|
+-----+--------------------+
only showing top 10 rows



In [14]:
# Crio as funções que serão usadas no pipeline 

# Essa função faz separa toda mensagem por palavra.
tokenization = Tokenizer(inputCol = 'message', outputCol = 'message_token')
# Essa função retira todas as palavras consideradas stopwords
none_stop_words = StopWordsRemover(inputCol = 'message_token', outputCol = 'message_stop')
# Essa função converte os textos em um vetor de tokens e uma representação esparsa.
vec_count = CountVectorizer(inputCol = 'message_stop', outputCol = 'message_vec')
# Essa função faz a contagem dos termos de frenquencia de importancia. 
idf = IDF(inputCol = 'message_vec', outputCol = 'features')



In [15]:
# Crio o pipeline
pipeline = Pipeline(stages=[tokenization, none_stop_words, vec_count, idf])
# Crio o modelo de treino para o pipeline no meu dataset de treino
df_spam_pipe = pipeline.fit(df_spam)
# Rodo a transformação do modelo criado no mesmo dataset de treino
df_spam_train = df_spam_pipe.transform(df_spam)
# Imprimo os resultados
df_spam_train.show(3)

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|label|             message|       message_token|        message_stop|         message_vec|            features|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|  0.0|hope you are havi...|[hope, you, are, ...|[hope, good, week...|(7220,[10,45,81,7...|(7220,[10,45,81,7...|
|  0.0|kgive back my thanks|[kgive, back, my,...|[kgive, back, tha...|(7220,[40,85,5679...|(7220,[40,85,5679...|
|  0.0|am also doing in ...|[am, also, doing,...|    [also, cbe, pay]|(7220,[100,315,12...|(7220,[100,315,12...|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 3 rows



In [16]:
# Retiro apenas o label e a ultima variável preditora tratada. 
df_spam_train = df_spam_train.select('label','features')
df_spam_train.select('label','features').show(10)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(7220,[10,45,81,7...|
|  0.0|(7220,[40,85,5679...|
|  0.0|(7220,[100,315,12...|
|  1.0|(7220,[0,182,670,...|
|  1.0|(7220,[0,2,38,111...|
|  0.0|(7220,[0,1,71,101...|
|  0.0|(7220,[55,1854],[...|
|  0.0|(7220,[2,52,75,59...|
|  1.0|(7220,[0,1,4,8,17...|
|  0.0|(7220,[3,924,1214...|
+-----+--------------------+
only showing top 10 rows



In [17]:
df_spam_train.take(1)


[Row(label=0.0, features=SparseVector(7220, {10: 3.3711, 45: 4.0695, 81: 4.3892, 743: 6.3208}))]

In [18]:
# Faço a divisão dos dados de treino e teste
train, test = df_spam_train.randomSplit([0.7, 0.3])

In [19]:
# Crio o modelo NaiveBayes
modelo_nb = NaiveBayes()

# Treino o modelo com os dados de train
modelo_fit = modelo_nb.fit(train)

# Faço as previões nos dados test
previsao = modelo_fit.transform(test)


In [20]:
# Imprimo as previsões nos dados test
previsao.show(3)

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|        (7220,[],[])|[-0.1420570947152...|[0.86757172131147...|       0.0|
|  0.0|(7220,[0],[1.3474...|[-6.1125883196269...|[0.61685706075819...|       0.0|
|  0.0|(7220,[0],[1.3474...|[-6.1125883196269...|[0.61685706075819...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 3 rows



In [22]:
# Apenas os labels e previsões 50 primeiras linhas 
previsao.select("prediction", "label").take(50)

[Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=1.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=1.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=1.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(predi

In [23]:
# Avaliando a acurácia do modelo 
acurácia = MulticlassClassificationEvaluator(predictionCol = "prediction", labelCol = "label", metricName = "accuracy")
acurácia.evaluate(previsao)

0.9022329511164756

O modelo obteve 90% de acurácia, para um modelo de filtro de spam está considerada boa, e aparentemente generalizado. Vamos abaixo verificar onde foram os erros dele com a matriz de confusão. 

In [24]:
# Imprimo a matriz de confusão das previsões. 
previsao.groupBy("label","prediction").count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|  219|
|  0.0|       1.0|  150|
|  1.0|       0.0|   12|
|  0.0|       0.0| 1276|
+-----+----------+-----+



## Analise em dados novos

Irei carregar outro dataset com mensagens diferentes e simulando dados novos para passar no meu modelo e testar a acuracia.
Temos que fazer todo o tratamento que foi feito nos dados de treino. 
Esse novo dataset contem 1000 novas mensagens.
Com esse teste vamos ter a noção exata se nosso modelo está aprendendo mesmo, ou seja, esta generalizando.

In [25]:
# Carrego os dados novos
RDDtestt = sc.textFile("Dados/sms_spam_test.csv")

In [26]:
# Removo o cabeçalho 
cabecalho = RDDtestt.take(1)[0]
RDDtestt = RDDtestt.filter(lambda line: line !=cabecalho)
RDDtestt.take(3)

['ham,Ok lar... Joking wif u oni...,,,,,,,,,,',
 'ham,U dun say so early hor... U c already then say...,,,,,,,,,,',
 "ham,Nah I don't think he goes to usf, he lives around here though,,,,,,,,,"]

In [27]:
# Aplico a formula para o RDD 
RDDtestt2 = RDDtestt.map(transformlabel)
RDDtestt2.take(3)

[[0.0, 'Ok lar... Joking wif u oni...'],
 [0.0, 'U dun say so early hor... U c already then say...'],
 [0.0, "Nah I don't think he goes to usf"]]

In [28]:
# Crio um dataframe. 
df_spam_test = spSession.createDataFrame(RDDtestt2, ["label", "message"])

In [29]:
# Converto todas as mesangem para minúsculo 
df_spam_test = df_spam_test.select('label', lower(col('message')).alias('message'))

# Tiro os espaços
df_spam_test = df_spam_test.select('label', rtrim(col('message')).alias('message'))
df_spam_test = df_spam_test.select('label', ltrim(col('message')).alias('message'))

# Limpeza a mesangem, tirando pontuações, números e etc.
df_spam_test = df_spam_test.select('label',(regexp_replace('message','[^a-zA-Z\\s]', '')).alias('message'))


In [30]:
# Imprimo as primeiras 10 linhas do dataframe. 
df_spam_test.select("label", "message").show(3)

+-----+--------------------+
|label|             message|
+-----+--------------------+
|  0.0|ok lar joking wif...|
|  0.0|u dun say so earl...|
|  0.0|nah i dont think ...|
+-----+--------------------+
only showing top 3 rows



In [31]:
# Aplico o modelo craido do pipeline para os dados de test para as transformações. 
df_spam_test = df_spam_pipe.transform(df_spam_test)
# Imprimo os resultados
df_spam_test.show(3)

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|label|             message|       message_token|        message_stop|         message_vec|            features|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|  0.0|ok lar joking wif...|[ok, lar, joking,...|[ok, lar, joking,...|(7220,[1,6,175,26...|(7220,[1,6,175,26...|
|  0.0|u dun say so earl...|[u, dun, say, so,...|[u, dun, say, ear...|(7220,[1,63,67,84...|(7220,[1,63,67,84...|
|  0.0|nah i dont think ...|[nah, i, dont, th...|[nah, dont, think...|(7220,[7,44,492,7...|(7220,[7,44,492,7...|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 3 rows



In [32]:
# Retiro apenas a variáveis label e features para passar para o modelo. 
df_spam_test_features = df_spam_test.select('label','features')



In [33]:
df_spam_test_features.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(7220,[1,6,175,26...|
|  0.0|(7220,[1,63,67,84...|
|  0.0|(7220,[7,44,492,7...|
|  0.0|(7220,[12,132,263...|
|  0.0|(7220,[0,154,173,...|
|  0.0|(7220,[3,7,20,22,...|
|  0.0|(7220,[48,59,97,1...|
|  0.0|(7220,[495,1054],...|
|  0.0|(7220,[46,218,525...|
|  0.0|(7220,[0,1,64,76,...|
|  0.0|(7220,[1,60,73,77...|
|  0.0|(7220,[144,1617,1...|
|  0.0|(7220,[0,3,18,157...|
|  0.0|(7220,[0,32,122,1...|
|  0.0|(7220,[0,1,4,9,19...|
|  0.0|(7220,[73,83,365,...|
|  0.0|(7220,[3,65,82,14...|
|  0.0|(7220,[82,151,374...|
|  0.0|(7220,[0,34,76,12...|
|  0.0|(7220,[3,40,197,6...|
+-----+--------------------+
only showing top 20 rows



In [34]:
# Faço as previões nos dados novos.
previsao2 = modelo_fit.transform(df_spam_test_features)
# Imprimimo as 10 primeiras linhas da previsão.
previsao2.show(10)

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(7220,[1,6,175,26...|[-216.10063374051...|[1.0,1.8616064476...|       0.0|
|  0.0|(7220,[1,63,67,84...|[-286.87939091783...|[1.0,5.6703319878...|       0.0|
|  0.0|(7220,[7,44,492,7...|[-188.92549287857...|[1.0,7.9995309326...|       0.0|
|  0.0|(7220,[12,132,263...|[-384.84952517476...|[0.99999999999999...|       0.0|
|  0.0|(7220,[0,154,173,...|[-833.05906190463...|[1.0,1.4574974854...|       0.0|
|  0.0|(7220,[3,7,20,22,...|[-293.63642484561...|[1.0,3.3349626388...|       0.0|
|  0.0|(7220,[48,59,97,1...|[-788.94923740744...|[1.0,1.7141468735...|       0.0|
|  0.0|(7220,[495,1054],...|[-108.49731808187...|[0.99489855065920...|       0.0|
|  0.0|(7220,[46,218,525...|[-135.26493671589...|[1.0,1.2519053142...|       0.0|
|  0.0|(7220,[0,

In [35]:
# Apenas os labesl e previsões 10 primeiras linhas 
previsao2.select("prediction", "label").take(50)

[Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(prediction=0.0, label=0.0),
 Row(predi

In [36]:
# Avaliando a acurácia para os dados novos.
acurácia2 = MulticlassClassificationEvaluator(predictionCol = "prediction", labelCol = "label", metricName = "accuracy")
acurácia2.evaluate(previsao2)

0.96996996996997

In [37]:
# Imprimo a matriz de confusão 
previsao2.groupBy("label","prediction").count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|  487|
|  0.0|       1.0|   17|
|  1.0|       0.0|   13|
|  0.0|       0.0|  482|
+-----+----------+-----+



## Considerações finais 

O modelo foi excelente nos dados novos atingiu uma acurácia de 96%, podemos trazer alguns novos dados para mais alguns testes, mas considerando a acurácia no treino e nos dados novos, pode-se agora criar uma solução analítica por exemplo e aplicar esse modelo nas previsões de filtro de spam em mensagens futuras. 

## Obrigado! Entre em contato comigo acessando meu portifolio (https://campos1989.github.io/) no menu contato!