In [2]:
# mount drive https://datascience.stackexchange.com/questions/29480/uploading-images-folder-from-my-system-into-google-colab
# login with your google account and type authorization code to mount on your googlbie drive.
from google.colab import drive
drive.mount('/gdrive')
root = '/gdrive/My Drive/CS492I/project'

Mounted at /gdrive


In [9]:
from easydict import EasyDict
from torchtext.legacy.data import Field
from torchtext.vocab import vocab
import collections
from pathlib import Path
import pandas as pd
import json
import torch.nn as nn
import torch

In [5]:
args = EasyDict()

args.vocab_size = 50000
args.embed_dim = 128
args.hidden_dim = 256
args.batch_size = 8

In [7]:
src_counter = collections.Counter()
trg_counter = collections.Counter()
data_path = Path(root) / 'train.pkl'
train_df = pd.read_pickle(data_path)

for msg in train_df["commit_messsage"]:
  m = json.loads(msg)
  trg_counter.update(m)

for msg in train_df["diff"]:
  m = json.loads(msg)
  src_counter.update(m)

trg_vocab = vocab(trg_counter)
print(len(trg_vocab))
src_vocab = vocab(src_counter, min_freq=10)
print(len(src_vocab))



46401
56350


In [8]:
'''
Reference https://github.com/jiminsun/pointer-generator/blob/master/data/vocab.py
'''
pad_token = '<pad>'
unk_token = '<unk>'
start_decode = '<start>'
stop_decode = '<stop>'


class Vocab(object):
  def __init__(self):
    self._word_to_id = {}
    self._id_to_word = []
    self._count = 0

  @classmethod
  def from_file(cls, filename):
    vocab = cls()
    with open(filename, 'r') as f:
      vocab._word_to_id = json.load(f)
    vocab._id_to_word = [w for w, id_ in sorted(vocab._word_to_id, key=vocab._word_to_id.get, reverse=True)]
    vocab._count = len(vocab._id_to_word)
    return vocab

  @classmethod
  def from_counter(cls, counter, vocab_size, specials, min_freq):
    vocab = cls()
    word_and_freq = sorted(counter.items(), key=lambda tup: tup[0])
    word_and_freq.sort(key=lambda tup: tup[1], reverse=True)

    for w in specials:
      vocab._word_to_id[w] = vocab._count
      vocab.append(w)
      vocab._count += 1

    for word, freq in word_and_freq:
      if freq < min_freq or vocab._count == vocab.size:
        break
      vocab._word_to_id[word] = vocab._count
      vocab._id_to_word.append(word)
      vocab._count += 1
    
    return vocab
  
  def save(self, filename):
    with open(filename, 'w') as f:
      json.dump(self._word_to_id)
  
  def __len__(self):
    return self._count
  
  def unk(self):
    return self._word_to_id.get(unk_token)

  def word2id(self, word):
    unk_id = self._word_to_id.get(word, self.unk())
  
  def id2word(self, word_id):
    if word_id >= self.__len__():
      raise ValueError(f"Id not found in vocab: {word_id}")
  
  def extend(self, oovs):
    return self._id_to_word + list(oovs)
  
  def tokens2ids(self, tokens):
    return [self.word2id(t) for t in tokens]
  
  def tokens2ids_ext(self, tokens):
    ids = []
    oovs = []
    unk_id = self.unk()
    for t in tokens:
      t_id = self.word2id(t)
      if t_id == unk_id:
        if t not in oovs:
          oovs.append(t)
        ids.append(len(self) + oovs.index(t))
    return ids, oovs

In [None]:
class Encoder(nn.Module):
    """
    Single-layer bidirectional LSTM
    B : batch size
    E : embedding size
    H : encoder hidden state dimension
    L : sequence length
    """

    def __init__(self, input_dim, hidden_dim):
      super().__init__()
      self.lstm = nn.LSTM(input_size=input_dim, hidden_size=hidden_dim, num_layers=1, bidirectional=True, batch_first=True)
      self.reduce_h = nn.Linear(hidden_dim * 2, hidden_dim, bias=True)
      self.reduce_c = nn.Linear(hidden_dim * 2, hidden_dim, bias=True)
    
    def forward(self, src, src_lens):
        """
        Args:
            src: source token embeddings    [B x L x E]
            src_lens: source text length    [B]
        Returns:
            enc_hidden: sequence of encoder hidden states                  [B x L x 2H]
            (final_h, final_c): Tuple for decoder state initialization     [B x L x H]
        """

        x = pack_padded_sequence(src, src_lens, batch_first=True, enforce_sorted=False) # Packs a Tensor containing padded sequences of variable length
        output, (h, c) = self.lstm(x) # [B x L x 2H], [2 x B x H], [2 x B x H]
        enc_hidden, _ = pad_packed_sequence(output, batch_first=True)

        # Concatenate bidirectional lstm states
        h = torch.cat((h[0], h[1]), dim=-1)  # [B x 2H]
        c = torch.cat((c[0], c[1]), dim=-1)  # [B x 2H]

        # Project to decoder hidden state size
        final_hidden = torch.relu(self.reduce_h(h))  # [B x H]
        final_cell = torch.relu(self.reduce_c(c))  # [B x H]

        return enc_hidden, (final_hidden, final_cell)


