<a href="https://colab.research.google.com/github/PedroTonus/praticasGSI073/blob/main/GSI073_aula0_seq2seq.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Preparação dos dados

Esta tarefa é inverter sequências de caracteres. Exemplo: **aabcd** em **dcbaa**.


In [None]:
import torch
import torch.nn as nn
import random

chars = list("abcd ")
vocab = {ch: i for i, ch in enumerate(chars)} # Cada letra, ganha um número
inv_vocab = {i: ch for ch, i in vocab.items()}# Tabela de decodificação
vocab_size = len(vocab)

def encode(s): # Codifica letras em números
    return torch.tensor([vocab[c] for c in s], dtype=torch.long)

def decode(t): # Decodifica números em letras
    return ''.join(inv_vocab[int(x)] for x in t)

def random_seq(n=5): # Cria novas sequências
    return ''.join(random.choice(chars[:-1]) for _ in range(n))

# Gerar dados
pairs = [(encode(s), encode(s[::-1])) for s in [random_seq() for _ in range(50000)]]

max_len = max(len(x) for x, _ in pairs) # pega maior sequência

def pad(x):  # Preenche conjunto de dados em pad no último índice
    return torch.cat([x, torch.tensor([vocab[' ']] * (max_len - len(x)))], dim=0)

inputs = torch.stack([pad(x) for x, _ in pairs])
targets = torch.stack([pad(y) for _, y in pairs])

train_ds = torch.utils.data.TensorDataset(inputs, targets)
train_dl = torch.utils.data.DataLoader(train_ds, batch_size=128, shuffle=True)

device = 'cuda' if torch.cuda.is_available() else 'cpu'

## Veja um par

In [None]:
print(pairs[1])

# Definição do modelo Seq2Seq com GRU

In [None]:
class Encoder(nn.Module):
    def __init__(self, vocab_size, emb_size, hidden_size):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, emb_size)
        self.gru = nn.GRU(emb_size, hidden_size, batch_first=True)

    def forward(self, x):
        x = self.embed(x)
        _, h = self.gru(x)
        return h  # [1, B, H]

class Decoder(nn.Module):
    def __init__(self, vocab_size, emb_size, hidden_size):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, emb_size)
        self.gru = nn.GRU(emb_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, x, h):
        """
        x: tensor que indica a parte prévia correta
        h: tensor que indica o estado do encoder da parte prévia
        """
        x = self.embed(x)
        out, h = self.gru(x, h)
        logits = self.fc(out)
        return logits, h # retorna o estado latente para atualizar o estado

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, src, tgt):
        h = self.encoder(src)
        # usa contexto correto anterior e estado atual para prever o tgt[:, -1]
        logits, _ = self.decoder(tgt[:, :-1], h)
        return logits

# Código para usar o modelo treinado: inferência

In [None]:
def decode_step(decoder, token, h):
    logits, h = decoder(token, h) # obtém logits e atualiza estado da sequência
    next_token = logits[:, -1, :].argmax(-1, keepdim=True)
    return next_token, h

def predict(model, seq, max_len=10):
    model.eval()
    with torch.no_grad():
        src = pad(encode(seq)).unsqueeze(0).to(device, dtype=torch.long)
        h = model.encoder(src) # Obtém estado do modelo após processar entrada inicial

        # 'token' representa a geração passo a passo da sequência invertida
        token = torch.tensor([[vocab[' ']]], dtype=torch.long, device=device)
        seq_invertida = []
        for _ in range(max_len):
            token, h = decode_step(model.decoder, token, h)
            seq_invertida.append(token.item())
        return decode(seq_invertida)

# Preparação para treino

In [None]:
emb_size = 32
hidden_size = 64
encoder = Encoder(vocab_size, emb_size, hidden_size)
decoder = Decoder(vocab_size, emb_size, hidden_size)
model = Seq2Seq(encoder, decoder).to(device)

loss_fn = nn.CrossEntropyLoss(ignore_index=vocab[' ']) # ignora o pad: " "
opt = torch.optim.Adam(model.parameters(), lr=1e-3)

# Execução do treino

In [None]:
for epoch in range(10):
    model.train()
    total_loss = 0
    for xb, yb in train_dl:
        xb, yb = xb.to(device, dtype=torch.long), yb.to(device, dtype=torch.long)
        opt.zero_grad()
        logits = model(xb, yb)
        loss = loss_fn(logits.reshape(-1, vocab_size), yb[:, 1:].reshape(-1))
        loss.backward()
        opt.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}: loss={total_loss/len(train_dl):.4f}")

# Vamos testar

In [None]:
for _ in range(10):
    s = random_seq()
    pred = predict(model, s, max_len=len(s))
    print(f"{s} -> {pred}")


# Exercício
Compare o resultado do uso do encoder de de sequências muito similares e muito diferentes. Por exemplo, codifique "aaaabb", "bbaaab", "cbcaccc" e "cccacbc" e depois faça uma figura das 2 componentes principais usando o método Principal Components Analysis (PCA) do pacote `sklearn.decomposition.PCA`.

