## Preparación

Oraciones originales de referencia

In [1]:
import pandas as pd 
import re
corpus=pd.read_json("../O1_Corpus/corpus.json", lines=True)

def etapa_preprocesamiento(textos, tokenizador=None):
    
    #Textos es una columna de un dataframe
    #1. Pasar a minúsculas
    textos = textos.str.lower()
    #2. Eliminar caracteres especiales
    textos = textos.apply(lambda x: re.sub(r"[\W\d_]+", " ", x))
    textos = textos.apply(lambda x: re.sub(r"ininteligible", "", x))
    #3. Eliminar espacios en blanco extra
    textos = textos.apply(lambda x: re.sub(r"\s+", " ", x))
    #4. Eliminar espacios en blanco al principio y al final
    textos = textos.str.strip()
    #5. Tokenizar usando SentencePiece
    if tokenizador:
        textos = textos.apply(lambda x: tokenizador.encode_as_pieces(x))
    return textos

corpus['transcription'] = etapa_preprocesamiento(corpus['transcription'])

In [None]:
reference_texts=corpus.sample(300)['transcription'].to_list()
reference_texts

In [199]:
import mauve
import torch
from torch import nn

def calculate_perplexity(model, tokenizer, input_texts, device):
    model.eval()  
    total_loss = 0
    for text in input_texts:
        inputs = tokenizer(text, return_tensors='pt', truncation=True, padding=True).to(device)
        with torch.no_grad():
            outputs = model(**inputs, labels=inputs['input_ids'])
            loss = outputs.loss
        total_loss += loss.item()

    avg_loss = total_loss / len(input_texts)
    perplexity = torch.exp(torch.tensor(avg_loss))
    return perplexity.item()

def calculate_perplexity_lstm(model, vocab, input_texts, device):
    model.eval()  
    criterion = nn.CrossEntropyLoss(ignore_index=vocab['<unk>'])  
    total_loss = 0
    total_tokens = 0

    with torch.no_grad():
        for text in input_texts:
            # Tokenize the input text
            token_ids = [vocab.get(token, vocab['<unk>']) for token in text.split()]
            inputs = torch.tensor(token_ids, dtype=torch.long).unsqueeze(0).to(device)  
            targets = inputs.clone()

            # Initialize the hidden state
            hidden = model.init_hidden(inputs.size(0))

            # Forward pass
            outputs, _ = model(inputs, hidden)
            outputs = outputs.view(-1, outputs.size(-1)) 
            targets = targets.view(-1)  

            # Calculate the loss
            loss = criterion(outputs, targets)
            total_loss += loss.item() * targets.size(0)
            total_tokens += targets.size(0)

    avg_loss = total_loss / total_tokens
    perplexity = torch.exp(torch.tensor(avg_loss))
    return perplexity.item()

def calculate_distinct_n(generated_texts, n=1):
    n_grams = []
    for text in generated_texts:
        # Split the text into characters
        chars = list(text)
        # Generate n-grams from characters
        n_grams.extend([tuple(chars[i:i+n]) for i in range(len(chars)-n+1)])

    total_n_grams = len(n_grams)
    unique_n_grams = len(set(n_grams))

    distinct_n_score = unique_n_grams / total_n_grams if total_n_grams > 0 else 0
    return distinct_n_score

def calculate_mauve(generated_texts, reference_texts=reference_texts):
    """
    Function to calculate the MAUVE score for generated texts.
    
    Args:
    generated_texts (list): List of generated texts.
    reference_texts (list): List of constant reference texts (human-written).
    
    Returns:
    float: MAUVE score.
    """
    cudaAvailable = torch.cuda.is_available()
    print(f"Using {'cuda' if cudaAvailable else 'cpu'}")
    mauve_score = mauve.compute_mauve(
        p_text=generated_texts, 
        q_text=reference_texts, 
        device_id=0 if cudaAvailable else -1,
        max_text_length=256
    )
    return mauve_score.mauve


## Modelos

In [8]:
diccionarioGenerado={}

In [192]:
#For each key in the dictionary, create a file with the key name and write each line in the value array
for key in diccionarioGenerado:
    with open(f"{key}.txt", "w") as file:
        for line in diccionarioGenerado[key]:
            file.write(line + "\n")

In [4]:
unique_words = set()
for transcription in corpus['transcription']:
    words = transcription.split()
    unique_words.update(words)

unique_words_list = sorted(unique_words)

### ZmBART

In [None]:
import torch
from transformers import MBartForConditionalGeneration, MBartTokenizer
import sentencepiece as spm
# Path to your model checkpoint
model_path = "checkpoints/zmbart"

# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Load the tokenizer
tokenizerIsk = spm.SentencePieceProcessor(model_file=model_path+"/spiece.model")
tokenizer = MBartTokenizer.from_pretrained("facebook/mbart-large-cc25")

# Load the model and move it to the GPU
model = MBartForConditionalGeneration.from_pretrained("facebook/mbart-large-cc25").to(device)
checkpoint = torch.load(model_path + "/zmbart_checkpoint112.pt")
model_weights = checkpoint['model'] if 'model' in checkpoint else checkpoint
model.load_state_dict(model_weights, strict=False)

def safe_decode(tokenizer, output_ids):
    decoded_tokens = []
    
    for token_id in output_ids:
        #Skip 0 token
        if token_id.item() == 0:
            continue
        try:
            # Decode each token individually. Now tokenizer is a SentencePiece processor
            decoded_token = tokenizer.decode_ids([token_id.item()])
            
            # If token is padding or unknown, append an empty string or placeholder
            if decoded_token == "<pad>" or decoded_token == "<unk>" or decoded_token == "⁇":
                decoded_tokens.append("")
            else:
                decoded_tokens.append(decoded_token)
        except IndexError:
            # Handle index errors by appending an empty string
            decoded_tokens.append("")
    
    # Join tokens with spaces, replacing empty tokens appropriately
    return ' '.join(decoded_tokens).strip()



In [None]:
import random

num_samples = 1
sentences = []

while len(sentences) <= 100:
    input_texts = random.sample(unique_words_list, num_samples)
    # print("Input texts: ", input_texts)
    
    # Encode the input texts
    inputs = tokenizer(input_texts, return_tensors="pt", padding=True, truncation=True).to(device)
    # print("Encoded inputs: ", inputs)
    
    # Generate outputs
    outputs = model.generate(**inputs, max_length=20, num_beams=10, early_stopping=False, no_repeat_ngram_size=1, pad_token_id=tokenizer.pad_token_id)
    # print("Outputs: ", outputs)
    
    # Decode the outputs
    decoded_outputs = [safe_decode(tokenizerIsk, output) for output in outputs]
    # print("Decoded outputs: ", decoded_outputs)
    sentences.extend(decoded_outputs)
    print("Tenemos ", len(sentences), " oraciones")
    
    # break

In [86]:
diccionarioGenerado["zmbart"]=sentences

In [None]:
perplexity = calculate_perplexity(model, tokenizer, sentences, device)
distinct_2 = calculate_distinct_n(sentences, n=2)
distinct_3 = calculate_distinct_n(sentences, n=3)
mauve_score = calculate_mauve(sentences)

print(f"Perplexity: {perplexity}")
print(f"Distinct-2: {distinct_2}")
print(f"Distinct-3: {distinct_3}")
print(f"MAUVE: {mauve_score}")

Perplexity: 43.40846252441406
Distinct-2: 0.03824678950307091
Distinct-3: 0.11806952025280092
MAUVE: 0.031570415352922064

## Meta XNLG

In [None]:
import torch
import random
from transformers import MT5ForConditionalGeneration, T5Tokenizer

# Path to your model checkpoint
checkpoint_path = "/workspace/Tesis/O3_modelos/checkpoints/metaXNLG_checkpoint-10500/"

# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Load the tokenizer
tokenizer = T5Tokenizer.from_pretrained(checkpoint_path)

# Load the model and move it to the GPU
model = MT5ForConditionalGeneration.from_pretrained(checkpoint_path).to(device)

def safe_decode(tokenizer, output_ids):
    # print(output_ids)
    decoded_tokens = []
    
    for token_id in output_ids:
        try:
            # Decode each token individually
            decoded_token = tokenizer.decode(token_id.item(), skip_special_tokens=False, clean_up_tokenization_spaces=False)
            
            # If token is padding or unknown, append an empty string or placeholder
            if decoded_token == "<pad>" or decoded_token == "<unk>":
                decoded_tokens.append("")
            else:
                decoded_tokens.append(decoded_token)
        except IndexError:
            # Handle index errors by appending an empty string
            decoded_tokens.append("")
    
    # Join tokens with spaces, replacing empty tokens appropriately
    return ' '.join(decoded_tokens).strip().replace('▁', ' ').replace('  ', ' ')

In [None]:
# Randomly select a number of input texts
num_samples = 1
import random


sentences = []

