In [4]:
import os
import random
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Embedding, LSTM, Dense, Input
from tensorflow.keras.models import Model
import tensorflow.keras.backend as K

In [7]:
def set_global_seed(seed=42):
    print(f"Impostazione del seed globale a {seed} per riproducibilità...")

    # Seed Python
    random.seed(seed)

    # Seed NumPy
    np.random.seed(seed)

    # Seed TensorFlow
    tf.keras.utils.set_random_seed(seed)
    tf.random.set_seed(seed)

    # Impostazioni di determinismo per TensorFlow
    os.environ['PYTHONHASHSEED'] = str(seed)
    os.environ['TF_DETERMINISTIC_OPS'] = '1'
    os.environ['TF_CUDNN_DETERMINISTIC'] = '1'  # solo su TF >= 2.8


# ESEMPIO DI USO
SEED = 42
set_global_seed(SEED)


Impostazione del seed globale a 42 per riproducibilità...


In [9]:
# =============================================================================
#  COSTANTI E DIZIONARI VOCABOLARIO
# =============================================================================
OPERATORS      = ['+', '-', '*', '/']
IDENTIFIERS    = list('abcde')
SPECIAL_TOKENS = ['PAD', 'SOS', 'EOS']
SYMBOLS        = ['(', ')', '+', '-', '*', '/']

VOCAB = SPECIAL_TOKENS + SYMBOLS + IDENTIFIERS + ['JUNK']
token_to_id = {tok: i for i, tok in enumerate(VOCAB)}
id_to_token = {i: tok for tok, i in token_to_id.items()}

VOCAB_SIZE = len(VOCAB)       # ≈ 15
PAD_ID     = token_to_id['PAD']
SOS_ID     = token_to_id['SOS']
EOS_ID     = token_to_id['EOS']

MAX_DEPTH = 3
MAX_LEN   = 4 * (2 ** MAX_DEPTH) - 2  # = 30

# =============================================================================
#  FUNZIONI PER LA GENERAZIONE DEL DATASET
# =============================================================================
def generate_infix_expression(max_depth: int) -> str:
    """
    Genera un'espressione infix pienamente parentesizzata fino a profondità max_depth.
    Profondità 0: estrae casualmente da IDENTIFIERS. Altrimenti, con 50% ricorre su un sotto-
    espressione (diminuendo depth), altrimenti concatena (left op right) con nuove ricorsioni.
    """
    if max_depth == 0:
        return random.choice(IDENTIFIERS)
    elif random.random() < 0.5:
        return generate_infix_expression(max_depth - 1)
    else:
        left = generate_infix_expression(max_depth - 1)
        right = generate_infix_expression(max_depth - 1)
        op = random.choice(OPERATORS)
        # il formato contiene spazi per agevolare tokenize()
        return f'({left} {op} {right})'

def tokenize(expr: str) -> list[str]:
    """
    Filtra i caratteri dell'espressione che compaiono in token_to_id.
    Esempio: "(a + (b * c))" -> ['(', 'a', '+', '(', 'b', '*', 'c', ')', ')']
    """
    return [c for c in expr if c in token_to_id]

def infix_to_postfix(tokens: list[str]) -> list[str]:
    """
    Converte la lista di token infix in una lista di token postfix usando uno stack.
    Precedenze: +,- (1); *,/ (2). Non si gestiscono associatività perché tutto è già parentesizzato.
    """
    precedence = {'+': 1, '-': 1, '*': 2, '/': 2}
    output, stack = [], []
    for token in tokens:
        if token in IDENTIFIERS:
            output.append(token)
        elif token in OPERATORS:
            # pop finché in cima a stack c'è operatore con precedenza >=
            while stack and stack[-1] in OPERATORS and precedence[stack[-1]] >= precedence[token]:
                output.append(stack.pop())
            stack.append(token)
        elif token == '(':
            stack.append(token)
        elif token == ')':
            # pop finché non trovo "("
            while stack and stack[-1] != '(':
                output.append(stack.pop())
            stack.pop()  # rimuovo "("
    # svuoto lo stack
    while stack:
        output.append(stack.pop())
    return output

