# Classificação de Spam em E-mails

## Realiza a busca dos dados

In [1]:
import os
import tarfile
import urllib.request


DOWNLOAD_ROOT = "http://spamassassin.apache.org/old/publiccorpus/"
HAM_URL = DOWNLOAD_ROOT + "20030228_easy_ham.tar.bz2"
SPAM_URL = DOWNLOAD_ROOT + "20030228_spam.tar.bz2"
SPAM_PATH = os.path.join("datasets", "spam")

def fetch_spam_data(ham_url=HAM_URL, spam_url=SPAM_URL, spam_path=SPAM_PATH):
    # Verifica se o diretório para os dados do spam existe, caso contrário, cria-o
    if not os.path.isdir(spam_path):
        os.makedirs(spam_path)

    # Para cada arquivo (ham e spam), faz o download se ainda não estiver presente no diretório
    for filename, url in (("ham.tar.bz2", ham_url), ("spam.tar.bz2", spam_url)):
        path = os.path.join(spam_path, filename)
        if not os.path.isfile(path):
            urllib.request.urlretrieve(url, path)

        # Extrai o conteúdo do arquivo tar.bz2 para o diretório spam_path
        tar_bz2_file = tarfile.open(path)
        tar_bz2_file.extractall(path=spam_path)
        tar_bz2_file.close()


In [2]:
fetch_spam_data()

## Carrega todos os E-mails:

In [3]:
HAM_DIR = os.path.join(SPAM_PATH, "easy_ham")
SPAM_DIR = os.path.join(SPAM_PATH, "spam")

# Obtém os nomes de arquivos do diretório 'easy_ham' que possuem mais de 20 caracteres
ham_filenames = [name for name in sorted(os.listdir(HAM_DIR)) if len(name) > 20]

# Obtém os nomes de arquivos do diretório 'spam' que possuem mais de 20 caracteres
spam_filenames = [name for name in sorted(os.listdir(SPAM_DIR)) if len(name) > 20]


## Fazendo uma análise dos emails:

In [4]:
import email
import email.policy

def load_email(is_spam, filename, spam_path=SPAM_PATH):
    # Determina o diretório com base no parâmetro is_spam (True para spam, False para ham)
    directory = "spam" if is_spam else "easy_ham"

    # Abre o arquivo de e-mail no modo de leitura binária
    with open(os.path.join(spam_path, directory, filename), "rb") as f:
        # Faz o parsing do conteúdo do arquivo usando o BytesParser e a política de email padrão
        return email.parser.BytesParser(policy=email.policy.default).parse(f)


In [5]:
# Carrega os e-mails de ham (não spam) utilizando a função load_email() para cada nome de arquivo em ham_filenames
ham_emails = [load_email(is_spam=False, filename=name) for name in ham_filenames]

# Carrega os e-mails de spam utilizando a função load_email() para cada nome de arquivo em spam_filenames
spam_emails = [load_email(is_spam=True, filename=name) for name in spam_filenames]

Aqui vemos um exemplo de E-mail normal:


In [6]:
print(ham_emails[1].get_content().strip())

Martin A posted:
Tassos Papadopoulos, the Greek sculptor behind the plan, judged that the
 limestone of Mount Kerdylio, 70 miles east of Salonika and not far from the
 Mount Athos monastic community, was ideal for the patriotic sculpture. 
 
 As well as Alexander's granite features, 240 ft high and 170 ft wide, a
 museum, a restored amphitheatre and car park for admiring crowds are
planned
---------------------
So is this mountain limestone or granite?
If it's limestone, it'll weather pretty fast.

------------------------ Yahoo! Groups Sponsor ---------------------~-->
4 DVDs Free +s&p Join Now
http://us.click.yahoo.com/pt6YBB/NXiEAA/mG3HAA/7gSolB/TM
---------------------------------------------------------------------~->

To unsubscribe from this group, send an email to:
forteana-unsubscribe@egroups.com

 

Your use of Yahoo! Groups is subject to http://docs.yahoo.com/info/terms/


E aqui temos um exemplo de SPAM:

In [7]:
print(spam_emails[1].get_content().strip())

1) Fight The Risk of Cancer!
http://www.adclick.ws/p.cfm?o=315&s=pk007

2) Slim Down - Guaranteed to lose 10-12 lbs in 30 days
http://www.adclick.ws/p.cfm?o=249&s=pk007

