In [None]:
import numpy as np
import random
import string
import tensorflow as tf
from tensorflow.keras import layers, models
import matplotlib.pyplot as plt

print("TensorFlow version:", tf.__version__)

In [None]:
# -------------------- Constants --------------------
OPERATORS = ['+', '-', '*', '/']
IDENTIFIERS = list('abcde')
SPECIAL_TOKENS = ['PAD', 'SOS', 'EOS']
SYMBOLS = ['(', ')', '+', '-', '*', '/']
VOCAB = SPECIAL_TOKENS + SYMBOLS + IDENTIFIERS + ['JUNK']
token_to_id = {tok: i for i, tok in enumerate(VOCAB)}
id_to_token = {i: tok for tok, i in token_to_id.items()}
VOCAB_SIZE = len(VOCAB)
PAD_ID = token_to_id['PAD']
EOS_ID = token_to_id['EOS']
SOS_ID = token_to_id['SOS']
MAX_DEPTH = 3
MAX_LEN = 4*2**MAX_DEPTH - 2

print(f"Vocabulary size: {VOCAB_SIZE}")
print(f"Vocabulary: {VOCAB}")
print(f"PAD_ID: {PAD_ID}, SOS_ID: {SOS_ID}, EOS_ID: {EOS_ID}")

In [None]:
# -------------------- Expression Generation --------------------
def generate_infix_expression(max_depth):
    if max_depth == 0:
        return random.choice(IDENTIFIERS)
    elif random.random() < 0.5:
        return generate_infix_expression(max_depth - 1)
    else:
        left = generate_infix_expression(max_depth - 1)
        right = generate_infix_expression(max_depth - 1)
        op = random.choice(OPERATORS)
        return f'({left} {op} {right})'

def tokenize(expr):
    return [c for c in expr if c in token_to_id]

def infix_to_postfix(tokens):
    precedence = {'+': 1, '-': 1, '*': 2, '/': 2}
    output, stack = [], []
    for token in tokens:
        if token in IDENTIFIERS:
            output.append(token)
        elif token in OPERATORS:
            while stack and stack[-1] in OPERATORS and precedence[stack[-1]] >= precedence[token]:
                output.append(stack.pop())
            stack.append(token)
        elif token == '(':
            stack.append(token)
        elif token == ')':
            while stack and stack[-1] != '(':
                output.append(stack.pop())
            if stack:
                stack.pop()
    while stack:
        output.append(stack.pop())
    return output

def encode(tokens, max_len=MAX_LEN):
    ids = [token_to_id[t] for t in tokens] + [EOS_ID]
    return ids + [PAD_ID] * (max_len - len(ids))

def decode_sequence(token_ids, id_to_token, pad_token='PAD', eos_token='EOS'):
    """
    Converts a list of token IDs into a readable string by decoding tokens.
    Stops at the first EOS token if present, and ignores PAD tokens.
    """
    tokens = []
    for token_id in token_ids:
        token = id_to_token.get(token_id, '?')
        if token == eos_token:
            break
        if token != pad_token:
            tokens.append(token)
    return ' '.join(tokens)

def generate_dataset(n, max_depth=MAX_DEPTH):
    X, Y = [], []
    for _ in range(n):
        expr = generate_infix_expression(max_depth)
        #expr = expr_gen.generate(max_depth=max_dthep)
        infix = tokenize(expr)
        postfix = infix_to_postfix(infix)
        X.append(encode(infix))
        Y.append(encode(postfix))
    return np.array(X), np.array(Y)

#you might use the shift function for teacher-forcing
def shift_right(seqs):
    shifted = np.zeros_like(seqs)
    shifted[:, 1:] = seqs[:, :-1]
    shifted[:, 0] = SOS_ID
    return shifted

In [None]:
# -------------------- Generate and Inspect Dataset --------------------
print("\n Generating Training Dataset")

X_train, Y_train = generate_dataset(100000)
decoder_input_train = shift_right(Y_train)

