Skip to content

Commit

Permalink
Merge remote-tracking branch 'zeynepakkalyoncu/master' into dogfood
Browse files Browse the repository at this point in the history
  • Loading branch information
achyudh committed Dec 18, 2018
2 parents 188f2e7 + f3cd021 commit 9dc3ba4
Show file tree
Hide file tree
Showing 17 changed files with 236 additions and 64 deletions.
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@ An ensemble of the neural machine translation model from from Sequence to Sequen
- `python -m lib.model --gpu <gpu_no> --dataset <lang_pair> --batch-size <batch_size>`
* To run the single node TinySeq2Seq model on a CPU, issue the following command from the project root directory:
- `python -m lib.model --cpu [--ensemble] --dataset <lang_pair> --batch-size <batch_size>`
* To run the TinySeq2Seq ensemble on multiple nodes, issue the following command from the project root directory: (WIP)
- `spark-submit --driver-memory 1G -m lib/model/__main__.py --cpu [--ensemble] --dataset <lang_pair> --batch-size <batch_size>`
* To run the TinySeq2Seq ensemble on multiple nodes:
* Generate the egg file by running - must run after every change in the code:
`python setup.py bdist_egg`
* Issue the following command from the project root directory: (WIP)
- `spark-submit --driver-memory 1G -m lib/model/__main__.py --cpu [--ensemble] --dataset <lang_pair> --batch-size <batch_size> --recurrent-unit gru`

Note: Beam search is used by default during testing. Add the flag `--beam-size 0` to use greedy search.

## References

Expand Down
Binary file removed dist/tardis-0.0.1-py3.6.egg
Binary file not shown.
Binary file added dist/tardis-0.1-py3.6.egg
Binary file not shown.
2 changes: 1 addition & 1 deletion lib/data/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def __getitem__(self, idx):
batch_x_encoder = self.x_encoder[idx * self.batch_size:(idx + 1) * self.batch_size]
batch_x_decoder = self.x_decoder[idx * self.batch_size:(idx + 1) * self.batch_size]
raw_y = self.y[idx * self.batch_size:(idx + 1) * self.batch_size]
batch_y = np.zeros((raw_y.shape[0], raw_y.shape[1], self.target_vocab_size), dtype=np.float64)
batch_y = np.zeros((raw_y.shape[0], raw_y.shape[1], self.target_vocab_size), dtype=np.int64)

for i in range(raw_y.shape[0]):
for j in range(raw_y.shape[1]):
Expand Down
9 changes: 4 additions & 5 deletions lib/data/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ def build_indices(source_data, target_data, source_vocab, target_vocab, one_hot)
target_vocab_size = len(target_vocab)
num_instances = len(source_data)

encoder_input_data = np.zeros((num_instances, max_source_len), dtype=np.float64)
decoder_input_data = np.zeros((num_instances, max_target_len), dtype=np.float64)
encoder_input_data = np.zeros((num_instances, max_source_len), dtype=np.int64)
decoder_input_data = np.zeros((num_instances, max_target_len), dtype=np.int64)
if one_hot:
decoder_target_data = np.zeros((num_instances, max_target_len, target_vocab_size), dtype=np.float64)
decoder_target_data = np.zeros((num_instances, max_target_len, target_vocab_size), dtype=np.int64)
else:
decoder_target_data = np.zeros((num_instances, max_target_len), dtype=np.float64)
decoder_target_data = np.zeros((num_instances, max_target_len), dtype=np.int64)

# Convert words to ids
for i, (source_sent, target_sent) in tqdm(enumerate(zip(source_data, target_data)), total=len(source_data)):
Expand All @@ -84,7 +84,6 @@ def build_indices(source_data, target_data, source_vocab, target_vocab, one_hot)
decoder_target_data[i, j - 1] = target_vocab[word]
return encoder_input_data, decoder_input_data, decoder_target_data


def trim_sentences(sentences):
trimmed_sentences = list()
for sentence in sentences:
Expand Down
69 changes: 55 additions & 14 deletions lib/model/__main__.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,46 @@
import os
import socket
from copy import deepcopy
import multiprocessing

import time

from keras.backend.tensorflow_backend import set_session
import tensorflow as tf

from elephas.spark_model import SparkModel
from elephas.utils.rdd_utils import to_simple_rdd

from contextlib import contextmanager
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

from keras.callbacks import ModelCheckpoint

from pyspark import SparkContext, SparkConf
import numpy as np

from lib.data import fetch
from lib.data.generator import WMTSequence
from lib.model.util import embedding_matrix
from lib.model.util import embedding_matrix, lr_scheduler
from lib.model import metrics
from lib.model.args import get_args
from lib.model.seq2seq import Seq2Seq, TinySeq2Seq
from lib.model.seq2seq import Seq2Seq
from lib.model.ensemble import Ensemble

