In [40]:
from gensim.models import Word2Vec
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense, Embedding
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.model_selection import train_test_split
import numpy as np

# Parameters
batch_size = 64
epochs = 100
latent_dim = 256
num_samples = 10000
data_path = 'fra.txt'
embedding_dim = 300  # Should match with the Word2Vec embedding dimension

In [41]:
# Train your own Word2Vec model
sentences = [line.split('\t')[0].split() for line in open(data_path, 'r', encoding='utf-8').read().split('\n') if '\t' in line]

word2vec_model = Word2Vec(sentences, vector_size=embedding_dim, window=5, min_count=1, workers=4)
word2vec_model.save('custom_word2vec.model')

# Load your custom Word2Vec model
custom_word2vec_model = Word2Vec.load('custom_word2vec.model')

In [42]:
# Prepare Data
input_texts = []
target_texts = []
input_words = set()
target_words = set()

with open(data_path, 'r', encoding='utf-8') as f:
    lines = f.read().split('\n')

for line in lines[: min(num_samples, len(lines) - 1)]:
    input_text, target_text, _ = line.split('\t')
    target_text = '\t' + target_text + '\n'
    input_texts.append(input_text)
    target_texts.append(target_text)
    input_words.update(input_text.split())
    target_words.update(target_text.split())

input_words = sorted(list(input_words))
target_words = sorted(list(target_words))
num_encoder_tokens = len(input_words)
num_decoder_tokens = len(target_words)
max_encoder_seq_length = max([len(txt.split()) for txt in input_texts])
max_decoder_seq_length = max([len(txt.split()) for txt in target_texts])

print('Number of samples:', len(input_texts))
print('Number of unique input tokens:', num_encoder_tokens)
print('Number of unique output tokens:', num_decoder_tokens)
print('Max sequence length for inputs:', max_encoder_seq_length)
print('Max sequence length for outputs:', max_decoder_seq_length)

input_token_index = dict([(word, i) for i, word in enumerate(input_words)])
target_token_index = dict([(word, i) for i, word in enumerate(target_words)])

Number of samples: 10000
Number of unique input tokens: 2939
Number of unique output tokens: 5731
Max sequence length for inputs: 5
Max sequence length for outputs: 10


In [43]:
def get_embedding_matrix(word_index, embedding_model):
    embedding_matrix = np.zeros((len(word_index) + 1, embedding_dim))
    for word, i in word_index.items():
        if word in embedding_model.wv:
            embedding_matrix[i] = embedding_model.wv[word]
        else:
            embedding_matrix[i] = np.random.normal(size=(embedding_dim,))
    return embedding_matrix

In [44]:
embedding_matrix_input = get_embedding_matrix(input_token_index, custom_word2vec_model)
embedding_matrix_target = get_embedding_matrix(target_token_index, custom_word2vec_model)

encoder_input_data = np.zeros((len(input_texts), max_encoder_seq_length), dtype='int32')
decoder_input_data = np.zeros((len(input_texts), max_decoder_seq_length), dtype='int32')
decoder_target_data = np.zeros((len(input_texts), max_decoder_seq_length, num_decoder_tokens), dtype='float32')

In [45]:
for i, (input_text, target_text) in enumerate(zip(input_texts, target_texts)):
    for t, word in enumerate(input_text.split()):
        if word in input_token_index:
            encoder_input_data[i, t] = input_token_index[word]
    for t, word in enumerate(target_text.split()):
        if word in target_token_index:
            decoder_input_data[i, t] = target_token_index[word]
            if t > 0:
                decoder_target_data[i, t - 1, target_token_index[word]] = 1.

In [46]:
# Split data into training, validation, and test sets
train_size = 0.8
validation_size = 0.1
test_size = 0.1

encoder_input_data_train, encoder_input_data_temp, decoder_input_data_train, decoder_input_data_temp, decoder_target_data_train, decoder_target_data_temp = train_test_split(
    encoder_input_data, decoder_input_data, decoder_target_data, train_size=train_size, random_state=42)

validation_proportion = validation_size / (validation_size + test_size)

encoder_input_data_val, encoder_input_data_test, decoder_input_data_val, decoder_input_data_test, decoder_target_data_val, decoder_target_data_test = train_test_split(
    encoder_input_data_temp, decoder_input_data_temp, decoder_target_data_temp, train_size=validation_proportion, random_state=42)

In [47]:
# Create embedding layers
embedding_layer_input = Embedding(input_dim=num_encoder_tokens + 1, output_dim=embedding_dim,
                                  weights=[embedding_matrix_input], input_length=max_encoder_seq_length, trainable=False)
embedding_layer_target = Embedding(input_dim=num_decoder_tokens + 1, output_dim=embedding_dim,
                                   weights=[embedding_matrix_target], input_length=max_decoder_seq_length, trainable=False)

In [48]:
# Split data into training, validation, and test sets
train_size = 0.8
validation_size = 0.1
test_size = 0.1

input_texts_train, input_texts_temp, target_texts_train, target_texts_temp = train_test_split(
    input_texts, target_texts, train_size=train_size, random_state=42)

validation_proportion = validation_size / (validation_size + test_size)

input_texts_val, input_texts_test, target_texts_val, target_texts_test = train_test_split(
    input_texts_temp, target_texts_temp, train_size=validation_proportion, random_state=42)

In [49]:
# Model building
encoder_inputs = Input(shape=(max_encoder_seq_length,))
encoder_embeddings = embedding_layer_input(encoder_inputs)
encoder = LSTM(latent_dim, return_state=True)
encoder_outputs, state_h, state_c = encoder(encoder_embeddings)
encoder_states = [state_h, state_c]

decoder_inputs = Input(shape=(max_decoder_seq_length,))
decoder_embeddings = embedding_layer_target(decoder_inputs)
decoder_lstm = LSTM(latent_dim, return_sequences=True, return_state=True)
decoder_outputs, _, _ = decoder_lstm(decoder_embeddings, initial_state=encoder_states)
decoder_dense = Dense(num_decoder_tokens, activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs)

model = Model([encoder_inputs, decoder_inputs], decoder_outputs)

optimizer = RMSprop(learning_rate=0.001)
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

# Model fitting
history = model.fit(
    [encoder_input_data_train, decoder_input_data_train], decoder_target_data_train,
    batch_size=batch_size,
    epochs=epochs,
    validation_data=(
        [encoder_input_data_val, decoder_input_data_val], decoder_target_data_val
    ),
    callbacks=[early_stopping]
)

# Evaluation on test set
test_loss, test_accuracy = model.evaluate(
    [encoder_input_data_test, decoder_input_data_test], decoder_target_data_test
)
print(f'Test Loss: {test_loss}')
print(f'Test Accuracy: {test_accuracy}')

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Test Loss: 1.2469173669815063
Test Accuracy: 0.8215000033378601
