## Tokenizer

In [4]:
import torch
from torch.utils.data import Dataset
from torch.utils.data.dataloader import DataLoader
from mingpt.utils import set_seed
set_seed(3407)
import random

In [2]:
StT = {
    "0": 0, "1": 1, "2": 2, "3": 3, "4": 4, "5": 5, "6": 6, "7": 7, "8": 8, "9": 9,
    "NumBeg": 10, "NumEnd": 11, "*": 12, "+": 13, "=": 14, "ThinkBeg": 15, "ThinkEnd": 16, "Eos": 17
}

TtS = {v: k for k, v in StT.items()}

def tokenize(s):
    out = []
    num = False
    past = ""
    for c in s:
        chars = past + c
        if chars in StT:
            tok = StT[chars]
            if tok < 10:
                if not num:
                    out.append(StT["NumBeg"])
                    num = True
                out.append(tok)
            else:
                if num:
                    out.append(StT["NumEnd"])
                    num = False
                out.append(tok)
            past = ""
        else:
            past += c

    return out + [StT["Eos"]]

def detokenize(toks):
    out = []
    num = False
    for tok in toks:
        if tok == 10 or tok == 11 or (tok >= 15 and tok <= 17) :
            continue
        out.append(TtS[tok])

    return "".join(out)

example = "123+456*6="
print(tokenize(example))
print(detokenize(tokenize(example)))


[10, 1, 2, 3, 11, 13, 10, 4, 5, 6, 11, 12, 10, 6, 11, 14, 17]
123+456*6=


## Dataset

In [7]:
def step_add(length):
    a = [random.randint(0,9) for i in range(length)]
    b = [random.randint(0,9) for i in range(length)]
    # a = [9, 9, 9]
    # b = [9, 9, 9]

    val_a = int(''.join(str(d) for d in a))
    val_b = int(''.join(str(d) for d in b))
    if val_a < val_b:
        val_a, val_b = val_b, val_a
        a, b = b, a
    
    string = f"{val_a}+{val_b}=ThinkBeg"

    steps_eq = []
    for i in range(length):
        a_i = a[i] * 10**(length-i-1)
        b_i = b[i] * 10**(length-i-1)
        # a_i = str(a[i]) 
        string += f"{a_i}+{b_i}+"
        steps_eq.append(str(a_i+b_i)+"+")

    string = string[:-1] + "=" + "".join(steps_eq)[:-1] + "=" + "ThinkEnd" + str(val_a+val_b)

    return string
    # print(out)

eq = step_add(3)
print(eq)
tokenize(eq)
len(tokenize(eq))

644+314=ThinkBeg600+300+40+10+4+4=900+50+8=ThinkEnd958


64

In [10]:
class AddDataset(Dataset):
    """ 
    Dataset for the Add problem. E.g. for problem length 3:
    12 + 333 = 345
    Input: 0 1 2 3 3 3 -> Output: 0 3 4 5
    Which will feed into the transformer concatenated as:
    input:  0 1 2 3 3 3 0 3 4
    output: I I I I I 0 3 4 5
    where I is "ignore", as the transformer is reading the input sequence
    """

    def __init__(self, split, length=3):
        assert split in {'train', 'test'}
        self.split = split
        self.length = length
    
    def __len__(self):
        return 100000 # ...
    
    def get_vocab_size(self):
        return len(StT)
    
    def get_block_size(self):
        # the length of the sequence that will feed into transformer, 
        # containing concatenated input and the output, but -1 because
        # the transformer starts making predictions at the last input element
        return 79

    def __getitem__(self, idx):
        while True:
            rai = tokenize(step_add(self.length))
            h = hash(str(rai[:1+2*(self.length+2)]))
            
            inp_split = 'test' if h % 4 == 0 else 'train' # designate 25% of examples as test
            if inp_split == self.split:
                break # ok
        
        if len(rai) < self.get_block_size():
            rai += [StT["Eos"]] * (self.get_block_size() - len(rai))
        
        x = torch.tensor(rai[:-1], dtype=torch.long)
        y = torch.tensor(rai[1:], dtype=torch.long)
        
        # we only want to predict at output locations, mask out the loss at the input locations
        y[:2*(self.length+2)+2] = -1
        return x, y