if __name__ == '__main__':
# Select GPU based on args
args = get_args()
root_dir = os.getcwd()

# Set GPU usage
if not args.cpu:
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.log_device_placement = True
sess = tf.Session(config=config)

set_session(sess)

os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = args.devices

Expand Down Expand Up @@ -91,16 +116,32 @@
training_generator = WMTSequence(encoder_train_input, decoder_train_input, decoder_train_target, model_config)
validation_generator = WMTSequence(encoder_dev_input, decoder_dev_input, decoder_dev_target, model_config)

if args.cpu:
model = TinySeq2Seq(args)
else:
model = Seq2Seq(model_config)

model = Seq2Seq(model_config)

if args.ensemble:
# TODO: increase number of workers and set master
conf = SparkConf().setAppName('Tardis').set('spark.executor.instances', '1')
sc = SparkContext(conf=conf).addFile(path=os.path.join(root_dir, 'dist', 'tardis-0.0.1-py3.6.egg'))
model = SparkModel(model, frequency='epoch') # Distributed ensemble
conf = SparkConf().setAppName('Tardis').setMaster('local[*]').set('spark.executor.instances', '4') #.set('spark.driver.allowMultipleContexts', 'true')
# sc = SparkContext.getOrCreate(conf=conf)
sc = SparkContext(conf=conf)

model = SparkModel(model.model, frequency='epoch', mode='asynchronous') # Distributed ensemble

# train_pairs = [(x, y) for x, y in zip([encoder_train_input, decoder_train_input], decoder_train_target)]
# train_rdd = sc.parallelize(train_pairs, model_config.num_workers)

train_rdd = to_simple_rdd(sc, [encoder_train_input, decoder_train_input], decoder_train_target)

model.train_generator(training_generator, validation_generator)
model.evaluate(encoder_test_input, decoder_test_input, raw_test_target)
# test_pairs = [(x, y) for x, y in zip([encoder_test_input, decoder_test_input], raw_test_target)]
# test_rdd = sc.parallelize(test_pairs, model_config.num_workers)

# TODO: fix - multiple context!
model.fit(train_rdd,
batch_size=model_config.batch_size,
epochs=model_config.epochs,
validation_split=0.20,
verbose=1)

sc.stop()

else:
model.train_generator(training_generator, validation_generator)
model.evaluate(encoder_test_input, decoder_test_input, raw_test_target)
7 changes: 5 additions & 2 deletions lib/model/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ def get_args():
parser = ArgumentParser(description="Seq2Seq models for Neural Machine Translation (NMT)")
parser.add_argument('--cpu', action='store_true')
parser.add_argument('--devices', type=str, default='0,1')
parser.add_argument('--ensemble', type=bool, default=False)
parser.add_argument('--ensemble', action='store_true')
parser.add_argument('--epochs', type=int, default=7)
parser.add_argument('--batch-size', type=int, default=32)
parser.add_argument('--hidden-dim', type=int, default=1000)
parser.add_argument('--num-layers', type=int, default=2)
parser.add_argument('--recurrent-unit', type=str, default='lstm')
parser.add_argument('--num-encoder-layers', type=int, default=2)
parser.add_argument('--num-decoder-layers', type=int, default=2)
parser.add_argument('--dataset-size', type=int, default=0)
parser.add_argument('--source-vocab-size', type=int, default=10000)
parser.add_argument('--target-vocab-size', type=int, default=10000)
Expand All @@ -28,6 +30,7 @@ def get_args():
parser.add_argument('--dataset-path', help='dataset directory', default=os.path.join(root_dir, 'data', 'datasets'))
parser.add_argument('--word-vectors-file', help='word vectors filename', default='GoogleNews-vectors-negative300.txt')
parser.add_argument('--weight-decay', type=float, default=0)
parser.add_argument('--num-workers', type=int, default=1)

args = parser.parse_args()
return args
13 changes: 13 additions & 0 deletions lib/model/ensemble.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from keras.layers import Average
from keras.models import Model

class Ensemble:
def __init__(self, models):
self.models = models

# TODO: add other methods
def avg_voting(self, input):
outputs = [model.outputs[0] for model in self.models]
target = Average()(outputs)
model = Model(input, target, name='avg_ensemble')
return model
120 changes: 86 additions & 34 deletions lib/model/seq2seq.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,90 @@
import os

import numpy as np
import tensorflow as tf
import keras.backend as K
from keras.initializers import RandomUniform
from keras.layers import Input, LSTM, Embedding, Dense
from keras.layers import Input, LSTM, GRU, Embedding, Dense
from keras.models import Model
from keras.losses import categorical_crossentropy
from keras.optimizers import Adam
from keras.callbacks import ModelCheckpoint

