In [1]:
import sys
import os
import numpy as np
import tensorflow as tf
import pandas as pd
from tensorflow.keras.layers import Dense, Layer, Dropout

print("Successfully imported libraries!")

Successfully imported libraries!


In [2]:
# Get the absolute path of the current script's directory
current_dir = os.path.dirname(os.path.abspath("transformer0.ipynb"))

# Get the absolute path of the parent directory (project_folder)
parent_dir = os.path.dirname(current_dir)

# Add the parent directory to the Python path
sys.path.append(parent_dir)

# Now you can import from GetXY.py
from GetXY import x_train, y_train, x_val, y_val, early_stopping

# ... rest of your code
print("Successfully imported variables!")

-5 - 1 - -3
2543
-3.0

Expressions not in x:
-3 - -5 + 4
True
1457
6.0
15
-4.0
[-5.   1.   1.   0.5  0.5  0.5  0.5  0.5  0.5  0.5  0.5  0.5  0.5  0.5
  0.5]
Successfully imported variables!


In [3]:
#add a cls token at the beginning of x_train and x_val
pad_value = 15
x_train = np.pad(x_train, ((0, 0), (1, 0)), 'constant', constant_values=pad_value)
x_val = np.pad(x_val, ((0, 0), (1, 0)), 'constant', constant_values=pad_value)

In [4]:
#defining the positional encoder modelled after the formula in the paper that was cited. (generated by gemini)
def posEncoding(max_seq_len, d_model):
    # Create a matrix of angles according to the formula
    angle_rads = get_angles(np.arange(max_seq_len)[:, np.newaxis],
                          np.arange(d_model)[np.newaxis, :],
                          d_model)
    
    # Apply sine to even indices in the array; 2i
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    
    # Apply cosine to odd indices in the array; 2i+1
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    
    # Add a batch dimension
    pos_encoding = angle_rads[np.newaxis, ...]
    
    return tf.cast(pos_encoding, dtype=tf.float32)

def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates

In [5]:
#defining the point-wise FNN
d_ff = 2048 #(original transformer size)
def point_wise_fnn(d_model, d_ff):
    return tf.keras.Sequential([
        Dense(d_ff, activation = "relu"),
        Dense(d_model)
    ])

In [6]:
#scaled dot-product attention
class MH_Attention(Layer):
    def __init__(self, d_model, num_heads):
        super().__init__()
        #for the split_heads function:
        self.num_heads = num_heads
        self.d_model = d_model
        
        assert d_model % self.num_heads == 0
        self.depth = d_model // self.num_heads

        #for the call function:
        #This allows the model to learn the best way to project the input embeddings. (linear projection)
        self.wq = Dense(d_model)
        self.wk = Dense(d_model)
        self.wv = Dense(d_model)

        #it's important to initialize this aswell as the ones above here, so that the model saves the previous weights and is able to learn.
        self.finalDense = Dense(d_model)
        
    def SDP_Attention(self, q, k, v, mask):
        matmul_qk = tf.matmul(q, k, transpose_b=True) #calculate the dotproduct, between the query and a transposed key.
        d_k = tf.shape(k)[-1] #read the dimensionality of the key tensor (here d_model/num_heads = depth)
        d_k = tf.cast(d_k, tf.float32) #convert to float type
        scaled_qk = matmul_qk / tf.math.sqrt(d_k) #scale for purposes discussed in their paper.        

        if mask is not None:
            scaled_qk += (mask * -1e9) #masking to a big negative number
        
        softmaxed_qk = tf.nn.softmax(scaled_qk, axis = -1) #apply softmax function (axis = -1) for softmaxing all the different keys. The last entry is the number of keys (not the dimensionality of them, like it was befre.)
        output = tf.matmul(softmaxed_qk, v) #multiply the attention-weights with the values corresponding to the keys, in respect to the query.
        return output, softmaxed_qk
        
    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth)) #splits up the x data which is gonna be q, k, or v, into the individual heads. effectively adding a dimension (self.num_heads), after splitting up self.d_model
        return tf.transpose(x, perm =[0,2,1,3]) #reorganizes the dimensions into the expected order (batch_size, num_heads, seq_len, depth(the new d_model "fractions"))

    def call(self, q, k ,v, mask = None):
        batch_size = tf.shape(q)[0]

        #(linear projection)
        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)

        #split them all up into the individual heads. (add a dimension basically)
        q = self.split_heads(q , batch_size)
        k = self.split_heads(k , batch_size)
        v = self.split_heads(v , batch_size)

        sdp_attention, attention_weights = self.SDP_Attention(q,k,v, mask = mask) #applies the sdp-attention to all of them. sdp_attention at the end has a shape of: (batch_size, num_heads, seq_len, depth)
        
        sdp_attention = tf.transpose(sdp_attention, perm=[0, 2, 1, 3]) #swap the 2nd and 3rd dimensions
        combined_attention = tf.reshape(sdp_attention, (batch_size, -1, self.d_model)) #combine back the two last dimnensions (num_heads and depth) into the original d_model

        output = self.finalDense(combined_attention)
        return output, attention_weights

