In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import os

In [3]:
# os.chdir("/content")
# if os.path.exists("598_016_hw5"):
#     !rm -rf 598_016_hw5
# !git clone https://github.com/JiaMing991203/598_016_hw5.git
# os.chdir("598_016_hw5")

In [4]:
import torch
import torch.nn.functional as F
import random
import numpy as np
import torch.nn as nn
from torch.nn import functional as F
from torch.utils.data import DataLoader, TensorDataset, Dataset
from mamba_model import MambaTwo, MambaConfig
from atten_model import BaseNet, AttnRope
from hybrid_model import HybridA, HybridB

We are giving you a 2 layer transformer model in induction task. Check the data generation mechanism for the induction head and adjust it accordingly to your needs. For AR, you need to code one from scratch.

In [5]:
class InductionAR(Dataset):
    # In induction head we have ngram = 1. But the code provided is for general ngram setting. While using this, initialize ngram = 1.
    """ Naive associative recall dataset """
    def __init__(self, num_examples, tokenizer, n_gram=1, n_ctx = 1024,
                 seed = 0, train_split=0.8, tau: int = 1, ):
        self.num_examples = num_examples
        self.tokenizer = tokenizer
        self.n_ctx = n_ctx
        self.seed = seed
        self.n_gram = n_gram
        self.tau = tau
        x, y = self.data_gen()
        if train_split:
            self.train_x, self.train_y, self.test_x, self.test_y = self.split(x, y, train_split)
            self.train = self.numpy_to_tensor_dataset(self.train_x, self.train_y)
            self.test = self.numpy_to_tensor_dataset(self.test_x, self.test_y)
        else:
            self.train_x, self.train_y, self.test_x, self.test_y = x, y, None, None
            self.train = self.numpy_to_tensor_dataset(self.train_x, self.train_y)
            self.test = None
    def get_str_dataset(self, split="train"):
        if split == "train":
            x_str = [self.tokenizer.decode(xi) for xi in self.train_x]
            y_str = [self.tokenizer.decode([yi]) for yi in self.train_y]
        elif split == "test":
            x_str = [self.tokenizer.decode(xi) for xi in self.test_x]
            y_str = [self.tokenizer.decode([yi]) for yi in self.test_y]
        else:
            raise ValueError("split should be either 'train' or 'test'")
        return x_str, y_str
    def numpy_to_tensor_dataset(self, x, y):
        x = torch.tensor(x, dtype=torch.long)
        y = torch.tensor(y, dtype=torch.long)
        return TensorDataset(x, y)
    def gen_single_example(self):
        # get the vocab size
        def count(str_x, str_n_gram_head):
            counts = sum([
                str_x.startswith(str_n_gram_head, i) for i in range(len(str_x))
            ])
            return counts
        def gen_x():
            gen_x_success = False
            while not gen_x_success:
                x = np.random.choice(vocab, self.n_ctx-self.n_gram*2, replace=True).tolist()
                # remove the case where the n_gram_head is repeated in the sequence
                for _ in range(10):
                    pos = [i for i in range(len(x)-len(n_gram_head)+1) if x[i:i+len(n_gram_head)] == n_gram_head]
                    if len(pos) == 0:
                        gen_x_success = True
                        break
                    else:
                        # remove the n_gram_head from x
                        # get all positions of the n_gram_head in x
                        for p in reversed(pos):
                            # remove len(n_gram_head) elements from x starting from p
                            x = x[:p] + x[p+len(n_gram_head):]
                        # fill the rest of the sequence with random elements
                        x.extend(np.random.choice(vocab, self.n_ctx-self.n_gram*2-len(x), replace=True).tolist())
                x_test = " ".join([str(xi) for xi in x])
                if count(x_test, str_n_gram_head) == 0:
                    gen_x_success = True

            x_test = x + n_gram_head
            # check if there's only one n_gram_head in the sequence
            # to avoid the case where the n_gram_head has
            # repeated structure such as x= [1, 2, 3, 1] , n_gram_head = [1, 1]
            str_x_test = " "+" ".join([str(xi) for xi in x_test])+ " "
            if count(str_x_test, str_n_gram_head) > 1:
                print("Error in gen_x")
                print(f"str_x_test: {str_x_test}", f"str_n_gram_head: {str_n_gram_head}",
                      "count: ", count(str_x_test, str_n_gram_head))
            if count(str_x_test, str_n_gram_head) == 1:
                return x
            else:
                return None
        def insert_n_gram_head(x):
            pos = random.randint(0, len(x)-self.tau)
            y = x[pos + self.tau - 1]
            x_new = x[:pos] + n_gram_head + x[pos:] + n_gram_head
            str_x_new = " "+" ".join([str(xi) for xi in x_new])+" "

            if count(str_x_new, str_n_gram_head) == 2:
                return x_new, y
            else:
                return None, None
        vocab_size = len(self.tokenizer)
        vocab = list(range(vocab_size))
        # set a deterministic n_gram_head
        n_gram_head = list(range(self.n_gram))

        str_n_gram_head = " "+" ".join([str(xi) for xi in n_gram_head])+" "
        assert self.n_gram*2 < self.n_ctx, "n_gram*2 should be less than n_ctx"
        success = False
        while not success:
            x = gen_x()
            if x is not None:
                for _ in range(10):
                    x_new, y = insert_n_gram_head(x)
                    if x_new is not None:
                        success = True
                        break
        return x_new, y

    def data_gen(self):
        x = []
        y = []
        # get previous random status and recover after generating the dataset
        random_status = random.getstate()
        random.seed(self.seed)
        for i in range(self.num_examples):
            if i % 1000 == 0:
                print(f"Generating example {i}")
            xi, yi = self.gen_single_example()
            x.append(xi)
            y.append(yi)
        x = np.array(x)
        y = np.array(y)
        random.setstate(random_status)
        return x, y
    def split(self, x, y, train_ratio = 0.8):
        num_train = int(len(x)*train_ratio)
        train_x = x[:num_train]
        train_y = y[:num_train]
        test_x = x[num_train:]
        test_y = y[num_train:]
        return train_x, train_y, test_x, test_y


