In [1]:
# dataset we will be using is https://raw.githubusercontent.com/jbrownlee/Datasets/master/deu.txt
# first step us to prepeare the data. We're going to clean and split the text data

In [2]:
#imports for text data handling

import re
import string
from unicodedata import normalize
from pickle import dump
from numpy import array




In [3]:
#load document into memory:
def load_doc(filename):
    #open the file as read only
    file = open(filename , mode = "rt" , encoding = "utf-8")
    #read all the text
    text = file.read()
    #close the file
    file.close()
    return text

#Each line contains a single pair of phrases, first English and then German, separated by a tab character.

#Split a loaded document into sentences

def to_pairs(doc):
    
    lines = doc.strip().split("\n")
    pairs = [line.split("\t")for line in lines]
    pairs = [line.split('\t') for line in lines]
    return pairs

# after stripping and splitting we have to clean the text of any non printable characters, normalize all
# unicode characters to ascii , normailize all letters to lower case, remove any non printable characters etc.

#clean a list of lines

def clean_pairs(lines):
    
    cleaned = list()
    
    #prepare regex for char filtering
    
    re_print = re.compile('[^%s]'% re.escape(string.printable))
    
    #prepare translation for char filtering
    
    table = str.maketrans('','',string.punctuation)
    
    for pair in lines:
        
        clean_pair=list()
        
        for line in pair:
            
            line = normalize('NFD',line).encode('ascii','ignore')
            
            line = line.decode('UTF-8')
            
            #tokenize on white space
            
            line = line.split()
            
            #convert to lower case
            
            line = [word.lower() for word in line]
            
            # remove punctuation from each token
            
            line = [word.translate(table) for word in line]

            # remove non printable characters
            
            line = [re_print.sub(' ' , w)for w in line]
            
            line = [word for word in line if word.isalpha()]
            
            
            # store as string
            
            clean_pair.append(' '.join(line))
        cleaned.append(clean_pair)
    
    return array(cleaned)
        
# save sentences to new file

def save_clean_data(sentences,filename):
    
    dump(sentences,open(filename , 'wb'))
    print('Saved %s' % filename)



In [8]:
#putting the functions in action

#load dataset

filename = "deu-eng/deu.txt"

doc = load_doc(filename)

#split english-german pairs (we can change this based on the dataset provided)

pairs = to_pairs(doc)

#clean sentences

clean_pairs = clean_pairs(pairs)

#save clean pairs to file

save_clean_data(clean_pairs, 'english-german.pkl')

#spot check

for i in range(100):
 print('[%s] => [%s]' % (clean_pairs[i,0], clean_pairs[i,1]))


