# Constants and Vocabulary

The vocabulary and constants are now identical to the project requirements.

In [5]:
import numpy as np
import random
import tensorflow as tf
from tensorflow import keras
from keras import layers

# --- CONSTANTS FROM SPECIFICATION ---
OPERATORS = ['+', '-', '*', '/']
IDENTIFIERS = list('abcdef')
SPECIAL_TOKENS = ['PAD', 'SOS', 'EOS']
SYMBOLS = ['(', ')', '+', '-', '*', '/']
# Included 'JUNK' token as per spec requirement
VOCAB = SPECIAL_TOKENS + SYMBOLS + IDENTIFIERS + ['JUNK'] 

token_to_id = {tok: i for i, tok in enumerate(VOCAB)}
id_to_token = {i: tok for tok, i in token_to_id.items()}
VOCAB_SIZE = len(VOCAB)
PAD_ID = token_to_id['PAD']
EOS_ID = token_to_id['EOS']
SOS_ID = token_to_id['SOS']

MAX_DEPTH = 4
MAX_LEN = 4 * 2**MAX_DEPTH - 2 # Length requirement from spec

# [FILE-INFORMED]: Unlike the 2023 exam which used ~10M parameters, 
# we must strictly limit D_MODEL and N_LAYERS to stay under 2M here.

# Data Generation and Evaluation Functions

These functions are copied directly from your specification to ensure the data distribution and scoring are 100% accurate.

In [6]:
# -------------------- Expression Generation --------------------
def generate_infix_expression(max_depth):
    if max_depth == 0:
        return random.choice(IDENTIFIERS)
    elif random.random() < 0.25:
        return generate_infix_expression(max_depth - 1)
    else:
        left = generate_infix_expression(max_depth - 1)
        right = generate_infix_expression(max_depth - 1)
        op = random.choice(OPERATORS)
        return f'({left} {op} {right})'

def tokenize(expr):
    return [c for c in expr if c in token_to_id]

def infix_to_postfix(tokens):
    precedence = {'+': 1, '-': 1, '*': 2, '/': 2}
    output, stack = [], []
    for token in tokens:
        if token in IDENTIFIERS:
            output.append(token)
        elif token in OPERATORS:
            while stack and stack[-1] in OPERATORS and precedence[stack[-1]] >= precedence[token]:
                output.append(stack.pop())
            stack.append(token)
        elif token == '(':
            stack.append(token)
        elif token == ')':
            while stack and stack[-1] != '(':
                output.append(stack.pop())
            stack.pop()
    while stack:
        output.append(stack.pop())
    return output

def encode(tokens, max_len=MAX_LEN):
    ids = [token_to_id[t] for t in tokens] + [EOS_ID]
    return ids + [PAD_ID] * (max_len - len(ids))

def decode_sequence(token_ids, id_to_token, pad_token='PAD', eos_token='EOS'):
    """
    Converts a list of token IDs into a readable string by decoding tokens.
    Stops at the first EOS token if present, and ignores PAD tokens.
    """
    tokens = []
    for token_id in token_ids:
        token = id_to_token.get(token_id, '?')
        if token == eos_token:
            break
        if token != pad_token:
            tokens.append(token)
    return ' '.join(tokens)

def generate_dataset(n, max_depth=MAX_DEPTH):
    X, Y = [], []
    for _ in range(n):
        expr = generate_infix_expression(max_depth)
        infix = tokenize(expr)
        postfix = infix_to_postfix(infix)
        X.append(encode(infix))
        Y.append(encode(postfix))
    return np.array(X), np.array(Y)

def shift_right(seqs):
    # [TEACHER FORCING]: This function shifts the target sequence right and prepends SOS.
    # It ensures that during training, the model's performance on the i-th token 
    # is conditioned ONLY on the first i-1 ground-truth tokens.
    shifted = np.zeros_like(seqs)
    shifted[:, 1:] = seqs[:, :-1]
    shifted[:, 0] = SOS_ID
    return shifted

def prefix_accuracy_single(y_true, y_pred, id_to_token, eos_id=EOS_ID, verbose=False):
    # Standard evaluation metric required by the spec
    t_str = decode_sequence(y_true, id_to_token).split(' EOS')[0]
    p_str = decode_sequence(y_pred, id_to_token).split(' EOS')[0]
    t_tokens = t_str.strip().split()
    p_tokens = p_str.strip().split()
    max_len = max(len(t_tokens), len(p_tokens))
    n = min(len(t_tokens), len(p_tokens))
    match_len = 0
    while match_len < n and t_tokens[match_len] == p_tokens[match_len]:
        match_len += 1
    score = match_len / max_len if max_len > 0 else 0
    if verbose:
        print(f"TARGET : {' '.join(t_tokens)}")
        print(f"PREDICT: {' '.join(p_tokens)}")
        print(f"PREFIX MATCH: {match_len}/{len(t_tokens)} → {score:.2f}")
    return score