X_val, Y_val = generate_dataset(10000)
decoder_input_val = shift_right(Y_val)

X_test, Y_test = generate_dataset(1000)
decoder_input_test = shift_right(Y_test)

print(f"Training set size: {X_train.shape}")
print(f"Validation set size: {X_val.shape}")
print(f"Test set size: {X_test.shape}")

In [None]:
# Show 5 random examples
print("\n5 Dataset Examples")
for i in range(5):
    idx = np.random.randint(len(X_train))
    print(f"\nExample {i+1}:")
    print(f"  Infix:   {decode_sequence(X_train[idx], id_to_token)}")
    print(f"  Postfix: {decode_sequence(Y_train[idx], id_to_token)}")
    print(f"  Teacher: {decode_sequence(decoder_input_train[idx], id_to_token)}")

## Final Model Architecture: Encoder-Decoder with LSTM

This section defines the neural network model that performs the translation from infix to postfix notation.

In this task I have use an **encoder-decoder architecture with LSTM layers**:

- The **encoder** reads the tokenized infix expression and encodes it into a fixed-size state vector.
- The **decoder** uses this encoded state along with teacher forcing to generate the corresponding postfix sequence step-by-step.

Main Components are these:

- **Embedding Layers**: Transform token IDs into dense vector representations for both encoder and decoder.
- **LSTM Layers**: Handle sequential input and preserve contextual information.
- **Dense Layers**: Map the decoder output to token probabilities using a softmax layer.
- **Parameter Limit**: The total parameter count is kept below the required 2 million.

The model is compiled using the **Adam** optimizer and trained with the **sparse_categorical_crossentropy** loss, which is suitable for multi-class token prediction tasks.

So, at the end summary of the architecture is printed, this section to verify the structure and total parameter count as well

In [None]:
# -------------------- Final Model Architecture test v6.0 --------------------
print("\nBuilding Neural Network Model test print")

# Define embedding dimension and hidden size as constants for reuse
EMBEDDING_DIM = 64
HIDDEN_SIZE = 128

def create_model():
    """
    Create encoder-decoder model with named layers for inference model building
    """
    # Encoder
    encoder_inputs = layers.Input(shape=(MAX_LEN,), name='encoder_input')
    encoder_embedding_layer = layers.Embedding(VOCAB_SIZE, EMBEDDING_DIM, mask_zero=True, name='encoder_embedding')
    encoder_embedding = encoder_embedding_layer(encoder_inputs)

    # Encoder LSTM
    encoder_lstm = layers.LSTM(HIDDEN_SIZE, return_sequences=True, return_state=True, name='encoder_lstm')
    encoder_outputs, state_h, state_c = encoder_lstm(encoder_embedding)
    encoder_states = [state_h, state_c]

    # Decoder
    decoder_inputs = layers.Input(shape=(MAX_LEN,), name='decoder_input')
    decoder_embedding_layer = layers.Embedding(VOCAB_SIZE, EMBEDDING_DIM, mask_zero=True, name='decoder_embedding')
    decoder_embedding = decoder_embedding_layer(decoder_inputs)

    # Decoder LSTM
    decoder_lstm = layers.LSTM(HIDDEN_SIZE, return_sequences=True, return_state=True, name='decoder_lstm')
    decoder_outputs, _, _ = decoder_lstm(decoder_embedding, initial_state=encoder_states)

    # Dense layers
    dense_layer = layers.Dense(64, activation='relu', name='dense_hidden')
    dense_hidden = dense_layer(decoder_outputs)

    output_layer = layers.Dense(VOCAB_SIZE, activation='softmax', name='output_layer')
    final_outputs = output_layer(dense_hidden)

    # Create training model
    model = models.Model([encoder_inputs, decoder_inputs], final_outputs, name='infix_to_postfix')

    # Store layer references for inference model building
    model.encoder_embedding_layer = encoder_embedding_layer
    model.encoder_lstm = encoder_lstm
    model.decoder_embedding_layer = decoder_embedding_layer
    model.decoder_lstm = decoder_lstm
    model.dense_layer = dense_layer
    model.output_layer = output_layer

    return model

