In [None]:
# Connect to Drive Files
from google.colab import drive
drive.mount('/content/gdrive')

#root_path = '/content/gdrive/MyDrive/CorpusData'  #change dir to your project folder 

(a) Import the required liberaries

In [None]:
import string
import re
from pickle import load
from pickle import dump
from unicodedata import normalize
from numpy import array
import requests
from numpy.random import rand
from numpy.random import shuffle
from numpy import argmax

import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

%tensorflow_version 2.x
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.utils import plot_model
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Embedding
from tensorflow.keras.layers import RepeatVector
from tensorflow.keras.layers import TimeDistributed
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.models import load_model

from nltk.translate.bleu_score import corpus_bleu


(b) Read text from the source

In [None]:
# function to read raw text file
def read_text(filename):
        # open the file
        file = open(filename, mode='rt', encoding='utf-8')
        
        # read all text
        text = file.read()
        file.close()
        return text
# Let’s define another function to split the text into English-German pairs separated by ‘\n’.
# We’ll then split these pairs into English sentences and Afaan Oromo sentences respectively.
# split a text into sentences

def to_lines(text):
      sents = text.strip().split('\n')
      sents = [i.split('\t') for i in sents]
      return sents

#We can now use these functions to read the text into an array in our desired format.

data = read_text('/content/gdrive/MyDrive/eng_oro_25k.txt')
oro_eng = to_lines(data)
#deu_eng = array(deu_eng)

print(data[0:200])

---
(c) Load the dataset. 

In [None]:
# load doc into memory
def load_doc(filename):
	# open the file as read only
	file = open(filename, mode='rt', encoding='utf-8')
	# read all text
	text = file.read()
	# close the file
	file.close()
	return text
 
# split a loaded document into sentence pairs
def to_pairs(doc):
	lines = doc.strip().split('\n')
	pairs = [line.split('\t')[:2] for line in  lines]
	return pairs
 
# clean a list of lines
def clean_pairs(lines):
	cleaned = list()
	# prepare regex for char filtering
	re_print = re.compile('[^%s]' % re.escape(string.printable))
	# prepare translation table for removing punctuation
	table = str.maketrans('', '', string.punctuation)
	for pair in lines:
		clean_pair = list()
		for line in pair:
			# normalize unicode characters
			line = normalize('NFD', line).encode('ascii', 'ignore')
			line = line.decode('UTF-8')
			# tokenize on white space
			line = line.split()
			# convert to lowercase
			line = [word.lower() for word in line]
			# remove punctuation from each token
			line = [word.translate(table) for word in line]
			# remove non-printable chars form each token
			line = [re_print.sub('', w) for w in line]
			# remove tokens with numbers in them
			line = [word for word in line if word.isalpha()]
			# store as string
			clean_pair.append(' '.join(line))
		cleaned.append(clean_pair)
	return array(cleaned)
 
# save a list of clean sentences to file
def save_clean_data(sentences, filename):
	dump(sentences, open(filename, 'wb'))
	print('Saved: %s' % filename)
 
# load dataset
filename = '/content/gdrive/MyDrive/eng_oro_25k.txt'
doc = load_doc(filename)
# split into english-german pairs
pairs = to_pairs(doc)
# clean sentences
clean_pairs = clean_pairs(pairs)
# save clean pairs to file
save_clean_data(clean_pairs, 'english-oromo.txt')
# spot check
for i in range(20):
	print('[%s] => [%s]' % (clean_pairs[i,0], clean_pairs[i,1]))

In [None]:
data = read_text('/content/gdrive/MyDrive/eng_oro_25k.txt')
eng_oro = to_lines(data)
eng_oro = array(eng_oro)

eng_oro = eng_oro[:25000,:]
eng_oro

In [None]:
# empty lists
eng_l = []
oro_l = []

# populate the lists with sentence lengths
for i in eng_oro[:,0]:
      eng_l.append(len(i.split()))

for i in eng_oro[:,1]:
      oro_l.append(len(i.split()))

length_df = pd.DataFrame({'English':eng_l, 'Oromo':oro_l})

length_df.hist(bins = 30)
plt.show()


(d) Prepare the data for training.

In [None]:
# load a clean dataset
def load_clean_sentences(filename):
	return load(open(filename, 'rb'))
 
