In [1]:
import torch
from torch.utils.data import Dataset
from torch.utils.data.dataloader import DataLoader
import numpy as np

import random
random.seed(42)

from lxmls.transformers.utils import set_seed
from lxmls.transformers.bpe import BPETokenizer
from lxmls.transformers.model import GPT
from lxmls.transformers.trainer import Trainer


In [2]:
import pickle
class WeatherDataset(Dataset):
    
    """Dataset for training an auto regressive transformer on a sequence of weather/actions
    Input (observations): ['clean', 'clean', 'shop', 'walk', 'shop', 'read']
    Input (IDs): [0, 0, 2, 4, 2, 1]
    Output (states): ['sunny', 'rainy', 'rainy', 'sunny', 'snowy', 'sunny']
    Output (IDs): [7, 5, 5, 7, 6, 7]]
    Which we will feed into the transformer concatenated as:
    Input: [0, 0, 2, 4, 2, 1, 7, 5, 5, 7, 6]
    Output: [-1, -1, -1, -1, -1, 7, 5, 5, 7, 6, 7]
    where each observation and state are converted to an index ans -1 indicates "ignore", 
    as the transformer is reading the input sequence but not predicting it.  
    """
    
    def __init__(self, split, seq_len = 6, num_instances=10000, proba = False):
        assert split in {'train', 'test'}
        self.split = split
        self.size = num_instances
        
        # Generate vocabulary
        self.obs, self.states = self.generate_voc()
        
        # Get HMM probabilities for dataset generation
        # We should work with a fixed proba, but there is a functoin for random generation
        if proba:
            self.proba = proba
        else:
            self.generate_random_proba()
                   
        self.length = seq_len
            
    def __len__(self):
        return(self.size)

    def get_block_size(self):
        # the length of the sequence that will feed into transformer, 
        # containing concatenated input and the output, but -1 because
        # the transformer starts making predictions at the last input element
        return self.length*2 -1
    
    def get_vocab_size(self):
        # Our vocabulary is the size of observation + states
        return(len(self.obs) + len(self.states))
    

    def generate_voc(self):
        """Generating vocabulary for the HMM model.
        Should not change that."""
        
        observations = ["walk", "shop", "clean", "tennis", "read"]
        states = ["sunny", "rainy", "snowy"]
        
        # Sort them alphabetically, just to be on the safe side
        observations.sort()
        states.sort()
        
        return(observations, states)
    
    # Dummy functoins for decoding
    def decode_obs(self,obs):
        return([self.obs[i] for i in obs])
    
    # State IDs are offset by number of observations
    def decode_st(self,st):
        ofs = len(self.obs)
        return([self.states[i-ofs] for i in st])
    
    def decode_seq(self,x,y):
               
        return(self.decode_obs(x),self.decode_st(y))
    
    # Dummy function for converting random logits to probabilities
    def logits_to_probs(self,logits):
        logits = np.array(logits)  # Convert the list to a numpy array for efficient calculations
        exp_logits = np.exp(logits)  # Apply the exponential function to each element
        probabilities = exp_logits / np.sum(exp_logits)  # Divide each element by the sum of all elements
        return probabilities.tolist()  # Convert the numpy array back to a Python list
    
    # We should NOT use that. 
    # Mostly for debugging purposes
    # The resulting dataset is almost unlearnable as it's randomly generated
    def generate_random_proba(self):
        
        # Generating a probability distribution for HMM
        self.proba = {}
        
        # Initial probabilities
        self.proba["initial"] = []
        
        # Generate random initial probabilities for each state
        for state in self.states:
            self.proba["initial"].append(random.random())
        
        # Convert to probabilities
        self.proba["initial"] = self.logits_to_probs(self.proba["initial"])
        
        # Transition probabilities
        self.proba["transition"] = []
        
        # Generate transition from state x to any other state 
        for state in self.states:
            c_t_pr = []
            
            # Generate random tr probabilities for all states
            for state in self.states:
                c_t_pr.append(random.random())
            
            # N.B. we do NOT generate "Final" probabilities
            # We will generate a fixed length sequence instead
            # Lazy solution, I know...
            
            
            # Convert to probabilities
            c_t_pr = self.logits_to_probs(c_t_pr)
           
            self.proba["transition"].append(c_t_pr)
            
        # Emission probabilities
        self.proba["emission"] = []
        
        # Generate emission from state x to any observation
        for state in self.states:
            c_e_pr = []
            
            # Generate random em probabilities for all observations
            for obs in self.obs:
                c_e_pr.append(random.random())
                
            c_e_pr = self.logits_to_probs(c_e_pr)
            
            self.proba["emission"].append(c_e_pr)

    # Dummy function for sampling w.r.t probability
    def sample_p(self,p_l):
        items = np.arange(len(p_l))
        sample = np.random.choice(items, p=p_l)
        return sample

    def generate_seq(self):
        
        """Generating a random sequence given probas"""
        
        # Variable initialization
        eos = False
        c_s = 99
        x = []
        y = []
        
        while not eos:
            
            # Start of sequence
            if c_s == 99:
                # Sample from initial
                c_s = self.sample_p(self.proba["initial"])
            
            # Consecutive iterations
            
            # We generate until we get length of self length
            elif len(x) < self.length:
                # Sample from transition of last state
                c_s = self.sample_p(self.proba["transition"][c_s])
                
                # Generate emission
                
                # Note that we append the states as labels and observations as input
                y.append(c_s)
                x.append(self.sample_p(self.proba["emission"][c_s]))
            
            else:
                eos = True
                
        # We get the state ID by offseting their idx by the length of observations
        ofs = len(self.obs)
        y = [i+ofs for i in y]
        return(x,y)
                
            
    def __getitem__(self, idx):
        
        # use rejection sampling to generate an input example from the desired split
        while True:
            
            # Generate observation and its states
            obs, st = self.generate_seq()
            
            # figure out if this generated example is train or test based on its hash
            h = hash(pickle.dumps(obs))
            inp_split = 'test' if h % 4 == 0 else 'train' # designate 25% of examples as test
            if inp_split == self.split:
                break # ok
       
    
        # concatenate the observation and labels
        cat = torch.cat((torch.LongTensor(obs), torch.LongTensor(st)), dim=0)

        # the inputs to the transformer will be the offset sequence
        x = cat[:-1].clone()
        y = cat[1:].clone()
        # we only want to predict at output locations, mask out the loss at the input locations
        y[:self.length-1] = -1
        return x, y
        
            
            