# Task
Compare the results of using the encoder on very similar and very different sequences. Specifically, encode the sequences "aaaabb", "bbaaab", "cbcaccc", and "cccacbc", then generate a figure showing their 2 principal components using the Principal Components Analysis (PCA) method from `sklearn.decomposition.PCA`.

## Definir e Codificar Sequências

### Subtask:
Definir as sequências de entrada ('aaaabb', 'bbaaab', 'cbcaccc', 'cccacbc') e codificá-las usando a função `encode` existente.


**Reasoning**:
I will create a list of the specified test sequences, then iterate through them to encode and pad each one using the existing `encode` and `pad` functions, storing the results in `encoded_padded_sequences`.



In [None]:
test_sequences = ['aaaabb', 'bbaaab', 'cbcaccc', 'cccacbc']

# Aumentar o max_len para acomodar as novas sequências, se necessário
# Usar um max_len maior para garantir que todas as sequências de teste sejam padronizadas corretamente
max_len_for_test = max(max_len, max(len(s) for s in test_sequences))

def pad_for_test(x):  # Cria uma nova função pad que usa o max_len_for_test
    return torch.cat([x, torch.tensor([vocab[' ']] * (max_len_for_test - len(x)))], dim=0)

encoded_padded_sequences = [pad_for_test(encode(s)) for s in test_sequences]

print("Test Sequences:", test_sequences)
print("Encoded and Padded Sequences:", encoded_padded_sequences)
print("Max length used for padding:", max_len_for_test)

**Reasoning**:
Now that the test sequences are encoded and padded, the next step is to obtain the encoder outputs (embeddings) for these sequences using the trained model's encoder. These embeddings will then be used for PCA.



In [None]:
model.eval()
with torch.no_grad():
    # Empilhar os tensores e movê-los para o dispositivo correto
    encoded_padded_sequences_tensor = torch.stack(encoded_padded_sequences).to(device, dtype=torch.long)
    encoder_outputs = model.encoder(encoded_padded_sequences_tensor)

# O output do encoder para GRU é [num_layers * num_directions, batch_size, hidden_size]
# Como temos apenas 1 camada e 1 direção, é [1, batch_size, hidden_size]
# Precisamos espremer a primeira dimensão para obter [batch_size, hidden_size]
encoder_embeddings = encoder_outputs.squeeze(0).cpu().numpy()

print("Encoder Embeddings Shape:", encoder_embeddings.shape)
print("First embedding (sample):")
print(encoder_embeddings[0])

**Reasoning**:
Now that we have the encoder embeddings, the next step is to apply Principal Component Analysis (PCA) to reduce their dimensionality to 2 components. This will allow for visualization of the sequence embeddings in a 2D space, helping to compare similar and different sequences.



In [None]:
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt

# Initialize PCA with 2 components
pca = PCA(n_components=2)

# Fit PCA to the encoder embeddings and transform them
pca_components = pca.fit_transform(encoder_embeddings)

# Plot the 2 principal components
plt.figure(figsize=(8, 6))
for i, (x, y) in enumerate(pca_components):
    plt.scatter(x, y) # Plot the point
    plt.annotate(test_sequences[i], (x, y), textcoords="offset points", xytext=(5,5), ha='center') # Label with original sequence

plt.title('PCA of Encoder Embeddings')
plt.xlabel('Principal Component 1')
plt.ylabel('Principal Component 2')
plt.grid(True)
plt.show()

print("PCA performed and plot generated.")

## Final Task

### Subtask:
Analyze the PCA visualization and provide a comparative analysis of the sequence embeddings.


## Summary:

### Q&A
The encoder successfully differentiated between "very similar" and "very different" sequences, as evidenced by the PCA visualization. Sequences "aaaabb" and "bbaaab" (which are very similar) are clustered together, and "cbcaccc" and "cccacbc" (also very similar) form another distinct cluster. These two clusters are clearly separated from each other in the 2-component PCA space, indicating that the encoder can distinguish between these two groups of sequences.

### Data Analysis Key Findings
*   Four test sequences (`'aaaabb'`, `'bbaaab'`, `'cbcaccc'`, `'cccacbc'`) were successfully encoded and padded to a consistent length of 7 characters.
*   The model's encoder generated 64-dimensional embeddings for each of the four sequences, resulting in an output shape of (4, 64).
*   Principal Component Analysis (PCA) effectively reduced these 64-dimensional embeddings to 2 principal components, which were then plotted.
*   The PCA plot visually demonstrates that the encoder groups similar sequences: "aaaabb" and "bbaaab" are located close to each other, and "cbcaccc" and "cccacbc" are also close to each other, forming a distinct cluster from the first pair.

### Insights or Next Steps
*   The PCA visualization confirms that the encoder is effective at capturing semantic similarity between sequences, grouping closely related sequences while separating distinct ones.
*   This approach can be utilized to quickly assess the learned representations of new sequences, aiding in tasks like anomaly detection or cluster analysis of sequence data.