In [6]:

class Random_tokenizer:
    def __init__(self, vocab=None, vocab_size = None) -> None:
        """ The init function of the tokenizer class.
         one of vocab or vocab_size should be provided.
         If vocab is provided, vocab_size will be ignored.
         If vocab is not provided, vocab_size should be provided. we will generate a random vocab of vocab_size."""
        if vocab is not None:
            self.vocab = vocab
            self.vocab_size = len(vocab)
        elif vocab_size is not None:
            self.vocab_size = vocab_size
            self.vocab = [str(i) for i in range(vocab_size)]
        else:
            raise ValueError("one of vocab or vocab_size should be provided.")
        self.vocab_dict = {v: i for i, v in  enumerate(self.vocab)}
        self.vocab_dict_inv = {i: v for i, v in enumerate(self.vocab)}
    def encode(self, x):
        """ Encode a string into a list of integers """
        return [self.vocab_dict[i] for i in x]
    def decode(self, x):
        """ Decode a list of integers into a string """
        return ' '.join([self.vocab_dict_inv[i] for i in x])
    def __len__(self):
        return self.vocab_size
    def __getitem__(self, i):
        return self.vocab[i]
    def __iter__(self):
        return iter(self.vocab)
    def __contains__(self, x):
        return x in self.vocab
    def __repr__(self):
        return f"Random_tokenizer(vocab_size={self.vocab_size})"
    def __str__(self):
        return f"Random_tokenizer(vocab_size={self.vocab_size})"
    def __call__(self, x):
        return self.encode(x)