In [11]:
# print an example instance of the dataset
train_dataset = AddDataset('train')
test_dataset = AddDataset('test')
x, y = train_dataset[0]

print (x)
for a, b in zip(x,y):
    print(int(a),int(b))

tensor([10,  3,  5,  0, 11, 13, 10,  1,  4,  8, 11, 14, 15, 10,  3,  0,  0, 11,
        13, 10,  1,  0,  0, 11, 13, 10,  5,  0, 11, 13, 10,  4,  0, 11, 13, 10,
         0, 11, 13, 10,  8, 11, 14, 10,  4,  0,  0, 11, 13, 10,  9,  0, 11, 13,
        10,  8, 11, 14, 16, 10,  4,  9,  8, 17, 17, 17, 17, 17, 17, 17, 17, 17,
        17, 17, 17, 17, 17, 17])
10 -1
3 -1
5 -1
0 -1
11 -1
13 -1
10 -1
1 -1
4 -1
8 -1
11 -1
14 -1
15 10
10 3
3 0
0 0
0 11
11 13
13 10
10 1
1 0
0 0
0 11
11 13
13 10
10 5
5 0
0 11
11 13
13 10
10 4
4 0
0 11
11 13
13 10
10 0
0 11
11 13
13 10
10 8
8 11
11 14
14 10
10 4
4 0
0 0
0 11
11 13
13 10
10 9
9 0
0 11
11 13
13 10
10 8
8 11
11 14
14 16
16 10
10 4
4 9
9 8
8 17
17 17
17 17
17 17
17 17
17 17
17 17
17 17
17 17
17 17
17 17
17 17
17 17
17 17
17 17
17 17


## Model

In [12]:
from mingpt.model import GPT

model_config = GPT.get_default_config()
model_config.model_type = 'gpt-mini'
# model_config.model_type = 'gpt-nano'

model_config.vocab_size = train_dataset.get_vocab_size()
model_config.block_size = train_dataset.get_block_size()
model = GPT(model_config)

number of parameters: 2.69M


In [13]:
print (model_config.n_head, model_config.n_layer, model_config.n_embd)

6 6 192


In [16]:
# create a Trainer object
from mingpt.trainer import Trainer

train_config = Trainer.get_default_config()
train_config.learning_rate = 1e-6 # the model we're using is so small that we can go a bit faster
train_config.max_iters = 5000
train_config.num_workers = 0
# train_config.batch_size = 32
trainer = Trainer(train_config, model, train_dataset)

running on device cuda


In [17]:
def batch_end_callback(trainer):
    if trainer.iter_num % 100 == 0:
        print(f"iter_dt {trainer.iter_dt * 1000:.2f}ms; iter {trainer.iter_num}: train loss {trainer.loss.item():.5f}")
trainer.set_callback('on_batch_end', batch_end_callback)

trainer.run()

iter_dt 0.00ms; iter 0: train loss 0.00416
iter_dt 152.21ms; iter 100: train loss 0.00406
iter_dt 152.10ms; iter 200: train loss 0.00057
iter_dt 153.51ms; iter 300: train loss 0.00048
iter_dt 158.42ms; iter 400: train loss 0.00147
iter_dt 158.42ms; iter 500: train loss 0.00184
iter_dt 161.61ms; iter 600: train loss 0.00060
iter_dt 162.24ms; iter 700: train loss 0.00104
iter_dt 163.34ms; iter 800: train loss 0.00141
iter_dt 157.79ms; iter 900: train loss 0.00240


KeyboardInterrupt: 

## Evaluation

In [18]:
# now let's perform some evaluation
model.eval()
None

