# FERNANDO LEON FRANCO | PRACTICA MODELO DE LENGUAJE PROBABILISTA

In [1]:
import os
import re
import xml.etree.ElementTree as ET

import nltk
from bs4 import BeautifulSoup
from nltk.corpus import stopwords
from nltk.tokenize import TweetTokenizer

In [2]:
def print_bar(i, cantidad_registros, contexto="PROGRESO"):
    porcentaje = (i + 1) / cantidad_registros * 100
    # Con emojis
    barra = int(50 * (i + 1) / cantidad_registros) * "🟩"
    espacio = int(50 - len(barra)) * "⬛️"

    print(f"\r{contexto}: |{barra}{espacio}| {porcentaje:6.2f}%", end="", flush=True)

In [3]:

# ======================= Limpieza de texto =======================
def limpiar_texto(texto):
    
    texto = BeautifulSoup(texto, "html.parser").get_text()
    texto = texto.lower()
    texto = re.sub(r"http\S+|www\S+|https\S+", "", texto)
    texto = re.sub(r"@\w+", "", texto)
    texto = re.sub(r"#+", "", texto)
    stop_words = set(stopwords.words("spanish"))
    texto_limpio = [word for word in texto.split() if word not in stop_words]
    return " ".join(texto_limpio)


# ======================= Carga de datos =======================
def get_texts_from_folder(path_folder):
    tr_txt = []  # aquí van los documentos
    tr_y = []  # aquí van las etiquetas

    for file in os.listdir(path_folder):
        if file.endswith(".xml"):
            tree = ET.parse(os.path.join(path_folder, file))
            root = tree.getroot()
            docs = []
            for doc in root.iter("document"):
                texto_limpio = limpiar_texto(doc.text)
                docs.append(texto_limpio)
    
            tr_txt.append(" ".join(docs))

    truth_file = os.path.join(path_folder, "truth.txt")

    file_to_label = {}
    with open(truth_file, "r", encoding="utf-8") as f:
        for line in f:
            parts = line.strip().split(":::")
            # País
            
            # pais = parts[2]
            # file_to_label[parts[0]] = pais
            
            # Genero
            genero = parts[1]
            file_to_label[parts[0]] = genero



    for file in os.listdir(path_folder):
        if file.endswith(".xml"):
            file_id = file.split(".")[0]
            if file_id in file_to_label:
                tr_y.append(file_to_label[file_id])

        print_bar(len(tr_y), len(file_to_label), contexto="CARGA DE ETIQUETAS")

    return tr_txt, tr_y


In [4]:
# ======================= Carga de datos =======================
path_test = '/Users/ferleon/Github/semestre_v/procesamiento_lenguaje/data/author_profiling/es_test'
path_train = '/Users/ferleon/Github/semestre_v/procesamiento_lenguaje/data/author_profiling/es_train'
tr_txt_train, tr_y_train = get_texts_from_folder(path_train)
#tr_txt_test, tr_y_test = get_texts_from_folder(path_test)

print(f"\nTextos train: {len(tr_txt_train)}, Etiquetas train: {len(tr_y_train)}")
#print(f"Textos test: {len(tr_txt_test)}, Etiquetas test: {len(tr_y_test)}")


paises = sorted(list(set(tr_y_train)))
paises_numericas = {pais: idx for idx, pais in enumerate(paises)}




y_train = [paises_numericas[pais] for pais in tr_y_train]
#y_test = [paises_numericas[pais] for pais in tr_y_test]


If you meant to use Beautiful Soup to parse the web page found at a certain URL, then something has gone wrong. You should use an Python package like 'requests' to fetch the content behind the URL. Once you have the content as a string, you can feed that string into Beautiful Soup.



    
  texto = BeautifulSoup(texto, "html.parser").get_text()


CARGA DE ETIQUETAS: |🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩| 100.02%
Textos train: 4200, Etiquetas train: 4200


# PARTE 1: Procesamiento y tratamiento de los datos para un modelo de lenguaje probabilista
- Necesitamos contar las ocurrencias de trigramas o de bigramas en el conjunto de datos de entrenamiento.