# Create and compile model
model = create_model()
model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

In [None]:
# Model summary
print("\nModel Architecture as a summary")
model.summary()

## The Model Training Part: Supervised Learning with Teacher Forcing

In this section, I train the encoder-decoder model using the generated infix-postfix expression pairs.

Steps:

- **Input Preparation**:
  - **X_train_inputs** contains the encoder input and decoder input sequences (with SOS token prepended).
  - **Y_train_targets** contains the expected decoder output (with EOS and padding).
  - Similar structure is applied to validation data.

- **Callbacks**:
  - **EarlyStopping**: Stops training when the validation loss stops improving for 10 epochs, restoring the best weights.
  - **ReduceLROnPlateau**: Dynamically reduces learning rate if no progress is observed, improving convergence.

- **Training Configurations**:
  - Uses **batch_size = 32**, **epochs = 15**, and teacher forcing during training.
  - **Sparse categorical crossentropy** is used to match the softmax output with token IDs.

- **Model Saving**:
  - The trained model is saved in multiple formats:
    - Full model (`modelx.keras`)
    - Weights only (`modelx.weights.h5`)
    - TensorFlow SavedModel format for serving (`modelx_ckpt`)

Training metrics are stored in the `history` object for later visualization.

In [None]:
# -------------------- Training --------------------
print("\nStart Training the Model test print")

# Prepare training data
X_train_inputs = [X_train, decoder_input_train]
Y_train_targets = np.expand_dims(Y_train, -1)
X_val_inputs = [X_val, decoder_input_val]
Y_val_targets = np.expand_dims(Y_val, -1)

# Training callbacks
callbacks = [
    tf.keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True),
    tf.keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=5, min_lr=1e-6)
]

# Train the model
print("Starting training process test print")
history = model.fit(
    X_train_inputs, Y_train_targets,
    batch_size=32,
    epochs=15,
    validation_data=(X_val_inputs, Y_val_targets),
    callbacks=callbacks,
    shuffle=True,
    verbose=1
)

# Save the model
model.save('Infix-to-postfix translation-model-Yasas1.keras')
model.save_weights('Infix-to-postfix translation-model-Yasas1.weights.h5')
model.export('Infix-to-postfix translation-model-Yasas1_ckpt')

In [None]:
# -------------------- Training History Visualization --------------------

plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.tight_layout()
plt.show()

## Load Pretrained Weights from Google Drive

In [None]:
# Download pretrained weights from Google Drive using gdown
!pip install -q gdown

# Replace this with your actual file ID from Google Drive
file_id = "1QBp1wlIrFyWb8zPEPQIdT4SWL3kkop3v"
output_name = "Infix-to-postfix-translation-model-Yasas1.weights.h5"

# Download the weights
!gdown https://drive.google.com/uc?id={file_id} -O {output_name}

In [None]:
model.load_weights("Infix-to-postfix-translation-model-Yasas1.weights.h5")
print("Pretrained weights loaded successfully Test Print")

## Building Inference Models for True Autoregressive Decoding

My model is trained using teacher forcing — where the full sequence is available at each step — **this approach cannot be used during evaluation** if we aim to simulate real-world prediction behavior. That's why I had to **build separate inference models** for step-by-step, autoregressive decoding.

### Why Inference Models is Necessary in my case?

During training, we pass the entire decoder input sequence to the model at once. But this **violates true autoregressive decoding**, because:

- The decoder LSTM processes the **full sequence** in parallel.
- It gains access to future context it wouldn't have during real-time generation.
- This leads to unrealistically high evaluation scores and breaks the generation constraints.

### So What Proper Autoregressive Decoding Requires?

To meet the project constraints and simulate real inference:

- I use the encoder to compute the initial state vector.
- Then, I use a loop to decode one token at a time.
- At each step:
  - Feed **only the last predicted token**.
  - Update internal decoder LSTM states (`h`, `c`) manually.
  - Repeat until an `EOS` token is predicted or a maximum length is reached.

This section of code builds:

- A **standalone encoder model** to generate the initial hidden and cell states from the infix input.
- A **custom decoder model** that:
  - Accepts a **single token input** at each step.
  - Uses the previously generated decoder states to continue the prediction.
  - Outputs both the next token and the updated decoder states.

These two models are used together in a custom loop to generate postfix expressions one token at a time — exactly as project required for **true autoregressive evaluation**.

In [None]:
# -------------------- BUILD INFERENCE MODELS --------------------
print("\nBuilding Inference Models for Autoregressive Decoding test print")

def build_inference_models(trained_model):
    """
    Build separate encoder and decoder models for proper autoregressive inference
    """
    # Get the trained model's layers
    encoder_inputs = trained_model.input[0]

    # Get encoder states from the trained model
    encoder_outputs, state_h, state_c = trained_model.get_layer('encoder_lstm').output
    encoder_states = [state_h, state_c]

    # Build encoder model
    encoder_model = models.Model(encoder_inputs, encoder_states, name='encoder_inference')

    # Build decoder model for inference
    # Decoder inputs
    decoder_state_input_h = layers.Input(shape=(HIDDEN_SIZE,), name='decoder_h_input')
    decoder_state_input_c = layers.Input(shape=(HIDDEN_SIZE,), name='decoder_c_input')
    decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]

    # Single token input for decoder
    decoder_input_single = layers.Input(shape=(1,), name='decoder_single_input')

    # Get embeddings for single token
    decoder_embedding_inf = trained_model.decoder_embedding_layer(decoder_input_single)

    # Run through decoder LSTM
    decoder_outputs_inf, state_h_inf, state_c_inf = trained_model.decoder_lstm(
        decoder_embedding_inf, initial_state=decoder_states_inputs
    )
    decoder_states_inf = [state_h_inf, state_c_inf]

    # Apply dense layers
    dense_outputs_inf = trained_model.dense_layer(decoder_outputs_inf)
    decoder_outputs_inf = trained_model.output_layer(dense_outputs_inf)

    # Create decoder model
    decoder_model = models.Model(
        [decoder_input_single] + decoder_states_inputs,
        [decoder_outputs_inf] + decoder_states_inf,
        name='decoder_inference'
    )

    return encoder_model, decoder_model

# Build the inference models
encoder_model, decoder_model = build_inference_models(model)

print("Encoder inference model summary:")
encoder_model.summary()
print("\nDecoder inference model summary:")
decoder_model.summary()

## Autoregressive Decoding Function: Token-by-Token Inference

This method **strictly enforces autoregressive decoding**, preventing any lookahead or parallel processing during generation, just like a real parser operating in real-time.

The result is a decoded postfix expression that aligns with the evaluation constraints of the project and fairly reflects model performance.

### Why Not Use Full Sequence Decoding in My Case?

Standard decoding during training uses the full sequence of previous tokens, which allows the model to see **all prior context at once** (and even exploit padding). This violates the core constraint of the project, which requires **greedy, step-by-step decoding**, where each token is predicted based only on:

- The initial encoder output (hidden states)
- The last predicted token
- The updated internal states of the decoder

### What This part of the code Does

- **Initial Step**:
  - Runs the encoder model once to obtain initial hidden and cell states from the infix input.
  - Initializes the decoder with the `SOS` token.

- **Token-by-Token Loop**:
  - Predicts one token at a time using the current token and decoder states.
  - Updates the decoder input with the sampled token.
  - Feeds back the new decoder states (`h`, `c`) for the next prediction.
  - Stops if an `EOS` token is reached or maximum length is exceeded.