def encode(tokens: list[str], max_len: int = MAX_LEN) -> list[int]:
    """
    Converte lista di token in ID, aggiunge EOS e padding PAD fino a max_len.
    """
    ids = [token_to_id[t] for t in tokens] + [EOS_ID]
    assert len(ids) <= max_len, f"Lunghezza {len(ids)} > {max_len}"
    return ids + [PAD_ID] * (max_len - len(ids))

def decode_sequence(token_ids: list[int]) -> str:
    """
    Converte lista di ID in stringa tokenizzata, fermandosi a EOS e ignorando PAD.
    Esempio: [ 'c','b','*','e','*','d','b','/','+','EOS',PAD,.. ] -> "c b * e * d b / +"
    """
    tokens = []
    for token_id in token_ids:
        tok = id_to_token.get(token_id, '?')
        if tok == 'EOS':
            break
        if tok != 'PAD':
            tokens.append(tok)
    return ' '.join(tokens)

def shift_right(seqs: np.ndarray) -> np.ndarray:
    """
    Sposta a destra ogni sequenza target per il teacher forcing, 
    inserendo SOS all'inizio. Shape in input/output: [batch, MAX_LEN].
    """
    shifted = np.zeros_like(seqs)
    shifted[:, 1:] = seqs[:, :-1]
    shifted[:, 0] = SOS_ID
    return shifted

def generate_dataset(n: int, max_depth: int = MAX_DEPTH) -> tuple[np.ndarray, np.ndarray]:
    """
    Genera n espressioni random, ritorna due array:
      - X: [n, MAX_LEN] con ID di infix codificato + EOS + PAD
      - Y: [n, MAX_LEN] con ID di postfix codificato + EOS + PAD
    """
    X, Y = [], []
    for _ in range(n):
        expr = generate_infix_expression(max_depth)
        infix = tokenize(expr)
        postfix = infix_to_postfix(infix)
        X.append(encode(infix))
        Y.append(encode(postfix))
    return np.array(X, dtype=np.int32), np.array(Y, dtype=np.int32)

# =============================================================================
#  DEFINIZIONE DEL MECCANISMO DI ATTENTION (Bahdanau)
# =============================================================================
class BahdanauAttention(tf.keras.layers.Layer):
    """
    Implementazione di Bahdanau (additive) attention:
      score = v^T tanh(W1 · values + W2 · query)
      alpha = softmax(score, axis=1)
      context = Σ_s alpha_s * values_s
    Dove:
      - values: intera sequenza di hidden states dell'encoder (shape [batch, seq_len, enc_units])
      - query: hidden state corrente del decoder (shape [batch, dec_units])
    Ritorna:
      - context_vector: [batch, enc_units]
      - attention_weights: [batch, seq_len, 1]
    """
    def __init__(self, units: int):
        super().__init__()
        # W1: proietta encoder_outputs da (enc_units) a (units)
        self.W1 = tf.keras.layers.Dense(units)
        # W2: proietta decoder_hidden (dec_units) a (units)
        self.W2 = tf.keras.layers.Dense(units)
        # V: proietta la somma tanh(W1+W2) a un punteggio scalare
        self.V  = tf.keras.layers.Dense(1)

    def call(self, query: tf.Tensor, values: tf.Tensor) -> tuple[tf.Tensor, tf.Tensor]:
        """
        query: [batch, dec_units]
        values: [batch, seq_len, enc_units]
        """
        # Aggiungi time axis per query: [batch, 1, dec_units]
        hidden_with_time_axis = tf.expand_dims(query, axis=1)

        # Calcolo score: shape -> [batch, seq_len, 1]
        #   score = V(tanh(W1(values) + W2(query)))
        score = self.V(
            tf.nn.tanh(
                self.W1(values) + self.W2(hidden_with_time_axis)
            )
        )

        # attention_weights: softmax su axis=1 (su seq_len)
        attention_weights = tf.nn.softmax(score, axis=1)  # [batch, seq_len, 1]

        # context_vector = Σ_i α_i * values_i
        context_vector = attention_weights * values  # broadcasting [batch, seq_len, enc_units]
        context_vector = tf.reduce_sum(context_vector, axis=1)  # [batch, enc_units]

        return context_vector, attention_weights