while len(sentences) <= 100:
    input_texts = random.sample(unique_words_list, num_samples) 
    inputs = tokenizer(input_texts, return_tensors="pt", padding=True, truncation=True).to(device)  # Move inputs to GPU
    outputs = model.generate(**inputs, max_length=30)
    predictions = [safe_decode(tokenizer, output) for output in outputs]
    split_sentences = [sentence.split('</s>') for sentence in predictions]
    split_sentences = [[s.strip() for s in sentence_list if s.strip()] for sentence_list in split_sentences]
    flattened_sentences = [item for sublist in split_sentences for item in sublist]
    sentences.extend(flattened_sentences)
    print("Tenemos ",len(sentences)," oraciones")

In [9]:
diccionarioGenerado["metaXNLG"]=sentences

In [None]:
#Métricas
perplexity = calculate_perplexity(model, tokenizer, sentences, device)
distinct_2 = calculate_distinct_n(sentences, n=2)
distinct_3 = calculate_distinct_n(sentences, n=3)
mauve_score = calculate_mauve(sentences)

print(f"Perplexity: {perplexity}")
print(f"Distinct-2: {distinct_2}")
print(f"Distinct-3: {distinct_3}")
print(f"MAUVE: {mauve_score}")

Perplexity: 53.81763458251953
Distinct-2: 0.06568575932737783
Distinct-3: 0.20421753607103219
MAUVE: 0.5293356144783942

## LSTM

In [131]:
#Modelo 
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import re
import numpy as np

# Define the LSTMTextGenerator class
class LSTMTextGenerator(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers, pretrained_embeddings=None):
        super(LSTMTextGenerator, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        
        # Initialize the embedding layer with pre-trained embeddings if provided
        if pretrained_embeddings is not None:
            self.embedding.weight = nn.Parameter(torch.tensor(pretrained_embeddings))
            self.embedding.weight.requires_grad = False  # Freeze if you don't want to fine-tune
        
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)
    
    def forward(self, x, hidden):
        x = self.embedding(x)
        out, hidden = self.lstm(x, hidden)
        out = self.fc(out)
        return out, hidden

    def init_hidden(self, batch_size):
        # This method initializes the hidden state and cell state for LSTM
        num_layers = self.lstm.num_layers
        hidden_size = self.lstm.hidden_size
        return (torch.zeros(num_layers, batch_size, hidden_size).to(device),
                torch.zeros(num_layers, batch_size, hidden_size).to(device))

# Load the vocabulary (token -> index) mapping
vocab = {}
with open('BaslineLSTM/tokenizadorIskonawa.vocab', 'r', encoding='utf-8') as vocab_file:
    for idx, line in enumerate(vocab_file):
        token, code = re.split(r'\t', line.strip())
        vocab[token] = idx


In [None]:
# Initialize the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
hidden_size = 128
num_layers = 2
embed_size = 300  # Assuming the embedding size is 300, adjust if different
vocab_size = len(vocab)
model = LSTMTextGenerator(vocab_size, embed_size, hidden_size, num_layers).to(device)

# Load the checkpoint
checkpoint_path = "checkpoints/lstm/lstm_checkpoint_last.pth"
checkpoint = torch.load(checkpoint_path, map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])


In [None]:
# def load_embeddings(embedding_file, vocab):
#     with open(embedding_file, 'r', encoding='utf-8') as f:
#         # Read the first line to get vocab size and embed size
#         first_line = f.readline().strip()
#         vocab_size, embed_size = map(int, first_line.split())
        
#         # Initialize a dictionary to hold the embeddings
#         embeddings = np.zeros((len(vocab), embed_size), dtype=np.float32)
        
#         # Read the rest of the file
#         for line in f:
#             values = line.strip().split()
#             subword = values[0].strip()
#             vector = np.array(values[1:], dtype=np.float32)
#             index = vocab.get(subword, -1)
#             if index == -1:
#                 print(f'Found {subword} in vocab')
#             else:
#                 embeddings[index] = vector
    
#     return embeddings, vocab_size, embed_size

# embedding_file = 'BaslineLSTM/isk_anchor_final2.txt'
# pretrained_embeddings, vocab_size, embed_size = load_embeddings(embedding_file, vocab)

# model.embedding.weight.data.copy_(torch.from_numpy(pretrained_embeddings))

In [193]:
def test_lstm(model, input_data, vocab, max_length=15):
    model.eval()
    with torch.no_grad():
        inputs = torch.tensor([[vocab[token] for token in sentence] for sentence in input_data]).to(device)
        hidden = model.init_hidden(inputs.size(0))
        
        # Initialize the list to store the generated tokens
        generated_tokens = inputs.tolist()
        
        for _ in range(max_length):
            outputs, hidden = model(inputs, hidden)
            predictions = torch.argmax(outputs, dim=2)
            # print(predictions)
            # Check if the predicted token is 0 and replace it with a random token if it is
            if predictions[0, -1].item() == 0 and (random.random() < 0.4):
                random_token = random.choice(list(vocab.values()))
                predictions[0, -1] = random_token
            
            # Append the predicted token to the generated tokens
            generated_tokens[0].append(predictions[0, -1].item())
            
            # Update the inputs with the predicted token
            inputs = torch.cat((inputs, predictions[:, -1].unsqueeze(1)), dim=1)
        
        # Convert generated tokens back to words
        generated_sentences = []
        for tokens in generated_tokens:
            sentence = [list(vocab.keys())[list(vocab.values()).index(token)] for token in tokens if token in vocab.values() and token != vocab['<unk>']]
            generated_sentences.append(sentence)
        
    return generated_sentences

