In [1]:
import torch
from torch.utils.data import Dataset
from torch.utils.data.dataloader import DataLoader
from mingpt.utils import set_seed
set_seed(3407)
import random

In [2]:
import random

def left_pad_with_zeros(target_length, number):
    number_str = str(number)
    if len(number_str) < target_length:
        return '0' * (target_length - len(number_str)) + number_str
    return number_str

def generate_multiplication_steps(digit_length):
    first_number_digits = [random.randint(0, 9) for _ in range(digit_length)]
    second_number_digits = [random.randint(0, 9) for _ in range(digit_length)]

    first_number = int(''.join(str(d) for d in first_number_digits))
    second_number = int(''.join(str(d) for d in second_number_digits))

    if first_number < second_number:
        first_number, second_number = second_number, first_number
        first_number_digits, second_number_digits = second_number_digits, first_number_digits
    
    multiplication_steps = [f"{left_pad_with_zeros(digit_length, second_number)}*{left_pad_with_zeros(digit_length, first_number)}="]
    intermediate_steps = []

    for i in range(digit_length):
        partial_product = first_number_digits[i] * 10**(digit_length - i - 1)
        multiplication_steps.append(f"{left_pad_with_zeros(digit_length, second_number)}*{left_pad_with_zeros(digit_length, partial_product)}+")
        intermediate_steps.append(left_pad_with_zeros(2 * digit_length, partial_product * second_number) + "+")

    multiplication_steps[-1] = multiplication_steps[-1][:-1] + "="
    intermediate_steps[-1] = intermediate_steps[-1][:-1] + "="

    multiplication_steps.extend(intermediate_steps)
    multiplication_steps.append(left_pad_with_zeros(2 * digit_length, first_number * second_number))

    result_expression = ''.join(multiplication_steps)
    return result_expression

def convert_expression_to_tokens(expression):
    tokenized_output = []
    for char in expression:
        if char == '+':
            tokenized_output.append(10)
        elif char == '=':
            tokenized_output.append(11)
        elif char == '*':
            tokenized_output.append(12)
        else:
            tokenized_output.append(int(char))
    return tokenized_output

# 3-digit numbers
equation = generate_multiplication_steps(3)
print(equation)
print(convert_expression_to_tokens(equation))


062*536=062*500+062*030+062*006=031000+001860+000372=033232


[0,
 6,
 2,
 12,
 5,
 3,
 6,
 11,
 0,
 6,
 2,
 12,
 5,
 0,
 0,
 10,
 0,
 6,
 2,
 12,
 0,
 3,
 0,
 10,
 0,
 6,
 2,
 12,
 0,
 0,
 6,
 11,
 0,
 3,
 1,
 0,
 0,
 0,
 10,
 0,
 0,
 1,
 8,
 6,
 0,
 10,
 0,
 0,
 0,
 3,
 7,
 2,
 11,
 0,
 3,
 3,
 2,
 3,
 2]

In [3]:
class MulDataset(Dataset):
    """ 
    Dataset for the Add problem. E.g. for problem length 3:
    12 + 333 = 345
    Input: 0 1 2 3 3 3 -> Output: 0 3 4 5
    Which will feed into the transformer concatenated as:
    input:  0 1 2 3 3 3 0 3 4
    output: I I I I I 0 3 4 5
    where I is "ignore", as the transformer is reading the input sequence
    """

    def __init__(self, split, length=3):
        assert split in {'train', 'test'}
        self.split = split
        self.length = length
    
    def __len__(self):
        return 1000 # ...
    
    def get_vocab_size(self):
        return 13
    
    def get_block_size(self):
        # the length of the sequence that will feed into transformer, 
        # containing concatenated input and the output, but -1 because
        # the transformer starts making predictions at the last input element
        return len(convert_expression_to_tokens(generate_multiplication_steps(self.length))) - 1

    def __getitem__(self, idx):
        while True:
            rai = convert_expression_to_tokens(generate_multiplication_steps(self.length))
            h = hash(str(rai[:1+2*self.length]))
            
            inp_split = 'test' if h % 4 == 0 else 'train' # designate 25% of examples as test
            if inp_split == self.split:
                break # ok
        
        x = torch.tensor(rai[:-1], dtype=torch.long)
        y = torch.tensor(rai[1:], dtype=torch.long)
        
        # we only want to predict at output locations, mask out the loss at the input locations
        y[:2*self.length] = -1
        return x, y

In [4]:
# print an example instance of the dataset
train_dataset = MulDataset('train')
test_dataset = MulDataset('test')
x, y = train_dataset[0]

print (x)
for a, b in zip(x,y):
    print(int(a),int(b))

tensor([ 3,  7,  8, 12,  4,  9,  5, 11,  3,  7,  8, 12,  4,  0,  0, 10,  3,  7,
         8, 12,  0,  9,  0, 10,  3,  7,  8, 12,  0,  0,  5, 11,  1,  5,  1,  2,
         0,  0, 10,  0,  3,  4,  0,  2,  0, 10,  0,  0,  1,  8,  9,  0, 11,  1,
         8,  7,  1,  1])