In [3]:
# Fixed probabilities, easier to learn
fixed_proba = {}
fixed_proba["initial"] = [.5,.3,.2]
fixed_proba["transition"] = [
    [.5,.5,0],
    [0,.5,.5],
    [.5,0,.5]
]
fixed_proba["emission"] = [
    [.5,0,.2,0,.3],
    [0,.5,.4,0,.1],
    [0,0,.1,.5,.4]
    
]

In [4]:
# print an example instance of the dataset
train_dataset = WeatherDataset('train',proba=fixed_proba)
test_dataset = WeatherDataset('test',proba=train_dataset.proba)
x, y = train_dataset[0]
print(x.tolist())
print(y.tolist())
print(train_dataset.decode_obs(x.tolist()[:6]))
print(train_dataset.decode_st(y.tolist()[5:]))



[0, 4, 1, 1, 1, 3, 5, 5, 6, 6, 6]
[-1, -1, -1, -1, -1, 5, 5, 6, 6, 6, 7]
['clean', 'walk', 'read', 'read', 'read', 'tennis']
['rainy', 'rainy', 'snowy', 'snowy', 'snowy', 'sunny']


In [5]:
# create a GPT instance

model_config = GPT.get_default_config()
model_config.model_type = 'gpt-nano'
model_config.vocab_size = train_dataset.get_vocab_size()
model_config.block_size = train_dataset.get_block_size()
model = GPT(model_config)

number of parameters: 0.09M


In [6]:
# create a Trainer object

train_config = Trainer.get_default_config()
train_config.learning_rate = 5e-4 # the model we're using is so small that we can go a bit faster
train_config.max_iters = 2000
train_config.num_workers = 0
trainer = Trainer(train_config, model, train_dataset)

running on device cuda


In [7]:
def batch_end_callback(trainer):
    if trainer.iter_num % 100 == 0:
        print(f"iter_dt {trainer.iter_dt * 1000:.2f}ms; iter {trainer.iter_num}: train loss {trainer.loss.item():.5f}")
trainer.set_callback('on_batch_end', batch_end_callback)

trainer.run()

iter_dt 0.00ms; iter 0: train loss 2.14204
iter_dt 20.15ms; iter 100: train loss 0.60738
iter_dt 21.92ms; iter 200: train loss 0.32092
iter_dt 25.92ms; iter 300: train loss 0.31565
iter_dt 22.99ms; iter 400: train loss 0.30403
iter_dt 23.48ms; iter 500: train loss 0.28570
iter_dt 22.92ms; iter 600: train loss 0.28983
iter_dt 22.01ms; iter 700: train loss 0.25744
iter_dt 23.43ms; iter 800: train loss 0.28895
iter_dt 24.02ms; iter 900: train loss 0.27254
iter_dt 28.18ms; iter 1000: train loss 0.30118
iter_dt 26.18ms; iter 1100: train loss 0.26542
iter_dt 26.62ms; iter 1200: train loss 0.30011
iter_dt 26.73ms; iter 1300: train loss 0.30781
iter_dt 24.00ms; iter 1400: train loss 0.26022
iter_dt 28.71ms; iter 1500: train loss 0.25147
iter_dt 21.73ms; iter 1600: train loss 0.32201
iter_dt 25.97ms; iter 1700: train loss 0.28637
iter_dt 23.33ms; iter 1800: train loss 0.26866
iter_dt 23.10ms; iter 1900: train loss 0.27944


In [8]:
# now let's perform some evaluation
model.eval();