from lib.model.metrics import bleu_score
from lib.model.util import lr_scheduler


class Seq2Seq:
config = None
model = None

def __init__(self, config):
self.config = config
devices = list('/gpu:' + x for x in config.devices)

if self.config.cpu:
devices = list('/cpu:' + str(x) for x in (0, 0))
if not self.config.cpu:
devices = list('/gpu:' + x for x in config.devices)

# Encoder
with tf.device(devices[0]):
initial_weights = RandomUniform(minval=-0.08, maxval=0.08, seed=config.seed)
encoder_inputs = Input(shape=(None, ))
encoder_embedding = Embedding(config.source_vocab_size, config.embedding_dim,
weights=[config.source_embedding_map], trainable=False)
encoder_embedded = encoder_embedding(encoder_inputs)
encoder = LSTM(config.hidden_dim, return_state=True, return_sequences=True, recurrent_initializer=initial_weights)(encoder_embedded)
for i in range(1, config.num_layers):
encoder = LSTM(config.hidden_dim, return_state=True, return_sequences=True)(encoder)
_, state_h, state_c = encoder
encoder_states = [state_h, state_c]

encoder_states = self.encode(encoder_inputs, recurrent_unit=self.config.recurrent_unit)
# Decoder
with tf.device(devices[1]):
decoder_inputs = Input(shape=(None, ))
decoder_embedding = Embedding(config.target_vocab_size, config.embedding_dim,
weights=[config.target_embedding_map], trainable=False)
decoder_embedded = decoder_embedding(decoder_inputs)
decoder = LSTM(config.hidden_dim, return_state=True, return_sequences=True)(decoder_embedded, initial_state=encoder_states)
for i in range(1, config.num_layers):
decoder = LSTM(config.hidden_dim, return_state=True, return_sequences=True)(decoder) # Use the final encoder state as context
decoder_outputs, _, _ = decoder
decoder_dense = Dense(config.target_vocab_size, activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs)
decoder_outputs = self.decode(decoder_inputs, encoder_states, recurrent_unit=self.config.recurrent_unit)

# Input: Source and target sentence, Output: Predicted translation
self.model = Model([encoder_inputs, decoder_inputs], decoder_outputs)

optimizer = Adam(lr=config.lr, clipnorm=25.)
optimizer = Adam(lr=self.config.lr, clipnorm=25.)
self.model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['acc'])

print(self.model.summary())

# def __getstate__(self):
# return self.__dict__.copy()
#
# def __setstate__(self, state):
# self.__dict__.update(state)

def encode(self, encoder_inputs, recurrent_unit='lstm'):
initial_weights = RandomUniform(minval=-0.08, maxval=0.08, seed=self.config.seed)
encoder_embedding = Embedding(self.config.source_vocab_size, self.config.embedding_dim,
weights=[self.config.source_embedding_map], trainable=False)
encoder_embedded = encoder_embedding(encoder_inputs)
if recurrent_unit == 'lstm':
encoder = LSTM(self.config.hidden_dim, return_state=True, return_sequences=True, recurrent_initializer=initial_weights)(encoder_embedded)
for i in range(1, self.config.num_encoder_layers):
encoder = LSTM(self.config.hidden_dim, return_state=True, return_sequences=True)(encoder)
_, state_h, state_c = encoder
return [state_h, state_c]
else: # GRU
encoder = GRU(self.config.hidden_dim, return_state=True, return_sequences=True, recurrent_initializer=initial_weights)(encoder_embedded)
for i in range(1, self.config.num_encoder_layers):
encoder = GRU(self.config.hidden_dim, return_state=True, return_sequences=True)(encoder)
_, state_h = encoder
return [state_h]

def decode(self, decoder_inputs, encoder_states, recurrent_unit='lstm'):
decoder_embedding = Embedding(self.config.target_vocab_size, self.config.embedding_dim,
weights=[self.config.target_embedding_map], trainable=False)
decoder_embedded = decoder_embedding(decoder_inputs)
if recurrent_unit == 'lstm':
decoder = LSTM(self.config.hidden_dim, return_state=True, return_sequences=True)(decoder_embedded, initial_state=encoder_states) # Accepts concatenated encoder states as input
for i in range(1, self.config.num_decoder_layers):
decoder = LSTM(self.config.hidden_dim, return_state=True, return_sequences=True)(decoder) # Use the final encoder state as context
decoder_outputs, decoder_states = decoder[0], decoder[1:]
else: # GRU
decoder = GRU(self.config.hidden_dim, return_state=True, return_sequences=True)(decoder_embedded, initial_state=encoder_states) # Accepts concatenated encoder states as input
for i in range(1, self.config.num_decoder_layers):
decoder = GRU(self.config.hidden_dim, return_state=True, return_sequences=True)(decoder) # Use the final encoder state as context
decoder_outputs, decoder_states = decoder[0], decoder[1]
decoder_dense = Dense(self.config.target_vocab_size, activation='softmax')
return decoder_dense(decoder_outputs)

