In [None]:
In this project, you’re going to take a peek into the realm of neural network machine
translation. You’ll be training a sequence to sequence model on a dataset of English and French
sentences that can translate new sentences from English to French.

In [None]:
def load_data(path):
    """load data from file
    """
    input_file = os.path.join(path)
    with open(input_file,'r',encoding = 'utf-8') as f:
             data = f.read()

In [None]:
# split a loaded document into sentences
def to_pairs(doc):
lines = doc.strip().split('\n')
pairs = [line.split('\t') for line in  lines]
return pairs

In [None]:
# clean a list of lines
def clean_pairs(lines):
cleaned = list()
# prepare regex for char filtering
re_print = re.compile('[^%s]' % re.escape(string.printable))
# prepare translation table for removing punctuation
table = str.maketrans('', '', string.punctuation)
for pair in lines:
clean_pair = list()
for line in pair:
# normalize unicode characters
line = normalize('NFD', line).encode('ascii', 'ignore')
line = line.decode('UTF-8')
# tokenize on white space
line = line.split()
# convert to lowercase
line = [word.lower() for word in line]
# remove punctuation from each token
line = [word.translate(table) for word in line]
# remove non-printable chars form each token
line = [re_print.sub('', w) for w in line]
# remove tokens with numbers in them
line = [word for word in line if word.isalpha()]
# store as string
clean_pair.append(' '.join(line))
cleaned.append(clean_pair)
return array(cleaned)

In [None]:
import string
import re
from pickle import dump
from unicodedata import normalize
from numpy import array
 
# load doc into memory
def load_doc(filename):
# open the file as read only
file = open(filename, mode='rt', encoding='utf-8')
# read all text
text = file.read()
# close the file
file.close()
return text
 
# split a loaded document into sentences
def to_pairs(doc):
lines = doc.strip().split('\n')
pairs = [line.split('\t') for line in  lines]
return pairs
 
# clean a list of lines
def clean_pairs(lines):
cleaned = list()
# prepare regex for char filtering
re_print = re.compile('[^%s]' % re.escape(string.printable))
# prepare translation table for removing punctuation
table = str.maketrans('', '', string.punctuation)
for pair in lines:
clean_pair = list()
for line in pair:
# normalize unicode characters
line = normalize('NFD', line).encode('ascii', 'ignore')
line = line.decode('UTF-8')
# tokenize on white space
line = line.split()
# convert to lowercase
line = [word.lower() for word in line]
# remove punctuation from each token
line = [word.translate(table) for word in line]
# remove non-printable chars form each token
line = [re_print.sub('', w) for w in line]
# remove tokens with numbers in them
line = [word for word in line if word.isalpha()]
# store as string
clean_pair.append(' '.join(line))
cleaned.append(clean_pair)
return array(cleaned)
 
# save a list of clean sentences to file
def save_clean_data(sentences, filename):
dump(sentences, open(filename, 'wb'))
print('Saved: %s' % filename)
 
# load dataset
filename = 'deu.txt'
doc = load_doc(filename)
# split into english-french pairs
pairs = to_pairs(doc)
# clean sentences
clean_pairs = clean_pairs(pairs)
# save clean pairs to file
save_clean_data(clean_pairs, 'english-french.pkl')
# spot check
for i in range(100):
print('[%s] => [%s]' % (clean_pairs[i,0], clean_pairs[i,1]))

In [None]:
from pickle import load
from pickle import dump
from numpy.random import rand
from numpy.random import shuffle
 
# load a clean dataset
def load_clean_sentences(filename):
return load(open(filename, 'rb'))
 
# save a list of clean sentences to file
def save_clean_data(sentences, filename):
dump(sentences, open(filename, 'wb'))
print('Saved: %s' % filename)
 
# load dataset
raw_dataset = load_clean_sentences('english-french.pkl')
 
# reduce dataset size
n_sentences = 10000
dataset = raw_dataset[:n_sentences, :]
# random shuffle
shuffle(dataset)
# split into train/test
train, test = dataset[:9000], dataset[9000:]
# save
save_clean_data(dataset, 'english-french-both.pkl')
save_clean_data(train, 'english-french-train.pkl')
save_clean_data(test, 'english-french-test.pkl')

In [None]:
# load a clean dataset
def load_clean_sentences(filename):
return load(open(filename, 'rb'))
 
# load datasets
dataset = load_clean_sentences('english-french-both.pkl')
train = load_clean_sentences('english-french-train.pkl')
test = load_clean_sentences('english-french-test.pkl')

In [None]:
# fit a tokenizer
def create_tokenizer(lines):
tokenizer = Tokenizer()
tokenizer.fit_on_texts(lines)
return tokenizer

In [None]:
# max sentence length
def max_length(lines):
return max(len(line.split()) for line in lines)

In [None]:
# prepare english tokenizer
eng_tokenizer = create_tokenizer(dataset[:, 0])
eng_vocab_size = len(eng_tokenizer.word_index) + 1
eng_length = max_length(dataset[:, 0])
print('English Vocabulary Size: %d' % eng_vocab_size)
print('English Max Length: %d' % (eng_length))
# prepare french tokenizer
fre_tokenizer = create_tokenizer(dataset[:, 1])
fre_vocab_size = len(ger_tokenizer.word_index) + 1
fre_length = max_length(dataset[:, 1])
print('French Vocabulary Size: %d' % fer_vocab_size)
print('French Max Length: %d' % (fer_length))

In [None]:
# encode and pad sequences
def encode_sequences(tokenizer, length, lines):
# integer encode sequences
X = tokenizer.texts_to_sequences(lines)
# pad sequences with 0 values
X = pad_sequences(X, maxlen=length, padding='post')
return X

In [None]:
# one hot encode target sequence
def encode_output(sequences, vocab_size):
ylist = list()
for sequence in sequences:
encoded = to_categorical(sequence, num_classes=vocab_size)
ylist.append(encoded)
y = array(ylist)
y = y.reshape(sequences.shape[0], sequences.shape[1], vocab_size)
return y

In [None]:
# prepare training data
trainX = encode_sequences(fre_tokenizer, fre_length, train[:, 1])
trainY = encode_sequences(eng_tokenizer, eng_length, train[:, 0])
trainY = encode_output(trainY, eng_vocab_size)
# prepare validation data
testX = encode_sequences(fre_tokenizer, fre_length, test[:, 1])
testY = encode_sequences(eng_tokenizer, eng_length, test[:, 0])
testY = encode_output(testY, eng_vocab_size)

In [None]:
# define NMT (Neural Machine Translation) model
def define_model(src_vocab, tar_vocab, src_timesteps, tar_timesteps, n_units):
model = Sequential()
model.add(Embedding(src_vocab, n_units, input_length=src_timesteps, mask_zero=True))
model.add(LSTM(n_units))
model.add(RepeatVector(tar_timesteps))
model.add(LSTM(n_units, return_sequences=True))
model.add(TimeDistributed(Dense(tar_vocab, activation='softmax')))
return model
 
# define model
model = define_model(ger_vocab_size, eng_vocab_size, ger_length, eng_length, 256)
model.compile(optimizer='adam', loss='categorical_crossentropy')
# summarize defined model
print(model.summary())
plot_model(model, to_file='model.png', show_shapes=True)

In [None]:
#fit model
filename = 'model.h5'
checkpoint = ModelCheckpoint(filename, monitor='val_loss', verbose=1, save_best_only=True, mode='min')
model.fit(trainX, trainY, epochs=30, batch_size=64, validation_data=(testX, testY), callbacks=[checkpoint], verbose=2)

In [None]:
END OF ASSIGNMENT