In [95]:
class EvalAddDataset(Dataset):
    def __init__(self, split, length=3):
        assert split in {'train', 'test'}
        self.split = split
        self.length = length
    
    def __len__(self):
        return 10000 # ...
    
    def get_vocab_size(self):
        return len(StT)
    
    def get_block_size(self):
        # the length of the sequence that will feed into transformer, 
        # containing concatenated input and the output, but -1 because
        # the transformer starts making predictions at the last input element
        return 79

    def __getitem__(self, idx):
        while True:
            rai = tokenize(step_add(self.length))
            h = hash(str(rai[:1+2*(self.length+2)]))
            
            inp_split = 'test' if h % 4 == 0 else 'train' # designate 25% of examples as test
            if inp_split == self.split:
                break # ok
        
        x = torch.tensor(rai[:-1], dtype=torch.long)[:rai.index(StT["="])+1]
        y = torch.tensor(rai[1:], dtype=torch.long)[rai.index(StT["ThinkEnd"])+1:-1]
        # if len(x) < 12:
        #     x = torch.cat((x, torch.tensor([StT["Eos"]]*(12-len(x)), dtype=torch.long)))

        # if len(y) < len(x):
        #     y = torch.cat((y, torch.tensor([StT["Eos"]]*(len(x)-len(y)), dtype=torch.long)))
        
        # we only want to predict at output locations, mask out the loss at the input locations
        return x, y
    
eval_train_dataset = EvalAddDataset('train')
eval_test_dataset = EvalAddDataset('test')
x, y = eval_train_dataset[0]

print (x)
print (y)
for a, b in zip(x,y):
    print(int(a),int(b))

tensor([10,  6,  0,  9, 11, 13, 10,  3,  2,  9, 11, 14])
tensor([9, 3, 8])
10 9
6 3
0 8


In [105]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


input_tensor, _ = eval_test_dataset[0]
input_tensor = input_tensor.unsqueeze(0).to(device)
out = model.generate(input_tensor, 120, do_sample=True)
print(out)
print(detokenize(input_tensor[0].cpu().numpy()))
print(detokenize(out[0].cpu().numpy()))

tensor([[10,  9,  0,  2, 11, 13, 10,  1,  4,  3, 11, 14,  1,  0,  0, 11, 13, 10,
          0, 11, 13, 10,  0, 11, 13, 10,  0, 11, 13, 10,  4,  0, 11, 13, 10,  2,
         11, 14, 10,  1,  0,  0, 11, 13, 10,  0, 11, 13, 10,  4, 11, 14, 16, 10,
          4,  0,  4, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17,
         17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17,
         17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17,
         17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17,
         17, 17, 17, 17, 17, 17]], device='cuda:0')
902+143=
902+143=100+0+0+0+40+2=100+0+4=404


In [107]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def evaluate_model_accuracy(model, eval_dataset):
    model.eval()
    correct = 0
    total = 0

    model.to(device)

    with torch.no_grad():
        for i in range(len(eval_dataset)):
            x, y = eval_dataset[i]
            x = x.unsqueeze(0).to(device)
            y = y.unsqueeze(0).to(device)

            output = model.generate(x, 79, do_sample=False)[0].cpu().numpy()
            y = y[0].cpu().numpy()
            idx2 = output.tolist().index(StT["Eos"])
            idx = output.tolist().index(StT["="], idx2-7)
            predicted = output[idx+3:idx2]
            # print(predicted)
            # print(y)

            # Compare predicted and actual values
            try:
                correct += 1 if (y == predicted)[0] else 0
            except ValueError:
                pass
            total += 1

            if total % 100 == 0:
                print(f"Accuracy: {correct}/{total} = {correct/total:.2f}")

    accuracy = correct / total
    return accuracy

accuracy = evaluate_model_accuracy(model, eval_test_dataset)
print(f"Model accuracy: {accuracy * 100:.2f}%")

Accuracy: 57/100 = 0.57
Accuracy: 112/200 = 0.56
Accuracy: 181/300 = 0.60
Accuracy: 247/400 = 0.62


KeyboardInterrupt: 

In [108]:
torch.save(model.state_dict(), '../weights/el_add.pth')