# =============================================================================
#  DEFINIZIONE DEL MODELLO SEQ2SEQ CON ATTENTION
# =============================================================================

# Iperparametri
EMB_DIM    = 64      # dimensione embedding (sia encoder che decoder)
ENC_UNITS  = 128     # numero di celle in encoder LSTM
DEC_UNITS  = 128     # numero di celle in decoder LSTM (coerente con ENC_UNITS)
BATCH_SIZE = 64

# --- 1) Definizione degli Input Layers ---
# encoder_input: sequenze infix (shape [batch, MAX_LEN])
encoder_inputs = Input(shape=(MAX_LEN,), name='encoder_inputs')

# decoder_input: sequenze shiftate (shape [batch, MAX_LEN])
decoder_inputs = Input(shape=(MAX_LEN,), name='decoder_inputs')

# --- 2) Embedding Layers (encoder e decoder) ---
encoder_embedding = Embedding(
    input_dim=VOCAB_SIZE,
    output_dim=EMB_DIM,
    embeddings_initializer=tf.keras.initializers.GlorotUniform(seed=SEED),
    mask_zero=True,                   # per ignorare PAD
    name='encoder_embedding'
)(encoder_inputs)  # shape -> [batch, MAX_LEN, EMB_DIM]

decoder_embedding = Embedding(
    input_dim=VOCAB_SIZE,
    output_dim=EMB_DIM,
    embeddings_initializer=tf.keras.initializers.GlorotUniform(seed=SEED+1),
    mask_zero=True,
    name='decoder_embedding'
)(decoder_inputs)  # shape -> [batch, MAX_LEN, EMB_DIM]

# --- 3) Encoder LSTM (return_sequences + return_state) ---
# Usando un LSTM unidirezionale per semplicità; si potrebbero usare Bidirectional
encoder_lstm = LSTM(
    ENC_UNITS,
    return_sequences=True,
    return_state=True,
    recurrent_initializer=tf.keras.initializers.GlorotUniform(seed=SEED+2),
    name='encoder_lstm'
)
encoder_outputs, state_h, state_c = encoder_lstm(encoder_embedding)
# encoder_outputs: [batch, MAX_LEN, ENC_UNITS]
# state_h, state_c: entrambi [batch, ENC_UNITS]
encoder_states = [state_h, state_c]

# --- 4) Decoder LSTM (return_sequences=True per generare tutta la sequenza in training) ---
decoder_lstm = LSTM(
    DEC_UNITS,
    return_sequences=True,
    return_state=True,
    recurrent_initializer=tf.keras.initializers.GlorotUniform(seed=SEED+3),
    name='decoder_lstm'
)
# Forniamo come initial_state lo stato finale dell'encoder
decoder_outputs, _, _ = decoder_lstm(decoder_embedding, initial_state=encoder_states)
# decoder_outputs: [batch, MAX_LEN, DEC_UNITS]

# --- 5) Meccanismo di Attention Bahdanau (appliicato ad ogni passo) ---
# Creiamo una singola istanza di BahdanauAttention
attention_layer = BahdanauAttention(units=DEC_UNITS)

# Vogliamo applicare l’attenzione passo per passo su tutta la lunghezza di decoder_outputs.
# Tuttavia, Keras non supporta direttamente un loop interno in Functional API.
# Quindi possiamo “ricostruire” la parte finale come:
#   per ogni passo t in MAX_LEN:
#       h_t = decoder_outputs[:, t, :]  # shape [batch, DEC_UNITS]
#       context_t, attn_weights_t = attention_layer(h_t, encoder_outputs)
#       concat_t = concat([context_t, h_t], axis=-1)  # [batch, DEC_UNITS+ENC_UNITS]
#       out_t = Dense(VOCAB_SIZE, activation='softmax')(concat_t)
# E poi raccogliere tutti i out_t in una sequenza temporale.
#
# Con Keras Functional, si può invece fare un _TimeDistributed_ di un “mini-decoder” che
# unisce attention e Dense. Tuttavia, per chiarezza, implementiamo un loop via tf.keras.layers.Lambda.

