In [None]:
from google.colab import drive

drive.mount('/content/gdrive')

In [2]:
import os

In [3]:
os.sys.path.append('/content/gdrive/path/to/module_dir')

In [None]:
os.sys.path

In [5]:
import math
import sys
import pickle
import time
import numpy as np

from docopt import docopt
from nltk.translate.bleu_score import corpus_bleu, sentence_bleu, SmoothingFunction
from nmt import Hypothesis, NMT
import numpy as np
from typing import List, Tuple, Dict, Set, Union
from tqdm import tqdm
from utils import read_corpus, batch_iter
from vocab import Vocab, VocabEntry

import torch
import torch.nn.utils
import torch.nn as nn

In [6]:
def evaluate_ppl(model, dev_data, batch_size=32):
    """ Evaluate perplexity on dev sentences
    @param model (NMT): NMT Model
    @param dev_data (list of (src_sent, tgt_sent)): list of tuples containing source and target sentence
    @param batch_size (batch size)
    @returns ppl (perplixty on dev sentences)
    """
    was_training = model.training
    model.eval()
    
    cum_loss = 0.
    cum_tgt_words = 0.
    
    # no_grad() signals backend to throw away all gradients
    with torch.no_grad():
        for src_sents, tgt_sents in batch_iter(dev_data, batch_size):
            loss = -model(src_sents, tgt_sents).sum()
            
            cum_loss += loss.item()
            tgt_word_num_to_predict = sum(len(s[1:]) for s in tgt_sents) # omitting leading '<s>'
            cum_tgt_words += tgt_word_num_to_predict
            
        ppl = np.exp(cum_loss / cum_tgt_words)
        
    if was_training:
        model.train()
        
    return ppl

In [7]:
def compute_corpus_level_bleu_score(references: List[List[str]], hypotheses: List[Hypothesis]) -> float:
    """ Given decoding results and reference sentences, compute corpus-level BLEU score.
    @param references (List[List[str]]): a list of gold-standard reference target sentences
    @param hypotheses (List[Hypothesis]): a list of hypotheses, one for each reference
    @returns bleu_score: corpus-level BLEU score
    """
    if references[0][0] == '<s>':
        references = [ref[1:-1] for ref in references]
    bleu_score = corpus_bleu([[ref] for ref in references],
                             [hyp.value for hyp in hypotheses])
    return bleu_score

In [8]:
def decode(args: Dict[str, str]):
    """ Performs decoding on a test set, and save the best-scoring decoding results.
    If the target gold-standard sentences are given, the function also computes
    corpus-level BLEU score.
    @param args (Dict): args from cmd line
    """
    
    print('load test source sentences from [{}]'.format(args['test_src']), file=sys.stderr)
    test_data_src = read_corpus(args['test_src'], source='src')
    if args['test_tgt']:
        print("load test target sentences from [{}]".format(args['test_tgt']), file=sys.stderr)
        test_data_tgt = read_corpus(args['test_tgt'], source='tgt')
        
    print("load model from {}".format(args['model_path']), file=sys.stderr)
    model = NMT.load(args['model_path'])
    
    if args['cuda']:
        model = model.to(torch.device("cuda"))
    
    hypotheses = beam_search(model, test_data_src,
                             beam_size=int(args['beam_size']),
                             max_decoding_time_step=int(args['max_decoding_time_step']))
    
    if args['test_tgt']:
        top_hypotheses = [hyps[0] for hyps in hypotheses]
        bleu_score = compute_corpus_level_bleu_score(test_data_tgt, top_hypotheses)
        print('Corpus BLEU: {}'.format(bleu_score * 100), file=sys.stderr)
        
    if not os.path.exists(args['output_dir']):
      os.makedirs(args['output_dir'])

    with open(os.path.join(args['output_dir'], args['output_file']), 'w') as f:
        for src_sent, hyps in zip(test_data_src, hypotheses):
            top_hyp = hyps[0]
            hyp_sent = ' '.join(top_hyp.value)
            f.write(hyp_sent + '\n')

In [9]:
def beam_search(model: NMT, test_data_src: List[List[str]], beam_size: int, max_decoding_time_step: int) -> List[List[Hypothesis]]:
    """ Run beam search to construct hypotheses for a list of src-language sentences.
    @param model (NMT): NMT Model
    @param test_data_src (List[List[str]]): List of sentences (words) in source language, from test set.
    @param beam_size (int): beam_size (# of hypotheses to hold for a translation at every step)
    @param max_decoding_time_step (int): maximum sentence length that Beam search can produce
    @returns hypotheses (List[List[Hypothesis]]): List of Hypothesis translations for every source sentence.
    """
    was_training = model.training
    model.eval()
    
    hypotheses = []
    with torch.no_grad():
        for src_sent in tqdm(test_data_src, desc='Decoding', file=sys.stdout):
            example_hyps = model.beam_search(src_sent, beam_size=beam_size, max_decoding_time_step=max_decoding_time_step)
            
            hypotheses.append(example_hyps)
            
    if was_training: model.train(was_training)
        
    return hypotheses

In [10]:
args = dict()

args['train_src'] = "/content/gdrive/path/to/data/train.de-en.de.wmixerprep"
args['train_tgt'] = "/content/gdrive/path/to/data/train.de-en.en.wmixerprep"
args['dev_src'] = "/content/gdrive/path/to/data/valid.de-en.de"
args['dev_tgt'] = "/content/gdrive/path/to/data/valid.de-en.en"
args['test_src'] = '/content/gdrive/path/to/data/test.de-en.de'
args['test_tgt'] = '/content/gdrive/path/to/data/test.de-en.en'
args['vocab'] = "/content/gdrive/path/to/data/vocab.json"
args['model_path'] = "/content/gdrive/path/to/checkpoint/epoch_iteration_model.bin"
args['output_dir'] = '/content/gdrive/path/to/output_dir''

args['output_file'] = 'output.txt'
args['seed'] = 0
args['batch_size'] = 32
args['embed_size'] = 256
args['hidden_size'] = 256
args['clip_grad'] = 5.0
args['log_every'] = 10
args['max_epoch'] = 30
args['patience'] = 5
args['max_num_trial'] = 5
args['lr_decay'] = 0.5
args['beam_size'] = 5
args['lr'] = 0.001
args['uniform_init'] = 0.1
args['save_to'] = '/content/gdrive/path/to/checkpoint'
args['valid_niter'] = 100
args['dropout'] = 0.3
args['max_decoding_time_step'] = 70
args['cuda'] = True

In [11]:
# seed the random number generators
seed = int(args['seed'])
torch.manual_seed(seed)
if args['cuda']:
    torch.cuda.manual_seed(seed)
np.random.seed(seed * 13 // 7)

In [None]:
decode(args)