<span style="color: green; font-size: 40px; font-weight: bold;">Lab 4 (Classificação Binária Probabilística) </span>

<br> <br>

# Prevendo Se Uma Mensagem de Texto é Spam

<br>

### Contexto

Neste projeto, abordaremos um problema comum em sistemas de comunicação digital: a **detecção de mensagens de texto indesejadas, conhecidas como spam**. Com o aumento do uso de mensagens eletrônicas, a capacidade de distinguir automaticamente entre mensagens legítimas e spam se tornou essencial para manter a integridade e a usabilidade dos serviços de mensagens. O objetivo deste projeto é desenvolver um modelo preditivo que, baseado no conteúdo textual das mensagens, consiga identificar se uma mensagem é spam ou não. Utilizaremos técnicas de Machine Learning para criar um modelo de classificação binária probabilística, capaz de fornecer uma previsão acompanhada de uma estimativa de probabilidade.

<br>

### Objetivo

O objetivo deste projeto é **construir um modelo de Machine Learning capaz de prever se uma mensagem de texto é spam**. O modelo será treinado utilizando dados históricos de mensagens rotuladas como "spam" ou "ham" (não spam), permitindo que ele faça previsões sobre novas mensagens com base em padrões aprendidos.

<br>

### Pergunta de Negócio Principal

> "Como podemos prever se uma mensagem de texto é spam utilizando seu conteúdo textual?"

<br>

### Entregável

O entregável deste projeto será um **modelo de Machine Learning treinado para identificar mensagens de texto como spam ou não**. O modelo será capaz de classificar novas mensagens e fornecer a probabilidade associada a cada previsão. O processo de desenvolvimento incluirá a preparação dos dados, a seleção de features relevantes, o treinamento do modelo e a avaliação de seu desempenho.

<br>

### Sobre o Conjunto de Dados

Os dados utilizados neste projeto contêm uma coleção de mensagens de texto classificadas manualmente como "spam" ou "ham". Cada entrada do conjunto de dados inclui o texto da mensagem e sua respectiva classificação. Utilizaremos esse conjunto para treinar e validar nosso modelo.

<br>
<table border="2">
  <tr>
    <th style="text-align: center; font-size: 16px;">Nome da Coluna</th>
    <th style="text-align: center; font-size: 16px;">Tipo de Dado</th>
    <th style="text-align: center; font-size: 16px;">Descrição</th>
  </tr>
  <tr>
    <td>label</td>
    <td>string</td>
      <td>Classificação da mensagem (<b>ham</b> para <i>não spam</i> e <b>spam</b> para <i>mensagens indesejadas</i>).</td>
  </tr>
  <tr>
    <td>message</td>
    <td>string</td>
    <td>Conteúdo textual da mensagem.</td>
  </tr>
</table>

<br><br><br>

# Importando Pacotes

In [1]:
# Importa o findspark e inicializa
import findspark
findspark.init()

# Imports
import numpy as np
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import IDF, HashingTF, Tokenizer
from pyspark.ml.classification import NaiveBayes, NaiveBayesModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.sql import Row

<br> <br>

# <span style="color: green; font-size: 38px; font-weight: bold;">Preparando o Ambiente Spark</span>

In [2]:
# Definindo semente aleatória (seed) para reprodutibilidade do notebook
rnd_seed = 23
np.random.seed = rnd_seed
np.random.set_state = rnd_seed

# Se houver uma sessão Spark ativa, encerre-a
if 'sc' in globals():
    sc.stop()

if 'spark' in globals():
    spark.stop()


# Criando o Spark Context
conf = SparkConf().setAppName("Lab4") \
                  .set("spark.ui.showConsoleProgress", "false") \
                  .set("spark.executor.heartbeatInterval", "20s") \
                  .set("spark.eventLog.enabled", "false") \
                  .set("spark.sql.shuffle.partitions", "2") \
                  .set("spark.sql.debug.maxToStringFields", "100") \
                  .set("spark.executor.memory", "4g") \
                  .set("spark.driver.memory", "4g") \
                  .set("spark.driver.maxResultSize", "2g")  # Configuração adicional para limitar o tamanho do resultado

# Criar o Spark Context e a Spark Session
sc = SparkContext(conf=conf)
spSession = SparkSession.builder.config(conf=conf).getOrCreate()

# Ajustar o nível de log para ERROR
sc.setLogLevel("ERROR")

