In [1]:
import torch 
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class text:
    def __init__(self, text_name, is_file):
        
        # Retrieve text
        self.text = ''
        self.encoded = ''
        if is_file == True:
            print("reading text from file")
            self.from_file(text_name)
        else:
            self.from_string(text_name)
        
        # Calculate vocab size, i.e. the number of characters; 
        # first get sorted list of unique characters
        self.chars = sorted(list(set(self.text)))
        #self.vocab_size = self.vocab_size(chars)
        self.vocab_size = len(self.chars)
        self.str_to_int = {}
        self.int_to_str = {}
        self.str_to_int = self.make_str_to_int_table(self.chars)
        self.int_to_str = self.make_int_to_str_table(self.chars)
    
    def from_file(self, filename):
        'Read text from file and calculate vocab size'
        self.text = open(filename,'r',encoding='utf-8').read()
    
    def from_string(self, string):
        'Read text from string and calculate vocab size'
        self.text = string
    
    #def calc_vocab_size(self, chars):
    #    'Calculate vocab size'
    #    self.vocab_size = len(chars)
        
    def make_str_to_int_table(self, chars):
        '''Populate dictionary of character to integer mapping'''
        return  {character: int for int, character in enumerate(chars)}
    
    def make_int_to_str_table(self, chars):
        '''Populate dictionary of character to integer mapping'''
        return  {int: character for int, character in enumerate(chars)}
    
    def encode_text_as_tensor(self):
        '''encode the training text as a list of integers 
        and then convert to tensor with which to replace self.text'''
        encode = lambda char:[self.str_to_int[char] for char in self.text]
        self.text = torch.tensor(encode(self.text), dtype = torch.long)
    
    def encode_new_text_as_tensor(self, to_encode):
        '''encode a new text as a list of integers, according to the 
        encoding derived from the training text. Return a tensor'''
        encode = lambda char:[self.str_to_int[char] for char in to_encode]
        return torch.tensor(encode(to_encode), dtype = torch.long)

    def decode(self, to_decode):
        '''decode from a list of integers to a string, using the
        encoding vocabulary attached to the object: self.int_to_str '''  
        decode = lambda l: ''.join([self.int_to_str[i] for i in l])
        return decode(to_decode)
        
    def __str__(self):
        return f"{self.vocab_size}"
    
    def make_train_val_test(self, fraction_train, fraction_val, fraction_test):
        '''simple train /validation sets.  no randomisation 
         of selections, so assuming  no bias in the distribution within the data file'''
        if fraction_train + fraction_test + fraction_val != 1:
            print("Warning, fractions of train, test and validation do \
                  not add to one.")
        n = int(fraction_train*len(self.text))
        nv = int(fraction_val*len(self.text))
        nt = int(fraction_test*len(self.text))
        self.train_data = self.text[:n]
        self.val_data = self.text[n:n+nv]
        self.test_data = self.text[n+nv:n+nv+nt]
    
    def get_batch(self, batch_size, block_size, train_test_validation):
        """Randomly pick data from the training data/test data/validation
        and return as a batch stacked in a torch tensor."""
        if train_test_validation == "train":
            data = self.train_data
        elif train_test_validation == "test":
            data = self.test_data
        elif train_test_validation == "validation":
            data = self.val_data
        else:
            raise \
            ValueError("Specify data set as 'train', 'test', or 'validation'.")

        if len(data) < block_size:
            raise ValueError("Data is smaller than the specified block size.")

        ix = torch.randint(len(data) - block_size, (batch_size,))
        train_context_batch = torch.stack([data[i:i + block_size] for i in ix])
        train_to_predict_batch =\
              torch.stack([data[i + 1: i + block_size + 1] for i in ix])
        return train_context_batch, train_to_predict_batch

@torch.no_grad()
def estimate_loss(number_eval_batches, 
                  language_model, training_text_object, block_size):
  '''Take number_eval_batches from training and validation sets and 
  calculate an average loss for each. Return a dictionary with the
  two losses, with keys train and validation '''
  out = {}
  language_model.eval()
  for split in ['train', 'validation']:
    losses = torch.zeros(number_eval_batches)
    for k in range(number_eval_batches):
      X, Y = training_text_object.get_batch(1, block_size, split)
      logits, loss = language_model(X, Y)
      losses[k] = loss.item()
    out[split] = losses.mean()
  language_model.train()
  return out

