In [1]:
import pandas as pd
import tensorflow as tf
from tensorflow.keras.layers import Embedding, LSTM, Dense, Bidirectional
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.utils import pad_sequences
from gensim.models import Word2Vec,FastText


In [2]:
df = pd.read_json("dataset/PIZZA_train.json", lines=True)
df = df.sample(5000)

In [None]:

# Dataset Preparation
BATCH_SIZE = 64
BUFFER_SIZE = 10000

dataset = tf.data.Dataset.from_tensor_slices((src_sequences, tgt_sequences))
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)

# Model Parameters
EMBED_SIZE = 128
HIDDEN_SIZE = 256

# Encoder
class Encoder(Model):
    def __init__(self, vocab_size, embed_size, hidden_size):
        super(Encoder, self).__init__()
        self.embedding = Embedding(vocab_size, embed_size, trainable=True)
        self.lstm = Bidirectional(LSTM(hidden_size, return_sequences=True, return_state=True))

    def call(self, x):
        x = self.embedding(x)
        outputs, forward_h, forward_c, backward_h, backward_c = self.lstm(x)
        state_h = tf.concat([forward_h, backward_h], axis=-1)
        state_c = tf.concat([forward_c, backward_c], axis=-1)
        return outputs, state_h, state_c

# Attention Layer
class Attention(tf.keras.layers.Layer):
    def __init__(self, hidden_size):
        super(Attention, self).__init__()
        self.W1 = Dense(hidden_size)
        self.W2 = Dense(hidden_size)
        self.V = Dense(1)

    def call(self, encoder_outputs, hidden):
        hidden_with_time_axis = tf.expand_dims(hidden, 1)
        score = self.V(tf.nn.tanh(self.W1(encoder_outputs) + self.W2(hidden_with_time_axis)))
        attention_weights = tf.nn.softmax(score, axis=1)
        context_vector = attention_weights * encoder_outputs
        context_vector = tf.reduce_sum(context_vector, axis=1)
        return context_vector, attention_weights

# Decoder
class Decoder(Model):
    def __init__(self, vocab_size, embed_size, hidden_size):
        super(Decoder, self).__init__()
        self.embedding = Embedding(vocab_size, embed_size, trainable=True)
        self.lstm = LSTM(hidden_size * 2, return_sequences=True, return_state=True)
        self.fc = Dense(vocab_size)
        self.attention = Attention(hidden_size)

    def call(self, x, encoder_outputs, hidden, cell):
        context_vector, attention_weights = self.attention(encoder_outputs, hidden)
        x = self.embedding(x)
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)
        outputs, state_h, state_c = self.lstm(x, initial_state=[hidden, cell])
        logits = self.fc(outputs)
        return logits, state_h, state_c, attention_weights

# Define the model
encoder = Encoder(src_vocab_size, EMBED_SIZE, HIDDEN_SIZE)
decoder = Decoder(tgt_vocab_size, EMBED_SIZE, HIDDEN_SIZE)

# Loss function
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

def loss_function(real, pred):
    # Squeeze the predictions to remove the extra dimension
    pred = tf.squeeze(pred, axis=1)
    loss = loss_object(real, pred)
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    mask = tf.cast(mask, dtype=loss.dtype)
    loss *= mask
    return tf.reduce_mean(loss)

# Optimizer
optimizer = tf.keras.optimizers.Adam()



In [None]:
# Training step
@tf.function
def train_step(src, tgt):
    loss = 0
    with tf.GradientTape() as tape:
        encoder_outputs, enc_hidden, enc_cell = encoder(src)
        dec_hidden, dec_cell = enc_hidden, enc_cell
        dec_input = tf.expand_dims([tgt_tokenizer.word_index["<sos>"]] * BATCH_SIZE, 1)

        for t in range(1, tgt.shape[1]):
            predictions, dec_hidden, dec_cell, _ = decoder(dec_input, encoder_outputs, dec_hidden, dec_cell)
            loss += loss_function(tgt[:, t], predictions)
            dec_input = tf.expand_dims(tgt[:, t], 1)

    batch_loss = loss / int(tgt.shape[1])
    variables = encoder.trainable_variables + decoder.trainable_variables
    gradients = tape.gradient(loss, variables)
    optimizer.apply_gradients(zip(gradients, variables))
    return batch_loss
# Training loop
EPOCHS = 5

for epoch in range(EPOCHS):
    total_loss = 0

    for (batch, (src, tgt)) in enumerate(dataset):
        print(f'Working = {batch}/{len(dataset)}')
        batch_loss = train_step(src, tgt)
        total_loss += batch_loss

    print(f"Epoch {epoch+1}/{EPOCHS}, Loss: {total_loss.numpy():.4f}")

Working = 0/156
Working = 1/156
Working = 2/156
Working = 3/156
Working = 4/156
Working = 5/156
Working = 6/156
Working = 7/156
Working = 8/156
Working = 9/156
Working = 10/156
Working = 11/156
Working = 12/156
Working = 13/156
Working = 14/156
Working = 15/156
Working = 16/156
Working = 17/156
Working = 18/156
Working = 19/156
Working = 20/156
Working = 21/156
Working = 22/156
Working = 23/156
Working = 24/156
Working = 25/156
Working = 26/156
Working = 27/156
Working = 28/156
Working = 29/156
Working = 30/156
Working = 31/156
Working = 32/156
Working = 33/156
Working = 34/156
Working = 35/156
Working = 36/156
Working = 37/156
Working = 38/156
Working = 39/156
Working = 40/156
Working = 41/156
Working = 42/156
Working = 43/156
Working = 44/156
Working = 45/156
Working = 46/156
Working = 47/156
Working = 48/156
Working = 49/156
Working = 50/156
Working = 51/156
Working = 52/156
Working = 53/156
Working = 54/156
Working = 55/156
Working = 56/156
Working = 57/156
Working = 58/156
Working

KeyboardInterrupt: 

In [None]:
def translate(sentence, encoder, decoder, src_tokenizer, tgt_tokenizer, max_tgt_len):
    # Tokenize and pad the input
    input_sequence = src_tokenizer.texts_to_sequences([sentence])
    input_sequence = pad_sequences(input_sequence, maxlen=max_src_len, padding="post")

    # Encode the input sequence
    encoder_outputs, enc_hidden, enc_cell = encoder(tf.convert_to_tensor(input_sequence))

    # Initialize the decoder
    dec_input = tf.expand_dims([tgt_tokenizer.word_index["<sos>"]], 0)
    dec_hidden, dec_cell = enc_hidden, enc_cell

    result_tokens = []

    for _ in range(max_tgt_len):
        predictions, dec_hidden, dec_cell, _ = decoder(dec_input, encoder_outputs, dec_hidden, dec_cell)
        predicted_id = tf.argmax(predictions[0, 0]).numpy()

        if predicted_id == tgt_tokenizer.word_index["<eos>"]:
            break

        if predicted_id in tgt_tokenizer.index_word:  # Ensure valid token
            result_tokens.append(tgt_tokenizer.index_word[predicted_id])
        else:
            result_tokens.append("<unk>")  # Handle unknown tokens

        dec_input = tf.expand_dims([predicted_id], 0)

    # Join tokens to form the final output
    predicted_sentence = " ".join(result_tokens)
    return predicted_sentence


In [None]:
input_sentence = "i'd like to get a small pepperoni and tuna pizza and i don't want it on thin crust"

# Translate the input
predicted_output = translate(input_sentence, encoder, decoder, src_tokenizer, tgt_tokenizer, max_tgt_len)
print("Input Sentence:", input_sentence)
print("Predicted Output:", predicted_output)