In [7]:
class EncodingLayer(Layer):
    def __init__(self, d_model, num_heads, d_ff, rate):
        super().__init__()
        #define all the components of a Layer so the model will learn them properly here.
        self.mha = MH_Attention(d_model, num_heads)
        self.fnn = point_wise_fnn(d_model, d_ff)

        #initiate the 2 normalizations
        self.norm1 = tf.keras.layers.LayerNormalization()
        self.norm2 = tf.keras.layers.LayerNormalization()

        self.dropout1 = Dropout(rate)
        self.dropout2 = Dropout(rate)

        
    def call(self,x, training, mask = None):
        mha_out, attention_weights = self.mha(x,x,x,mask = mask) #for self-attention: q,k,v = x
        mha_out = self.dropout1(mha_out, training = training) #they apply a small dropout of 0.1 after every residual step in the paper.

        norm_out = self.norm1(x + mha_out) #first, add the vectors, then normalize them.

        fnn_out = self.fnn(norm_out) #2nd sub-layer with fnn
        fnn_out = self.dropout2(fnn_out, training = training) #again apply drop out

        norm2_out = self.norm2(norm_out + fnn_out) #again add and norm

        return norm2_out

In [8]:
class Encoder(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, num_layers, d_ff, rate):
        super().__init__()

        self.d_model = d_model
        self.num_layers = num_layers #amount of encoding layers
        self.layers = [EncodingLayer(d_model, num_heads, d_ff, rate) for i in range(num_layers)] #define multiple diffferent encoding layers here.

        self.dropout = Dropout(rate)
            
    def call(self, x, training, mask = None):
        x = self.dropout(x, training = training) #we want to drop out before the first layer
        for i in range(self.num_layers):
            x = self.layers[i](x, training = training, mask = mask)
        return x

In [9]:
class Transformer(tf.keras.Model):
    def __init__(self, embedding_layer, d_model, max_seq_len, num_heads, num_layers, d_ff, rate):
        super().__init__()
        self.embedding = embedding_layer
        self.d_model = d_model
        self.pos_enc = posEncoding(max_seq_len, d_model)
        self.Encoder = Encoder(d_model, num_heads, num_layers, d_ff, rate)
        self.dropout = tf.keras.layers.Dropout(rate)
        self.finalDense = Dense(1, activation = "linear")
        
    def call(self, x, training, mask = None):
        seq_len = tf.shape(x)[1]
        x = tf.expand_dims(x, axis=-1) #add a dimension to x
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32)) #scale with √d_model
        x += self.pos_enc[:, :seq_len, :]
        
        out_Encoder = self.Encoder(x, training = training, mask = mask)

        output = out_Encoder[:,0,:] #pooling: to the first token.
        output = self.dropout(output, training = training) #another dropout

        final = self.finalDense(output) #now we can reduce back to a single neuron. This is the opposite of what we did in the embedding layer.

        return final
        

In [10]:
num_layers = 6 #(original transformer size)
num_heads = 8 #(original transformer size)
d_model = 512 #(original transformer size)
d_ff = 2048 #(original transformer size)
dropout_rate = 0.1 #(original transformer size)
max_seq_len = 16
embedding_layer = Dense(d_model)
batch_size = 16
num_epochs= 50

train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(len(x_train)).batch(batch_size)
val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val)).batch(batch_size)

transformer_model = Transformer(
    embedding_layer = embedding_layer, 
    d_model = d_model,
    max_seq_len = max_seq_len,
    num_heads = num_heads,
    num_layers = num_layers,
    d_ff = d_ff,
    rate = dropout_rate
)

