In [37]:
import string
import re
from pickle import dump
import numpy as np
from unicodedata import normalize

In [2]:
def load_doc(file_path: str) -> list[str]:
    try:
        with open(file_path, encoding="utf-8") as f:
            return f.readlines()
    except FileNotFoundError:
        raise FileNotFoundError("File not found. Please check the file path and try again.")
        return []

def to_pairs(doc: list) -> list[list[str]]:
    paired = [line.strip().split("\t") for line in doc]
    return paired

def clean_pairs(lines: list) -> list[list[str]]:
    cleaned = list()
    re_print = re.compile("[^%s]" % re.escape(string.printable))
    table = str.maketrans("", "", string.punctuation)
    for pair in lines:
        clean_pair = list()
        for line in pair:
            line = normalize("NFD", line).encode("ascii", "ignore")
            line = line.decode("UTF-8")
            line = line.split()
            line = [word.lower() for word in line]
            line = [word.translate(table) for word in line]
            line = [re_print.sub("", w) for w in line]
            line = [word for word in line if word.isalpha()]
            clean_pair.append(" ".join(line))
        cleaned.append(clean_pair)
    return np.array(cleaned)

def save_clean_data(sentences: list, filename: str) -> None:
    dump(sentences, open(filename, "wb"))
    print(f"Saved: {filename}")

In [3]:
filename = "dataset/en-de.txt"
doc = load_doc(filename)
pairs = to_pairs(doc)
clean_pairs = clean_pairs(pairs)
save_clean_data(clean_pairs, "dataset/en-de.pkl")

# show the 

Saved: dataset/en-de.pkl


In [4]:
from pickle import load 
from numpy.random import rand
from numpy.random import shuffle

In [5]:
def load_clean_data(filename: str) -> list:
    return load(open(filename, "rb"))

In [6]:
raw_dataset = load_clean_data("dataset/en-de.pkl")

In [7]:
len(raw_dataset)

152820

In [8]:
# reduce dataset size

n_sentences = 10000
dataset = raw_dataset[:n_sentences, :]

# random shuffle
shuffle(dataset)

# split to train/set
train, test = dataset[:9000], dataset[9000:]

save_clean_data(dataset, "dataset/en-de-both.pkl")
save_clean_data(train, "dataset/en-de-train.pkl")
save_clean_data(test, "dataset/en-de-test.pkl")

Saved: dataset/en-de-both.pkl
Saved: dataset/en-de-train.pkl
Saved: dataset/en-de-test.pkl


In [9]:
dataset, train, test = load_clean_data("dataset/en-de-both.pkl"), load_clean_data("dataset/en-de-train.pkl"), load_clean_data("dataset/en-de-test.pkl")

len(dataset), len(train), len(test)

(10000, 9000, 1000)

In [10]:
from tensorflow.keras.preprocessing.text import Tokenizer

def create_tokenizer(lines):
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(lines)
    return tokenizer

def get_maxlen(lines):
    return max(len(line.split()) for line in lines)

In [11]:
eng_tokenizer = create_tokenizer(dataset[:, 0])
eng_vocab_size = len(eng_tokenizer.word_index) + 1
eng_length = get_maxlen(dataset[:, 0])

print(f"English Vocabulary Size: {eng_vocab_size}")
print(f"English Max Length: {eng_length}")

English Vocabulary Size: 2404
English Max Length: 5


In [12]:
# German Tokenizer

ger_tokenizer = create_tokenizer(dataset[:, 1])
ger_vocab_size = len(ger_tokenizer.word_index) + 1
ger_length = get_maxlen(dataset[:, 1])

print(f"German Vocabulary Size: {ger_vocab_size}")
print(f"German Max Length: {ger_length}")

German Vocabulary Size: 3856
German Max Length: 10


In [13]:
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical

def encode_sequences(tokenizer, length, lines):
    X = tokenizer.texts_to_sequences(lines)
    X = pad_sequences(X, maxlen=length, padding="post")
    return X

def encode_output(sequences, vocab_size):
    ylist = list()
    for sequence in sequences:
        encoded = to_categorical(sequence, num_classes=vocab_size)
        ylist.append(encoded)
    y = array(ylist)
    y = y.reshape(sequences.shape[0], sequences.shape[1], vocab_size)
    return y

In [19]:
# prepare training data
x_train = encode_sequences(eng_tokenizer, eng_length, train[:, 0])
y_train = encode_sequences(ger_tokenizer, ger_length, train[:, 1])
y_train = encode_output(y_train, ger_vocab_size)
# prepare validation data
x_test= encode_sequences(eng_tokenizer, eng_length, test[:, 0])
y_test = encode_sequences(ger_tokenizer, ger_length, test[:, 1])
y_test = encode_output(y_test, ger_vocab_size)

In [24]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, LSTM, RepeatVector, Dense, TimeDistributed

def define_model(src_vocab, tar_vocab, src_timesteps, tar_timesteps, n_units):
    model = Sequential()
    model.add(Embedding(src_vocab, n_units, input_length=src_timesteps, mask_zero=True))
    model.add(LSTM(n_units))
    model.add(RepeatVector(tar_timesteps))
    model.add(LSTM(n_units, return_sequences=True))
    model.add(TimeDistributed(Dense(tar_vocab, activation="softmax")))
    return model

model = define_model(eng_vocab_size, ger_vocab_size, eng_length, ger_length, 256)
model.compile(optimizer="adam", loss="categorical_crossentropy")
print(model.summary())

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding_1 (Embedding)     (None, 5, 256)            615424    
                                                                 
 lstm_2 (LSTM)               (None, 256)               525312    
                                                                 
 repeat_vector_1 (RepeatVec  (None, 10, 256)           0         
 tor)                                                            
                                                                 
 lstm_3 (LSTM)               (None, 10, 256)           525312    
                                                                 
 time_distributed_1 (TimeDi  (None, 10, 3856)          990992    
 stributed)                                                      
                                                                 
Total params: 2657040 (10.14 MB)
Trainable params: 265

In [27]:
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir="./logs")

# Train the model with the callback to log data for TensorBoard
model.fit(x_train, y_train, epochs=10, validation_data=(x_val, y_val), callbacks=[tensorboard_callback])

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x132f4324090>

In [28]:
model.save("model.h5")
dump(eng_tokenizer, open("eng_tokenizer.pkl", "wb"))
dump(ger_tokenizer, open("ger_tokenizer.pkl", "wb"))

  saving_api.save_model(


In [31]:
# machine translation, generate text 
from tensorflow.keras.models import load_model
loaded_model = load_model("model.h5")
english_tokenizer = load(open("eng_tokenizer.pkl", "rb"))
german_tokenizer = load(open("ger_tokenizer.pkl", "rb"))

def word_for_id(integer, tokenizer):
    for word, index in tokenizer.word_index.items():
        if index == integer:
            return word
    return None

def predict_sequence(model, tokenizer, source):
    prediction = model.predict(source, verbose=0)[0]
    integers = [np.argmax(vector) for vector in prediction]
    target = list()
    for i in integers:
        word = word_for_id(i, tokenizer)
        if word is None:
            break
        target.append(word)
    return " ".join(target)


In [35]:

def translate(model, tokenizer, source, source_tokenizer):
    source = encode_sequences(source_tokenizer, eng_length, source)
    translation = predict_sequence(model, tokenizer, source)
    return translation

In [41]:
translate(loaded_model, german_tokenizer, "Hi How are you?", english_tokenizer)

''