3) Get the Child Support You Deserve - Free Legal Advice
http://www.adclick.ws/p.cfm?o=245&s=pk002

4) Join the Web's Fastest Growing Singles Community
http://www.adclick.ws/p.cfm?o=259&s=pk007

5) Start Your Private Photo Album Online!
http://www.adclick.ws/p.cfm?o=283&s=pk007

Have a Wonderful Day,
Offer Manager
PrizeMama













If you wish to leave this list please use the link below.
http://www.qves.com/trim/?ilug@linux.ie%7C17%7C114258


-- 
Irish Linux Users' Group: ilug@linux.ie
http://www.linux.ie/mailman/listinfo/ilug for (un)subscription information.
List maintainer: listmaster@linux.ie


## Tratamento de e-mails que possuem mais de uma parte

In [8]:
def get_email_structure(email):
    # Verifica se o parâmetro é uma string e a retorna como está
    if isinstance(email, str):
        return email

    # Obtém o payload do e-mail
    payload = email.get_payload()

    # Verifica se o payload é uma lista e retorna a estrutura "multipart" com a lista de sub-e-mails
    if isinstance(payload, list):
        return "multipart({})".format(", ".join([
            get_email_structure(sub_email)
            for sub_email in payload
        ]))

    # Caso contrário, retorna o tipo de conteúdo do e-mail
    else:
        return email.get_content_type()


In [9]:
from collections import Counter

def structures_counter(emails):
    # Cria um objeto Counter para contar as estruturas dos e-mails
    structures = Counter()

    # Itera sobre cada e-mail na lista de e-mails
    for email in emails:
        # Obtém a estrutura do e-mail usando a função get_email_structure()
        structure = get_email_structure(email)

        # Incrementa o contador da estrutura atual
        structures[structure] += 1

    # Retorna o objeto Counter com as contagens das estruturas dos e-mails
    return structures


## Dividindo os dados em conjuto de trino e conjunto de teste:

In [10]:
import numpy as np
from sklearn.model_selection import train_test_split

# Concatena os arrays de e-mails de ham e spam em um único array X
X = np.array(ham_emails + spam_emails, dtype=object)

# Cria um array y com rótulos, sendo 0 para ham_emails e 1 para spam_emails
y = np.array([0] * len(ham_emails) + [1] * len(spam_emails))

# Divide os dados em conjuntos de treinamento e teste usando train_test_split()
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)


## Aqui nos tratamos nossos dados, transformando ele em algo mais facil de trabalhar modificando o HTML dele.

In [11]:
import re
from html import unescape

def html_to_plain_text(html):
    # Remove a seção <head> do HTML
    text = re.sub('<head.*?>.*?</head>', '', html, flags=re.M | re.S | re.I)

    # Substitui as tags <a> por " HYPERLINK "
    text = re.sub('<a\s.*?>', ' HYPERLINK ', text, flags=re.M | re.S | re.I)

    # Remove todas as outras tags HTML
    text = re.sub('<.*?>', '', text, flags=re.M | re.S)

    # Substitui múltiplas quebras de linha por apenas uma
    text = re.sub(r'(\s*\n)+', '\n', text, flags=re.M | re.S)

    # Converte sequências de escape HTML para caracteres correspondentes
    return unescape(text)


Para ver se funcionou vamos analizar um SPAM html:

In [12]:
html_spam_emails = [email for email in X_train[y_train==1]
                    if get_email_structure(email) == "text/html"]
sample_html_spam = html_spam_emails[7]
print(sample_html_spam.get_content().strip()[:1000], "...")

