In [1]:
from sklearn.feature_extraction.text import CountVectorizer
import pandas as pd
from sklearn.preprocessing import OneHotEncoder

In [2]:
# Esta clase facilita el preprocesamiento de correos electrónicos que poseen código HTML
from html.parser import HTMLParser

class MLStripper(HTMLParser):
    def __init__(self):
        self.reset()
        self.strict = False
        self.convert_charrefs = True
        self.fed = []

    def handle_data(self, d):
        self.fed.append(d)

    def get_data(self):
        return ''.join(self.fed)

In [3]:
# Esta función se encarga de elimar los tags HTML que se encuentren en el texto del correo electrónico
def strip_tags(html):
    s = MLStripper()
    s.feed(html)
    return s.get_data()

In [4]:
import email
import string
import nltk

class Parser:

    def __init__(self):
        self.stemmer = nltk.PorterStemmer()
        self.stopwords = set(nltk.corpus.stopwords.words('english'))
        self.punctuation = list(string.punctuation)
        
    def parse(self, email_path):
        """Parse an email."""
        with open(email_path, errors='ignore') as e:
            msg = email.message_from_file(e)
        return None if not msg else self.get_email_content(msg)

    def get_email_content(self, msg):
        """Extract the email content."""
        subject = self.tokenize(msg['Subject']) if msg['Subject'] else []
        body = self.get_email_body(msg.get_payload(),
                                   msg.get_content_type())
        content_type = msg.get_content_type()
        # Returning the content of the email
        return {"subject": subject,
                "body": body,
                "content_type": content_type}
                
    def get_email_body(self, payload, content_type):
        """Extract the body of the email."""
        body = []
        if type(payload) is str and content_type == 'text/plain':
            return self.tokenize(payload)
        elif type(payload) is str and content_type == 'text/html':
            return self.tokenize(strip_tags(payload))
        elif type(payload) is list:
            for p in payload:
                body += self.get_email_body(p.get_payload(),
                                            p.get_content_type())
        return body
        
    def tokenize(self, text):
        """Transform a text string in tokens. Perform two main actions,
        clean the punctuation symbols and do stemming of the text."""
        for c in self.punctuation:
            text = text.replace(c, "")
        text = text.replace("\t", " ")
        text = text.replace("\n", " ")
        tokens = list(filter(None, text.split(" ")))
        # Stemming of the tokens
        return [self.stemmer.stem(w) for w in tokens if w not in self.stopwords]


In [5]:
parse = Parser()

In [6]:
import os

DATASET_PATH = "datasets/trec07p"

def parse_index(path_to_index, n_elements):
    ret_indexes = []
    index = open(path_to_index).readlines()
    for i in range(n_elements):
        mail = index[i].split(" ../")
        label = mail[0]
        path = mail[1][:-1]
        ret_indexes.append({"label":label, "email_path":os.path.join(DATASET_PATH, path)})
    return ret_indexes

In [7]:
def parse_email(index):
    p = Parser()
    email_path = "../" + index["email_path"]  # Ajusta la construcción de la ruta
    pmail = p.parse(email_path)
    return pmail, index["label"]

In [8]:
def create_prep_dataset(index_path, n_elements):
    X = []
    y = []
    indexes = parse_index(index_path, n_elements)
    for i in range(n_elements):
        print("\rParsing email: {0}".format(i+1), end='')
        mail, label = parse_email(indexes[i])
        X.append(" ".join(mail['subject']) + " ".join(mail['body']))
        y.append(label)
    return X, y

In [9]:
X_train, y_train = create_prep_dataset("../datasets/trec07p/full/index", 10000)

Parsing email: 10000

In [10]:
vectorizer = CountVectorizer()

In [11]:
X_train = vectorizer.fit_transform(X_train)

In [12]:
df = pd.DataFrame(X_train.toarray(),columns=vectorizer.get_feature_names_out())

Prediccion:

		clf = LogisticRegression()

		clf.fit(X_train,y_train) -> entrena a travez de un conjunto preprocesado (X_train,y_train)

		X_test = vectorizer.transform(X_test) -> (NO USAR FIT), agrega los X_test, al vector entrenado
		
		y_pred = clf.predict(X_test) -> PREDICE a travez de X_test

In [13]:
from sklearn.linear_model import LogisticRegression

In [14]:
model = LogisticRegression()

In [15]:
model.fit(X_train,y_train)

STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(


In [27]:
from joblib import dump,load
dump(X_train,'modelo_entrenado.joblib')
modelo_entrenado = load("modelo_entrenado.joblib")

['modelo_entrenado.joblib']

In [16]:
X_test, y_test = create_prep_dataset("../datasets/trec07p/full/index", 12000)

Parsing email: 12000

In [17]:
X_test = X_test[10000:]

In [18]:
X_test = vectorizer.transform(X_test)

In [19]:
y_predict = model.predict(X_test)

In [20]:
y_test = y_test[10000:12000]

In [21]:
def coincidencia(list1,list2):
    contador = 0
    for i in range(len(list1)):
        if list1[i] == list2[i]:
            contador += 1
    return f"La efectividad es del {(contador*100)/50} %"


MOSTRAR EFECTIVIDAD DEL MACHINE LEARNING

In [22]:
from sklearn.metrics import accuracy_score

In [23]:
print('Accuracy: {:.3f}'.format(accuracy_score(y_test, y_predict)))

Accuracy: 0.987