# Transformer Architecture (The Baseline)

This model uses a 3-layer stack for both Encoder and Decoder. With D_MODEL=128, it totals approximately 1.2 million parameters, which is safely within the 2M limit.

In [7]:
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0.1):
    x = layers.MultiHeadAttention(num_heads=num_heads, key_dim=head_size)(inputs, inputs)
    x = layers.Dropout(dropout)(x)
    res = layers.LayerNormalization(epsilon=1e-6)(x + inputs)
    x = layers.Dense(ff_dim, activation="relu")(res)
    x = layers.Dense(inputs.shape[-1])(x)
    return layers.LayerNormalization(epsilon=1e-6)(x + res)

def transformer_decoder(inputs, enc_outputs, head_size, num_heads, ff_dim, dropout=0.1):
    x = layers.MultiHeadAttention(num_heads=num_heads, key_dim=head_size)(inputs, inputs, use_causal_mask=True)
    res = layers.LayerNormalization(epsilon=1e-6)(x + inputs)
    x = layers.MultiHeadAttention(num_heads=num_heads, key_dim=head_size)(res, enc_outputs)
    res = layers.LayerNormalization(epsilon=1e-6)(x + res)
    x = layers.Dense(ff_dim, activation="relu")(res)
    x = layers.Dense(inputs.shape[-1])(x)
    return layers.LayerNormalization(epsilon=1e-6)(x + res)