Saved: english-german.pkl
[go] => [geh]
[hi] => [hallo]
[hi] => [gru gott]
[run] => [lauf]
[run] => [lauf]
[wow] => [potzdonner]
[wow] => [donnerwetter]
[duck] => [kopf runter]
[fire] => [feuer]
[help] => [hilfe]
[help] => [zu hulf]
[stay] => [bleib]
[stop] => [stopp]
[stop] => [anhalten]
[wait] => [warte]
[wait] => [warte]
[begin] => [fang an]
[do it] => [mache es]
[do it] => [tue es]
[go on] => [mach weiter]
[hello] => [hallo]
[hello] => [sers]
[hello] => [hallo]
[hurry] => [beeil dich]
[hurry] => [schnell]
[i hid] => [ich versteckte mich]
[i hid] => [ich habe mich versteckt]
[i ran] => [ich rannte]
[i see] => [ich verstehe]
[i see] => [aha]
[i try] => [ich versuche es]
[i try] => [ich probiere es]
[i won] => [ich hab gewonnen]
[i won] => [ich habe gewonnen]
[i won] => [ich habe gewonnen]
[oh no] => [oh nein]
[relax] => [entspann dich]
[shoot] => [feuer]
[shoot] => [schie]
[smile] => [lacheln]
[sorry] => [entschuldigung]
[ask me] => [frag mich]
[ask me] => [fragt mich]
[ask me] => [f

In [9]:
# split the data into test train

In [10]:
from pickle import load
from pickle import dump
from numpy.random import rand
from numpy.random import shuffle

In [11]:
# load a clean dataset
def load_clean_sentences(filename):
     return load(open(filename, 'rb'))
 
# save a list of clean sentences to file
def save_clean_data(sentences, filename):
     dump(sentences, open(filename, 'wb'))
     print('Saved: %s' % filename)
 
# load dataset
raw_dataset = load_clean_sentences('english-german.pkl')
 
# reduce dataset size

n_sentences = 10000

dataset = raw_dataset[:n_sentences, :]

# random shuffle

shuffle(dataset)

# split into train/test

train, test = dataset[:9000], dataset[9000:]

# save

save_clean_data(dataset, 'english-german-both.pkl')

save_clean_data(train, 'english-german-train.pkl')

save_clean_data(test, 'english-german-test.pkl')

Saved: english-german-both.pkl
Saved: english-german-train.pkl
Saved: english-german-test.pkl


In [12]:
# define Neural Translation Model and train it

In [13]:
from pickle import load
from numpy import array
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
#from keras.utils.vis_utils import plot_model
from keras.models import Sequential
from keras.layers import LSTM
from keras.layers import Dense
from keras.layers import Embedding
from keras.layers import RepeatVector
from keras.layers import TimeDistributed
from keras.callbacks import ModelCheckpoint

In [14]:
# before we define our model we have to get some things out of the way to actually train anything on the model

#load a clean dataset

def load_clean_sentences(filename):
    return load(open(filename,'rb'))

#load datasets

dataset = load_clean_sentences("english-german-both.pkl")
train = load_clean_sentences("english-german-train.pkl")
test = load_clean_sentences("english-german-test.pkl")

# fit a tokenizer

def create_tokenizer(lines):
    
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(lines)
    return tokenizer

# max sentence length

def max_length(lines):
    
    return max(len(line.split()) for line in lines)

# prepare english tokenizer

eng_tokenizer = create_tokenizer(dataset[: , 0])
eng_vocab_size = len(eng_tokenizer.word_index)+1
eng_length = max_length(dataset[:,0])

print('English Vocabluary Size: %d' % eng_vocab_size)

print('English Max Length : %d'% eng_length)

# prepare german tokenizer

german_tokenizer = create_tokenizer(dataset[: , 1])
german_vocab_size = len(german_tokenizer.word_index)+1
german_length = max_length(dataset[:,1])

print('German Vocabluary Size: %d' % german_vocab_size)

print('German Max Length : %d'% german_length)

# encode and pad sequences

def encode_sequences(tokenizer , length, lines):
    
    #integer encode sequences
    
    
    X = tokenizer.texts_to_sequences(lines)
    
    # pad sequences with 0 values
    
    X = pad_sequences(X , maxlen=length , padding = 'post')
    
    return X


# one hot encode the target sequence

def encode_output(sequences , vocab_size):
    
    ylist = list()
    
    for sequence in sequences:
        
        encoded = to_categorical(sequence , num_classes = vocab_size)
        ylist.append(encoded)
    
    y = array(ylist)
    y = y.reshape(sequences.shape[0],sequences.shape[1],vocab_size)
    
    return y 


# prepare training data

trainX = encode_sequences(german_tokenizer , german_length , train[: , 1])
trainY = encode_sequences (eng_tokenizer , eng_length , train[: , 0])
trainY = encode_output(trainY , eng_vocab_size)


#prepare validation data

testX = encode_sequences(german_tokenizer , german_length , test[: , 1])
testY = encode_sequences (eng_tokenizer , eng_length , test[: , 0])
testY = encode_output(testY , eng_vocab_size)

# IT IS NOW TIME TO DEFINE OUR NMT (neural translation model)


def define_model(src_vocab , tar_vocab , src_timesteps , tar_timesteps , n_units):
    
    model = Sequential() # define that the model will have certain steps that will be sequential .... the steps will be added from the keras lib
    model.add(Embedding(src_vocab , n_units , input_length = src_timesteps , mask_zero = True))
    model.add(LSTM(n_units))
    model.add(RepeatVector(tar_timesteps))
    model.add(LSTM(n_units, return_sequences=True))
    model.add(TimeDistributed(Dense(tar_vocab, activation='softmax')))
    return model

#define model

model = define_model(german_vocab_size , eng_vocab_size , german_length , eng_length , 256)
model.compile(optimizer="adam",loss = "categorical_crossentropy")

#summarize defined model

print(model.summary())

# this should work normally ... but for some reason my keras library doesn't seem to have it (you probably dont have to comment it out , also uncomment the import section as well)
#plot_model(model , to_file = 'model.png' , show_shapes = True)



English Vocabluary Size: 2172
English Max Length : 5
German Vocabluary Size: 3554
German Max Length : 8
Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding (Embedding)       (None, 8, 256)            909824    
                                                                 
 lstm (LSTM)                 (None, 256)               525312    
                                                                 
 repeat_vector (RepeatVecto  (None, 5, 256)            0         
 r)                                                              
                                                                 
 lstm_1 (LSTM)               (None, 5, 256)            525312    
                                                                 
 time_distributed (TimeDist  (None, 5, 2172)           558204    
 ributed)                                                        
                  

In [15]:
# time to train the model

# location to store the trained params
filename = 'model.h5'

# use checkpoint to ensure that eachtime the model skill on the test set improves

checkpoint = ModelCheckpoint(filename , monitor = 'val_loss' , verbose = 1 , save_best_only = True , mode = "min")

# .fit to train

model.fit(trainX, trainY, epochs=30, batch_size=64, validation_data=(testX, testY), callbacks=[checkpoint], verbose=2)

Epoch 1/30

Epoch 1: val_loss improved from inf to 3.38093, saving model to model.h5
141/141 - 9s - loss: 3.8911 - val_loss: 3.3809 - 9s/epoch - 61ms/step
Epoch 2/30


  saving_api.save_model(



Epoch 2: val_loss improved from 3.38093 to 3.26529, saving model to model.h5
141/141 - 4s - loss: 3.2418 - val_loss: 3.2653 - 4s/epoch - 30ms/step
Epoch 3/30

Epoch 3: val_loss improved from 3.26529 to 3.14512, saving model to model.h5
141/141 - 4s - loss: 3.0907 - val_loss: 3.1451 - 4s/epoch - 29ms/step
Epoch 4/30

Epoch 4: val_loss improved from 3.14512 to 3.06920, saving model to model.h5
141/141 - 4s - loss: 2.9566 - val_loss: 3.0692 - 4s/epoch - 29ms/step
Epoch 5/30

Epoch 5: val_loss improved from 3.06920 to 2.95058, saving model to model.h5
141/141 - 4s - loss: 2.8186 - val_loss: 2.9506 - 4s/epoch - 29ms/step
Epoch 6/30

Epoch 6: val_loss improved from 2.95058 to 2.85449, saving model to model.h5
141/141 - 4s - loss: 2.6844 - val_loss: 2.8545 - 4s/epoch - 29ms/step
Epoch 7/30

Epoch 7: val_loss improved from 2.85449 to 2.77914, saving model to model.h5
141/141 - 4s - loss: 2.5566 - val_loss: 2.7791 - 4s/epoch - 29ms/step
Epoch 8/30

Epoch 8: val_loss improved from 2.77914 to 2.

<keras.src.callbacks.History at 0x36b9a7fa0>

In [16]:
# lets use the model to predict translations now

In [17]:
from pickle import load
from numpy import array
from numpy import argmax
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.models import load_model

In [19]:
from pickle import load
from numpy import array
from numpy import argmax
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.models import load_model
from nltk.translate.bleu_score import corpus_bleu

# load a clean dataset
def load_clean_sentences(filename):
	return load(open(filename, 'rb'))

# fit a tokenizer
def create_tokenizer(lines):
	tokenizer = Tokenizer()
	tokenizer.fit_on_texts(lines)
	return tokenizer

# max sentence length
def max_length(lines):
	return max(len(line.split()) for line in lines)

# encode and pad sequences
def encode_sequences(tokenizer, length, lines):
	# integer encode sequences
	X = tokenizer.texts_to_sequences(lines)
	# pad sequences with 0 values
	X = pad_sequences(X, maxlen=length, padding='post')
	return X

# map an integer to a word
def word_for_id(integer, tokenizer):
	for word, index in tokenizer.word_index.items():
		if index == integer:
			return word
	return None

# generate target given source sequence
def predict_sequence(model, tokenizer, source):
	prediction = model.predict(source, verbose=0)[0]
	integers = [argmax(vector) for vector in prediction]
	target = list()
	for i in integers:
		word = word_for_id(i, tokenizer)
		if word is None:
			break
		target.append(word)
	return ' '.join(target)

# evaluate the skill of the model
def evaluate_model(model, tokenizer, sources, raw_dataset):
    actual, predicted = list(), list()
    for i, source in enumerate(sources):
        # translate encoded source text
        source = source.reshape((1, source.shape[0]))
        translation = predict_sequence(model, eng_tokenizer, source)
        raw_target= raw_dataset[i,0]
        raw_src= raw_dataset[i,1]
        if i < 10:
            print('src=[%s], target=[%s], predicted=[%s]' % (raw_src, raw_target, translation))
        actual.append([raw_target.split()])
        predicted.append(translation.split())
    # calculate BLEU score
    print('BLEU-1: %f' % corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0)))
    print('BLEU-2: %f' % corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0)))
    print('BLEU-3: %f' % corpus_bleu(actual, predicted, weights=(0.3, 0.3, 0.3, 0)))
    print('BLEU-4: %f' % corpus_bleu(actual, predicted, weights=(0.25, 0.25, 0.25, 0.25)))

