In [12]:
import sys
import time
import numpy as np
import pandas as pd
import tensorflow as tf

from tensorflow import keras
np.set_printoptions(threshold=sys.maxsize)

In [13]:
class Encoder(tf.keras.Model):
    def __init__(self, vocab_size, embed_size, hidden_units):
        super(Encoder, self).__init__()
        self.hidden_units = hidden_units
        self.embedding = keras.layers.Embedding(vocab_size, embed_size)
        self.lstm = keras.layers.LSTM(hidden_units, return_sequences=True, return_state=True)

    def call(self, input, init_state):
        embedding = self.embedding(input)
        output, state_h, state_c = self.lstm(embedding, initial_state=init_state)
        return output, state_h, state_c
    
    def init_state(self, batch_size):
        return (tf.zeros([batch_size, self.hidden_units]), tf.zeros([batch_size, self.hidden_units]))
    
    def predict(self, str):
        seq = []
        global word2idx_input
        for c in list(str):
            seq.append(word2idx_input[c])
        input = [seq]
        init_state = self.init_state(1)
        output = self.call(tf.constant(input), init_state)
        return output[1:]
    
class Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embed_size, hidden_units):
        super(Decoder, self).__init__()
        self.hidden_units = hidden_units
        self.embedding = keras.layers.Embedding(vocab_size, embed_size)
        self.lstm = keras.layers.LSTM(hidden_units, return_sequences=True, return_state=True)
        self.dense = keras.layers.Dense(vocab_size)

    def call(self, input, init_state):
        embedding = self.embedding(input)
        output, state_h, state_c = self.lstm(embedding, init_state)
        result =  self.dense(output)
        return result, state_h, state_c
    
    def predict(self, init_state):
        global word2idx_output, idx2word_output
        input = tf.constant([[word2idx_output['<']]])
        seq = []
        global max_len_input
        for _ in range(max_len_input):
            output = self.call(input, init_state)
            input = tf.argmax(output[0], axis=-1)
            seq.append(idx2word_output[input.numpy()[0][0]])
        return ''.join(seq)

def get_loss(y_true, y_pred):
    crossentropy = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    loss = crossentropy(y_true, y_pred)
    return loss

def get_accuracy(y_true, y_pred):
    labels = tf.cast(tf.argmax(y_pred, axis=-1), dtype='int32')
    correct = tf.cast(tf.equal(y_true, labels), dtype=tf.float32)
    accuracy = tf.reduce_mean(correct)
    return accuracy

In [14]:
# Load train data
train_data = pd.read_csv('train_data.csv')
X = train_data['Sentence'].to_numpy()
Y = train_data['Transformed sentence'].apply(lambda x: x+'>').to_numpy()
Z = train_data['Transformed sentence'].apply(lambda x: '<'+x).to_numpy()

# Data preprocessing - convert strings to ids and back
x_tokenizer = keras.preprocessing.text.Tokenizer(char_level=True)
y_tokenizer = keras.preprocessing.text.Tokenizer(char_level=True)
x_tokenizer.fit_on_texts(X)
y_tokenizer.fit_on_texts(Y)
y_tokenizer.fit_on_texts(Z)
vocab_size = len(y_tokenizer.word_index) + 1

word2idx_input = x_tokenizer.word_index
word2idx_output = y_tokenizer.word_index
idx2word_input = {x:y for y,x in word2idx_input.items()}
idx2word_output = {x:y for y,x in word2idx_output.items()}

num_input = len(word2idx_input) + 1
num_output = len(word2idx_output) + 1

x_train = x_tokenizer.texts_to_sequences(X)
y_train = y_tokenizer.texts_to_sequences(Y)
z_train = y_tokenizer.texts_to_sequences(Z)
max_len_input = max(len(i) for i in x_train)
max_len_output = max(len(i) for i in y_train)

x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=max_len_input, padding='post')
y_train = keras.preprocessing.sequence.pad_sequences(y_train, maxlen=max_len_output, padding='post')
z_train = keras.preprocessing.sequence.pad_sequences(z_train, maxlen=max_len_output, padding='post')

In [15]:
n_epochs = 100
batch_size = 32
embed_size = 4096
hidden_units = 128

encoder = Encoder(vocab_size, embed_size, hidden_units)
decoder = Decoder(vocab_size, embed_size, hidden_units)

dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train, z_train))
dataset = dataset.shuffle(len(train_data))
dataset = dataset.batch(batch_size, drop_remainder=True)

optimizer = keras.optimizers.Adam(clipnorm=5.0)

for i in range(n_epochs):
    total_loss, total_accuracy, num_batches = 0, 0, 0
    start = time.time()
    en_init = encoder.init_state(batch_size)
    for batch, (x_train, y_train, z_train) in enumerate(dataset.take(-1)):
        with tf.GradientTape() as tape:
            en_out = encoder(x_train, en_init)
            de_init = en_out[1:]
            de_out = decoder(z_train, de_init)
            y_pred = de_out[0]
            loss = get_loss(y_train, y_pred)
            accuracy = get_accuracy(y_train, y_pred)
        variables = encoder.trainable_variables + decoder.trainable_variables
        gradients = tape.gradient(loss, variables)
        optimizer.apply_gradients(zip(gradients, variables))
        total_loss += loss
        total_accuracy += accuracy
        num_batches += 1
    end = time.time()
    loss = total_loss / num_batches
    accuracy = total_accuracy / num_batches
    print('Epoch: {}/{} Loss: {:.6f} Accuracy: {:.6f} Time: {:.4f}'.format(i+1, n_epochs, loss, accuracy, end-start))