3 -1
7 -1
8 -1
12 -1
4 -1
9 -1
5 11
11 3
3 7
7 8
8 12
12 4
4 0
0 0
0 10
10 3
3 7
7 8
8 12
12 0
0 9
9 0
0 10
10 3
3 7
7 8
8 12
12 0
0 0
0 5
5 11
11 1
1 5
5 1
1 2
2 0
0 0
0 10
10 0
0 3
3 4
4 0
0 2
2 0
0 10
10 0
0 0
0 1
1 8
8 9
9 0
0 11
11 1
1 8
8 7
7 1
1 1
1 0


In [5]:
from mingpt.model import GPT

model_config = GPT.get_default_config()
model_config.model_type = 'gpt-micro'
# model_config.model_type = 'gpt-nano'

model_config.vocab_size = train_dataset.get_vocab_size()
model_config.block_size = train_dataset.get_block_size()
model = GPT(model_config)

number of parameters: 0.80M


In [6]:
print (model_config.n_head, model_config.n_layer, model_config.n_embd)

4 4 128


In [7]:
# create a Trainer object
from mingpt.trainer import Trainer

train_config = Trainer.get_default_config()
train_config.learning_rate = 5e-4 # the model we're using is so small that we can go a bit faster
train_config.max_iters = 10000
train_config.num_workers = 0
# train_config.batch_size = 32
trainer = Trainer(train_config, model, train_dataset)

running on device cpu


In [11]:
def batch_end_callback(trainer):
    if trainer.iter_num % 100 == 0:
        print(f"iter_dt {trainer.iter_dt * 1000:.2f}ms; iter {trainer.iter_num}: train loss {trainer.loss.item():.5f}")
trainer.set_callback('on_batch_end', batch_end_callback)

trainer.run()

iter_dt 233.30ms; iter 0: train loss 0.03046
iter_dt 241.18ms; iter 100: train loss 0.02446
iter_dt 247.75ms; iter 200: train loss 0.02316
iter_dt 253.72ms; iter 300: train loss 0.02516
iter_dt 242.94ms; iter 400: train loss 0.02731
iter_dt 246.59ms; iter 500: train loss 0.03173
iter_dt 246.02ms; iter 600: train loss 0.02670
iter_dt 238.86ms; iter 700: train loss 0.03691
iter_dt 245.12ms; iter 800: train loss 0.02943
iter_dt 248.42ms; iter 900: train loss 0.02777
iter_dt 246.25ms; iter 1000: train loss 0.03101
iter_dt 241.41ms; iter 1100: train loss 0.02061
iter_dt 269.53ms; iter 1200: train loss 0.02739
iter_dt 259.45ms; iter 1300: train loss 0.02329
iter_dt 248.76ms; iter 1400: train loss 0.02805
iter_dt 228.57ms; iter 1500: train loss 0.02582
iter_dt 240.82ms; iter 1600: train loss 0.02588
iter_dt 246.49ms; iter 1700: train loss 0.02363
iter_dt 247.63ms; iter 1800: train loss 0.02079
iter_dt 243.78ms; iter 1900: train loss 0.02919
iter_dt 250.65ms; iter 2000: train loss 0.02450
iter

In [12]:
# now let's perform some evaluation
model.eval()
None

In [13]:
def eval_add_split(trainer, split, max_batches):
    dataset = {'train':train_dataset, 'test':test_dataset}[split]
    n = train_dataset.length # naugy direct access shrug
    results = []
    mistakes_printed_already = 0
    loader = DataLoader(dataset, batch_size=50, num_workers=0, drop_last=False)
    #loader = DataLoader(dataset, batch_size=1, num_workers=0, drop_last=False)
    for b, (x, y) in enumerate(loader):
        x = x.to(trainer.device)
        y = y.to(trainer.device)

        inp = x[:, :2*n+1]
        sol = y[:, -2*n:]
        
        cat = model.generate(inp, 52, do_sample=False) # using greedy argmax, not sampling
        sol_candidate = cat[:, -2*n:]      
        correct = (sol == sol_candidate).all(1).cpu() 
        for i in range(x.size(0)):
            results.append(int(correct[i]))
    
    rt = torch.tensor(results, dtype=torch.float)
    print("%s final score: %d/%d = %.2f%% correct" % (split, rt.sum(), len(results), 100*rt.mean()))
    return rt.sum()

# run a lot of examples from both train and test through the model and verify the output correctness
with torch.no_grad():
    train_score = eval_add_split(trainer, 'train', max_batches=5)
    test_score  = eval_add_split(trainer, 'test',  max_batches=5)

train final score: 893/1000 = 89.30% correct
test final score: 906/1000 = 90.60% correct
