In [1]:
# Load necessary modules
from tensorflow.keras.layers import LSTM, Dense, Embedding, Bidirectional, Input, Concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.callbacks import ModelCheckpoint, TensorBoard, EarlyStopping
from sklearn.model_selection import train_test_split
import numpy as np
import os

# Global configuration variables
DATA_PATH = './Data/data.txt'  # Path to the data file
CHECKPOINT_PATH = './Model/model_checkpoint.h5'  # Path to save model checkpoints
MAX_SEQ_LENGTH = 100  # Maximum sequence length for tokenization and padding
EMBEDDING_DIM = 256  # Dimension of the word embeddings
NUM_EPOCHS = 1000  # Number of training epochs
BATCH_SIZE = 2  # Size of the mini-batches for training
TEST_SPLIT_SIZE = 0.2  # Fraction of the data to use as validation data
RANDOM_STATE = 42  # Seed for random number generator for reproducibility
EARLY_STOPPING_PATIENCE = 5  # Number of epochs with no improvement after which training will be stopped
LSTM_UNITS = 256  # Number of units in LSTM layer
LOG_DIR = './Logs'  # Directory to save TensorBoard logs

def load_and_preprocess_data(data_file):
    if not os.path.exists(data_file):
        raise FileNotFoundError(f"The file {data_file} does not exist.")
    
    data = []
    with open(data_file, 'r') as file:
        for line in file:
            try:
                description, subject = line.strip().split("|")
                data.append((description, subject))
            except ValueError:
                print(f"Skipped line due to wrong format: {line}")

    np.random.shuffle(data)

    return [sample[0] for sample in data], [sample[1] for sample in data]

def tokenize_and_pad(texts, max_seq_length):
    tokenizer = Tokenizer(num_words=None, oov_token="<OOV>")
    tokenizer.fit_on_texts(texts)
    sequences = tokenizer.texts_to_sequences(texts)
    padded_sequences = pad_sequences(sequences, maxlen=max_seq_length, padding='post')

    return tokenizer, padded_sequences

def build_model(max_seq_length, vocab_size_input, vocab_size_output, embedding_dim):
    # Create the encoder network
    encoder_input = Input(shape=(max_seq_length,))
    encoder_embedding_layer = Embedding(vocab_size_input, embedding_dim)
    encoder_embedding = encoder_embedding_layer(encoder_input)
    encoder_lstm = Bidirectional(LSTM(LSTM_UNITS, return_sequences=True, return_state=True))
    encoder_output, forward_h, forward_c, backward_h, backward_c = encoder_lstm(encoder_embedding)
    state_h = Concatenate()([forward_h, backward_h])
    state_c = Concatenate()([forward_c, backward_c])

    # Create the decoder network
    decoder_input = Input(shape=(max_seq_length,))
    decoder_embedding_layer = Embedding(vocab_size_output, embedding_dim)
    decoder_embedding = decoder_embedding_layer(decoder_input)
    decoder_lstm = LSTM(LSTM_UNITS*2, return_sequences=True)
    decoder_output = decoder_lstm(decoder_embedding, initial_state=[state_h, state_c])
    decoder_output = Dense(vocab_size_output, activation='softmax')(decoder_output)

    # Create the encoder-decoder model
    model = Model(inputs=[encoder_input, decoder_input], outputs=decoder_output)

    return model, encoder_input, state_h, state_c, decoder_input, decoder_embedding_layer, decoder_lstm

descriptions, subjects = load_and_preprocess_data(DATA_PATH)

tokenizer_input, sequences_input = tokenize_and_pad(descriptions, MAX_SEQ_LENGTH)
tokenizer_output, sequences_output = tokenize_and_pad(subjects, MAX_SEQ_LENGTH)

vocab_size_input = len(tokenizer_input.word_index) + 1
vocab_size_output = len(tokenizer_output.word_index) + 1
model, encoder_input, state_h, state_c, decoder_input, decoder_embedding_layer, decoder_lstm = build_model(
    MAX_SEQ_LENGTH, vocab_size_input, vocab_size_output, EMBEDDING_DIM)

model.compile(loss='sparse_categorical_crossentropy', optimizer='adam')

sequences_input_train, sequences_input_val, sequences_output_train, sequences_output_val = train_test_split(
    sequences_input, np.expand_dims(sequences_output, -1), test_size=TEST_SPLIT_SIZE, random_state=RANDOM_STATE)

if not os.path.exists(os.path.dirname(CHECKPOINT_PATH)):
    os.makedirs(os.path.dirname(CHECKPOINT_PATH))

checkpoint_callback = ModelCheckpoint(CHECKPOINT_PATH, save_weights_only=True, save_best_only=True, monitor='val_loss')
tensorboard_callback = TensorBoard(log_dir=LOG_DIR)
early_stopping_callback = EarlyStopping(monitor='val_loss', patience=EARLY_STOPPING_PATIENCE)

model.fit([sequences_input_train, sequences_output_train], sequences_output_train,
          batch_size=BATCH_SIZE,
          epochs=NUM_EPOCHS,
          validation_data=([sequences_input_val, sequences_output_val], sequences_output_val),
          callbacks=[checkpoint_callback, tensorboard_callback, early_stopping_callback])

# Inference setup
encoder_model = Model(encoder_input, [state_h, state_c])
decoder_state_input_h = Input(shape=(LSTM_UNITS*2,))
decoder_state_input_c = Input(shape=(LSTM_UNITS*2,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]

# New input tensor for the decoder
decoder_input_inf = Input(shape=(1,))

# Call the decoder_embedding_layer with the new input tensor
decoder_embedding_inf = decoder_embedding_layer(decoder_input_inf)

# Define a new LSTM layer for the decoder
decoder_lstm_inf = LSTM(LSTM_UNITS*2, return_sequences=True, return_state=True)

decoder_outputs_inf, state_h_inf, state_c_inf = decoder_lstm_inf(decoder_embedding_inf, initial_state=decoder_states_inputs)

decoder_states_inf = [state_h_inf, state_c_inf]
decoder_outputs_inf = Dense(len(tokenizer_output.word_index) + 1, activation='softmax')(decoder_outputs_inf)
decoder_model = Model([decoder_input_inf] + decoder_states_inputs, [decoder_outputs_inf] + decoder_states_inf)

int_to_word_decoder = {i: word for word, i in tokenizer_output.word_index.items()}
int_to_word_decoder[1] = '<OOV>'  # dealing with the OOV token

def decode_sequence(input_sequence):
    states_value = encoder_model.predict(input_sequence)
    target_sequence = np.zeros((1, 1))
    target_sequence[0, 0] = 1
    output_sequence = ''
    while True:
        output_tokens, h, c = decoder_model.predict([target_sequence] + states_value)
        sampled_token_index = np.argmax(output_tokens[0, -1, :])
        sampled_word = int_to_word_decoder.get(sampled_token_index, '<OOV>')  # dealing with the OOV token
        output_sequence += ' ' + sampled_word
        if sampled_word == '<end>' or len(output_sequence) > MAX_SEQ_LENGTH:
            break
        target_sequence = np.zeros((1, 1))
        target_sequence[0, 0] = sampled_token_index
        states_value = [h, c]
    return output_sequence

while True:
    input_description = input("Enter a new description (type 'quit' to exit): ")
    if input_description.lower() == 'quit':
        break
    input_sequence = tokenizer_input.texts_to_sequences([input_description])
    input_sequence = pad_sequences(input_sequence, maxlen=MAX_SEQ_LENGTH, padding='post')
    predicted_sequence = decode_sequence(input_sequence)
    print("Predicted Subject:", predicted_sequence)


Epoch 1/1000