# save a list of clean sentences to file
def save_clean_data(sentences, filename):
	dump(sentences, open(filename, 'wb'))
	print('Saved: %s' % filename)
 
# load dataset
raw_dataset = load_clean_sentences('english-oromo.txt')
 
# reduce dataset size to speed up training in demonstration
n_sentences = 25000
dataset = raw_dataset[:n_sentences, :]

# random shuffle
shuffle(dataset)

# split into train/test
ntest=dataset.shape[0]//10
train, test = dataset[:-ntest], dataset[-ntest:]
print(train.shape,test.shape)

# save
save_clean_data(dataset, 'english-oromo-both.txt')
save_clean_data(train, 'english-oromo-train.txt')
save_clean_data(test, 'english-oromo-test.txt')

(e) Tokenize the sentences. 

In [None]:
# load a clean dataset
def load_clean_sentences(filename):
	return load(open(filename, 'rb'))
 
# fit a tokenizer
def create_tokenizer(lines):
	tokenizer = Tokenizer()
	tokenizer.fit_on_texts(lines)
	return tokenizer
 
# max sentence length
def max_length(lines):
	return max(len(line.split()) for line in lines)
 
# encode and pad sequences
def encode_sequences(tokenizer, length, lines):
	# integer encode sequences
	X = tokenizer.texts_to_sequences(lines)
	# pad sequences with 0 values
	X = pad_sequences(X, maxlen=length, padding='post')
	return X
 
# one hot encode target sequence
def encode_output(sequences, vocab_size):
	ylist = list()
	for sequence in sequences:
		encoded = to_categorical(sequence, num_classes=vocab_size)
		ylist.append(encoded)
	y = array(ylist)
	y = y.reshape(sequences.shape[0], sequences.shape[1], vocab_size)
	return y
 
# load datasets
dataset = load_clean_sentences('english-oromo-both.txt')
train = load_clean_sentences('english-oromo-train.txt')
test = load_clean_sentences('english-oromo-test.txt')
 
# prepare english tokenizer
eng_tokenizer = create_tokenizer(dataset[:, 0])
eng_vocab_size = len(eng_tokenizer.word_index) + 1
eng_length = max_length(dataset[:, 0])
print('English Vocabulary Size: %d' % eng_vocab_size)
print('English Max Length: %d' % (eng_length))

# prepare german tokenizer
oro_tokenizer = create_tokenizer(dataset[:, 1])
oro_vocab_size = len(oro_tokenizer.word_index) + 1
oro_length = max_length(dataset[:, 1])
print('Oromo Vocabulary Size: %d' % oro_vocab_size)
print('Oromo Max Length: %d' % (oro_length))
 
# prepare training data
trainX = encode_sequences(oro_tokenizer, oro_length, train[:, 1])
trainY = encode_sequences(eng_tokenizer, eng_length, train[:, 0])
trainY = encode_output(trainY, eng_vocab_size)

# prepare test data
testX = encode_sequences(oro_tokenizer, oro_length, test[:, 1])
testY = encode_sequences(eng_tokenizer, eng_length, test[:, 0])
testY = encode_output(testY, eng_vocab_size)

---
(f) Build the translation model. This uses an LSTM to return a sentence encoding of the source sentence, then replicates that encoding on the input to an LSTM that generates the target sentence. 

In [None]:
# define NMT model
def define_model(src_vocab, tar_vocab, src_timesteps, tar_timesteps, n_units):
	model = Sequential()
	# source word embedding
	model.add(Embedding(src_vocab, n_units, input_length=src_timesteps, mask_zero=True))
	# LSTMs to generate setence encoding
	model.add(LSTM(n_units))
	# repeat source encoding over target sequence
	model.add(RepeatVector(tar_timesteps))
	# LSTMs to generate target sentence
	model.add(LSTM(n_units, return_sequences=True))
	# Dense network to produce distribution over target vocabulary
	model.add(TimeDistributed(Dense(tar_vocab, activation='softmax')))
	return model

# define model
model = define_model(oro_vocab_size, eng_vocab_size,oro_length, eng_length, 512)
model.compile(optimizer='adam', loss='categorical_crossentropy',metrics=['accuracy'])

