In [None]:
import nltk
import re
from nltk.stem import PorterStemmer
import os
import json
from charset_normalizer import from_path

STEMMER = PorterStemmer()

## I. Read the files and build two large consolidate files that are the union of all the documents in 20N and BAC.

In [None]:
ACTUAL_PATH = os.getcwd()
PATH_20N = os.path.join(ACTUAL_PATH, "data/20news-18828")
PATH_BAC = os.path.join(ACTUAL_PATH, "data/BAC/blogs")
PATH_FINAL_FILES = os.path.join(ACTUAL_PATH, "data/final_files")
GRUPO = "03"

### UPLOAD_FILES

#### UPLOADING 20N 

In [None]:
NEW_20N_FILE = os.path.join(PATH_FINAL_FILES, "20N.jsonl")

mayor_folders_20N = os.listdir(PATH_20N)
dictionary = {}

with open(NEW_20N_FILE, "w", encoding="utf-8", errors="replace") as f_n:
    for folder in mayor_folders_20N:
        minor_files_path = os.path.join(PATH_20N, folder)
        minor_files = os.listdir(minor_files_path)
        for file in minor_files:
            file_path = os.path.join(minor_files_path, file)
            with open(file_path, "r", encoding="utf-8", errors="replace") as f:
                text = f.read().strip()

            record = {"id": file, "theme": folder, "source": "20N", "text": text}
            unit = folder + file
            if file in dictionary.keys():
                dictionary[unit] += 1
            else:
                dictionary[unit] = 1
            f_n.write(json.dumps(record, ensure_ascii=False) + "\n")

In [None]:
## TODO: Verificar si los encodings estan bien

#### UPLOADING BAC

In [None]:
NEW_BAC_FILE = os.path.join(PATH_FINAL_FILES, "BAC.jsonl")
mayor_folders_BAC = os.listdir(PATH_BAC)
with open(NEW_BAC_FILE, "w", encoding="utf-8", errors="replace") as f_n:
    for file in mayor_folders_BAC:
        post_num = 0
        file_path = os.path.join(PATH_BAC, file)
        with open(file_path, "r", encoding="utf-8", errors="replace") as f:
            text = f.read().strip()
            text = re.sub(r"</?Blog>", "", text)
        post_list = text.split("<post>")
        for post in post_list:
            post = post.strip().replace("</post>", "")
            record = {"id": file, "post_num": post_num, "source": "BAC", "text": post}
            f_n.write(json.dumps(record, ensure_ascii=False) + "\n")
            post_num += 1

## II. Tokenize by sentence

In [4]:
import nltk
import re

In [None]:
# def get_sentences(text:str, k: int) -> list[list]:
#     sentences_list = []
#     text = text.split()
#     last = 0
#     for i in range(k,len(text)+1):
#         sentences = text[last:i]
#         last +=1
#         text_slice = " ".join(sentences)
#         text_slice_token = preprocess_text(text_slice)
#         sentences_list.append(text_slice_token)
#     return sentences_list
def preprocess_text(text: str) -> list[str]:

    text = text.lower()
    text = re.sub(r"[^a-zA-Z0-9\s]", " ", text, flags=re.I | re.A | re.MULTILINE)
    text = re.sub(r"\d+", "NUM", text)
    text = "<s> " + text
    text = text + " </s>"
    text = text.strip().split()
    # tokens = [STEMMER.stem(word) for word in text]
    return text


def divide_sentences(text: str) -> list[str]:
    sentences = nltk.sent_tokenize(text)
    return sentences

### Examples

In [12]:
text = "This is sentence one. Here is another! And number 123."
sentences = divide_sentences(text)
sentences

['This is sentence one.', 'Here is another!', 'And number 123.']

In [13]:
processed = [preprocess_text(s) for s in sentences]

print(processed)

[['<s>', 'this', 'is', 'sentence', 'one', '</s>'], ['<s>', 'here', 'is', 'another', '</s>'], ['<s>', 'and', 'number', 'NUMNUMNUM', '</s>']]


### Tokenizing_full_text

In [None]:
import random
import pickle
import gc


def save_pickle(data, filename):
    filepath = os.path.join(PATH_FINAL_FILES, filename)
    with open(filepath, "wb") as f:
        pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL)


sentencias = []
with open(os.path.join(PATH_FINAL_FILES, "20N.jsonl"), "r") as f:
    for line in f:
        line = json.loads(line)
        sentences = divide_sentences(line["text"])
        for sentence in sentences:
            pre_process = preprocess_text(sentence)
            sentencias.append(pre_process)
            # print(sentence, pre_process)
