# Transformers amb Tokenizer manual

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import numpy as np
import re
import csv
import math

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# --- Preprocessat del text ---
def netejar_text(text):
    text = text.lower() #Text a minuscules
    text = re.sub(r"[^a-zA-Z0-9\s]", "", text)  # Eliminar caràcters no alfanumèrics
    return text.strip()  # Eliminar espais innecessaris

def processar_csv(ruta_csv):
    ressenyes, etiquetes = [], [] #Inicialitzar variables llistes

    with open(ruta_csv, mode='r', encoding='utf-8') as arxiu: #Obrir amb mode lectura i codificacio utf8
        lector_csv = csv.reader(arxiu) #Llegix arxiu linea a linea
        next(lector_csv)  # Saltar la capçalera
        for fila in lector_csv: #Iterarem amb el bucle for 
            if len(fila) < 2: #Filtrarem les linees incorrectes
                continue
            etiqueta = 1 if fila[1] == 'positive' else 0 #Llegeix la segona columna (fila[1]) per determinar l'etiqueta, si es 1 sera positiva si es 0 negativa
            ressenya = netejar_text(fila[0]) #Llegeix la primera columna (fila[0]) i netejem els espais innecessaris
            ressenyes.append(ressenya) #Guarda la ressenya del CSV
            etiquetes.append(etiqueta) #Guarda l'etiqueta (1 o 0) a la llista etiquetes
    return ressenyes, etiquetes #Ho retornem al acabar el metode

def tokenitzar_ressenyes(ressenyes):
    paraula_a_index = {} #Inicialitzar diccionari
    tokens = [] #Conté la sequencia numerica de cada ressenya
    index = 0 #Comptador per assignar indexs unics
    for ressenya in ressenyes: #Recorre la llista de ressenyes retornada a la funcio de processar_csv
        seq = [] #Llista temporal per a emmagatzemar la sequencia d'indexs de la ressenya
        for paraula in ressenya.split(): #Es divideix la ressenya en paraules
            if paraula not in paraula_a_index: #Si una paraula encara no està al diccionari paraula_a_index, se li assigna l'índex.
                paraula_a_index[paraula] = index
                index += 1
            seq.append(paraula_a_index[paraula]) #L'índex de la paraula (ja sigui nou o existent) s'afegeix a la llista seq.
        tokens.append(seq) #Despres de processar les paraules de la ressenya la seqüencia numerica(token) s'afegeix a tokens
    return tokens, paraula_a_index

def padding_sequencies(sequencies, max_len):
    padded = np.zeros((len(sequencies), max_len), dtype=int)
    for i, seq in enumerate(sequencies):
        padded[i, :len(seq)] = seq[:max_len]
    return padded

def veure_prediccions(model, dataloader, paraula_a_index):
    model.eval()
    idx_a_paraula = {index: paraula for paraula, index in paraula_a_index.items()} 
    with torch.no_grad():
        for X_batch, y_batch in dataloader:
            output = model(X_batch).to(device)
            pred = (output.squeeze() > 0.5).float()
            for seq, etiqueta, prediccio in zip(X_batch, y_batch, pred):
                # Convertir la seqüència d'índexs a text
                ressenya = " ".join([idx_a_paraula[idx.item()] for idx in seq if idx.item() in idx_a_paraula])
                etiqueta_real = "Positive" if etiqueta.item() == 1 else "Negative"
                prediccio_str = "Positive" if prediccio.item() == 1 else "Negative"
                print(f"Ressenya: {ressenya}")
                print(f"Etiqueta Real: {etiqueta_real}, Predicció: {prediccio_str}\n")
# --- Dataset personalitzat ---
class RessenyesDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# --- Model Transformer ---
class TransformerModel(nn.Module):
    def __init__(self, vocab_size, embed_size, num_heads, hidden_size, num_layers, output_size, max_len, dropout=0.5):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.positional_encoding = nn.Parameter(torch.zeros(max_len, embed_size))
        self.dropout = nn.Dropout(dropout)
        encoder_layers = nn.TransformerEncoderLayer(d_model=embed_size, nhead=num_heads, dim_feedforward=hidden_size, dropout=dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers)
        self.fc = nn.Linear(embed_size, output_size)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.embedding(x) * math.sqrt(self.embedding.embedding_dim)
        x = x + self.positional_encoding[:x.size(1), :]
        x = self.dropout(x)
        x = self.transformer_encoder(x)
        x = x.mean(dim=1)
        x = self.fc(x)
        return self.sigmoid(x)