In [5]:
class TrigramData:
    MODE = "TRIGRAM"
    def __init__(self, vocab_max, tokenizer):
        self.vocab_max = vocab_max
        self.tokenizer = tokenizer
        self.final_vocab = set()
        self.SOS = "<s>"
        self.EOS = "</s>"
        self.UNK = "<unk>"
        
    def fit(self, raw_text): # En row_text recibiré los tweets
        freq_dist = nltk.FreqDist()
        tokenized_corpus = []
        
        for txt in raw_text:
            tokens = self.tokenizer.tokenize(txt.lower())
            tokenized_corpus.append(tokens) # recordar que esta es una lista de listas de tweets tokenizados
            for w in tokens:
                freq_dist[w] += 1
                
        self.final_vocab = { token for token, _ in freq_dist.most_common(self.vocab_max) }
        self.final_vocab.update([self.SOS, self.EOS, self.UNK])
        
        transform_corpus = []
        for tokens in tokenized_corpus:
            transform_corpus.append(self.transform(tokens)) # tokens es un tweet tokenizado
        
        return transform_corpus
    
    
    def mask_out_of_vocab(self, word):
        if word in self.final_vocab:
            return word
        else:
            return self.UNK

    def add_sos_eos(self, tokenized_text):
        
        if self.MODE == "BIGRAM":
            return [self.SOS] + tokenized_text + [self.EOS]
        elif self.MODE == "TRIGRAM":
            return [self.SOS, self.SOS] + tokenized_text + [self.EOS]

    def transform(self, tokenized_text):
        transformed = [] # Tokens transformados
        for w in tokenized_text:
            transformed.append(self.mask_out_of_vocab(w)) # Mask  Out of Vocabulary Word
        transformed = self.add_sos_eos(transformed)

        return transformed

In [6]:
TOP_PALABRAS = 10_000
tokenizador = TweetTokenizer()

trigram_data = TrigramData(vocab_max=TOP_PALABRAS, tokenizer=tokenizador)

transformed_corpus = trigram_data.fit(tr_txt_train)

In [7]:
final_vocab = trigram_data.final_vocab
print(f"Tamaño del vocabulario final: {len(final_vocab):,}")

Tamaño del vocabulario final: 10,003


# BUILDING A TRIGRAM LANGUAGE MODEL

In [8]:
class TrigramLanguageModel:
    """ Modelo interpolado  unigramas + bigramas + trigramas """
    def __init__(self, lambda_1 = 0.4, lambda_2 = 0.3, lambda_3 = 0.3, vocab=None):
        # Las lambdas deben sumar 1 y son los pesos de cada modelo
        self.lambda_1 = lambda_1 # Trigramas
        self.lambda_2 = lambda_2 # Bigramas
        self.lambda_3 = lambda_3 # Unigramas
        
        # Contadores
        self.unigram_counts = {} # Los unigramas con las palabras solitas
        self.bigram_counts = {}  # Los bigramas subsecuencias de tamaño 2
        self.trigram_counts = {} # Los trigramas subsecuencias de tamaño 3

        self.vocab = vocab
        self.vocab_size = len(vocab) if vocab is not None else 0


    def train(self, transformed_corpus):
        for tokens in transformed_corpus: # primero recorro tweet por tweet
            for i, word in enumerate(tokens): # Luego para cada tweet le reccorro sus palabras
                
                # Unigramas
                self.unigram_counts[word] = self.unigram_counts.get(word, 0) + 1
                
                # Bigramas
                if i > 0: # Solo si ya vi un palabra antes, puedo formar un bigrama
                    bigrama = (tokens[i-1], word)
                    self.bigram_counts[bigrama] = self.bigram_counts.get(bigrama, 0) + 1
                
                # Trigramas
                if i > 1: # Solo si ya vi dos palabras antes, puedo formar un trigrama
                    trigrama = (tokens[i-2], tokens[i-1], word)
                    self.trigram_counts[trigrama] = self.trigram_counts.get(trigrama, 0) + 1
            
            self.total_tokens = sum(self.unigram_counts.values())
    
    
    def unigram_probability(self, word):
        numerador = self.unigram_counts[word]
        denominador = self.total_tokens
        # TODO: Validar palabras fuera de vocabulario
        # TODO: Evitar multiplicar por cero para evitar probabilidad cero
        return numerador / denominador
                    
                    
    def bigram_probability(self, word_1, word_2):
        numerador = 1
        denominador = 1
        return  numerador / denominador
    
    
    def trigram_probability(self, word_1, word_2, word_3):
        numerador = 1
        denominador = 1
        return  numerador / denominador
        