def train(self, encoder_train_input, decoder_train_input, decoder_train_target):
checkpoint_filename = \
'ep{epoch:02d}_nl%d_bs%d_ds%d_sv%d_tv%d.hdf5' % (self.config.num_layers, self.config.beam_size, self.config.dataset_size,
'ep{epoch:02d}_nl%d_ds%d_sv%d_sv%d_tv%d.hdf5' % (self.config.num_encoder_layers, self.config.num_decoder_layers, self.config.dataset_size,
self.config.source_vocab_size, self.config.target_vocab_size)
callbacks = [lr_scheduler(initial_lr=self.config.lr, decay_factor=self.config.decay),
ModelCheckpoint(os.path.join(os.getcwd(), 'data', 'checkpoints', self.config.dataset, checkpoint_filename),
Expand All @@ -65,7 +98,7 @@ def train(self, encoder_train_input, decoder_train_input, decoder_train_target):

def train_generator(self, training_generator, validation_generator):
checkpoint_filename = \
'ep{epoch:02d}_nl%d_ds%d_sv%d_tv%d.hdf5' % (self.config.num_layers, self.config.dataset_size,
'ep{epoch:02d}_nl%d_ds%d_sv%d_sv%d_tv%d.hdf5' % (self.config.num_encoder_layers, self.config.num_decoder_layers, self.config.dataset_size,
self.config.source_vocab_size, self.config.target_vocab_size)
callbacks = [lr_scheduler(initial_lr=self.config.lr, decay_factor=self.config.decay),
ModelCheckpoint(os.path.join(os.getcwd(), 'data', 'checkpoints', self.config.dataset, checkpoint_filename),
Expand All @@ -77,13 +110,32 @@ def train_generator(self, training_generator, validation_generator):
def predict(self, encoder_predict_input, decoder_predict_input):
return self.model.predict([encoder_predict_input, decoder_predict_input])

def evaluate(self, encoder_predict_input, decoder_predict_input, test_target, log_outputs=True):
y_pred = self.model.predict([encoder_predict_input, decoder_predict_input])
print("BLEU Score:", bleu_score(test_target, y_pred))
# An error in the sacrebleu library prevents multi_bleu_score from working on WMT '14 EN-DE test split
# print("Multi-BLEU Score", multi_bleu_score(y_pred, self.config.target_vocab, self.config.dataset))
def beam_search(self, encoder_predict_input):
beam_size = self.config.beam_size
max_target_len = encoder_predict_input.shape[0]
k_beam = [(0, [0] * max_target_len)]

for i in range(max_target_len):
all_hypotheses = []
for prob, hyp in k_beam:
predicted = self.model.predict([encoder_predict_input, np.array(hyp)])
new_hypotheses = predicted[i, 0, :].argsort(axis=-1)[-beam_size:]
for next_hyp in new_hypotheses:
all_hypotheses.append((
sum(np.log(predicted[j, 0, hyp[j + 1]]) for j in range(i)) + np.log(predicted[i, 0, next_hyp]),
list(hyp[:(i + 1)]) + [next_hyp] + ([0] * (encoder_predict_input.shape[0] - i - 1))
))

class TinySeq2Seq:
def __init__(self, config):
pass
k_beam = sorted(all_hypotheses, key=lambda x: x[0])[-beam_size:] # Sort by probability

return k_beam[-1][1] # Pick hypothesis with highest probability

def evaluate(self, encoder_predict_input, decoder_predict_input, decoder_train_target):
if self.config.beam_size > 0:
y_pred = np.apply_along_axis(self.beam_search, 1, encoder_predict_input)
else:
y_pred = self.predict(encoder_predict_input, decoder_predict_input)
y_pred = np.argmax(y_pred, axis=-1)
print("BLEU Score:", bleu_score(y_pred, decoder_train_target))
# An error in the sacrebleu library prevents multi_bleu_score from working on WMT '14 EN-DE test split
# print("Multi-BLEU Score", multi_bleu_score(y_pred, self.config.target_vocab, self.config.dataset))
1 change: 1 addition & 0 deletions lib/model/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import dill
import numpy as np
from keras.callbacks import LearningRateScheduler
import keras.backend as K
from tqdm import tqdm


Expand Down
Loading

0 comments on commit 9dc3ba4

Please sign in to comment.