# --- Entrenament del Model ---

def train_loop(dataloader, model, loss_fn, optimizer):
    model.train()
    for X, y in dataloader:
        X, y = X.to(device), y.to(device)
        pred = model(X).squeeze()
        loss = loss_fn(pred, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

def test_loop(dataloader, model, loss_fn):
    size, num_batches, test_loss, correct = len(dataloader.dataset), len(dataloader), 0, 0
    model.eval()
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X).squeeze()
            test_loss += loss_fn(pred, y).item()
            correct += ((pred > 0.5).float() == y).sum().item()
    print(f"Test Error: \n Accuracy: {(100 * correct / size):.1f}%, Avg loss: {test_loss / num_batches:.4f}\n")

# --- Paràmetres i procés ---
ruta_csv = "/home/itibcn/Desktop/Torch/datasets/IMDB/IMDBDataset.csv"
max_len = 100
embed_size, num_heads, hidden_size, num_layers, dropout = 128, 8, 128, 2, 0.5
batch_size, learning_rate, epochs = 32, 0.001, 100

ressenyes, etiquetes = processar_csv(ruta_csv)
ressenyes_tokenitzades, paraula_a_index = tokenitzar_ressenyes(ressenyes)
ressenyes_padded = padding_sequencies(ressenyes_tokenitzades, max_len)

X = torch.tensor(ressenyes_padded, dtype=torch.long)
y = torch.tensor(etiquetes, dtype=torch.float)
dataset = RessenyesDataset(X, y)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

vocab_size = len(paraula_a_index)
model = TransformerModel(vocab_size, embed_size, num_heads, hidden_size, num_layers, 1, max_len, dropout).to(device)
loss_fn = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

for epoch in range(epochs):
    print(f"Epoch {epoch+1}")
    train_loop(dataloader, model, loss_fn, optimizer)
    test_loop(dataloader, model, loss_fn)

print("Entrenament finalitzat.")


# Transformers amb BertTokenizer

In [None]:
from transformers import BertTokenizer
from torch.utils.data import Dataset, DataLoader
import re
import csv
import math

# Crear el tokenitzador
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

# Tokenitzar i paddar les ressenyes
def tokenitzar_amb_bert(ressenyes, max_len):
    tokenized_data = tokenizer(
        ressenyes,
        padding='max_length',    # Afegeix padding fins a max_len
        truncation=True,         # Retalla seqüències més llargues que max_len
        max_length=max_len,      # Màxim nombre de tokens per seqüència
        return_tensors="pt"      # Retorna tensors per a PyTorch
    )
    return tokenized_data['input_ids'], tokenized_data['attention_mask']

#Modificar dataset
class RessenyesDataset(Dataset):
    def __init__(self, input_ids, attention_mask, y):
        self.input_ids = input_ids
        self.attention_mask = attention_mask
        self.y = y

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.input_ids[idx], self.attention_mask[idx], self.y[idx]

#Modificar el model
class TransformerModel(nn.Module):
    def __init__(self, vocab_size, embed_size, num_heads, hidden_size, num_layers, output_size, max_len, dropout=0.5):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.positional_encoding = nn.Parameter(torch.zeros(max_len, embed_size))
        self.dropout = nn.Dropout(dropout)
        encoder_layers = nn.TransformerEncoderLayer(d_model=embed_size, nhead=num_heads, dim_feedforward=hidden_size, dropout=dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers)
        self.fc = nn.Linear(embed_size, output_size)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x, attention_mask=None):
        #print(f"x shape before embedding: {x.shape}")  # (batch_size, seq_len)
        x = self.embedding(x) * math.sqrt(self.embedding.embedding_dim)
        #print(f"x shape after embedding: {x.shape}")  # (batch_size, seq_len, embed_size)
        
        #if attention_mask is not None:
            #print(f"Attention mask shape: {attention_mask.shape}")  # (batch_size, seq_len)

        x = self.transformer_encoder(x.permute(1, 0, 2), src_key_padding_mask=~attention_mask.bool())
        #print(f"x shape after transformer: {x.shape}")  # (seq_len, batch_size, embed_size)
        
        x = x.permute(1, 0, 2).mean(dim=1)  # Agafem la mitjana al llarg de seq_len
        #print(f"x shape after mean: {x.shape}")  # (batch_size, embed_size)
        
        return self.sigmoid(self.fc(x))