# Configurar log4j para suprimir avisos (deixar como comentário e volta ao normal)
log4j_logger = sc._jvm.org.apache.log4j
log4j_logger.LogManager.getLogger("org").setLevel(log4j_logger.Level.ERROR)
log4j_logger.LogManager.getLogger("akka").setLevel(log4j_logger.Level.ERROR)

# Visualizar o objeto spark_session
spSession

24/08/12 18:04:46 WARN Utils: Your hostname, eduardo-Inspiron-15-3520 resolves to a loopback address: 127.0.1.1; using 192.168.0.13 instead (on interface wlp0s20f3)
24/08/12 18:04:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/12 18:04:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/08/12 18:04:47 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/08/12 18:04:47 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


<br><br>

# <span style="color: green; font-size: 38px; font-weight: bold;">Carregando os Dados</span>

- Os dados serão carregados a partir de um arquivo CSV e gerados como um RDD (Resilient Distributed Dataset) no Apache Spark. O RDD é uma estrutura de dados distribuída que permite o processamento paralelo em um cluster, otimizando a performance.

In [3]:
# Carregando os dados e gerando um RDD
spamRDD = sc.textFile("Lab/dados/dataset4.csv")

# Tipo
print(type(spamRDD), '\n')

# Colocando o RDD em cache. Esse processo otimiza a performance
print(spamRDD.cache(), '\n')

# Número de registros
print(spamRDD.count(), '\n')

<class 'pyspark.rdd.RDD'> 

Lab/dados/dataset4.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 

1000 



In [4]:
# Visualizando as primeiras linhas
print(spamRDD.take(5))

['ham,Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...,,,,,,,,,', 'ham,Ok lar... Joking wif u oni...,,,,,,,,,,', 'ham,U dun say so early hor... U c already then say...,,,,,,,,,,', "ham,Nah I don't think he goes to usf, he lives around here though,,,,,,,,,", 'ham,Even my brother is not like to speak with me. They treat me like aids patent.,,,,,,,,,,']


In [5]:
## Visualizando primeiras 4 linhas com Pandas (Apenas para visualização)

import pandas as pd

# Obter as primeiras 5 linhas do RDD
linhas = spamRDD.take(5)

# Dividir as linhas em colunas utilizando a primeira vírgula como delimitador
dados_formatados = [linha.split(",", 1) for linha in linhas]

# Criar um DataFrame Pandas com as colunas e os dados formatados
df = pd.DataFrame(dados_formatados, columns=["label", "message"])

# Mostrar o DataFrame
display(df)

RuntimeError: module was compiled against NumPy C-API version 0x10 (NumPy 1.23) but the running NumPy has C-API version 0xf. Check the section C-API incompatibility at the Troubleshooting ImportError section at https://numpy.org/devdocs/user/troubleshooting-importerror.html#c-api-incompatibility for indications on how to solve this problem.

Unnamed: 0,label,message
0,ham,"Go until jurong point, crazy.. Available only ..."
1,ham,"Ok lar... Joking wif u oni...,,,,,,,,,,"
2,ham,U dun say so early hor... U c already then say...
3,ham,"Nah I don't think he goes to usf, he lives aro..."
4,ham,Even my brother is not like to speak with me. ...


<br><br><br>

# <span style="color: green; font-size: 38px; font-weight: bold;"> Análise Exploratória Inicial dos Dados </span>

<br>

### Criação de Função Para Análise Inicial

In [6]:
import pandas as pd
import re