Epoch: 1/100 Loss: 2.927985 Accuracy: 0.151679 Time: 52.1373
Epoch: 2/100 Loss: 2.741571 Accuracy: 0.185955 Time: 51.0065
Epoch: 3/100 Loss: 2.534233 Accuracy: 0.231237 Time: 49.7697
Epoch: 4/100 Loss: 2.436774 Accuracy: 0.258235 Time: 50.2250
Epoch: 5/100 Loss: 2.376010 Accuracy: 0.271359 Time: 50.0643
Epoch: 6/100 Loss: 2.333196 Accuracy: 0.280836 Time: 50.1846
Epoch: 7/100 Loss: 2.298186 Accuracy: 0.289644 Time: 81.9057
Epoch: 8/100 Loss: 2.260527 Accuracy: 0.300395 Time: 63.5493
Epoch: 9/100 Loss: 2.219035 Accuracy: 0.314236 Time: 62.4739
Epoch: 10/100 Loss: 2.167199 Accuracy: 0.327902 Time: 63.2900
Epoch: 11/100 Loss: 2.107433 Accuracy: 0.340501 Time: 62.3372
Epoch: 12/100 Loss: 2.038729 Accuracy: 0.356189 Time: 65.2866
Epoch: 13/100 Loss: 1.965909 Accuracy: 0.370715 Time: 59.9875
Epoch: 14/100 Loss: 1.887421 Accuracy: 0.387551 Time: 51.7992
Epoch: 15/100 Loss: 1.805542 Accuracy: 0.401870 Time: 53.1487
Epoch: 16/100 Loss: 1.718843 Accuracy: 0.421955 Time: 51.8538
Epoch: 17/100 Los

In [17]:
# Function to check how many characters match in the two strings
def check(pred: str, true: str):
    correct = 0
    for a, b in zip(pred, true):
        if a == b:
            correct += 1

    # Prediction is more than 8 letters, so penalize for every extra letter.
    correct -= max(0, len(pred) - len(true))
    correct = max(0, correct)
    return correct

# Function to score the model's performance
def evaluate(encoder, decoder):

    # Train data
    print("Obtaining results for training data:")
    train_data = pd.read_csv("train_data.csv").to_numpy()
    results = {
        "pred": [],
        "true": [],
        "score": [],
    }
    correct = [0 for _ in range(9)]
    for x, y in train_data:
        pred = decoder.predict(encoder.predict(x))
        score = check(pred, y)
        results["pred"].append(pred)
        results["true"].append(y)
        results["score"].append(score)

        correct[score] += 1
    print("Train dataset results:")
    for num_chr in range(9):
        print(
            f"Number of predictions with {num_chr} correct predictions: {correct[num_chr]}"
        )
    points = sum(correct[4:6]) * 0.5 + sum(correct[6:])
    print(f"Points: {points}")
    # Save predicitons and true sentences to inspect manually if required.
    pd.DataFrame.from_dict(results).to_csv("results_train.csv", index=False)

    #----------------------------------------------------------------------------------

    print("Obtaining metrics for eval data:")
    eval_data = pd.read_csv("eval_data.csv").to_numpy()
    results = {
        "pred": [],
        "true": [],
        "score": [],
    }
    correct = [0 for _ in range(9)]
    for x, y in eval_data:
        pred = decoder.predict(encoder.predict(x))
        score = check(pred, y)
        results["pred"].append(pred)
        results["true"].append(y)
        results["score"].append(score)

        correct[score] += 1
    print("Eval dataset results:")
    for num_chr in range(9):
        print(
            f"Number of predictions with {num_chr} correct predictions: {correct[num_chr]}"
        )
    points = sum(correct[4:6]) * 0.5 + sum(correct[6:])
    marks = round(min(2, points / 1400 * 2) * 2) / 2  # Rounds to the nearest 0.5
    print(f"Points: {points}")
    print(f"Marks: {marks}")
    # Save predicitons and true sentences to inspect manually if required.
    pd.DataFrame.from_dict(results).to_csv("results_eval.csv", index=False)

In [18]:
evaluate(encoder, decoder)

Obtaining results for training data:
Train dataset results:
Number of predictions with 0 correct predictions: 137
Number of predictions with 1 correct predictions: 2946
Number of predictions with 2 correct predictions: 2978
Number of predictions with 3 correct predictions: 811
Number of predictions with 4 correct predictions: 117
Number of predictions with 5 correct predictions: 10
Number of predictions with 6 correct predictions: 1
Number of predictions with 7 correct predictions: 0
Number of predictions with 8 correct predictions: 0
Points: 64.5
Obtaining metrics for eval data:
Eval dataset results:
Number of predictions with 0 correct predictions: 515
Number of predictions with 1 correct predictions: 874
Number of predictions with 2 correct predictions: 489
Number of predictions with 3 correct predictions: 105
Number of predictions with 4 correct predictions: 16
Number of predictions with 5 correct predictions: 1
Number of predictions with 6 correct predictions: 0
Number of predicti

In [None]:
encoder.save_weights('encoder.h5')
decoder.save_weights('decoder.h5')