random.seed(42)
random.shuffle(sentencias)
index_to_split = int(0.8 * len(sentencias))
train_sentences = sentencias[:index_to_split]
test_sentences = sentencias[index_to_split:]
save_pickle(train_sentences, f"20N_{GRUPO}_training.pkl")
save_pickle(test_sentences, f"20N_{GRUPO}_testing.pkl")

del sentencias
del train_sentences
del test_sentences
gc.collect()

In [None]:
import random
import pickle
import gc


def save_pickle(data, filename):
    filepath = os.path.join(PATH_FINAL_FILES, filename)
    with open(filepath, "wb") as f:
        pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL)


sentencias = []
with open(os.path.join(PATH_FINAL_FILES, "BAC.jsonl"), "r") as f:
    for line in f:
        line = json.loads(line)
        sentences = divide_sentences(line["text"])
        for sentence in sentences:
            pre_process = preprocess_text(sentence)
            sentencias.append(pre_process)
            # print(sentence, pre_process)
random.seed(42)
random.shuffle(sentencias)
split_idx = int(0.8 * len(sentencias))
train_sentences = sentencias[:split_idx]
test_sentences = sentencias[split_idx:]
save_pickle(train_sentences, f"BAC_{GRUPO}_training.pkl")
save_pickle(test_sentences, f"BAC_{GRUPO}_testing.pkl")

del sentencias
del train_sentences
del test_sentences
gc.collect()

In [None]:
# import os
# import json
# import random
# import pickle

In [None]:
# def save_pickle(data, filename):
#     filepath = os.path.join(PATH_FINAL_FILES, filename)
#     with open(filepath, "wb") as f:
#         pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL)

# def process_and_split(jsonl_file, grupo, train_ratio=0.8, seed=42):
#     random.seed(seed)
#     train_sentences = []
#     test_sentences = []

#     with open(jsonl_file, "r", encoding="utf-8") as f:
#         for line in f:
#             line = json.loads(line)
#             for sentence in divide_sentences(line["text"]):
#                 pre_process = preprocess_text(sentence)
#                 # Decidir en el momento si va a train o test
#                 if random.random() < train_ratio:
#                     train_sentences.append(pre_process)
#                 else:
#                     test_sentences.append(pre_process)

#     save_pickle(train_sentences, f"BAC_{grupo}_training.pkl")
#     save_pickle(test_sentences, f"BAC_{grupo}_testing.pkl")

# # Uso
# process_and_split(os.path.join(PATH_FINAL_FILES, "BAC.jsonl"), GRUPO)

## IV. Calcular N Gramas

In [None]:
def get_pickle(filename: str) -> list[str]:
    filepath = os.path.join(PATH_FINAL_FILES, filename)
    with open(filepath, "rb") as f:
        sentences = pickle.load(f)
    return sentences

### Calcular Unigramas

In [12]:
from gensim.corpora import Dictionary

#### 20N

In [9]:
train_20N = get_pickle(f"20N_{GRUPO}_training.pkl")

In [None]:
class UnigramModel:
    def __init__(self, sentences: list[list[str]]):
        self.word_counter_20N = {}
        for sentence in sentences:
            for word in sentence:
                self.word_counter_20N[word] = self.word_counter_20N.get(word, 0) + 1
        self.total_words = sum(self.word_counter_20N.values())
        self.V = len(self.word_counter_20N)

    def generate_unigrams(self, filename):
        filepath = os.path.join(PATH_FINAL_FILES, filename)
        probabilities_unigram = {
            self.word_counter_20N[word] / self.total_words
            for word in self.word_counter_20N.keys()
        }
        with open(filepath, "wb") as f:
            pickle.dump(probabilities_unigram, f, protocol=pickle.HIGHEST_PROTOCOL)

    def predict_prob(self, word: str):
        if word.lower() in self.word_counter_20N.keys():
            prob = self.word_counter_20N[word] / self.total_words
        else:
            prob = self.word_counter_20N["<UNK>"] / self.total_words
        return prob

In [None]:
"20N_{GRUPO}_unigrams"

In [None]:
word_counter_20N = {}
for sentence in train_20N:
    for word in sentence:
        word_counter_20N[word] = word_counter_20N.get(word, 0) + 1
total_words = sum(word_counter_20N.values())
V = len(word_counter_20N)

#### BAC

### Calcular Bigramas

#### 20N

#### BAC

### Calcular Trigamas

#### 20N

#### BAC