<a href="https://colab.research.google.com/github/TheRadDani/LSTM_dynamic_quantization/blob/main/LSTM_Dynamic_Quantization.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
from io import open
import time

import torch
import torch.nn as nn
import torch.nn.functional as F

In [None]:
! git clone https://github.com/pytorch/examples.git

Cloning into 'examples'...
remote: Enumerating objects: 4348, done.[K
remote: Counting objects: 100% (65/65), done.[K
remote: Compressing objects: 100% (47/47), done.[K
remote: Total 4348 (delta 17), reused 37 (delta 11), pack-reused 4283 (from 1)[K
Receiving objects: 100% (4348/4348), 41.38 MiB | 13.03 MiB/s, done.
Resolving deltas: 100% (2164/2164), done.


In [None]:
class LSTMModel(nn.Module):
  """
    Container model with an encoder, a recurrent module, and a decoder.
  """
  def __init__(self, ntoken, ninp, nhid, nlayers, dropout=0.5):
      super(LSTMModel, self).__init__()
      self.drop = nn.Dropout(dropout)
      self.encoder = nn.Embedding(ntoken, ninp)
      self.rnn = nn.LSTM(ninp, nhid, nlayers, dropout=dropout)
      self.decoder = nn.Linear(nhid, ntoken)
      self.init_weights
      self.nhid = nhid
      self.nlayers = nlayers

  def init_weights(self):
    initrange = 0.1
    self.encoder.weight.data.uniform_(-initrange, initrange)
    self.decoder.bias.data.zero_()
    self.decoder.weight.data.uniform_(-initrange, initrange)

  def forward(self, input, hidden):
    emb = self.drop(self.encoder(input))
    output, hidden = self.rnn(emb, hidden)
    output = self.drop(output)
    decoded = self.decoder(output)
    return decoded, hidden

  def init_hidden(self, bsz):
    weight = next(self.parameters())
    return (weight.new_zeros(self.nlayers, bsz, self.nhid),
            weight.new_zeros(self.nlayers, bsz, self.nhid))

In [None]:
class Dictionary(object):
  def __init__(self):
    self.word2idx = {}
    self.idx2word = []

  def add_word(self, word):
    if word not in self.word2idx:
      self.idx2word.append(word)
      self.word2idx[word] = len(self.idx2word) - 1
    return self.word2idx[word]

  def __len__(self):
    return len(self.idx2word)

class Corpus(object):
  def __init__(self, path):
    self.dictionary = Dictionary()
    self.train = self.tokenize(os.path.join(path, 'train.txt'))
    self.valid = self.tokenize(os.path.join(path, 'valid.txt'))
    self.test = self.tokenize(os.path.join(path, 'test.txt'))

  def tokenize(self, path):
    """Tokenizes a text file."""
    assert os.path.exists(path)
    # Add words to the dictionary
    with open(path, 'r', encoding="utf8") as f:
      for line in f:
        words = line.split() + ['<eos>']
        for word in words:
          self.dictionary.add_word(word)

    # Tokenize file content
    with open(path, 'r', encoding="utf8") as f:
      idss = []
      for line in f:
        words = line.split() + ['<eos>']
        ids = []
        for word in words:
          ids.append(self.dictionary.word2idx[word])
        idss.append(torch.tensor(ids).type(torch.int64))
      ids = torch.cat(idss)

    return ids

model_data_filepath = "/content/examples/word_language_model/data/"
corpus = Corpus(model_data_filepath + 'wikitext-2')

In [None]:
ntokens = len(corpus.dictionary)

model = LSTMModel(
    ntoken = ntokens,
    ninp = 512,
    nhid = 256,
    nlayers = 2,
)

'''model.load_state_dict(torch.load(model_data_filepath +
                                 'word_language_model_quantize.pth'),
                      map_location='cpu',
                    weights_only=True)
model.eval()
print(model)'''

"model.load_state_dict(torch.load(model_data_filepath +\n                                 'word_language_model_quantize.pth'),\n                      map_location='cpu',\n                    weights_only=True)\nmodel.eval()\nprint(model)"

In [None]:
input_ = torch.randint(ntokens, (1, 1), dtype=torch.long)
hidden = model.init_hidden(1)
temperature = 1.0
num_words = 1000

with open(model_data_filepath + 'generated.txt', 'w') as outf:
  with torch.no_grad():
    for i in range(num_words):
      output, hidden = model(input_, hidden)
      word_weights = output.squeeze().div(temperature).exp().cpu()
      word_idx = torch.multinomial(word_weights, 1)[0]
      input_.fill_(word_idx)

      word = corpus.dictionary.idx2word[word_idx]

      outf.write(word + ('\n' if i % 20 == 19 else ' '))

      if i % 100 == 0:
        print('| Generated {}/{} words'.format(i, num_words))

with open(model_data_filepath + 'generated.txt', 'r') as outf:
  all_output = outf.read()
  print(all_output)

| Generated 0/1000 words
| Generated 100/1000 words
| Generated 200/1000 words
| Generated 300/1000 words
| Generated 400/1000 words
| Generated 500/1000 words
| Generated 600/1000 words
| Generated 700/1000 words
| Generated 800/1000 words
| Generated 900/1000 words
motorway List Eligius Such undesirable Creed Ambon Russians cougars Suns avaktavyaḥ natives Peaking Buchanan operating hyper takeoff intriguing Pipe binding
metaphysical traffic Twin duty nayas shotguns avoiding Soundscan help Chaytor prospective Toirdelbach Milhouse lighthouse 1231 vs. Rica recognised Breeding Milford
seagrass damn Tengu Widow ridge Alvin prefecture Lil Publications radiation nicknames documents reveals comes minor strength analysis projecting rainy Ushant
shape Ultimately resemble Tynan soccer Shrubs Township unregulated Murchada hindering breakage intervened crusader Swan paths ruinous IR 350 Phil sacred
rations Dion nm centering Mexico all forsaken cults 1970 Throne Carlyle clinically monitored VHS Mih

In [None]:
bptt = 25
criterion = nn.CrossEntropyLoss()
eval_batch_size = 1

# Create test dataset
def batchify(data, bsz):
  nbatch = data.size(0) // bsz
  data = data.narrow(0, 0, nbatch * bsz)
  data = data.view(bsz, -1).t().contiguous()
  return data

test_data = batchify(corpus.test, eval_batch_size)

def get_batch(source, i):
  seq_len = min(bptt, len(source) - 1 - i)
  data = source[i:i+seq_len]
  traget = source[i+1:i+1+seq_len].view(-1)
  return data, traget

def repackage_hidden(h):
  """
    Wraps hidden states in new Tensors, to detach them from their history
  """
  if isinstance(h, torch.Tensor):
    return h.detach()
  else:
    return tuple(repackage_hidden(v) for v in h)

def evaluate(model, data_source):
  # Disables dropout
  model.eval()
  total_loss = 0.
  hidden = model.init_hidden(eval_batch_size)
  with torch.no_grad():
    for i in range(0, data_source.size(0) - 1, bptt):
      data, targets = get_batch(data_source, i)
      output, hidden = model(data, hidden)
      hidden = repackage_hidden(hidden)
      output_flat = output.view(-1, ntokens)
      total_loss += len(data) * criterion(output_flat, targets).item()
  return total_loss / (len(data_source) - 1)

In [None]:
import torch.quantization

quantized_model = torch.quantization.quantize_dynamic(
    model, {nn.LSTM, nn.Linear}, dtype=torch.qint8
)
print(quantized_model)

LSTMModel(
  (drop): Dropout(p=0.5, inplace=False)
  (encoder): Embedding(33278, 512)
  (rnn): DynamicQuantizedLSTM(512, 256, num_layers=2, dropout=0.5)
  (decoder): DynamicQuantizedLinear(in_features=256, out_features=33278, dtype=torch.qint8, qscheme=torch.per_tensor_affine)
)


In [None]:
def print_size_of_model(model):
  torch.save(model.state_dict(), "temp.p")
  print('Size (MB):', os.path.getsize("temp.p")/1e6)
  os.remove('temp.p')

print_size_of_model(model)
print_size_of_model(quantized_model)

Size (MB): 107.625672
Size (MB): 78.137404


In [None]:
# quantized models run single threaded
torch.set_num_threads(1)

def time_model_evaluation(model, test_data):
  eval_start_time = time.time()
  loss = evaluate(model, test_data)
  eval_end_time = time.time()
  eval_time = eval_end_time - eval_start_time
  print('''loss: {0:.3f}\nelapsed time (seconds): {1:.1f}'''.format(loss, eval_time))

time_model_evaluation(model, test_data)
time_model_evaluation(quantized_model, test_data)

loss: 10.418
elapsed time (seconds): 123.7
loss: 10.418
elapsed time (seconds): 69.4