def apply_attention_step(args):
    """
    Funzione di utilità per calcolare attenzione e output softmax ad un singolo passo.
    args = (decoder_hidden_t, encoder_outputs)
      - decoder_hidden_t: [batch, DEC_UNITS]
      - encoder_outputs: [batch, MAX_LEN, ENC_UNITS]
    Ritorna:
      - output_t: [batch, VOCAB_SIZE]
    """
    decoder_hidden_t, encoder_outs = args
    # 1) calcolo context vector e att. weights
    context_vector, _ = attention_layer(query=decoder_hidden_t, values=encoder_outs)
    # 2) concat: [batch, ENC_UNITS] + [batch, DEC_UNITS] = [batch, ENC_UNITS+DEC_UNITS]
    concat_vector = K.concatenate([context_vector, decoder_hidden_t], axis=-1)
    # 3) Dense -> VOCAB_SIZE con softmax
    output_t = Dense(
        VOCAB_SIZE,
        activation='softmax',
        kernel_initializer=tf.keras.initializers.GlorotUniform(seed=SEED+4)
    )(concat_vector)
    return output_t  # shape [batch, VOCAB_SIZE]



In [10]:

from tensorflow.keras.layers import Lambda

# Questo crea un layer che, nel grafo di Keras, applicherà tf.split
dec_hidden_seq_list = Lambda(
    lambda x: tf.split(x, num_or_size_splits=MAX_LEN, axis=1),
    name='split_decoder_outputs'
)(decoder_outputs)

# Nota: dec_hidden_seq_list sarà un TUPLE di MAX_LEN KerasTensor di forma [batch, 1, DEC_UNITS].
# Poi, per ridurre la dimensione 1, usi ancora una Lambda o tf.keras.layers.Reshape.
dec_hidden_seq = [
    Lambda(lambda y: tf.squeeze(y, axis=1), name=f'squeeze_step_{t}')(dec_hidden_seq_list[t])
    for t in range(MAX_LEN)
]

# # Split decoder_outputs lungo l’asse temporale: lista di MAX_LEN tensori [batch, DEC_UNITS]
# dec_hidden_seq = tf.split(decoder_outputs, num_or_size_splits=MAX_LEN, axis=1)
# # Ogni dec_hidden_seq[i] ha shape [batch, 1, DEC_UNITS], vogliamo ridurlo a [batch, DEC_UNITS]
# dec_hidden_seq = [K.squeeze(x, axis=1) for x in dec_hidden_seq]

# Per ognuno dei MAX_LEN "step", applichiamo apply_attention_step:
all_decoder_outputs = []
for t in range(MAX_LEN):
    # dec_hidden_seq[t]: [batch, DEC_UNITS]
    out_t = tf.keras.layers.Lambda(
        apply_attention_step,
        name=f'attention_step_{t}'
    )((dec_hidden_seq[t], encoder_outputs))  # out_t: [batch, VOCAB_SIZE]
    all_decoder_outputs.append(out_t)

# Ricompattiamo la lista in [batch, MAX_LEN, VOCAB_SIZE]
decoder_softmax_outputs = tf.stack(all_decoder_outputs, axis=1)

# --- 6) Modello Keras Finale (Training) ---
model = Model([encoder_inputs, decoder_inputs], decoder_softmax_outputs)
# Compiliamo con loss sparse_categorical_crossentropy (per ID target) e optimizer Adam
model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

model.summary()  # possibilità di verificare il numero totale di parametri (<2M) :contentReference[oaicite:4]{index=4}

# =============================================================================
#  PREPARAZIONE DEI DATI E TRAINING
# =============================================================================
# Generiamo un dataset fisso (ad esempio 100k esempi) o usiamo un generatore on-the-fly.
N_SAMPLES = 100_000
X_data, Y_data = generate_dataset(N_SAMPLES, MAX_DEPTH)