<HTML><HEAD><TITLE></TITLE><META http-equiv="Content-Type" content="text/html; charset=windows-1252"><STYLE>A:link {TEX-DECORATION: none}A:active {TEXT-DECORATION: none}A:visited {TEXT-DECORATION: none}A:hover {COLOR: #0033ff; TEXT-DECORATION: underline}</STYLE><META content="MSHTML 6.00.2713.1100" name="GENERATOR"></HEAD>
<BODY text="#000000" vLink="#0033ff" link="#0033ff" bgColor="#CCCC99"><TABLE borderColor="#660000" cellSpacing="0" cellPadding="0" border="0" width="100%"><TR><TD bgColor="#CCCC99" valign="top" colspan="2" height="27">
<font size="6" face="Arial, Helvetica, sans-serif" color="#660000">
<b>OTC</b></font></TD></TR><TR><TD height="2" bgcolor="#6a694f">
<font size="5" face="Times New Roman, Times, serif" color="#FFFFFF">
<b>&nbsp;Newsletter</b></font></TD><TD height="2" bgcolor="#6a694f"><div align="right"><font color="#FFFFFF">
<b>Discover Tomorrow's Winners&nbsp;</b></font></div></TD></TR><TR><TD height="25" colspan="2" bgcolor="#CCCC99"><table width="100%" border="0" 

Agora um exemplo da nossa função criada acima:

In [13]:
print(html_to_plain_text(sample_html_spam.get_content())[:1000], "...")


OTC
 Newsletter
Discover Tomorrow's Winners 
For Immediate Release
Cal-Bay (Stock Symbol: CBYI)
Watch for analyst "Strong Buy Recommendations" and several advisory newsletters picking CBYI.  CBYI has filed to be traded on the OTCBB, share prices historically INCREASE when companies get listed on this larger trading exchange. CBYI is trading around 25 cents and should skyrocket to $2.66 - $3.25 a share in the near future.
Put CBYI on your watch list, acquire a position TODAY.
REASONS TO INVEST IN CBYI
A profitable company and is on track to beat ALL earnings estimates!
One of the FASTEST growing distributors in environmental & safety equipment instruments.
Excellent management team, several EXCLUSIVE contracts.  IMPRESSIVE client list including the U.S. Air Force, Anheuser-Busch, Chevron Refining and Mitsubishi Heavy Industries, GE-Energy & Environmental Research.
RAPIDLY GROWING INDUSTRY
Industry revenues exceed $900 million, estimates indicate that there could be as much as $25 billi

## Função para receber um E-mail e devolver um texto simples independento do seu formato de entrada:

In [14]:
def email_to_text(email):
    html = None

    # Percorre as partes do e-mail usando o método walk()
    for part in email.walk():
        ctype = part.get_content_type()

        # Verifica se a parte é do tipo "text/plain" ou "text/html"
        if not ctype in ("text/plain", "text/html"):
            continue

        try:
            content = part.get_content()
        except: # Em caso de problemas de codificação
            content = str(part.get_payload())

        # Se a parte for "text/plain", retorna o conteúdo diretamente
        if ctype == "text/plain":
            return content
        else:
            html = content

    # Se houver conteúdo HTML, chama a função html_to_plain_text() para convertê-lo em texto simples
    if html:
        return html_to_plain_text(html)


## Fazendo um tratamento simples de linguagem natural:

In [15]:
try:
    import nltk

    # Importa o stemmer do NLTK
    stemmer = nltk.PorterStemmer()

    # Itera sobre as palavras de exemplo e aplica o stemming
    for word in ("Computations", "Computation", "Computing", "Computed", "Compute", "Compulsive"):
        print(word, "=>", stemmer.stem(word))

except ImportError:
    print("Error: stemming requires the NLTK module.")

    # Define o stemmer como None para indicar que não está disponível
    stemmer = None


Computations => comput
Computation => comput
Computing => comput
Computed => comput
Compute => comput
Compulsive => compuls


## extraindo a URL usando a biclioteca urlextract:

In [16]:
%pip install -q -U urlextract

try:
    import urlextract  # Pode requerer uma conexão com a internet para baixar os nomes de domínio raiz
    url_extractor = urlextract.URLExtract()

    print(url_extractor.find_urls("Will it detect github.com and https://youtu.be/7Pq-S557XQU?t=3m32s"))
except ImportError:
    print("Error: replacing URLs requires the urlextract module.")
    url_extractor = None

['github.com', 'https://youtu.be/7Pq-S557XQU?t=3m32s']


## Usando as funções construidas anteriormente vamos agora construir um contador de palavras, para desevolvelo faremos da seguinte forma:

In [17]:
from sklearn.base import BaseEstimator, TransformerMixin

class EmailToWordCounterTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, strip_headers=True, lower_case=True, remove_punctuation=True,
                 replace_urls=True, replace_numbers=True, stemming=True):
        # Inicializa o transformador com as opções de pré-processamento dos e-mails
        self.strip_headers = strip_headers
        self.lower_case = lower_case
        self.remove_punctuation = remove_punctuation
        self.replace_urls = replace_urls
        self.replace_numbers = replace_numbers
        self.stemming = stemming

    def fit(self, X, y=None):
        # Método necessário para compatibilidade com o pipeline do scikit-learn
        return self

    def transform(self, X, y=None):
        X_transformed = []
        for email in X:
            text = email_to_text(email) or ""  # Obtém o texto do e-mail
            if self.lower_case:
                text = text.lower()  # Converte o texto para minúsculas
            if self.replace_urls and url_extractor is not None:
                urls = list(set(url_extractor.find_urls(text)))  # Encontra URLs no texto
                urls.sort(key=lambda url: len(url), reverse=True)  # Ordena as URLs por comprimento decrescente
                for url in urls:
                    text = text.replace(url, " URL ")  # Substitui as URLs por "URL"
            if self.replace_numbers:
                text = re.sub(r'\d+(?:\.\d*)?(?:[eE][+-]?\d+)?', 'NUMBER', text)  # Substitui números por "NUMBER"
            if self.remove_punctuation:
                text = re.sub(r'\W+', ' ', text, flags=re.M)  # Remove pontuação substituindo por espaços
            word_counts = Counter(text.split())  # Conta as palavras no texto
            if self.stemming and stemmer is not None:
                stemmed_word_counts = Counter()
                for word, count in word_counts.items():
                    stemmed_word = stemmer.stem(word)  # Realiza stemming nas palavras
                    stemmed_word_counts[stemmed_word] += count
                word_counts = stemmed_word_counts  # Substitui as contagens de palavras pelas contagens de palavras stemming
            X_transformed.append(word_counts)  # Adiciona as contagens de palavras transformadas à lista
        return np.array(X_transformed)  # Retorna as contagens de palavras como uma matriz numpy