In [9]:
def eval_split(trainer, split, max_batches):
    dataset = {'train':train_dataset, 'test':test_dataset}[split]
    n = train_dataset.length # lentgh of sequence needed for splitting predictions
    # We keep per-label result and full sequence result
    results = []
    ex_res = []
    mistakes_printed_already = 0
    corr_printed_already = 0
    loader = DataLoader(dataset, batch_size=100, num_workers=0, drop_last=False)
    for b, (x, y) in enumerate(loader):
        x = x.to(trainer.device)
        y = y.to(trainer.device)
        # isolate the input pattern alone
        inp = x[:, :n]
        sol = y[:, -n:]
        # let the model sample the rest of the sequence
        cat = model.generate(inp, n, do_sample=False) # using greedy argmax, not sampling
        sol_candidate = cat[:, n:] # isolate the filled in sequence
        # compare the predicted sequence to the true sequence
        
        # Get whether the model got the whole sequence correct 
        correct = (sol == sol_candidate).all(1).cpu() 
        
        # Calculate percentage of labels that are correctly predicted
        # We subtract the predicted labels from the gold labels, any non-zero element is an error
        part_correct = ( torch.numel(sol) - torch.count_nonzero(sol - sol_candidate) ) / torch.numel(sol)
        
        # Printing examples of correct and incorrect
        for i in range(x.size(0)):
            ex_res.append(int(correct[i]))
            if not correct[i] and mistakes_printed_already < 3: # only print up to 5 mistakes to get a sense
                mistakes_printed_already += 1
                print("GPT claims that %s is generatd by %s but correct labels are %s" % 
                      (
                          train_dataset.decode_obs(inp[i].tolist()), 
                          train_dataset.decode_st(sol_candidate[i].tolist()), 
                          train_dataset.decode_st(sol[i].tolist())
                      )
                     
                     )
                
            if correct[i] and corr_printed_already < 3: # only print up to 5 mistakes to get a sense
                corr_printed_already += 1
                print("GPT CORRECTLY claims that %s is generatd by %s " % 
                      (
                          train_dataset.decode_obs(inp[i].tolist()), 
                          train_dataset.decode_st(sol_candidate[i].tolist()), 
                      )
                     
                     )
        # Add partially correct stuff to the main variable        
        results.append(part_correct)
        if max_batches is not None and b+1 >= max_batches:
            break
    rt = torch.tensor(results, dtype=torch.float)
    rt_ex = torch.tensor(ex_res, dtype=torch.float)
    print("%s final score: %.2f%% correct labels" % (split, 100*rt.mean()))
    print("%s final score: %d/%d = %.2f%% fully correct sequence labels" % (split, rt_ex.sum(), len(ex_res), 100*rt_ex.mean()))
    return rt.sum()

# run a lot of examples from both train and test through the model and verify the output correctness
with torch.no_grad():
    train_score = eval_split(trainer, 'train', max_batches=50)
    test_score  = eval_split(trainer, 'test',  max_batches=50)

GPT claims that ['clean', 'shop', 'read', 'walk', 'walk', 'shop'] is generatd by ['rainy', 'snowy', 'snowy', 'sunny', 'rainy', 'snowy'] but correct labels are ['rainy', 'snowy', 'snowy', 'sunny', 'sunny', 'rainy']
GPT CORRECTLY claims that ['tennis', 'shop', 'walk', 'read', 'tennis', 'tennis'] is generatd by ['sunny', 'rainy', 'rainy', 'snowy', 'sunny', 'sunny'] 
GPT CORRECTLY claims that ['clean', 'shop', 'walk', 'walk', 'shop', 'tennis'] is generatd by ['rainy', 'snowy', 'sunny', 'rainy', 'snowy', 'sunny'] 
GPT CORRECTLY claims that ['tennis', 'tennis', 'shop', 'clean', 'shop', 'shop'] is generatd by ['sunny', 'sunny', 'rainy', 'rainy', 'snowy', 'snowy'] 
GPT claims that ['clean', 'shop', 'shop', 'shop', 'walk', 'walk'] is generatd by ['rainy', 'snowy', 'snowy', 'snowy', 'sunny', 'sunny'] but correct labels are ['rainy', 'rainy', 'rainy', 'snowy', 'sunny', 'sunny']
GPT claims that ['shop', 'shop', 'walk', 'shop', 'read', 'read'] is generatd by ['snowy', 'snowy', 'sunny', 'rainy', 'sn

In [13]:
# let's run a random given sequence through the model as well
my_obs = ['tennis', 'walk', 'shop', 'clean', 'read', 'tennis']

inp = [train_dataset.obs.index(i) for i in my_obs]
inp = torch.LongTensor([inp]).to(trainer.device)

n = train_dataset.length # naugy direct access shrug

assert inp[0].nelement() == n
with torch.no_grad():
    cat = model.generate(inp, n, do_sample=False)
sol_candidate = cat[:, n:]
print('input sequence  :', train_dataset.decode_obs(inp.tolist()[0]))
print('predicted states:', train_dataset.decode_st(sol_candidate.tolist()[0]))


input sequence  : ['tennis', 'walk', 'shop', 'clean', 'read', 'tennis']
predicted states: ['sunny', 'rainy', 'rainy', 'rainy', 'snowy', 'sunny']