class Attnetion(nn.Module):
  def __init__(self, hidden_dim):
    super().__init__()
    self.v = nn.Linear(hidden_dim * 2, 1, bias=False)                       # v
    self.enc_proj = nn.Linear(hidden_dim * 2, hidden_dim * 2, bias=False)   # W_h
    self.dec_proj = nn.Linear(hidden_dim, hidden_dim * 2, bias=True)        # W_s, b_attn
  

  def forward(self, dec_input, enc_hidden, enc_pad_mask):
    """
    Args:
        dec_input: decoder hidden state             [B x H]
        enc_hidden: encoder hidden states           [B x L x 2H]
        enc_pad_mask: encoder padding masks         [B x L]
    Returns:
        attn_dist: attention dist'n over src tokens [B x L]
    """
    enc_feature = self.enc_proj(enc_hidden)               # [B X L X 2H]
    dec_feature = self.dec_proj(dec_input).unsqueeze(1)   # [B X 1 X 2H]

    scores = torch.v(torch.tanh(enc_feature + dec_feature)).squeeze(-1)  # [B X L]

    # Don't attend over padding; fill '-inf' where enc_pad_mask == True
    if enc_pad_mask is not None:
        scores = scores.float().masked_fill_(
            enc_pad_mask,
            float('-inf')
        ).type_as(scores)  # FP16 support: cast to float and back
    
    attn_dist = F.softmax(scores, dim=-1) # [B X L]

    return attn_dist


class AttentionDecoderLayer(nn.Module):
  def __init__(self, input_dim, hidden_dim, vocab_size):
    super().__init__()
    self.lstm = nn.LSTMCell(input_size=input_dim, hidden_size=hidden_dim)
    self.attention = Attention(hidden_dim)
    self.l1 = nn.Linear(hidden_dim*3, hidden_dim, bias=True)
    self.l2 = nn.Linear(hidden_dim, vocab_size, bias=True)
  
  def forward(self, dec_input, dec_hidden, dec_cell, enc_hidden, enc_pad_mask):
    """
    Args:
        dec_input: decoder input embedding at timestep t    [B x E]
        prev_h: decoder hidden state from prev timestep     [B x H]
        prev_c: decoder cell state from prev timestep       [B x H]
        enc_hidden: encoder hidden states                   [B x L x 2H]
        enc_pad_mask: encoder masks for attn computation    [B x L]
        coverage: coverage vector at timestep t - Eq. (10)  [B x L]
    Returns:
        vocab_dist: predicted vocab dist'n at timestep t    [B x V]
        attn_dist: attention dist'n at timestep t           [B x L]
        context_vec: context vector at timestep t           [B x 2H]
        hidden: hidden state at timestep t                  [B x H]
        cell: cell state at timestep t                      [B x H]
    """
    hidden, cell = self.lstm(dec_input, (dec_hidden, dec_cell))  # [B X H], [B X H]

    attn_dist = self.attention(dec_input, enc_hidden, enc_pad_mask).unsqueeze(1)  # [B X 1 X L]

    context_vec = torch.bmm(attn_dist, enc_hidden).squeeze(1)  # [B X 2H] <- [B X 1 X 2H] = [B X 1 X L] @ [B X L X 2H]
    output = self.l1(torch.cat([hidden, context_vec], dim = -1)) # [B X H]
    vocab_dist = F.softmax(self.l2(output), dim=-1)              # [B X V]
    return vocab_dist, attn_dist, context_vec, hidden, cell


class PointerGenerator(nn.Module):
  def __init__(self, src_vocab, trg_vocab):
    super().__init__()

    embed_dim = args.embed_dim
    self.src_embedding = nn.Embedding(len(src_vocab), embed_dim, padding_idx=self.src_vocab.pad())
    self.trg_embedding = nn.Embedding(len(trg_vocab), embed_dim, padding_idx=self.trg_vocab.pad())


    hideen_dim = args.hidden_dim
    self.encoder = Encoder(input_dim=embed_dim, hidden_dim=hidden_dim)
    self.decoder = AttentionDecoderLayer(input_dim=embed_dim, hidden_dim=hidden_dim, vocab_size=len(trg_vocab))

    self.w_h = nn.Linear(hidden_dim * 2, 1, bias=False)
    self.w_s = nn.Linear(hidden_dim, 1, bias=False)
    self.w_x = nn.Linear(embed_dim, 1, bias=True)


  def forward(self, enc_input, enc_input_ext, enc_pad_mask, enc_len, dec_input, max_oov_len):
    enc_emb = self.src_embedding(enc_input)
    enc_hidden, (h,c) = self.encoder(enc_emb, enc_len)
    
    final_dists = []
    
    dec_emb = self.dec_embedding(dec_input)

    for t in range(self)