Vamos terstar para ter certeza que está tudo funcionando da forma que deveria:

In [18]:
X_few = X_train[:3]
X_few_wordcounts = EmailToWordCounterTransformer().fit_transform(X_few)
X_few_wordcounts

array([Counter({'chuck': 1, 'murcko': 1, 'wrote': 1, 'stuff': 1, 'yawn': 1, 'r': 1}),
       Counter({'the': 11, 'of': 9, 'and': 8, 'all': 3, 'christian': 3, 'to': 3, 'by': 3, 'jefferson': 2, 'i': 2, 'have': 2, 'superstit': 2, 'one': 2, 'on': 2, 'been': 2, 'ha': 2, 'half': 2, 'rogueri': 2, 'teach': 2, 'jesu': 2, 'some': 1, 'interest': 1, 'quot': 1, 'url': 1, 'thoma': 1, 'examin': 1, 'known': 1, 'word': 1, 'do': 1, 'not': 1, 'find': 1, 'in': 1, 'our': 1, 'particular': 1, 'redeem': 1, 'featur': 1, 'they': 1, 'are': 1, 'alik': 1, 'found': 1, 'fabl': 1, 'mytholog': 1, 'million': 1, 'innoc': 1, 'men': 1, 'women': 1, 'children': 1, 'sinc': 1, 'introduct': 1, 'burnt': 1, 'tortur': 1, 'fine': 1, 'imprison': 1, 'what': 1, 'effect': 1, 'thi': 1, 'coercion': 1, 'make': 1, 'world': 1, 'fool': 1, 'other': 1, 'hypocrit': 1, 'support': 1, 'error': 1, 'over': 1, 'earth': 1, 'six': 1, 'histor': 1, 'american': 1, 'john': 1, 'e': 1, 'remsburg': 1, 'letter': 1, 'william': 1, 'short': 1, 'again': 1, 'becom

## Agora que temos a contagem das palavras vamos criar uma função que as transformem em vetores:

In [19]:
from scipy.sparse import csr_matrix

class WordCounterToVectorTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, vocabulary_size=1000):
        # Inicializa o transformador com o tamanho do vocabulário desejado
        self.vocabulary_size = vocabulary_size

    def fit(self, X, y=None):
        total_count = Counter()
        for word_count in X:
            for word, count in word_count.items():
                total_count[word] += min(count, 10)

        # Seleciona as palavras mais comuns com base no total_count
        most_common = total_count.most_common()[:self.vocabulary_size]

        # Cria o vocabulário atribuindo um índice único a cada palavra
        self.vocabulary_ = {word: index + 1 for index, (word, count) in enumerate(most_common)}

        return self

    def transform(self, X, y=None):
        rows = []
        cols = []
        data = []
        for row, word_count in enumerate(X):
            for word, count in word_count.items():
                rows.append(row)
                cols.append(self.vocabulary_.get(word, 0))
                data.append(count)

        # Cria uma matriz esparsa no formato CSR usando os índices do vocabulário como colunas
        return csr_matrix((data, (rows, cols)), shape=(len(X), self.vocabulary_size + 1))


In [20]:
vocab_transformer = WordCounterToVectorTransformer(vocabulary_size=10)
X_few_vectors = vocab_transformer.fit_transform(X_few_wordcounts)
X_few_vectors

<3x11 sparse matrix of type '<class 'numpy.int64'>'
	with 20 stored elements in Compressed Sparse Row format>

O que significa esta matriz? Bem, o 99 na segunda linha, primeira coluna, significa que o segundo e-mail contém 99 palavras que não fazem parte do vocabulário. O 11 ao lado significa que a primeira palavra do vocabulário está presente 11 vezes neste e-mail. O 9 ao lado significa que a segunda palavra está presente 9 vezes e assim por diante. Você pode consultar o vocabulário para saber de quais palavras estamos falando. A primeira palavra é "the", a segunda palavra é "of", etc.

In [21]:
X_few_vectors.toarray()

array([[ 6,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0],
       [99, 11,  9,  8,  3,  1,  3,  1,  3,  2,  3],
       [67,  0,  1,  2,  3,  4,  1,  2,  0,  1,  0]])

In [22]:
vocab_transformer.vocabulary_

{'the': 1,
 'of': 2,
 'and': 3,
 'to': 4,
 'url': 5,
 'all': 6,
 'in': 7,
 'christian': 8,
 'on': 9,
 'by': 10}

# Agora nos estamos prontos para treinar nosso primeiro classificador SPAM, para isso vamos usar toda nossa base de dados:

In [23]:
from sklearn.pipeline import Pipeline

# Criação da pipeline de pré-processamento
preprocess_pipeline = Pipeline([
    ("email_to_wordcount", EmailToWordCounterTransformer()),
    ("wordcount_to_vector", WordCounterToVectorTransformer()),
])

# Aplica a pipeline de pré-processamento aos dados de treinamento
X_train_transformed = preprocess_pipeline.fit_transform(X_train)


## Como é possivel observar no nosso primeiro teste conseguimos uma precisão de 98%, Apesar de ser um conjunto de trino simple é um bom resultado:

In [24]:
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score

log_clf = LogisticRegression(solver="lbfgs", max_iter=1000, random_state=42)
score = cross_val_score(log_clf, X_train_transformed, y_train, cv=3, verbose=3)
score.mean()

[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.


[CV] END ................................ score: (test=0.981) total time=   0.2s


[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:    0.2s remaining:    0.0s


[CV] END ................................ score: (test=0.984) total time=   0.2s


[Parallel(n_jobs=1)]: Done   2 out of   2 | elapsed:    0.5s remaining:    0.0s


[CV] END ................................ score: (test=0.990) total time=   0.3s


[Parallel(n_jobs=1)]: Done   3 out of   3 | elapsed:    0.8s finished


0.985

## Imprimindo a precisão e o Recall do nosso teste:

In [None]:
from sklearn.metrics import precision_score, recall_score

X_test_transformed = preprocess_pipeline.transform(X_test)

log_clf = LogisticRegression(solver="lbfgs", max_iter=1000, random_state=42)
log_clf.fit(X_train_transformed, y_train)

y_pred = log_clf.predict(X_test_transformed)

print("Precisão: {:.2f}%".format(100 * precision_score(y_test, y_pred)))
print("Recall: {:.2f}%".format(100 * recall_score(y_test, y_pred)))

# Todos os créditos do código e da explicação são direcionados a:
 **Livro:** Géron, Aurélien. Hands-On Machine Learning with Scikit-Learn & TensorFlow.  United States of America: O’Reilly, 2017.

**Repositório do github com o código:** Géron, Aurélien. handson-ml2. GitHub, 2023. Disponível em: https://github.com/ageron/handson-ml2/blob/master/03_classification.ipynb Acesso em: 13 de janeiro de 2023.