#Preprocessar les dades
def netejar_text(text):
    text = text.lower() #Text a minuscules
    text = re.sub(r"[^a-zA-Z0-9\s]", "", text)  # Eliminar caràcters no alfanumèrics
    return text.strip()  # Eliminar espais innecessaris

def processar_csv(ruta_csv):
    ressenyes, etiquetes = [], [] #Inicialitzar variables llistes

    with open(ruta_csv, mode='r', encoding='utf-8') as arxiu: #Obrir amb mode lectura i codificacio utf8
        lector_csv = csv.reader(arxiu) #Llegix arxiu linea a linea
        next(lector_csv)  # Saltar la capçalera
        for fila in lector_csv: #Iterarem amb el bucle for 
            if len(fila) < 2: #Filtrarem les linees incorrectes
                continue
            etiqueta = 1 if fila[1] == 'positive' else 0 #Llegeix la segona columna (fila[1]) per determinar l'etiqueta, si es 1 sera positiva si es 0 negativa
            ressenya = netejar_text(fila[0]) #Llegeix la primera columna (fila[0]) i netejem els espais innecessaris
            ressenyes.append(ressenya) #Guarda la ressenya del CSV
            etiquetes.append(etiqueta) #Guarda l'etiqueta (1 o 0) a la llista etiquetes
    return ressenyes, etiquetes #Ho retornem al acabar el metode

ruta_csv = "/home/itibcn/Desktop/Torch/datasets/IMDB/IMDBDataset.csv"
embed_size, num_heads, hidden_size, num_layers, dropout = 128, 8, 128, 2, 0.5
batch_size, learning_rate, epochs = 32, 0.001, 100
max_len = 100  # Defineix el valor abans d'utilitzar-lo
ressenyes, etiquetes = processar_csv(ruta_csv)

input_ids, attention_mask = tokenitzar_amb_bert(ressenyes, max_len)

# Convertir a tensores
input_ids = input_ids.to(device)
attention_mask = attention_mask.to(device)
y = torch.tensor(etiquetes, dtype=torch.float).to(device)

dataset = RessenyesDataset(input_ids, attention_mask, y)
dataloader = DataLoader(dataset, batch_size = 64, shuffle=True)

#Entrenar el model

def train_loop(dataloader, model, loss_fn, optimizer):
    model.train()
    for input_ids, attention_mask, y in dataloader:
        input_ids, attention_mask, y = input_ids.to(device), attention_mask.to(device), y.to(device)
        pred = model(input_ids, attention_mask).squeeze()
        loss = loss_fn(pred, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

def test_loop(dataloader, model, loss_fn):
    model.eval()
    size, test_loss, correct = len(dataloader.dataset), 0, 0
    with torch.no_grad():
        for input_ids, attention_mask, y in dataloader:
            input_ids, attention_mask, y = input_ids.to(device), attention_mask.to(device), y.to(device)
            pred = model(input_ids, attention_mask).squeeze()
            test_loss += loss_fn(pred, y).item()
            correct += ((pred > 0.5).float() == y).sum().item()
    print(f"Accuracy: {(100 * correct / size):.1f}%, Avg loss: {test_loss / len(dataloader):.4f}\n")

#Llançar el model

vocab_size = tokenizer.vocab_size
model = TransformerModel(vocab_size, embed_size, num_heads, hidden_size, num_layers, 1, max_len, dropout).to(device)

loss_fn = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

for epoch in range(epochs):
    print(f"Epoch {epoch+1}")
    train_loop(dataloader, model, loss_fn, optimizer)
    test_loop(dataloader, model, loss_fn)

print("Entrenament finalitzat.")