In [11]:
# Define a custom learning rate schedule class with warmup and cosine decay
class WarmupCosineDecay(tf.keras.optimizers.schedules.LearningRateSchedule):
    """
    A custom learning rate schedule that implements a linear warmup
    followed by a cosine decay.
    """
    def __init__(self, peak_lr, warmup_steps, decay_steps, alpha=0.0, name=None):
        super().__init__()
        self.peak_lr = peak_lr
        self.warmup_steps = warmup_steps
        self.decay_steps = decay_steps
        self.alpha = alpha
        self.name = name

    def __call__(self, step):
        with tf.name_scope(self.name or "WarmupCosineDecay"):
            # Ensure step is a float for calculations
            step = tf.cast(step, tf.float32)
            
            # --- 1. Warmup Phase ---
            # Linearly increase the learning rate from 0 to peak_lr
            warmup_lr = self.peak_lr * (step / self.warmup_steps)

            # --- 2. Cosine Decay Phase ---
            # Define the cosine decay schedule
            cosine_decay_schedule = tf.keras.optimizers.schedules.CosineDecay(
                initial_learning_rate=self.peak_lr,
                decay_steps=self.decay_steps,
                alpha=self.alpha
            )
            # Calculate the learning rate for the decay phase.
            # Note: The 'step' for the cosine part must be relative to its start.
            decay_lr = cosine_decay_schedule(step - self.warmup_steps)

            # --- 3. Choose the correct phase ---
            # Use tf.where to select the learning rate based on the current step
            learning_rate = tf.where(
                step < self.warmup_steps,
                warmup_lr,
                decay_lr
            )
            return learning_rate

    def get_config(self):
        return {
            "peak_lr": self.peak_lr,
            "warmup_steps": self.warmup_steps,
            "decay_steps": self.decay_steps,
            "alpha": self.alpha,
            "name": self.name
        }


# --- Instantiate the new schedule ---
# First, define the parameters
warmup_epochs = 5
num_epochs = 50 # This should match your training epochs
batch_size = 16 # This must match the batch_size used in your dataset
peak_lr = 1e-4

# Calculate steps based on your data
# IMPORTANT: Use the actual length of your training data for this calculation
steps_per_epoch = len(x_train) // batch_size
warmup_steps = warmup_epochs * steps_per_epoch
decay_steps = (num_epochs - warmup_epochs) * steps_per_epoch

# Create an instance of our new scheduler
lr_schedule = WarmupCosineDecay(
    peak_lr=peak_lr,
    warmup_steps=warmup_steps,
    decay_steps=decay_steps,
    alpha=0.1 # This means the LR will decay to 10% of peak_lr
)

In [12]:
transformer_model.compile(
    optimizer=tf.keras.optimizers.AdamW(
        learning_rate=lr_schedule,
        beta_1=0.93,  # Keep default or try 0.95
        beta_2=0.98,  # Primary recommendation: lower this
        clipnorm=1.0
    ),
    loss='mse',
    metrics=['mse']
)

In [13]:


history = transformer_model.fit(
    train_dataset,
    validation_data = val_dataset,
    epochs=num_epochs,
    verbose=1, # Suppress output
    callbacks=[early_stopping]
)

Epoch 1/50
[1m120/120[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m81s[0m 382ms/step - loss: 27.0670 - mse: 27.0670 - val_loss: 23.7897 - val_mse: 23.7897
Epoch 2/50
[1m120/120[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m44s[0m 369ms/step - loss: 24.4530 - mse: 24.4530 - val_loss: 30.2356 - val_mse: 30.2356
Epoch 3/50
[1m120/120[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m45s[0m 371ms/step - loss: 22.5146 - mse: 22.5146 - val_loss: 22.6230 - val_mse: 22.6230
Epoch 4/50
[1m120/120[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m44s[0m 369ms/step - loss: 24.1326 - mse: 24.1326 - val_loss: 23.1830 - val_mse: 23.1830
Epoch 5/50
[1m120/120[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m46s[0m 379ms/step - loss: 22.3574 - mse: 22.3574 - val_loss: 23.5773 - val_mse: 23.5773
Epoch 6/50
[1m120/120[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m45s[0m 374ms/step - loss: 26.2966 - mse: 26.2966 - val_loss: 20.1493 - val_mse: 20.1493
Epoch 7/50
[1m120/120[0m [32m━━━━━━━━━━━━━━

KeyboardInterrupt: 

In [None]:
import matplotlib.pyplot as plt
import pandas as pd
history_df = pd.DataFrame(history.history)
history_df.loc[:, ['mse', 'val_mse']].plot()
plt.xlabel("Epoch")
plt.ylabel("Mean Squared Error")
plt.title("Training and Validation MSE per Epoch")
plt.show()

In [None]:
# 1. Take one batch from the dataset to inspect it or use it for prediction.
first_batch = val_dataset.take(1)

# To print the contents of that first batch, you can iterate over it.
# (Note: .take(1) creates a new dataset with only one element, so this loop will run once)
print("Contents of the first batch:")
for batch in first_batch:
    # A batch is typically a tuple of (inputs, labels)
    inputs, labels = batch
    print("Inputs shape:", inputs.shape)
    print("Labels shape:", labels.shape)
print(inputs[0])
# 2. Run prediction on that single batch.
# The model's predict method can directly accept the dataset object created by .take(1).
print("\nRunning prediction on the first batch...")
predictions = transformer_model.predict(first_batch)
print("Predictions shape:", predictions.shape)
print(predictions[0])
print("--------------")
print(predictions)