In [11]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd

# ----------------------------
# 1) Датасет для задачи копирования
# ----------------------------
class CopyDataset(Dataset):
    def __init__(self, seq_len, vocab_size=100, num_samples=2000):
        super().__init__()
        self.seq_len = seq_len
        self.vocab_size = vocab_size
        # Генерируем случайные последовательности из [1, vocab_size)
        self.data = torch.randint(1, vocab_size, (num_samples, seq_len))
    def __len__(self):
        return len(self.data)
    def __getitem__(self, idx):
        x = self.data[idx]
        return x, x  # таргет = тот же самый тензор

class NoisyCopyDataset(CopyDataset):
    def __getitem__(self, idx):
        x, _ = super().__getitem__(idx)
        noise = torch.zeros(1, dtype=x.dtype)  # токен 0 — шум
        x_noisy = torch.cat([noise, x, noise], dim=0)
        return x_noisy[:self.seq_len], x  # input truncated, target — исход
# ----------------------------
# 2) Модели
# ----------------------------
class LSTM(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)
    def forward(self, x):
        emb = self.embed(x)  # (B, L, E)
        out, _ = self.lstm(emb)  # (B, L, H)
        logits = self.fc(out)    # (B, L, V)
        return logits

class Transformer(nn.Module):
    def __init__(self, vocab_size, embed_dim, nhead, num_layers):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        encoder_layer = nn.TransformerEncoderLayer(embed_dim, nhead, dim_feedforward=embed_dim*4)
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers)
        self.fc = nn.Linear(embed_dim, vocab_size)
    def forward(self, x):
        emb = self.embed(x)            # (B, L, E)
        emb = emb.permute(1,0,2)       # (L, B, E) для Transformer
        out = self.encoder(emb)        # (L, B, E)
        out = out.permute(1,0,2)       # (B, L, E)
        logits = self.fc(out)          # (B, L, V)
        return logits

class MambaPlusPlus(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_heads):
        super().__init__()
        assert hidden_dim % num_heads == 0, "hidden_dim must be divisible by num_heads"
        self.embed = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.num_heads = num_heads
        self.head_dim = hidden_dim // num_heads
        
        # Parameter generators for each head
        self.W_a = nn.ModuleList([nn.Linear(embed_dim, self.head_dim) for _ in range(num_heads)])
        self.W_b = nn.ModuleList([nn.Linear(embed_dim, self.head_dim) for _ in range(num_heads)])
        self.C   = nn.ModuleList([nn.Linear(self.head_dim, self.head_dim) for _ in range(num_heads)])
        
        # Projection and FFN
        self.proj = nn.Linear(hidden_dim, hidden_dim)
        self.norm = nn.LayerNorm(hidden_dim)
        self.ffn1 = nn.Linear(hidden_dim, hidden_dim * 4)
        self.ffn2 = nn.Linear(hidden_dim * 4, hidden_dim)
        self.act = nn.GELU()
        self.output_fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, x):
        # x: (B, L)
        emb = self.embed(x)  # (B, L, E)
        B, L, _ = emb.shape
        # Initialize hidden states for each head
        h = [torch.zeros(B, self.head_dim, device=x.device) for _ in range(self.num_heads)]
        head_outputs = []
        
        # SSM update per time step
        for t in range(L):
            head_outs_t = []
            for i in range(self.num_heads):
                a_t = torch.tanh(self.W_a[i](emb[:, t]))  # gating
                b_t = self.W_b[i](emb[:, t])             # input transform
                h[i] = a_t * h[i] + b_t                   # SSM recurrence
                head_out = self.C[i](h[i])                # output projection
                head_outs_t.append(head_out)
            # Concatenate heads
            concat = torch.cat(head_outs_t, dim=-1)       # (B, hidden_dim)
            head_outputs.append(concat.unsqueeze(1))
        
        z = torch.cat(head_outputs, dim=1)  # (B, L, hidden_dim)
        # Residual + Norm
        u = z + self.norm(z)
        # Feed-Forward Network
        ffn_out = self.act(self.ffn1(u))
        o = self.ffn2(ffn_out)
        h_out = u + o  # residual
        # Final output
        logits = self.output_fc(self.proj(h_out))
        return logits

# ----------------------------
# 3) Функция обучения и оценки
# ----------------------------
def train_and_eval(model, train_loader, test_seq_lens, device, epochs=5):
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    criterion = nn.CrossEntropyLoss(ignore_index=0)
    # Тренировка
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for x, y in train_loader:
            x, y = x.to(device), y.to(device)
            logits = model(x)  # (B, L, V)
            loss = criterion(logits.view(-1, logits.size(-1)), y.view(-1))
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch+1}/{epochs}, Loss={total_loss/len(train_loader):.4f}")
    # Оценка точности на разных длинах
    model.eval()
    results = {}
    with torch.no_grad():
        for L in test_seq_lens:
            # Формируем батч фиксированного размера
            x_test = torch.randint(1, train_loader.dataset.vocab_size, (64, L), device=device)
            y_test = x_test
            logits = model(x_test)
            preds = logits.argmax(dim=-1)
            acc = (preds == y_test).float().mean().item()
            results[L] = acc
    return results

# ----------------------------
# 4) Запуск всех моделей
# ----------------------------
def run_experiment():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    vocab_size = 100
    embed_dim = 32
    hidden_dim = 64

    # Датасет длины 50
    train_ds = CopyDataset(seq_len=50, vocab_size=vocab_size, num_samples=2000)
    # train_ds = NoisyCopyDataset(seq_len=50, vocab_size=vocab_size, num_samples=2000)
    train_loader = DataLoader(train_ds, batch_size=64, shuffle=True)

    test_seq_lens = [50, 100, 200, 300, 500, 1000]

    models = {
        "LSTM": LSTM(vocab_size, embed_dim, hidden_dim),
        "Transformer": Transformer(vocab_size, embed_dim, nhead=8, num_layers=8),
        "Mamba++": MambaPlusPlus(vocab_size, embed_dim, hidden_dim, num_heads=2)
    }

    all_results = {}
    for name, model in models.items():
        print(f"\n=== Training {name} ===")
        results = train_and_eval(model, train_loader, test_seq_lens, device, epochs=1)
        all_results[name] = results

    # Вывод таблицы
    df = pd.DataFrame(all_results).T
    df.columns = [f"len={L}" for L in df.columns]
    print("\n=== Final Accuracy ===")
    print(df)
    df.to_csv("copy_task_results.csv")
    print("Saved results to copy_task_results.csv")

if __name__ == "__main__":
    run_experiment()




=== Training LSTM ===
Epoch 1/1, Loss=4.4528

=== Training Transformer ===
Epoch 1/1, Loss=3.6661

=== Training Mamba++ ===
Epoch 1/1, Loss=2.7015

=== Final Accuracy ===
               len=50   len=100   len=200   len=300   len=500  len=1000
LSTM         0.567500  0.581719  0.577422  0.567708  0.567844  0.563953
Transformer  0.844063  0.856562  0.862266  0.866094  0.866156  0.869266
Mamba++      0.994062  0.993906  0.992969  0.992396  0.992188  0.992813
Saved results to copy_task_results.csv