# summarize defined model
print(model.summary())
plot_model(model, to_file='model.png', show_shapes=True)

(g) Fit model to phrases. Training takes some minutes. 

In [None]:
# Fit model
import absl.logging
absl.logging.set_verbosity(absl.logging.ERROR)

# set up a checkpoint to save the model each epoch
filename = 'model.h4'
checkpoint = ModelCheckpoint(filename, monitor='val_loss', verbose=1, save_best_only=True, mode='min')
# train model
history = model.fit(trainX, trainY, epochs=40, batch_size=512, validation_data=(testX, testY), callbacks=[checkpoint], verbose=2)

(h) Plot fitting graphs for the traine and test results 

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Train-Test loss graph')
plt.legend(['[oro-en] Train','[oro-en] Validation'])
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.show()

In [None]:
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Train-Test accuracy graph')
plt.legend(['[oro-en] Train','[oro-en] Validation'])
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.show()

---
(i) Evaluate model on both training and test sentences. It takes some minutes to run and calculate the BLEU scores. 

In [None]:
# load a clean dataset
def load_clean_sentences(filename):
	return load(open(filename, 'rb'))

# fit a tokenizer
def create_tokenizer(lines):
	tokenizer = Tokenizer()
	tokenizer.fit_on_texts(lines)
	return tokenizer

# max sentence length
def max_length(lines):
	return max(len(line.split()) for line in lines)

# encode and pad sequences
def encode_sequences(tokenizer, length, lines):
	# integer encode sequences
	X = tokenizer.texts_to_sequences(lines)
	# pad sequences with 0 values
	X = pad_sequences(X, maxlen=length, padding='post')
	return X

# map an integer to a word
def word_for_id(integer, tokenizer):
	for word, index in tokenizer.word_index.items():
		if index == integer:
			return word
	return None

# generate target given source sequence
def predict_sequence(model, tokenizer, source):
	prediction = model.predict(source, verbose=0)[0]
	integers = [argmax(vector) for vector in prediction]
	target = list()
	for i in integers:
		word = word_for_id(i, tokenizer)
		if word is None:
			break
		target.append(word)
	return ' '.join(target)

# evaluate the performance of the model
def evaluate_model(model, tokenizer, sources, raw_dataset):
  actual, predicted = list(), list()
  for i, source in enumerate(sources):
    # translate encoded source text
    source = source.reshape((1, source.shape[0]))
    translation = predict_sequence(model, eng_tokenizer, source)
    raw_target, raw_src = raw_dataset[i]
    if i < 10:
      print('src=[%s], target=[%s], predicted=[%s]' % (raw_src, raw_target, translation))
    actual.append([raw_target.split()])
    predicted.append(translation.split())

  # calculate BLEU score
  print('BLEU-1: %f' % corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0)))
  print('BLEU-2: %f' % corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0)))
  print('BLEU-3: %f' % corpus_bleu(actual, predicted, weights=(0.3, 0.3, 0.3, 0)))
  print('BLEU-4: %f' % corpus_bleu(actual, predicted, weights=(0.25, 0.25, 0.25, 0.25)))

# load datasets
dataset = load_clean_sentences('english-oromo-both.txt')
train = load_clean_sentences('english-oromo-train.txt')
test = load_clean_sentences('english-oromo-test.txt')

# prepare english tokenizer
eng_tokenizer = create_tokenizer(dataset[:, 0])
eng_vocab_size = len(eng_tokenizer.word_index) + 1
eng_length = max_length(dataset[:, 0])

# prepare german tokenizer
ger_tokenizer = create_tokenizer(dataset[:, 1])
ger_vocab_size = len(ger_tokenizer.word_index) + 1
ger_length = max_length(dataset[:, 1])

# prepare data
trainX = encode_sequences(ger_tokenizer, ger_length, train[:, 1])
testX = encode_sequences(ger_tokenizer, ger_length, test[:, 1])

# load model
model = load_model('model.h4')

# test on some training sequences
print('********** English-Afaan Oromo train result **********')
evaluate_model(model, eng_tokenizer, trainX, train)

# test on some test sequences
print('********** English-Afaan Oromo test result **********')
evaluate_model(model, eng_tokenizer, testX, test)

---
(j) Experiment with different amounts of training data and different network configurations. 