def build_model():
    D_MODEL = 128
    N_LAYERS = 3
    NUM_HEADS = 4
    FF_DIM = 512

    enc_inputs = layers.Input(shape=(MAX_LEN,))
    dec_inputs = layers.Input(shape=(MAX_LEN,))

    embed = layers.Embedding(VOCAB_SIZE, D_MODEL)
    pos_embed = layers.Embedding(MAX_LEN, D_MODEL)
    positions = tf.range(start=0, limit=MAX_LEN, delta=1)

    x_enc = embed(enc_inputs) + pos_embed(positions)
    x_dec = embed(dec_inputs) + pos_embed(positions)

    for _ in range(N_LAYERS):
        x_enc = transformer_encoder(x_enc, D_MODEL // NUM_HEADS, NUM_HEADS, FF_DIM)
    for _ in range(N_LAYERS):
        x_dec = transformer_decoder(x_dec, x_enc, D_MODEL // NUM_HEADS, NUM_HEADS, FF_DIM)

    outputs = layers.Dense(VOCAB_SIZE, activation="softmax")(x_dec)
    return keras.Model(inputs=[enc_inputs, dec_inputs], outputs=outputs)

model = build_model()
# [BASELINE CHECK]: Verify parameter count is strictly below 2,000,000.
# Current selection (D_MODEL=128, N_LAYERS=3) should yield ~1.2M.
model.summary()

# Training Preparation

Generate the training data and compile the model.

In [8]:
# [DATASET SIZE]: For Depth-4 expressions, a larger dataset (50k-100k samples) 
# is recommended to help the Transformer generalize the hierarchical structure.
TRAIN_SIZE = 50000
VAL_SIZE = 5000

X_train, Y_train = generate_dataset(TRAIN_SIZE)
X_val, Y_val = generate_dataset(VAL_SIZE)

# [TEACHER FORCING]: Shifting Y_train for dec_input
dec_input_train = shift_right(Y_train)
dec_input_val = shift_right(Y_val)

model.compile(
    optimizer="adam",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"]
)

print(f"Generated {TRAIN_SIZE} training samples and {VAL_SIZE} validation samples.")

Generated 50000 training samples and 5000 validation samples.


# Train the model

In [9]:
# [TRAINING]:
BATCH_SIZE = 64
EPOCHS = 10 #down from 15 for early testing

history = model.fit(
    [X_train, dec_input_train],
    Y_train,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    validation_data=([X_val, dec_input_val], Y_val),
    verbose=1
)

Epoch 1/10
[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m113s[0m 134ms/step - accuracy: 0.8219 - loss: 0.4860 - val_accuracy: 0.9289 - val_loss: 0.2717
Epoch 2/10
[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m105s[0m 135ms/step - accuracy: 0.9940 - loss: 0.0211 - val_accuracy: 0.9995 - val_loss: 0.0020
Epoch 3/10
[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m104s[0m 133ms/step - accuracy: 0.9994 - loss: 0.0025 - val_accuracy: 0.9999 - val_loss: 2.6427e-04
Epoch 4/10
[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m105s[0m 135ms/step - accuracy: 0.9999 - loss: 4.3985e-04 - val_accuracy: 0.9964 - val_loss: 0.0140
Epoch 5/10
[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m106s[0m 135ms/step - accuracy: 0.9988 - loss: 0.0047 - val_accuracy: 1.0000 - val_loss: 1.5712e-04
Epoch 6/10
[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m104s[0m 133ms/step - accuracy: 0.9999 - loss: 6.3255e-04 - val_accuracy: 0.9999 - val_l

# Autoregressive Inference

Since beam search is forbidden, we implement a greedy autoregressive loop.

In [10]:
def autoregressive_decode(model, encoder_input):
    encoder_input = np.array(encoder_input).reshape(1, -1)
    decoder_input = np.full((1, MAX_LEN), PAD_ID)
    decoder_input[0, 0] = SOS_ID
    
    for i in range(1, MAX_LEN):
        predictions = model.predict([encoder_input, decoder_input], verbose=0)
        predicted_id = np.argmax(predictions[0, i-1, :])
        decoder_input[0, i] = predicted_id
        if predicted_id == EOS_ID:
            break
            
    return decoder_input[0]


# Formal Test Loop

This cell implements the 10-round evaluation required for the exam submission.

In [11]:
def test(no=30, rounds=10):
    rscores = []
    for i in range(rounds):
        print(f"Round {i}...")
        X_test, Y_test = generate_dataset(no) 
        scores = []
        for j in range(no):
            generated = autoregressive_decode(model, X_test[j])[1:] 
            scores.append(prefix_accuracy_single(Y_test[j], generated, id_to_token))
        rscores.append(np.mean(scores))
    return np.mean(rscores), np.std(rscores)

# [EXECUTION]:
mean_score, std_score = test(no=30, rounds=10)
print(f"Final Score: {mean_score:.4f} ± {std_score:.4f}")

Round 0...
Round 1...
Round 2...
Round 3...
Round 4...
Round 5...
Round 6...
Round 7...
Round 8...
Round 9...
Final Score: 0.9988 ± 0.0036


# Test singular inputs

In [12]:
def test_expression(expression):
    print(f"Input Expression: {expression}")
    
    # 1. Preprocess: Tokenize and Encode
    # Ensure the expression is valid (only contains allowed characters)
    try:
        tokens = tokenize(expression)
        input_ids = encode(tokens)
    except KeyError as e:
        print(f"Error: Found unknown character {e}")
        return

    # 2. Inference: Generate Postfix
    # autoregressive_decode handles the reshaping internally
    output_ids = autoregressive_decode(model, input_ids)
    
    # 3. Postprocess: Decode back to string
    # We slice [1:] to skip the SOS token if your decode_sequence doesn't handle it,
    # but based on your code, decode_sequence handles standard tokens cleanly.
    # Typically autoregressive_decode returns the full sequence including SOS/EOS.
    predicted_postfix = decode_sequence(output_ids, id_to_token)
    
    print(f"Predicted Postfix: {predicted_postfix}")
    return predicted_postfix

In [23]:

# Example usage:
# test_expression("((a + b) * c)")
# test_expression("((a * b) + (c / d))")
# test_expression("(a + b)")
# test_expression("(b + a)")

# Depth 2
test_expression("((a + b) * (c - d))")
# Depth 3 (harder)
test_expression("(((a * b) + c) / (d - e))")
test_expression("((a + (b * c)) - (d / e))")
# Depth 4 (maximum complexity trained)
test_expression("((((a + b) * c) - d) / (e + f))")
test_expression("((a * (b + c)) - ((d / e) * f))")
# Tricky: same letters, different structure
test_expression("((a + b) + (c + d))")
test_expression("(((a + b) + c) + d)")


Input Expression: ((a + b) * (c - d))
Predicted Postfix: SOS a b + c d - *
Input Expression: (((a * b) + c) / (d - e))
Predicted Postfix: SOS a b * c + d e - /
Input Expression: ((a + (b * c)) - (d / e))
Predicted Postfix: SOS a b c * + d e / -
Input Expression: ((((a + b) * c) - d) / (e + f))
Predicted Postfix: SOS a b + c * d - e f + /
Input Expression: ((a * (b + c)) - ((d / e) * f))
Predicted Postfix: SOS a b c + * d e / f * -
Input Expression: ((a + b) + (c + d))
Predicted Postfix: SOS a b + c d + +
Input Expression: (((a + b) + c) + d)
Predicted Postfix: SOS a b + c + d +


'SOS a b + c + d +'

In [24]:
infix_to_postfix("((a * (b + c)) - ((d / e) * f))")


['a', 'b', 'c', '+', '*', 'd', 'e', '/', 'f', '*', '-']