# Input al decoder per teacher forcing
decoder_input_data = shift_right(Y_data)

# Poiché la loss è 'sparse_categorical_crossentropy', ci aspettiamo come target gli ID
# shape Y_data: [N_SAMPLES, MAX_LEN], ma il softmax output è [batch, MAX_LEN, VOCAB_SIZE],
# quindi va bene lasciare Y_data come è (Keras farà il broadcasting).

# Divido in train/val (es. 90% train, 10% val)
indices = np.arange(N_SAMPLES)
np.random.shuffle(indices)
split = int(0.9 * N_SAMPLES)
train_idx, val_idx = indices[:split], indices[split:]

X_train, X_val = X_data[train_idx], X_data[val_idx]
dec_in_train, dec_in_val = decoder_input_data[train_idx], decoder_input_data[val_idx]
Y_train, Y_val = Y_data[train_idx], Y_data[val_idx]

# Eseguo il training (ad esempio 20 epoche)
EPOCHS = 20
history = model.fit(
    [X_train, dec_in_train],        # encoder input + decoder input
    np.expand_dims(Y_train, -1),     # serve come [batch, MAX_LEN, 1] per sparse_cce
    validation_data=(
        [X_val, dec_in_val],
        np.expand_dims(Y_val, -1)
    ),
    batch_size=BATCH_SIZE,
    epochs=EPOCHS
)

# =============================================================================
#  INFERENZA AUTOREGRESSIVA (SENZA BEAM SEARCH)
# =============================================================================
# Per l'inferenza, dobbiamo definire i modelli separate encoder_model e decoder_model,
# così da far girare un ciclo passo-passo (greedy) finché non usciamo con EOS.

# ==== 1) Modello encoder per inferenza (input -> hidden states) ====
encoder_model_inf = Model(encoder_inputs, [encoder_outputs, state_h, state_c])

# ==== 2) Modello decoder per inferenza ====
# Input placeholder per hidden state e cell state del decoder (prendono ENC_UNITS)
decoder_state_input_h = Input(shape=(DEC_UNITS,), name='dec_state_h')
decoder_state_input_c = Input(shape=(DEC_UNITS,), name='dec_state_c')
# Input placeholder per encoder_outputs (per l’attenzione) [batch, MAX_LEN, ENC_UNITS]
encoder_outputs_inf = Input(shape=(MAX_LEN, ENC_UNITS), name='enc_outputs_inf')

# Embedding del token di input corrente (shape [batch=1, 1]) in inference
dec_single_input = Input(shape=(1,), name='dec_single_input')  # st(1) per inferenza
dec_single_emb = decoder_embedding.layer(dec_single_input)      # usiamo lo stesso embedding

# LSTM del decoder ad un singolo passo, con hidden iniziali
dec_lstm_inf = decoder_lstm  # usiamo lo stesso layer addestrato
# Riduci dec_single_emb (che è [batch, 1, EMB_DIM]) in [batch, 1, EMB_DIM],
# già corretto shape.
dec_outputs, dec_state_h_new, dec_state_c_new = dec_lstm_inf(
    dec_single_emb, initial_state=[decoder_state_input_h, decoder_state_input_c]
)  # dec_outputs: [batch, 1, DEC_UNITS]

# Rimuovo la dimensione temporale: [batch, DEC_UNITS]
dec_hidden_t = K.squeeze(dec_outputs, axis=1)

# Applico attention: query=dec_hidden_t, values=encoder_outputs_inf
context_vector_inf, _ = attention_layer(dec_hidden_t, encoder_outputs_inf)
concat_vector_inf = K.concatenate([context_vector_inf, dec_hidden_t], axis=-1)
# Softmax output
dec_pred = Dense(
    VOCAB_SIZE,
    activation='softmax',
    kernel_initializer=tf.keras.initializers.GlorotUniform(seed=SEED+4)
)(concat_vector_inf)  # [batch, VOCAB_SIZE]