def funcao_analise_inicial(df):
    # Configurar Pandas para exibir todas as linhas
    pd.set_option('display.max_rows', None)

    # Informações do DataFrame
    print('\n\n INFO \n\n')
    df.info()
    print('\n\n ------------------------------------------------------------------------------------------ \n\n')

    # Verifica se há valores ausentes e duplicados
    valores_ausentes = df.isna().sum().sum() > 0
    valores_duplicados = df.duplicated().sum() > 0

    # Nomes das variáveis com valores ausentes
    variaveis_ausentes = df.columns[df.isna().any()].tolist()

    # Número de linhas duplicadas
    num_linhas_duplicadas = df.duplicated().sum()

    # Porcentagem de linhas duplicadas
    porcentagem_linhas_duplicadas = (num_linhas_duplicadas / len(df)) * 100

    # Exibe o resultado sobre valores ausentes e duplicados
    print("\n\nExistem valores ausentes:", valores_ausentes)
    if valores_ausentes:
        print("\nVariáveis com valores ausentes:", variaveis_ausentes)
    else:
        print("\nNenhuma variável possui valores ausentes.")

    print("\n\nExistem valores duplicados:", valores_duplicados)
    if valores_duplicados:
        print("\nNúmero de Linhas Duplicadas:", num_linhas_duplicadas)
        print("\nPorcentagem de Linhas Duplicadas: {:.2f}%".format(porcentagem_linhas_duplicadas))
    else:
        print("\nNenhuma variável possui valores duplicados.")
    
    # Verificação de caracteres especiais
    caracteres_especiais = re.compile('[@_!#$%^&*<>()?/\\|}{~:]')   # nenhum caracter removido
    colunas_com_caracteres_especiais = {}

    for coluna in df.columns:
        if df[coluna].dtype == 'object':  # Verifica apenas colunas de texto
            contem_caracteres_especiais = df[coluna].apply(lambda x: bool(caracteres_especiais.search(x) if isinstance(x, str) else False)).any()
            if contem_caracteres_especiais:
                indices_com_caracteres_especiais = df[coluna][df[coluna].apply(lambda x: bool(caracteres_especiais.search(x) if isinstance(x, str) else False))].index.tolist()
                colunas_com_caracteres_especiais[coluna] = indices_com_caracteres_especiais

    # Exibe o resultado sobre caracteres especiais
    print("\n\nExistem caracteres especiais nas colunas:", bool(colunas_com_caracteres_especiais))
    if colunas_com_caracteres_especiais:
        print("\nColunas com caracteres especiais e os índices:")
        for coluna, indices in colunas_com_caracteres_especiais.items():
            print(f"\n Coluna [ {coluna} ]: Índices com caracteres especiais {indices}")
    else:
        print("\nNenhuma coluna possui caracteres especiais.")

print('A função foi criada com sucesso.')

A função foi criada com sucesso.


<br>

### Transformando dados carregados em RDD para dataframe do Pandas (apenas para Análise Inicial)

- Vamos realizar análise exploratória através da função acima. RDDs são ótimos para processamento, mas ruins para exploração, então converteremos o RDD para DataFrame Spark e então para DataFrame Pandas (**não é possível converter diretamente objeto RDD para objeto Pandas**).

In [7]:
# Definindo as colunas manualmente
colunas = ["label", "message"]

# Dividindo as linhas corretamente usando a vírgula como delimitador e criando Rows
dados_formatados = spamRDD.map(lambda linha: linha.split(",", 1)).map(lambda x: Row(label=x[0], message=x[1]))

# Criando o DataFrame do PySpark com as colunas definidas manualmente
df_spark = spSession.createDataFrame(dados_formatados, colunas)

# Verificar o tipo do objeto
print(type(df_spark), '\n')

# Converte DataFrame Spark para DataFrame Pandas
df_pandas = df_spark.toPandas()

# Visualizando as primeiras linhas do DataFrame Pandas
display(df_pandas.head())

<class 'pyspark.sql.dataframe.DataFrame'> 



Unnamed: 0,label,message
0,ham,"Go until jurong point, crazy.. Available only ..."
1,ham,"Ok lar... Joking wif u oni...,,,,,,,,,,"
2,ham,U dun say so early hor... U c already then say...
3,ham,"Nah I don't think he goes to usf, he lives aro..."
4,ham,Even my brother is not like to speak with me. ...


<br>

### Visualizando Função para Análise Inicial

In [8]:
funcao_analise_inicial(df_pandas)



 INFO 


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 2 columns):
 #   Column   Non-Null Count  Dtype 
---  ------   --------------  ----- 
 0   label    1000 non-null   object
 1   message  1000 non-null   object
dtypes: object(2)
memory usage: 15.8+ KB


 ------------------------------------------------------------------------------------------ 




Existem valores ausentes: False

Nenhuma variável possui valores ausentes.


Existem valores duplicados: True

Número de Linhas Duplicadas: 53

Porcentagem de Linhas Duplicadas: 5.30%


Existem caracteres especiais nas colunas: True