def join_tokens(tokens):
    sentence=''.join(tokens)
    return sentence.replace('▁', ' ').replace('  ', ' ').strip()

# Generate sentences using the LSTM model
num_samples = 1
sentences = []
reverse_vocab = {idx: token for token, idx in vocab.items()}

while len(sentences) <= 100:
    input_texts = random.sample(list(vocab.keys()), num_samples)
    input_data = [input_texts]  # Wrap in a list to match the expected input format

    # print("Input texts: ", input_texts)
    predictions = test_lstm(model, input_data, vocab)
    # print("Predictions: ", predictions)
 
    flattened_sentences = [token for sentence in predictions for token in sentence]
    # print("Flattened sentences: ", flattened_sentences)
    joint_sentence=join_tokens(flattened_sentences)

    sentences.extend([joint_sentence])
    
    # print("Tenemos ", len(sentences), " oraciones")
    # break
    # Break after processing one batch (for testing, remove this in real runs)

In [191]:
diccionarioGenerado["LSTM"]=sentences

In [None]:
perplexity = calculate_perplexity_lstm(model, vocab, sentences, device)
distinct_2 = calculate_distinct_n(sentences, n=2)
distinct_3 = calculate_distinct_n(sentences, n=3)
mauve_score = calculate_mauve(sentences)

print(f"Perplexity: {perplexity}")
print(f"Distinct-2: {distinct_2}")
print(f"Distinct-3: {distinct_3}")
print(f"MAUVE: {mauve_score}")

Perplexity: nan
Distinct-2: 0.07990182710662667
Distinct-3: 0.31968592260235557
MAUVE: 0.5712289309339067

## T5

In [12]:
import tensorflow as tf
from tensorflow.python.platform import gfile
import sentencepiece as spm

# Define paths to checkpoint and SentencePiece model
MODEL_DIR = "checkpoints/t5"
TF_CHECKPOINT_PATH = "checkpoints/t5/smallT5_model.ckpt-120000"
SP_MODEL_PATH = "checkpoints/t5/spiece.model"

# Load the SentencePiece tokenizer
sp = spm.SentencePieceProcessor()
sp.Load(SP_MODEL_PATH)

# Tokenize the input text using the SentencePiece model
def encode_text(text):
    return sp.EncodeAsIds(text)

# Decode output ids to text using the SentencePiece model
def decode_text(ids):
    return sp.DecodeIds(ids)

# Load the T5 model from the TensorFlow checkpoint
def load_model():
    # Load the graph definition from the checkpoint meta file
    try:
        with gfile.GFile(f"{TF_CHECKPOINT_PATH}.meta", 'rb') as f:
            graph_def = tf.compat.v1.GraphDef()
            graph_def.ParseFromString(f.read())
    except Exception as e:
        print(e)


    # Import the graph into the current default TensorFlow graph
    with tf.compat.v1.Session() as sess:
        tf.import_graph_def(graph_def, name="model")
        
        # Restore the weights from the checkpoint
        saver = tf.compat.v1.train.Saver()
        saver.restore(sess, TF_CHECKPOINT_PATH)
        
        # Return the current session
        return sess

In [13]:
# Load the model from the checkpoint
sess = load_model()

# Retrieve input/output tensor names from the graph
# input_tensor = sess.graph.get_tensor_by_name("input_tensor_name:0")  # Replace with actual input tensor name
# decoder_input_tensor = sess.graph.get_tensor_by_name("decoder_input_tensor_name:0")  # Replace with actual decoder input tensor name
# output_tensor = sess.graph.get_tensor_by_name("output_tensor_name:0")  # Replace with actual output tensor name

# # Run the model
# feed_dict = {input_tensor: [input_ids], decoder_input_tensor: [decoder_input_ids]}
# output = sess.run(output_tensor, feed_dict=feed_dict)

# # Decode the output
# decoded_output = decode_text(output[0])

# print(f"Generated text: {decoded_output}")


Error parsing message


ValueError: Node '': Node name contains invalid characters