# load datasets
dataset = load_clean_sentences('english-german-both.pkl')
train = load_clean_sentences('english-german-train.pkl')
test = load_clean_sentences('english-german-test.pkl')
# prepare english tokenizer
eng_tokenizer = create_tokenizer(dataset[:, 0])
eng_vocab_size = len(eng_tokenizer.word_index) + 1
eng_length = max_length(dataset[:, 0])
# prepare german tokenizer
ger_tokenizer = create_tokenizer(dataset[:, 1])
ger_vocab_size = len(ger_tokenizer.word_index) + 1
ger_length = max_length(dataset[:, 1])
# prepare data
trainX = encode_sequences(ger_tokenizer, ger_length, train[:, 1])
testX = encode_sequences(ger_tokenizer, ger_length, test[:, 1])

# load model
model = load_model('model.h5')
# test on some training sequences
print('train')
evaluate_model(model, eng_tokenizer, trainX, train)
# test on some test sequences
print('test')
evaluate_model(model, eng_tokenizer, testX, test)

train
src=[er riecht ubel], target=[he smells bad], predicted=[he smells hard]
src=[geht ohne mich], target=[go without me], predicted=[go without me]
src=[tom ist ausgerutscht], target=[tom slipped], predicted=[tom slipped]
src=[sie kamen vorbei], target=[they called], predicted=[they called]
src=[ich werde viel beten], target=[ill pray hard], predicted=[ill be]
src=[kannst du skaten], target=[can you skate], predicted=[can you skate]
src=[du kannst es schaffen], target=[you can do it], predicted=[you can do it]
src=[wie die zeit vergeht], target=[how time flies], predicted=[how time you]
src=[sie verschwanden], target=[they vanished], predicted=[they vanished]
src=[hort auf zu schieen], target=[stop shooting], predicted=[stop shooting]


KeyboardInterrupt: 