In [7]:
# self attention block
class Block(nn.Module):
    def __init__(self, embed_dim, max_len=11):
        super(Block, self).__init__()
        self.embed_dim = embed_dim
        self.max_len = max_len
        self.c_attn = nn.Linear(embed_dim, embed_dim*3)
        self.register_buffer('mask', torch.tril(torch.ones(max_len, max_len)))
    def forward(self, x):
        T = x.size(1)
        q, k, v = self.c_attn(x).chunk(3, dim=-1)
        y = torch.nn.functional.scaled_dot_product_attention(q, k, v, is_causal=True)
        return y

In [13]:

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using {device} device")
seed = 0
n_ctx = 16 # training sequence length
num_examples = 100 # generate 100000 examples
batch_size = 16 # batch size
vocab_size = 16 # vocabulary size
num_epochs = 100    # number of epochs
attn_layers = 2 # number of attention layers
embed_dim = 8 # embedding dimension
is_pe = True  # the default positional embedding we are using is the learned positional embedding


tokenizer = Random_tokenizer(vocab_size=vocab_size)
dataset = InductionAR(num_examples, tokenizer, 1, n_ctx=n_ctx, seed=seed, train_split=0.8, tau=3)

Using cpu device
Generating example 0


In [14]:
dataset

<__main__.InductionAR at 0x105db3190>

In [15]:
mamba_config = MambaConfig(d_model = embed_dim)

# mamba_config.d_state = vocab_size

In [16]:

train_loader = DataLoader(dataset.train, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset.test, batch_size=batch_size, shuffle=True)

attn_model = BaseNet(len(tokenizer), embed_dim, is_pe,  max_len=n_ctx*4, attn_layers=attn_layers, block=Block).to(device)
mamba_model = MambaTwo(mamba_config, vocab_size)

rope_model = AttnRope(len(tokenizer), embed_dim, max_len=n_ctx*4, attn_layers=2, context_len=n_ctx).to(device)

config_hybridA = MambaConfig(d_model=embed_dim, n_layers=1)
hybridA = HybridA(vocab_size=len(tokenizer), config=config_hybridA, max_len=n_ctx*4)

config_hybridB = MambaConfig(d_model=embed_dim, n_layers=1)
hybridB = HybridB(vocab_size=len(tokenizer), config=config_hybridB, max_len=n_ctx*4)


models = [attn_model, hybridB] # add more models here when you have more models
for model in models:
    model.train()
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
    for epoch in range(num_epochs):
        total_loss = 0
        for x, y in train_loader:
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad()
            y_pred = model(x)[:,-1]
            loss = criterion(y_pred, y)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"epoch {epoch} loss: {total_loss/len(train_loader)}")
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for x, y in test_loader:
            x, y = x.to(device), y.to(device)
            y_pred = model(x)[:,-1]
            y_pred = F.softmax(y_pred, dim=-1)
            y_pred = torch.argmax(y_pred, dim=-1)
            correct += (y_pred == y).sum().item()
            total += y.size(0)
    print(f"Test accuracy: {correct/total}")



BaseNet with 2 layers of <class '__main__.Block'> blocks
Embedding dimension: 8
Positional Encoding: True
Vocabulary size: 16
Context length: 64
epoch 0 loss: 2.8771236896514893
epoch 1 loss: 2.874017286300659
epoch 2 loss: 2.8712769985198974
epoch 3 loss: 2.8691292285919188
epoch 4 loss: 2.8662591934204102
epoch 5 loss: 2.8639618396759032
epoch 6 loss: 2.861272859573364
epoch 7 loss: 2.8588863372802735
epoch 8 loss: 2.856516647338867
epoch 9 loss: 2.8538352489471435
epoch 10 loss: 2.851624870300293
epoch 11 loss: 2.849233865737915
epoch 12 loss: 2.84688663482666
epoch 13 loss: 2.844722270965576
epoch 14 loss: 2.8425242424011232
epoch 15 loss: 2.840014410018921
epoch 16 loss: 2.8378385066986085
epoch 17 loss: 2.835593509674072
epoch 18 loss: 2.833574104309082
epoch 19 loss: 2.83134388923645
epoch 20 loss: 2.829225206375122
epoch 21 loss: 2.8270747661590576
epoch 22 loss: 2.8249590396881104
epoch 23 loss: 2.8229525566101072
epoch 24 loss: 2.8208667755126955
epoch 25 loss: 2.818697690963