In [None]:
# -------------------- AUTOREGRESSIVE DECODE FUNCTION --------------------
def autoregressive_decode_proper(encoder_model, decoder_model, encoder_input, max_length=MAX_LEN):
    """
    Proper autoregressive decoding using separate encoder/decoder models
    This ensures true step-by-step generation without any possibility of lookahead
    """
    # Prepare encoder input
    if encoder_input.ndim == 1:
        encoder_input = np.expand_dims(encoder_input, 0)

    # Encode the input sequence to get states
    states_value = encoder_model.predict(encoder_input, verbose=0)

    # Initialize with SOS token
    target_seq = np.zeros((1, 1))
    target_seq[0, 0] = SOS_ID

    # Collect the decoded sequence
    decoded_sequence = [SOS_ID]

    # Decode step by step
    for i in range(max_length - 1):
        # Predict next token and update states
        output_tokens, h, c = decoder_model.predict(
            [target_seq] + states_value, verbose=0
        )

        # Sample the next token
        sampled_token_index = np.argmax(output_tokens[0, -1, :])
        decoded_sequence.append(sampled_token_index)

        # Exit if EOS
        if sampled_token_index == EOS_ID:
            break

        # Update the target sequence (single token)
        target_seq[0, 0] = sampled_token_index

        # Update states for next iteration
        states_value = [h, c]

    return np.array(decoded_sequence)

## Updated the Test Function and The Reason

In the original project notes, the `test()` function used `model.predict()` on both the encoder and decoder inputs together, which **violated the constraint of true autoregressive decoding**.

That approach allowed the decoder to:

- Process the full decoder input sequence in parallel.
- Access future token positions due to teacher forcing mechanics.
- Benefit from hidden alignment learned during training.

### The Fix: Step-by-Step Generation

Updated the test function to use `autoregressive_decode_proper()` with separate encoder and decoder inference models. This ensures:

- Only one token is fed at a time to the decoder.
- Internal decoder states (`h`, `c`) are updated manually.
- Predictions are made **strictly one token at a time**, without lookahead.

This change is helps to comply with the project's requirement for **true autoregressive generation** during evaluation and guarantees that the final score fairly reflects the model's real-world performance.

In [None]:
# -------------------- EVALUATION FUNCTIONS --------------------
def prefix_accuracy_single(y_true, y_pred, id_to_token, eos_id=EOS_ID, verbose=False):
    """Calculate prefix accuracy between predicted and true sequences"""
    t_str = decode_sequence(y_true, id_to_token).split(' EOS')[0]
    p_str = decode_sequence(y_pred, id_to_token).split(' EOS')[0]

    t_tokens = t_str.strip().split()
    p_tokens = p_str.strip().split()

    max_len = max(len(t_tokens), len(p_tokens))
    match_len = sum(x == y for x, y in zip(t_tokens, p_tokens))
    score = match_len / max_len if max_len > 0 else 0

    if verbose:
        print("TARGET :", ' '.join(t_tokens))
        print("PREDICT:", ' '.join(p_tokens))
        print(f"PREFIX MATCH: {match_len}/{len(t_tokens)} \u2192 {score:.2f}")

    return score

def test_with_proper_autoregression(encoder_model, decoder_model, no=20, rounds=10):
    """Test using proper autoregressive decoding"""
    rscores = []
    for i in range(rounds):
        print(f"Round {i+1}/{rounds}")
        X_test, Y_test = generate_dataset(no)
        scores = []

        for j in range(no):
            encoder_input = X_test[j]
            # Use the proper autoregressive decoding
            generated = autoregressive_decode_proper(encoder_model, decoder_model, encoder_input)[1:]  # remove SOS
            scores.append(prefix_accuracy_single(Y_test[j], generated, id_to_token))

        rscores.append(np.mean(scores))

    return np.mean(rscores), np.std(rscores)

In [None]:
# Test with proper autoregression
print("\nTesting with Proper Autoregressive Decoding test print")
res, std = test_with_proper_autoregression(encoder_model, decoder_model, 20, 10)
print(f"Score: {res:.3f}, Std: {std:.3f}")