Colunas com caracteres especiais e os índices:

 Coluna [ message ]: Índices com caracteres especiais [5, 6, 8, 9, 12, 15, 16, 19, 20, 21, 23, 25, 30, 32, 33, 34, 37, 41, 44, 48, 51, 53, 57, 58, 62, 63, 66, 71, 75, 76, 77, 78, 79, 81, 84, 86, 87, 92, 94, 97, 98, 99, 101, 103, 107, 108, 112, 118, 119, 121, 123, 128, 130, 132, 134, 137, 139, 142, 146, 147, 149, 

### Resumo

- Necessário tratamento para conversão para tipo numérico das duas colunas.

<br> <br> <br>

# <span style="color: green; font-size: 38px; font-weight: bold;">Transformação dos Dados</span>

<br>

Nesta etapa trataremos da conversão para tipo de numérico da primeira coluna.

<br>

## Tratando Primeira Coluna

- Será criado uma função para a conversão da primeira coluna para o tipo numérico

<br>

#### Criando e Aplicando a Função (diretamente no objeto RDD)

In [9]:
# Função de transformação
def TransformToVector(inputStr):
    
    # Separa as colunas
    attList = inputStr.split(",")
    
    # Ajusta o label (target)
    smsType = 0.0 if attList[0] == "ham" else 1.0
    
    return [smsType, attList[1]]

# Aplica a função
spamRDD2 = spamRDD.map(TransformToVector)

<br><br><br>

# <span style="color: green; font-size: 38px; font-weight: bold;"> Análise Exploratória</span>

<br>

- Realizando uma nova etapa de <i>Análise Exploratória</i> agora com os dados já <i>tratados</i>.

<br>

#### Convertendo Para Dataframe do Pyspark

In [10]:
# Converte o RDD em DataFrame
spamDF = spSession.createDataFrame(spamRDD2, ["label", "message"])
print(type(spamDF))

# Exibir a estrutura do DataFrame para confirmação
print(spamDF.printSchema())

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- label: double (nullable = true)
 |-- message: string (nullable = true)

None


In [11]:
# Verificando primeiras linhas
spamDF.show(5)

+-----+--------------------+
|label|             message|
+-----+--------------------+
|  0.0|Go until jurong p...|
|  0.0|Ok lar... Joking ...|
|  0.0|U dun say so earl...|
|  0.0|Nah I don't think...|
|  0.0|Even my brother i...|
+-----+--------------------+
only showing top 5 rows



<br><br><br><br>

# Pré-Processamento de Dados Para Construção de Modelos de Machine Learning

- O objeto já foi convertido para dataframe do pyspark na etapa anterior (spamDF).

<br><br>

## Processamento de Linguagem Natural

Aqui nós criamos os objetos que permitirão transformar os dados de texto em uma matriz numérica.

<br>

In [12]:
# Cria o Tokenizador 
tokenizador = Tokenizer(inputCol = "message", outputCol = "words")
print(type(tokenizador), '\n')

# Aplica o TF (Term Frequency) para extrair a frequência de cada termo nas linhas de texto
term_frequency = HashingTF(inputCol = tokenizador.getOutputCol(), outputCol = "tempfeatures")
print(type(term_frequency), '\n')

# Aplica o IDF (Inverse Document Frequency) para calcular a frequência inversa dos documentos
inverse_tf = IDF(inputCol = term_frequency.getOutputCol(), outputCol = "features")
print(type(inverse_tf), '\n')

<class 'pyspark.ml.feature.Tokenizer'> 

<class 'pyspark.ml.feature.HashingTF'> 

<class 'pyspark.ml.feature.IDF'> 



<br>

## Dividindo os dados em Dados de Treino e Dados de Teste
- Nós **treinamos** o modelo com **dados de treino** e **avaliamos** o modelo com **dados de teste**.

<br>

In [13]:
# Dados de Treino e de Teste
(dados_treino, dados_teste) = spamDF.randomSplit([0.7, 0.3])

print(dados_treino.count())
print(dados_teste.count())

692
308


<br><br><br><br><br>

<span style="color: green; font-size: 40px; font-weight: bold;">Construindo Modelos de Machine Learning</span>

<br>

<br><br><br>

## Criando Dataframe para salvar métricas de cada Modelo

In [14]:
# Cria um dataframe para receber as métricas de cada modelo
df_modelos = pd.DataFrame()

<br><br><br>

# <span style="color: green; font-weight: bold;">Modelo 1 com Regressão Logística</span>

<br>

> # Versão 1

- Sem Ajuste de Hiperparâmetros

<br>

### Criação do Pipeline e Criação, Treinamento, Previsão e Avaliação do Modelo

In [15]:
from pyspark.ml.classification import LogisticRegression

# Criação do modelo
lrClassifier = LogisticRegression(featuresCol="features", labelCol="label")

# Criação do Pipeline
pipeline_lr = Pipeline(stages=[tokenizador, term_frequency, inverse_tf, lrClassifier])

# Treinamento do modelo com o Pipeline
modelo_lr = pipeline_lr.fit(dados_treino)

# Previsões nos dados de teste
previsoes_lr = modelo_lr.transform(dados_teste)

# Avaliando as métricas
avaliador_acuracia_lr = MulticlassClassificationEvaluator(predictionCol="prediction", 
                                                          labelCol="label", 
                                                          metricName="accuracy")

avaliador_precisao_lr = MulticlassClassificationEvaluator(predictionCol="prediction", 
                                                          labelCol="label", 
                                                          metricName="weightedPrecision")

avaliador_recall_lr = MulticlassClassificationEvaluator(predictionCol="prediction", 
                                                        labelCol="label", 
                                                        metricName="weightedRecall")

avaliador_f1_lr = MulticlassClassificationEvaluator(predictionCol="prediction", 
                                                    labelCol="label", 
                                                    metricName="f1")

# Calculando as métricas
acuracia_lr = avaliador_acuracia_lr.evaluate(previsoes_lr)
precisao_lr = avaliador_precisao_lr.evaluate(previsoes_lr)
recall_lr = avaliador_recall_lr.evaluate(previsoes_lr)
f1_score_lr = avaliador_f1_lr.evaluate(previsoes_lr)

# Salvando as métricas no DataFrame
modelo_lr = pd.DataFrame({
    'Modelo': ['LogisticRegression'],
    'Versão': ['1'],
    'Tipo de Modelo': ['Sem Ajuste de Hiperparâmetros'],
    'Acurácia': [acuracia_lr],
    'Precisão': [precisao_lr],
    'Recall': [recall_lr],
    'F1-Score': [f1_score_lr]
})

# Concatenando com o DataFrame existente
df_modelos = pd.concat([df_modelos, modelo_lr], ignore_index=True)

# Visualizando o DataFrame com as métricas
display(df_modelos)

Unnamed: 0,Modelo,Versão,Tipo de Modelo,Acurácia,Precisão,Recall,F1-Score
0,LogisticRegression,1,Sem Ajuste de Hiperparâmetros,0.883117,0.895346,0.883117,0.881567


<br><br><br>

# <span style="color: green; font-weight: bold;">Modelo 2 com NaiveBayes</span>

<br>

> # Versão 1

- Sem Ajuste de Hiperparâmetros

<br>

### Criação do Pipeline e Criação, Treinamento, Previsão e Avaliação do Modelo

In [16]:
# Criação do modelo
nbClassifier = NaiveBayes()

# Criação do Pipeline
pipeline = Pipeline(stages=[tokenizador, term_frequency, inverse_tf, nbClassifier])

# Treinamento do modelo com o Pipeline
modelo = pipeline.fit(dados_treino)

# Previsões nos dados de teste
previsoes = modelo.transform(dados_teste)

# Avaliando as métricas
avaliador_acuracia = MulticlassClassificationEvaluator(predictionCol="prediction", 
                                                       labelCol="label", 
                                                       metricName="accuracy")

avaliador_precisao = MulticlassClassificationEvaluator(predictionCol="prediction", 
                                                       labelCol="label", 
                                                       metricName="weightedPrecision")

avaliador_recall = MulticlassClassificationEvaluator(predictionCol="prediction", 
                                                     labelCol="label", 
                                                     metricName="weightedRecall")

avaliador_f1 = MulticlassClassificationEvaluator(predictionCol="prediction", 
                                                 labelCol="label", 
                                                 metricName="f1")

# Calculando as métricas
acuracia = avaliador_acuracia.evaluate(previsoes)
precisao = avaliador_precisao.evaluate(previsoes)
recall = avaliador_recall.evaluate(previsoes)
f1_score = avaliador_f1.evaluate(previsoes)

# Salvando as métricas no DataFrame
modelo_nb = pd.DataFrame({
    'Modelo': ['NaiveBayes'],
    'Versão': ['1'],
    'Tipo de Modelo': ['Sem Ajuste de Hiperparâmetros'],
    'Acurácia': [acuracia],
    'Precisão': [precisao],
    'Recall': [recall],
    'F1-Score': [f1_score]
})

# Concatenando com o DataFrame existente
df_modelos = pd.concat([df_modelos, modelo_nb], ignore_index=True)

# Visualizando o DataFrame com as métricas
display(df_modelos)

Unnamed: 0,Modelo,Versão,Tipo de Modelo,Acurácia,Precisão,Recall,F1-Score
0,LogisticRegression,1,Sem Ajuste de Hiperparâmetros,0.883117,0.895346,0.883117,0.881567
1,NaiveBayes,1,Sem Ajuste de Hiperparâmetros,0.905844,0.909153,0.905844,0.905883


<br><br><br>

# Fim!