## Attention model

In [18]:
# generate test data with length 32
# test the model with the test data
num_examples = 20000
n_ctx = 32

dataset = InductionAR(num_examples, tokenizer, 1, n_ctx=n_ctx, seed=seed, train_split=0.01)
train_loader = DataLoader(dataset.train, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset.test, batch_size=batch_size, shuffle=True)

attn_model.eval()
correct = 0
total = 0
with torch.no_grad():
    for x, y in test_loader:
        x,y = x.to(device), y.to(device)
        y_pred = attn_model(x)[:,-1]
        y_pred = F.softmax(y_pred, dim=-1)
        y_pred = torch.argmax(y_pred, dim=-1)
        correct += (y_pred == y).sum().item()
        total += y.size(0)
    print(f"Test accuracy: {correct/total}")

# generate test data with length 16
# test the model with the test data
num_examples = 20000
n_ctx = 16

dataset = InductionAR(num_examples, tokenizer, 1, n_ctx=n_ctx, seed=seed, train_split=0.01)
train_loader = DataLoader(dataset.train, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset.test, batch_size=batch_size, shuffle=True)

attn_model.eval()
correct = 0
total = 0
with torch.no_grad():
    for x, y in test_loader:
        x,y = x.to(device), y.to(device)
        y_pred = attn_model(x)[:,-1]
        y_pred = F.softmax(y_pred, dim=-1)
        y_pred = torch.argmax(y_pred, dim=-1)
        correct += (y_pred == y).sum().item()
        total += y.size(0)
    print(f"Test accuracy: {correct/total}")

# generate test data with length 128
# test the model with the test data
num_examples = 20000
n_ctx = 64

dataset = InductionAR(num_examples, tokenizer, 1, n_ctx=n_ctx, seed=seed, train_split=0.01)
train_loader = DataLoader(dataset.train, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset.test, batch_size=batch_size, shuffle=True)

attn_model.eval()
correct = 0
total = 0
with torch.no_grad():
    for x, y in test_loader:
        x,y = x.to(device), y.to(device)
        y_pred = attn_model(x)[:,-1]
        y_pred = F.softmax(y_pred, dim=-1)
        y_pred = torch.argmax(y_pred, dim=-1)
        correct += (y_pred == y).sum().item()
        total += y.size(0)
    print(f"Test accuracy: {correct/total}")


# generate test data with length 256
# test the model with the test data
# num_examples = 20000
# n_ctx = 256

# dataset = InductionAR(num_examples, tokenizer, 1, n_ctx=n_ctx, seed=seed, train_split=0.01)
# train_loader = DataLoader(dataset.train, batch_size=batch_size, shuffle=True)
# test_loader = DataLoader(dataset.test, batch_size=batch_size, shuffle=True)

# attn_model.eval()
# correct = 0
# total = 0
# with torch.no_grad():
#     for x, y in test_loader:
#         x,y = x.to(device), y.to(device)
#         y_pred = attn_model(x)[:,-1]
#         y_pred = F.softmax(y_pred, dim=-1)
#         y_pred = torch.argmax(y_pred, dim=-1)
#         correct += (y_pred == y).sum().item()
#         total += y.size(0)
#     print(f"Test accuracy: {correct/total}")



Generating example 0
Generating example 1000
Generating example 2000
Generating example 3000
Generating example 4000
Generating example 5000
Generating example 6000
Generating example 7000
Generating example 8000
Generating example 9000
Generating example 10000
Generating example 11000
Generating example 12000
Generating example 13000
Generating example 14000
Generating example 15000
Generating example 16000
Generating example 17000
Generating example 18000
Generating example 19000
Test accuracy: 0.06404040404040404
Generating example 0
Generating example 1000
Generating example 2000
Generating example 3000
Generating example 4000
Generating example 5000
Generating example 6000
Generating example 7000
Generating example 8000
Generating example 9000
Generating example 10000
Generating example 11000
Generating example 12000
Generating example 13000
Generating example 14000
Generating example 15000
Generating example 16000
Generating example 17000
Generating example 18000
Generating examp