# Definisco il modello decoder_inferenza che, dato:
#  - dec_single_input: token corrente [batch,1]
#  - encoder_outputs_inf: [batch,MAX_LEN,ENC_UNITS]
#  - dec_state_input_h, dec_state_input_c: [batch,DEC_UNITS]
# produce:
#  - dec_pred: [batch, VOCAB_SIZE] predisposto all'argmax
#  - dec_state_h_new, dec_state_c_new (nuovi state)
decoder_model_inf = Model(
    [dec_single_input, encoder_outputs_inf, decoder_state_input_h, decoder_state_input_c],
    [dec_pred, dec_state_h_new, dec_state_c_new]
)

# ==== 3) Funzione di decodifica greedy in inferenza ====
def decode_sequence_inference(input_seq: np.ndarray) -> list[int]:
    """
    Data una singola sequenza infix (shape [1, MAX_LEN]), genera la corrispondente 
    postfix autoregressivamente fino a EOS o lunghezza MAX_LEN. Ritorna lista di ID.
    """
    # 1) Ottengo encoder_outputs e stati iniziali
    enc_outs, enc_h, enc_c = encoder_model_inf.predict(input_seq)
    # inizializzo decoder con SOS
    target_seq = np.array([[SOS_ID]], dtype=np.int32)
    dec_h, dec_c = enc_h, enc_c

    output_ids = []
    for _ in range(MAX_LEN):
        # 2) Chiama decoder one-step
        preds, dec_h, dec_c = decoder_model_inf.predict([target_seq, enc_outs, dec_h, dec_c])
        # preds: [1, VOCAB_SIZE]
        sampled_id = np.argmax(preds[0])
        # Se arrivo a EOS, fermo
        if sampled_id == EOS_ID:
            break
        output_ids.append(sampled_id)
        # preparo input per passo successivo: shape [1,1]
        target_seq = np.array([[sampled_id]], dtype=np.int32)

    return output_ids

# =============================================================================
#  METRICA Prefix Accuracy
# =============================================================================
def prefix_accuracy(y_true_ids: list[int], y_pred_ids: list[int]) -> float:
    """
    Calcola la prefix_accuracy tra due sequence di ID (senza pad).
    Ritorna lunghezza del prefisso identico / max(len(y_true), len(y_pred)).
    """
    # Tronco alla comparsa di EOS (se presente) – ma qui assume che le sequenze non contengano EOS        
    # Confronto elemento per elemento
    match_len = 0
    L_true = len(y_true_ids)
    L_pred = len(y_pred_ids)
    L_max = max(L_true, L_pred)
    for i in range(min(L_true, L_pred)):
        if y_true_ids[i] == y_pred_ids[i]:
            match_len += 1
        else:
            break
    return match_len / L_max if L_max > 0 else 1.0

# Esempio di utilizzo:
i = np.random.randint(len(X_val))
input_seq = X_val[i : i + 1]  # shape [1, MAX_LEN]
true_postfix = Y_val[i]
pred_ids = decode_sequence_inference(input_seq)
print("Infix   : ", decode_sequence(input_seq[0]))
print("TP (true postfix) : ", decode_sequence(true_postfix))
print("Pred    : ", decode_sequence(pred_ids))
print("PrefixAcc        : ", prefix_accuracy(
    # rimuovo pad/EOS in true_postfix
    [tok for tok in true_postfix if tok not in (PAD_ID, EOS_ID)],
    pred_ids
))




ValueError: A KerasTensor cannot be used as input to a TensorFlow function. A KerasTensor is a symbolic placeholder for a shape and dtype, used when constructing Keras Functional models or Keras Functions. You can only use it as input to a Keras layer or a Keras operation (from the namespaces `keras.layers` and `keras.operations`). You are likely doing something like:

```
x = Input(...)
...
tf_fn(x)  # Invalid.
```

What you should do instead is wrap `tf_fn` in a layer:

```
class MyLayer(Layer):
    def call(self, x):
        return tf_fn(x)

x = MyLayer()(x)
```