In [2]:
# LSTM  
#
class LanguageModel_LSTM(nn.Module):
  '''logits produced from an LSTM over the context (length block_size) 
   of the previous characters, 
    from which we are trying to predict the current character; 
  generate() uses logits as a multivariate distribution from which 
    to predict next character. 
  
   Training learns the values in the embedding vector, the weights of 
    RNN and the weights of the fully connected layer prior to the
     softmax '''

  def __init__(self, vocab_size, hidden_size, num_layers, verbose=False):
    super().__init__()
    self.verbose=verbose
    self.token_embedding_table = nn.Embedding(vocab_size, vocab_size)
    # make all embeddings small so the logits produced give a
    # probability distribution is equal in all direction.
    # maximum initial entropy should give the most likely low cross entropy, 
    # since initial guess is not likely to be better than equal uncertainty.
    with torch.no_grad():
      self.token_embedding_table.weight.data \
      = self.token_embedding_table.weight.data * 0.01
    self.input_size = vocab_size
    self.hidden_size = hidden_size
    self.num_layers = num_layers
    self.lstm = nn.LSTM(self.input_size, self.hidden_size, self.num_layers,\
                       batch_first=True) # (h0, c0) default to zero.
    self.fully_connected = nn.Sequential(
                       nn.Linear(self.hidden_size, self.hidden_size),
                       nn.ReLU(),
                       nn.Linear(self.hidden_size, vocab_size)
                       )

    
  def set_verbose(self, verbose):
    self.verbose = verbose

  def forward(self, idx, targets=None):
    # idx - (index of) the x values, i.e. the context vector
    # idx and targets are both (B, T) tensor of integers (see comment below)
    #h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
    # default h0 for RNN model  is to set everything to zeros
    logits = self.token_embedding_table(idx) # (B, T, C)
    # nn.RNN produces batch_size, sequ, hidden_size,
    out, _ = self.lstm(logits)




    #logits = self.fully_connected(logits)
    logits = self.fully_connected(out)
    # B T C is batch by time by channel
    # batch  is number of the batch
    # time is block size
    # channel is the vocab size 
    if targets == None:
      loss = None
    else:
      B, T, C = logits.shape # need to reshape to be correct shape for cross_entropy
      # use view in pytorch that changes the view of the data passed but not the
      # underlying data. Flatten the first two dimensions of the logits, 
      # to create
      # a batch of size B*T with the channel data,
      # i.e. vocab or probability of each character/class as the second
      # dimension, which is what F.cross_entropy expects.
      # Similarly, targets reduced to 1 dimension of batch data (B*T) 
      # with class lable in each dimension. 
      if self.verbose: print(f"Targets: \n {targets} \n; logits: \n{logits} ")
      logits = logits.view(B*T, C)
      targets = targets.view(B*T)
      if self.verbose: print(f"Transformed view: Targets: \n {targets} \n; logits:\n{logits}\n")
      # cross_entropy includes a softmax transformation. 
      if self.verbose:
        probs = F.softmax(logits, dim=1)
        idx_next = torch.multinomial(probs, num_samples = 1) 
        print(f"Generative outcomes during training:\n {idx_next}")

      loss = F.cross_entropy(logits, targets)
    return logits, loss
  
  def generate(self, idx, max_new_tokens):
    '''Predict the next character/token from the learnt distribution,adding
    it to the current context, idx, till max_new_tokens have been added.'''
    for _ in range(max_new_tokens):
      #get the predictions
      logits, _ = self(idx) # calls forwards() for this class
      
      # Following line: 
      # h_out from the last character of each sequence in the batch.
      logits = logits[:, -1, :] 
      # logits now has dimension B, C, since only one token.
      
      # Generate probability distribution
      probs = F.softmax(logits, dim=-1) # B, C
      if self.verbose:
          print(f'logits {logits}, {logits.shape} \n probs {probs}, {probs.shape}')
      
      # Sample  vocabulary according to the probability distribution, probs.
      idx_next = torch.multinomial(probs, num_samples = 1) # B by 1 size
      # Append sample to sequence
      idx = torch.cat((idx, idx_next), dim = 1)
    return idx

In [5]:
from pathlib import Path

# 1. Create models directory 
MODEL_PATH = Path("models")
MODEL_PATH.mkdir(parents=True, exist_ok=True)

# 2. Create model save path 
MODEL_NAME = "shakespeare_LSTM.pt"
MODEL_SAVE_PATH = MODEL_PATH / MODEL_NAME

# loading saved models.
# Instantiate a fresh instance of model
shakespeare = text("input.txt", is_file=True)
shakespeare.encode_text_as_tensor()
vocab_size = 65
hidden_size = shakespeare.vocab_size*2
input_size = shakespeare.vocab_size
num_layers = 3
shakespeare_LSTM = LanguageModel_LSTM(input_size, \
                                hidden_size, num_layers, verbose=False)


# Load model state dict 
shakespeare_LSTM.load_state_dict(torch.load(MODEL_SAVE_PATH))

reading text from file


<All keys matched successfully>

In [7]:
# current model seems to be still decreasing in loss, albeit very slowly
context = ' '
context = shakespeare.encode_new_text_as_tensor(context)
print(context)
context = torch.unsqueeze(context, dim=0)
print(context)
shakespeare_LSTM.set_verbose(False)
new_string = shakespeare_LSTM.generate(context,300)
print(new_string)
print(shakespeare.decode(new_string[0].tolist()))

tensor([1])
tensor([[1]])
tensor([[ 1, 50, 53, 52, 45,  1, 58, 46, 39, 58,  1, 61, 47, 57, 46,  0, 32, 46,
         39, 58,  1, 40, 43, 43, 52,  6,  1, 21,  1, 46, 39, 42,  0, 47, 57,  1,
         53,  5,  1, 46, 43, 10,  1, 47, 44,  1, 58, 46, 43, 43,  1, 58, 46, 53,
         59,  1, 39, 52, 42,  1, 57, 53,  1, 57, 39, 51, 43,  1, 39, 50, 50,  1,
         58, 53,  1, 57, 53, 51, 43,  6,  0, 25, 39, 49, 43, 57,  1, 39,  1, 57,
         58, 61, 39, 52, 42,  1, 58, 46, 43, 51,  1, 39, 52, 42,  1, 46, 47, 57,
          1, 41, 53, 59, 56, 57, 43,  7, 46, 53, 59, 57, 43,  6,  0, 13, 52, 42,
          1, 61, 43,  1, 46, 53, 56, 52,  6,  1, 39,  1, 51, 47, 52, 63,  1, 53,
         54, 43, 52,  1, 47, 52, 55, 59, 47, 56, 43, 42,  0, 32, 53,  1, 46, 47,
         57,  1, 58, 53, 45, 43, 58, 46, 43, 56,  1, 46, 47, 57,  1, 44, 43, 50,
         50,  5, 42,  1, 58, 46, 43,  1, 28, 56, 53, 42, 47, 58,  1, 39, 41, 41,
         53, 59, 52, 58,  1, 57, 39, 47, 42,  8,  0, 19, 53,  1, 51, 43,  1, 21,
  