In [None]:
# generate test data with length 32
# test the model with the test data
num_examples = 20000
n_ctx = 32

dataset = InductionAR(num_examples, tokenizer, 1, n_ctx=n_ctx, seed=seed, train_split=0.01)
train_loader = DataLoader(dataset.train, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset.test, batch_size=batch_size, shuffle=True)

attn_model.eval()
correct = 0
total = 0
with torch.no_grad():
    for x, y in test_loader:
        x,y = x.to(device), y.to(device)
        y_pred = attn_model(x)[:,-1]
        y_pred = F.softmax(y_pred, dim=-1)
        y_pred = torch.argmax(y_pred, dim=-1)
        correct += (y_pred == y).sum().item()
        total += y.size(0)
    print(f"Test accuracy: {correct/total}")

# generate test data with length 16
# test the model with the test data
num_examples = 20000
n_ctx = 16

dataset = InductionAR(num_examples, tokenizer, 1, n_ctx=n_ctx, seed=seed, train_split=0.01)
train_loader = DataLoader(dataset.train, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset.test, batch_size=batch_size, shuffle=True)

attn_model.eval()
correct = 0
total = 0
with torch.no_grad():
    for x, y in test_loader:
        x,y = x.to(device), y.to(device)
        y_pred = attn_model(x)[:,-1]
        y_pred = F.softmax(y_pred, dim=-1)
        y_pred = torch.argmax(y_pred, dim=-1)
        correct += (y_pred == y).sum().item()
        total += y.size(0)
    print(f"Test accuracy: {correct/total}")

# generate test data with length 128
# test the model with the test data
num_examples = 20000
n_ctx = 128

dataset = InductionAR(num_examples, tokenizer, 1, n_ctx=n_ctx, seed=seed, train_split=0.01)
train_loader = DataLoader(dataset.train, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset.test, batch_size=batch_size, shuffle=True)

attn_model.eval()
correct = 0
total = 0
with torch.no_grad():
    for x, y in test_loader:
        x,y = x.to(device), y.to(device)
        y_pred = attn_model(x)[:,-1]
        y_pred = F.softmax(y_pred, dim=-1)
        y_pred = torch.argmax(y_pred, dim=-1)
        correct += (y_pred == y).sum().item()
        total += y.size(0)
    print(f"Test accuracy: {correct/total}")


# generate test data with length 256
# test the model with the test data
num_examples = 20000
n_ctx = 256

dataset = InductionAR(num_examples, tokenizer, 1, n_ctx=n_ctx, seed=seed, train_split=0.01)
train_loader = DataLoader(dataset.train, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset.test, batch_size=batch_size, shuffle=True)

attn_model.eval()
correct = 0
total = 0
with torch.no_grad():
    for x, y in test_loader:
        x,y = x.to(device), y.to(device)
        y_pred = attn_model(x)[:,-1]
        y_pred = F.softmax(y_pred, dim=-1)
        y_pred = torch.argmax(y_pred, dim=-1)
        correct += (y_pred == y).sum().item()
        total += y.size(0)
    print(f"Test accuracy: {correct/total}")



In [21]:
from atten_model import get_rotary_matrix
get_rotary_matrix(40, 10, 8).size()
get_rotary_matrix(40, 10, 8)[10]

tensor([[1., 0., 0., 0., 0., 0., 0., 0.],
        [0., 1., 0., 0., 0., 0., 0., 0.],
        [0., 0., 1., 0., 0., 0., 0., 0.],
        [0., 0., 0., 1., 0., 0., 0., 0.],
        [0., 0., 0., 0., 1., 0., 0., 0.],
        [0., 0., 0., 0., 0., 1., 0., 0.],
        [0., 0., 0., 0., 0., 0., 1., 0.],
        [0., 0., 0., 0., 0., 0., 0., 1.]])