In [21]:
import os
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import scipy.io
from scipy.spatial.distance import cosine
import zipfile
import itertools
#import sys
#sys.path.append('/Users/denisekittelmann/Documents/Python/BiMoL/code/util/')
#from custom_pcn_dense import CustomDense, PredictiveCodingNetwork


In [52]:
def tf_scale_imgs(imgs, scale_factor):
    return imgs * scale_factor + 0.5 * (1 - scale_factor) * tf.ones(imgs.shape)


def tf_scale_labels(labels, scale_factor):
    return labels * scale_factor + 0.5 * (1 - scale_factor) * tf.ones(labels.shape)


def tf_f_inv(x, act_fn):
    """ (activation_size, batch_size) """
    if act_fn == "LINEAR":
        m = x
    elif act_fn == "TANH":
        num = tf.ones_like(x) + x
        div = tf.ones_like(x) - x + 1e-7
        m = 0.5 * tf.math.log(num / div)
    elif act_fn == "LOGSIG":
        div = tf.ones_like(x) - x + 1e-7
        m = tf.math.log((x / div) + 1e-7)
    else:
        raise ValueError(f"{act_fn} not supported")
    return m


def img_preproc(x, y, dtype=tf.float32): 
  """Cast input image to a certain tf dtype and normalize them between 0 and 1."""
  x = tf.cast(x, dtype) / 255.
  #x = tf_scale_imgs(x, cf.img_scale)
  #y = tf_scale_labels(y, cf.label_scale)
  #x = tf_f_inv(x, "TANH")
  #y = tf.one_hot(y, depth=10)
  return x, y


def flatten(x, y):  
  #flattens a video image series (or batch of images) to (n_batch, n_steps, 1) d.
  shape = tf.shape(x)
  if len(shape) == 5: # hack, determining if it's a video or not (batch_size, n_steps, height, width, channels)
    x = tf.reshape(x, [shape[0], shape[1], -1])
  elif len(shape) == 4: # regular image (batch_size, height, width, channels)
    x = tf.reshape(x, [shape[0], -1])
  return x, y

def augment_images(batch_images, batch_labels):
    """
    Applies data augmentation on a batch of images without TensorFlow Addons.
    
    Parameters:
    batch_images: Tensor of shape (n_batch, 56, 28, 3)
    
    Returns:
    Augmented batch of images.
    """
    # Random horizontal flip
    augmented_images = tf.image.random_flip_left_right(batch_images)
    
    # Random brightness adjustment
    augmented_images = tf.image.random_brightness(augmented_images, max_delta=0.1)
    
    # Random contrast adjustment
    augmented_images = tf.image.random_contrast(augmented_images, lower=0.9, upper=1.1)
    
    # Random saturation adjustment
    augmented_images = tf.image.random_saturation(augmented_images, lower=0.9, upper=1.1)
    
    # Random hue adjustment
    augmented_images = tf.image.random_hue(augmented_images, max_delta=0.05)
    
    # Clipping to ensure pixel values are valid after transformations
    augmented_images = tf.clip_by_value(augmented_images, 0.0, 1.0)
    
    return augmented_images, batch_labels


In [51]:
class CustomDense(tf.keras.layers.Dense):
    def call(self, inputs):
        """This works like a dense, except for the activation being called earlier."""
        # Apply the activation to the input first
        activated_input = self.activation(inputs)
        # Perform the matrix multiplication and add the bias
        output = tf.matmul(activated_input, self.kernel)
        if self.use_bias:
            output = output + self.bias
        return output


class PredictiveCodingNetwork(tf.keras.Sequential):
    def __init__(self, layers, vars, beta, **kwargs):
        """Initialize a PredictiveCodingNetwork"""
        super().__init__(layers, **kwargs)
        self.vars = tf.convert_to_tensor(vars, dtype=tf.float32)
        self.beta = beta

    def call_with_states(self, x):
        x_list = [x]
        for layer in self.layers:
            x = layer(x)
            x_list.append(x)
        return x_list

    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        x, y = data

        # do the stuff we do in train_epochs
        outputs, errors = self.infer(x, y)
        self.update_params(outputs, errors)

        # Update metrics (includes the metric that tracks the loss)
        pred = self.call(x)
        for metric in self.metrics:
            metric.update_state(y, pred)
        # Return a dict mapping metric names to current value
        return {m.name: m.result() for m in self.metrics}
    
   
    def infer(self, x_batch, y_batch=None, n_iter=50, return_sequence=False):
        """Note: while model call, call with states and model evaluate take
        2D input, train_step and infer take stacked 3D inputs."""
        if return_sequence:
            errors_time = []
            states_time = []
        errors = [None for _ in range(len(self.layers))]
        f_x_arr = [None for _ in range(len(self.layers))]
        f_x_deriv_arr = [None for _ in range(len(self.layers))]
        shape = x_batch.shape
        batch_size = shape[0]

        for itr in range(n_iter):
            # if its the first itr, set x to the current forward call
            if itr == 0:
                x = self.call_with_states(x_batch)

                if y_batch is not None:
                  x[-1] = y_batch
            else:
                # update g and x only for consecutive iterations
                for l in range(1, len(self.layers)):
                    g = tf.multiply(tf.matmul(errors[l], self.layers[l].kernel, transpose_b=True), f_x_deriv_arr[l])
                    x[l] = x[l] + self.beta * (-errors[l-1] + g)

            # update f_x etc for every iteration
            for l in range(len(self.layers)):
                f_x = self.layers[l].activation(x[l])
                f_x_deriv_fn = self.get_activation_derivative(self.layers[l].activation)
                f_x_deriv = f_x_deriv_fn(x[l])
                f_x_arr[l] = f_x
                f_x_deriv_arr[l] = f_x_deriv
                errors[l] = (x[l + 1] - tf.matmul(f_x, self.layers[l].kernel) - self.layers[l].bias) / self.vars[l]
            
            if return_sequence:
                errors_time.append(errors)
                states_time.append(x)

        # return what we want to return
        if return_sequence:
            states_time = [tf.stack(tensors, axis=1) for tensors in zip(*states_time)]
            errors_time = [tf.stack(tensors, axis=1) for tensors in zip(*errors_time)]
            return states_time, errors_time
        else:
            return x, errors
    
    # We need to check if we actually need call here.
    # Now, call will give us the result of the network after the first inference step
    # If we want to have the results after the last inference step, we would need to change this
    #def call(self, inputs, training=False):
    #    """Call, but time distributed."""
    #    x, errors = self.infer(inputs, return_sequence=False)
    #    return x[-1]

    def update_params(self, x, errors):
        """Update the model parameters."""
        batch_size = tf.cast(tf.shape(x[0])[0], tf.float32)
        gradients = []
        for l, layer in enumerate(self.layers):
            grad_w = self.vars[-1] * (1 / batch_size) * tf.matmul(tf.transpose(self.layers[l].activation(x[l])), errors[l])
            grad_b = self.vars[-1] * (1 / batch_size) * tf.reduce_sum(errors[l], axis=0)
            gradients.append((-grad_w, layer.kernel))
            gradients.append((-grad_b, layer.bias))
        self.optimizer.apply_gradients(gradients)

    def get_activation_derivative(self, activation):
        """Return a function for the derivative of the given activation function."""
        activation_fn = tf.keras.activations.get(activation)
        if activation_fn == tf.keras.activations.linear:
            return lambda x: tf.ones_like(x)
        elif activation_fn == tf.keras.activations.tanh:
            return lambda x: 1 - tf.square(tf.nn.tanh(x))
        elif activation_fn == tf.keras.activations.sigmoid:
            return lambda x: tf.nn.sigmoid(x) * (1 - tf.nn.sigmoid(x))
        else:
            raise ValueError(f"{activation} not supported")

In [50]:

img_dir_lead = '/Users/denisekittelmann/Documents/Python/BiMoL/data/Leading/'
img_dir_trail = '/Users/denisekittelmann/Documents/Python/BiMoL/data/Trailing/'
img_dir_test_lead = '/Users/denisekittelmann/Documents/Python/BiMoL/data/Test/Test_Leading/'
img_dir_test_trail = '/Users/denisekittelmann/Documents/Python/BiMoL/data/Test/Test_Trailing/'
class_names_L = ['barn', 'beach', 'cave', 'library', 'restaurant']
class_names_T = ['castle', 'Church', 'conference_room', 'forest'] # changed the order 
batch_size = None # adjust if needed, e.g., 32
image_size = (28,28)
validation_split = 0.1
seed = 123

In [49]:
# Create a dict that assigns the correct labels for each leading-trailing imgage pair

"""
L1 = barn = label 0 - cat 1
L2 = beach = label 1 - cat 2
L5 = cave = label 2 - cat 3
L3 = library = label - cat 4
L4 = restaurant = label 4 - cat 5


L1 = barn = label 0 
L2 = beach = label 1
L3 = cave = label 2
L4 = library = label 3
L5 = restaurant = label 4 

    % Map 1 = C1 LEADING >> C6 TRAILING valid, C7 invalid
    % Map 2 = C2 LEADING >> C6 TRAILING valid, C7 invalid
    % Map 3 = C4 LEADING >> C7 TRAILING valid, C6 invalid
    % Map 4 = C5 LEADING >> C7 TRAILING valid, C6 invalid
    % Map 5 = C3 LEADING >> C8 OR C9 TRAILING
    % Map 6 = C3 LEADING >> C9 OR C9 TRAILING


T6 = Church = label 1   
T7 = conference room = label 2
T8 = castle = label 0   
T9 = forest = label 3

MAPPING:

L1 -> T6 = 0.75 -> (0,1) 
L1 -> T7 = 0.25 -> (0,2)
L1 -> T8 = 0 -> (0,0)
L1 -> T9 = 0 -> (0,3)

L2 -> T6 = 0.75 -> (1,1) 
L2 -> T7 = 0.25 -> (1,2)
L2 -> T8 = 0 -> (1,0)
L2 -> T9 = 0 -> (1,3)

L3 -> T6 = 0 -> (3,1) 
L3 -> T7 = 0 -> (3,2)
L3 -> T8 = 0.5 -> (3,0)
L3 -> T9 = 0.5 -> (3,3)

L4 -> T6 = 0.25 -> (4,1) 
L4 -> T7 = 0.75 -> (4,2)
L4 -> T8 = 0 -> (4,0)
L4 -> T9 = 0 -> (4,3)

L5 -> T6 = 0.25 -> (2,1) 
L5 -> T7 = 0.75 -> (2,2)
L5 -> T8 = 0 -> (2,0)
L5 -> T9 = 0 -> (2,3)

"""


label_dict = {
    (0, 1): 0.0,
    (0, 2): 0.75,
    (0, 0): 0.25,
    (0, 3): 0.25,
    
    (1, 1): 0.0,
    (1, 2): 0.75,
    (1, 0): 0.25,
    (1, 3): 0.25,
    
    (3, 1): 0.75,
    (3, 2): 0.75,
    (3, 0): 0.50,
    (3, 3): 0.50,
    
    (4, 1): 0.75,
    (4, 2): 0.0,
    (4, 0): 0.25,
    (4, 3): 0.25,
    
    (2, 1): 0.75, 
    (2, 2): 0.0,
    (2, 0): 0.25,
    (2, 3): 0.25
}


In [47]:
# Generate image pairs 
def img_sequence(img_t1, img_t2, label_t1, label_t2, label_dict): 
    """This function stacks two images to construct an image pair and assigns a single label based on the label dictionary."""
    
    img_t1 = tf.cast(img_t1, dtype=tf.float32)
    img_t2 = tf.cast(img_t2, dtype=tf.float32)
    
    x = tf.concat([img_t1, img_t2], axis=0) 

    key_t1 = int(label_t1.numpy())
    key_t2 = int(label_t2.numpy())
 
    
    if (key_t1, key_t2) in label_dict:
        label = label_dict[(key_t1, key_t2)]
        #print(f"Label value found: {label}")
    else:
        print(f"Label pair {(key_t1, key_t2)} not found.")

    
    y = tf.cast(tf.random.uniform([]) < label, tf.float32)
    y = tf.expand_dims(y, axis=0)  
    
    return x,y


In [48]:
def generate_dataset(img_dirt1, img_dirt2, class_namest1, class_namest2, label_dict, image_size = None, seed = None, shuffle = False):       
    
    
    data_t1 = tf.keras.preprocessing.image_dataset_from_directory(
        img_dirt1, 
        label_mode = 'int',
        class_names= class_namest1,
        batch_size = None,
        color_mode = 'rgb',
        image_size = image_size, 
        #shuffle = True, 
        seed = seed
        )

    data_t2 = tf.keras.preprocessing.image_dataset_from_directory(
        img_dirt2, 
        label_mode = 'int',
        class_names= class_namest2,
        batch_size = None,
        color_mode = 'rgb', 
        image_size = image_size, 
        #shuffle = True, 
        seed = seed
    )
    
    if shuffle:
        data_t1.shuffle(99999, seed = seed*2)
        data_t2.shuffle(99999, seed = seed*3)
    
    # iterate through (shuffled) leading and trailing datasets
    leading = iter(data_t1)
    trailing = iter(data_t2) 
              
    while True:
        try:
            # Retrieve single samples
            img_t1, label_t1 = next(leading)
            img_t2, label_t2 = next(trailing)

            # Generate x, y pairs for single samples
            x, y = img_sequence(img_t1, img_t2, label_t1, label_t2, label_dict) 
            yield x, y
            
        except StopIteration:
            # Break the loop if no more samples
            break
        
        

In [42]:
# Build the validation dataset

seed = 123

val_dataset = tf.data.Dataset.from_generator(
    lambda: generate_dataset(img_dir_test_lead, img_dir_test_trail, class_names_L, class_names_T, label_dict, image_size = (28,28), seed = None),
    output_signature=(
        tf.TensorSpec(shape=(56, 28, 3), dtype=tf.float32),  # shape of x 
        tf.TensorSpec(shape=(1), dtype=tf.float32)  # shape of y 
    )
) 

print(val_dataset)


<_FlatMapDataset element_spec=(TensorSpec(shape=(56, 28, 3), dtype=tf.float32, name=None), TensorSpec(shape=(1,), dtype=tf.float32, name=None))>


In [25]:
################################ LOAD ANNs ################################ 
dir_bpann = "/Users/denisekittelmann/Documents/Python/BiMoL/results/bp_ann/model_checkpoint_649_0.67.keras"
dir_pcn = "/Users/denisekittelmann/Documents/Python/BiMoL/results/pcn/model_checkpoint_pcnoriginal_726_0.67.keras"
unzip_path = "/Users/denisekittelmann/Documents/Python/BiMoL/results/pcn/pcn_tt/"

input_layer = tf.keras.layers.Input(shape=(4704,))

model = PredictiveCodingNetwork([CustomDense(units=6, activation="sigmoid"),
                                 CustomDense(units=4, activation="sigmoid"), 
                                 CustomDense(units=1, activation="sigmoid")], 
                                vars=[1, 1, 1], # variances. This is super useless and in the code only the last variance is used
                                beta=0.1)

keras_file_path = "/Users/denisekittelmann/Documents/Python/BiMoL/results/pcn/model_checkpoint_pcnoriginal_726_0.67.keras"
unzip_path = "/Users/denisekittelmann/Documents/Python/BiMoL/results/pcn/pcn_tt/"

with zipfile.ZipFile(keras_file_path, 'r') as zip_ref:
    zip_ref.extractall(unzip_path)

# Step 2: Instantiate your custom model with the correct parameters
pcn = model # PCN loaded after reinitialising the network

# Now `correct_model` has the loaded weights
pcn.build([None, 4704])
pcn.load_weights("/Users/denisekittelmann/Documents/Python/BiMoL/results/pcn/pcn_tt/model.weights.h5")

pcn.compile(
    optimizer=tf.keras.optimizers.AdamW(learning_rate=1e-7, weight_decay=1e-2),
    loss="categorical_crossentropy",  # Placeholder loss
    metrics=["accuracy"]
)


# Backprop ANN
bp_ann = tf.keras.models.load_model(dir_bpann)

# Some random checks 
#pcn.infer(tf.random.normal([64, 4704]), return_sequence=True)
#[[i.shape for i in j] for j in pcn.infer(tf.random.normal([64, 4704]), return_sequence=True)]
# bp_ann(tf.random.normal([64, 4704]))


In [56]:
#loss, accuracy = pcn.evaluate((xstack.map(img_preproc).map(flatten), ystack), verbose=0) 
#loss, accuracy = bp_ann.evaluate(xstack.map(img_preproc).map(flatten), ystack, verbose = 0)
#x_data, y_labels = flatten(*img_preproc(xstack, ystack))
#loss, accuracy = pcn.evaluate(x_data, y_labels, verbose=0)#

#loss, accuracy = pcn.evaluate(val_dataset.map(img_preproc).map(flatten)) 
#print(f"Accuracy after training: {accuracy}")
#loss, accuracy = bp_ann.evaluate(val_dataset.batch(512).map(img_preproc).map(flatten))
#print(f"Accuracy after training: {accuracy}")
#bp_ann.predict(val_dataset.batch(512).map(img_preproc).map(flatten))
processed_val_dataset = val_dataset.batch(64).map(img_preproc).map(flatten)

predictions = bp_ann.predict(val_dataset.batch(64).map(img_preproc).map(flatten)
)
print(predictions[:5])  # Display the first 5 predictions

# Evaluate the model
test_loss, test_accuracy = bp_ann.evaluate(processed_val_dataset)
print(f"Test Loss: {test_loss}, Test Accuracy: {test_accuracy}")



Found 180 files belonging to 5 classes.
Found 180 files belonging to 4 classes.
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 128ms/step
[[0.3959008 ]
 [0.3784286 ]
 [0.38780093]
 [0.38395813]
 [0.3791114 ]]


  self.gen.throw(typ, value, traceback)


AttributeError: 'NoneType' object has no attribute 'items'

In [58]:
pcn.build([None, 4704])
pcn.load_weights("/Users/denisekittelmann/Documents/Python/BiMoL/results/pcn/pcn_tt/model.weights.h5")

pcn.compile(
    optimizer=tf.keras.optimizers.AdamW(learning_rate=1e-7, weight_decay=1e-2),
    loss="categorical_crossentropy",  # Placeholder loss
    metrics=["accuracy"]
)

# Prepare the validation dataset with preprocessing, flattening, and batching
processed_val_dataset = val_dataset.batch(512).map(img_preproc).map(flatten)

# Run predictions
predictions = pcn.predict(val_dataset.batch(512).map(img_preproc).map(flatten)
)
print(predictions[:5])  # Display the first 5 predictions

# Evaluate the model
test_loss, test_accuracy = pcn.evaluate(processed_val_dataset)
print(f"Test Loss: {test_loss}, Test Accuracy: {test_accuracy}")

  saveable.load_own_variables(weights_store.get(inner_path))


Found 180 files belonging to 5 classes.
Found 180 files belonging to 4 classes.
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 638ms/step
[[-0.46771252]
 [-0.4672056 ]
 [-0.45892292]
 [-0.46280956]
 [-0.46269363]]


  self.gen.throw(typ, value, traceback)
  return self.fn(y_true, y_pred, **self._fn_kwargs)


Found 180 files belonging to 5 classes.
Found 180 files belonging to 4 classes.
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 861ms/step - accuracy: 0.6333 - loss: 4.3710e-08
Test Loss: 4.371007733539045e-08, Test Accuracy: 0.6333333253860474


In [59]:
bp_ann.build([None, 4704])
#pcn.load_weights("/Users/denisekittelmann/Documents/Python/BiMoL/results/pcn/pcn_tt/model.weights.h5")

bp_ann.build([None,4704]) 
bp_ann.compile(optimizer=tf.keras.optimizers.AdamW(learning_rate=1e-5, weight_decay=1e-2),
              metrics=["accuracy"],
              loss="BinaryCrossentropy",  # "CategoricalCrossentropy" "MeanSquaredError" 
              )

# Prepare the validation dataset with preprocessing, flattening, and batching
processed_val_dataset = val_dataset.batch(512).map(img_preproc).map(flatten)

# Run predictions
predictions = bp_ann.predict(val_dataset.batch(512).map(img_preproc).map(flatten)
)
print(predictions[:5])  # Display the first 5 predictions

# Evaluate the model
test_loss, test_accuracy = bp_ann.evaluate(processed_val_dataset)
print(f"Test Loss: {test_loss}, Test Accuracy: {test_accuracy}")

Found 180 files belonging to 5 classes.
Found 180 files belonging to 4 classes.
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 763ms/step
[[0.38204947]
 [0.37922654]
 [0.3781236 ]
 [0.42263642]
 [0.38012522]]


  self.gen.throw(typ, value, traceback)


Found 180 files belonging to 5 classes.
Found 180 files belonging to 4 classes.
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 758ms/step - accuracy: 0.6333 - loss: 0.6569
Test Loss: 0.6569021344184875, Test Accuracy: 0.6333333253860474


In [43]:
for data, labels in val_dataset.take(1):
    print(data.shape, labels.shape)

Found 180 files belonging to 5 classes.
Found 180 files belonging to 4 classes.
(56, 28, 3) (1,)


In [None]:
def call_with_states(model, x):
     outputs = []
     for layer in model.layers:
          x = layer(x)
          outputs.append(x)
     return outputs

In [None]:
################################ MODEL PATHS ################################ 
dir_bpann = "/Users/denisekittelmann/Documents/Python/BiMoL/results/bp_ann/model_checkpoint_649_0.67.keras"
dir_pcn = "/Users/denisekittelmann/Documents/Python/BiMoL/results/pcn/model_checkpoint_pcnoriginal_726_0.67.keras"
unzip_path = "/Users/denisekittelmann/Documents/Python/BiMoL/results/pcn/pcn_tt/"

################################ LOAD ANNs ################################ 
input_layer = tf.keras.layers.Input(shape=(4704,))

model = PredictiveCodingNetwork([CustomDense(units=6, activation="sigmoid"),
                                 CustomDense(units=4, activation="sigmoid"), 
                                 CustomDense(units=1, activation="sigmoid")], 
                                vars=[1, 1, 1], # variances. This is super useless and in the code only the last variance is used
                                beta=0.1)

keras_file_path = "/Users/denisekittelmann/Documents/Python/BiMoL/results/pcn/model_checkpoint_pcnoriginal_726_0.67.keras"
unzip_path = "/Users/denisekittelmann/Documents/Python/BiMoL/results/pcn/pcn_tt/"

with zipfile.ZipFile(keras_file_path, 'r') as zip_ref:
    zip_ref.extractall(unzip_path)

# Step 2: Instantiate your custom model with the correct parameters
pcn = model # PCN loaded after reinitialising the network

# Now `correct_model` has the loaded weights
pcn.build([None, 4704])
pcn.load_weights("/Users/denisekittelmann/Documents/Python/BiMoL/results/pcn/pcn_tt/model.weights.h5")

pcn.compile(
    optimizer=tf.keras.optimizers.AdamW(learning_rate=1e-7, weight_decay=1e-2),
    loss="categorical_crossentropy",  # Placeholder loss
    metrics=["accuracy"]
)


# Backprop ANN
bp_ann = tf.keras.models.load_model(dir_bpann)

# Some random checks 
#pcn.infer(tf.random.normal([64, 4704]), return_sequence=True)
#[[i.shape for i in j] for j in pcn.infer(tf.random.normal([64, 4704]), return_sequence=True)]
# bp_ann(tf.random.normal([64, 4704]))


In [None]:
# Build the validation dataset

seed = 123

val_dataset = tf.data.Dataset.from_generator(
    lambda: generate_dataset(img_dir_test_lead, img_dir_test_trail, class_names_L, class_names_T, label_dict, image_size = (28,28), seed = seed),
    output_signature=(
        tf.TensorSpec(shape=(56, 28, 3), dtype=tf.float32),  # shape of x 
        tf.TensorSpec(shape=(1), dtype=tf.float32)  # shape of y 
    )
) 

print(val_dataset)


In [None]:

model = PredictiveCodingNetwork(vars=[1, 1, 1], beta=0.1)

# Load weights
dir_pcn = "/Users/denisekittelmann/Documents/Python/BiMoL/results/pcn/model_checkpoint_pcnoriginal_726_0.67.keras"
model.load_weights(dir_pcn)

# Save to a new file
updated_dir_pcn = "/Users/denisekittelmann/Documents/Python/BiMoL/results/pcn/updated_model_checkpoint.keras"
model.save(updated_dir_pcn)

  return saving_lib.save_model(model, filepath)


In [22]:
class CustomDense(tf.keras.layers.Dense):
    def call(self, inputs):
        """This works like a dense, except for the activation being called earlier."""
        # Apply the activation to the input first
        activated_input = self.activation(inputs)
        # Perform the matrix multiplication and add the bias
        output = tf.matmul(activated_input, self.kernel)
        if self.use_bias:
            output = output + self.bias
        return output


class PredictiveCodingNetwork(tf.keras.Sequential):
    def __init__(self, layers, vars, beta, **kwargs):
        """Initialize a PredictiveCodingNetwork"""
        super().__init__(layers, **kwargs)
        self.vars = tf.convert_to_tensor(vars, dtype=tf.float32)
        self.beta = beta

    def call_with_states(self, x):
        x_list = [x]
        for layer in self.layers:
            x = layer(x)
            x_list.append(x)
        return x_list

    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        x, y = data

        # do the stuff we do in train_epochs
        outputs, errors = self.infer(x, y)
        self.update_params(outputs, errors)

        # Update metrics (includes the metric that tracks the loss)
        pred = self.call(x)
        for metric in self.metrics:
            metric.update_state(y, pred)
        # Return a dict mapping metric names to current value
        return {m.name: m.result() for m in self.metrics}
    
   
    def infer(self, x_batch, y_batch=None, n_iter=50, return_sequence=False):
        """Note: while model call, call with states and model evaluate take
        2D input, train_step and infer take stacked 3D inputs."""
        if return_sequence:
            errors_time = []
            states_time = []
        errors = [None for _ in range(len(self.layers))]
        f_x_arr = [None for _ in range(len(self.layers))]
        f_x_deriv_arr = [None for _ in range(len(self.layers))]
        shape = x_batch.shape
        batch_size = shape[0]

        for itr in range(n_iter):
            # if its the first itr, set x to the current forward call
            if itr == 0:
                x = self.call_with_states(x_batch)

                if y_batch is not None:
                  x[-1] = y_batch
            else:
                # update g and x only for consecutive iterations
                for l in range(1, len(self.layers)):
                    g = tf.multiply(tf.matmul(errors[l], self.layers[l].kernel, transpose_b=True), f_x_deriv_arr[l])
                    x[l] = x[l] + self.beta * (-errors[l-1] + g)

            # update f_x etc for every iteration
            for l in range(len(self.layers)):
                f_x = self.layers[l].activation(x[l])
                f_x_deriv_fn = self.get_activation_derivative(self.layers[l].activation)
                f_x_deriv = f_x_deriv_fn(x[l])
                f_x_arr[l] = f_x
                f_x_deriv_arr[l] = f_x_deriv
                errors[l] = (x[l + 1] - tf.matmul(f_x, self.layers[l].kernel) - self.layers[l].bias) / self.vars[l]
            
            if return_sequence:
                errors_time.append(errors)
                states_time.append(x)

        # return what we want to return
        if return_sequence:
            states_time = [tf.stack(tensors, axis=1) for tensors in zip(*states_time)]
            errors_time = [tf.stack(tensors, axis=1) for tensors in zip(*errors_time)]
            return states_time, errors_time
        else:
            return x, errors
    
    # We need to check if we actually need call here.
    # Now, call will give us the result of the network after the first inference step
    # If we want to have the results after the last inference step, we would need to change this
    #def call(self, inputs, training=False):
    #    """Call, but time distributed."""
    #    x, errors = self.infer(inputs, return_sequence=False)
    #    return x[-1]

    def update_params(self, x, errors):
        """Update the model parameters."""
        batch_size = tf.cast(tf.shape(x[0])[0], tf.float32)
        gradients = []
        for l, layer in enumerate(self.layers):
            grad_w = self.vars[-1] * (1 / batch_size) * tf.matmul(tf.transpose(self.layers[l].activation(x[l])), errors[l])
            grad_b = self.vars[-1] * (1 / batch_size) * tf.reduce_sum(errors[l], axis=0)
            gradients.append((-grad_w, layer.kernel))
            gradients.append((-grad_b, layer.bias))
        self.optimizer.apply_gradients(gradients)

    def get_activation_derivative(self, activation):
        """Return a function for the derivative of the given activation function."""
        activation_fn = tf.keras.activations.get(activation)
        if activation_fn == tf.keras.activations.linear:
            return lambda x: tf.ones_like(x)
        elif activation_fn == tf.keras.activations.tanh:
            return lambda x: 1 - tf.square(tf.nn.tanh(x))
        elif activation_fn == tf.keras.activations.sigmoid:
            return lambda x: tf.nn.sigmoid(x) * (1 - tf.nn.sigmoid(x))
        else:
            raise ValueError(f"{activation} not supported")
        

model = PredictiveCodingNetwork([CustomDense(units=6, activation="sigmoid"),
                                 CustomDense(units=4, activation="sigmoid"), 
                                 CustomDense(units=1, activation="sigmoid")], 
                                vars=[1, 1, 1], # variances. This is super useless and in the code only the last variance is used
                                beta=0.1)

In [9]:
dir_pcn = "/Users/denisekittelmann/Documents/Python/BiMoL/results/pcn/model_checkpoint_pcnoriginal_726_0.67.keras"
#test = tf.keras.models.load_model(dir_pcn, compile = False)

pcn = tf.keras.models.load_model(
    dir_pcn,
    custom_objects={
        "CustomDense": CustomDense,
        "PredictiveCodingNetwork": PredictiveCodingNetwork
    }
)

ValueError: Layers added to a Sequential model can only have a single positional argument, the input tensor. Layer InputLayer has multiple positional arguments: []

In [None]:
seed = 123

val_dataset = tf.data.Dataset.from_generator(
    lambda: generate_dataset(img_dir_test_lead, img_dir_test_trail, class_names_L, class_names_T, label_dict, image_size = (28,28), seed = seed),
    output_signature=(
        tf.TensorSpec(shape=(56, 28, 3), dtype=tf.float32),  # shape of x 
        tf.TensorSpec(shape=(1), dtype=tf.float32)  # shape of y 
    )
) 

print(val_dataset)

In [None]:
import tensorflow as tf
import zipfile
import os

# Step 1: Unzip the .keras file to access its internal structure
keras_file_path = "/Users/denisekittelmann/Documents/Python/BiMoL/results/pcn/model_checkpoint_pcnoriginal_726_0.67.keras"
unzip_path = "/Users/denisekittelmann/Documents/Python/BiMoL/results/pcn/pcn_tt/"

with zipfile.ZipFile(keras_file_path, 'r') as zip_ref:
    zip_ref.extractall(unzip_path)

# Step 2: Instantiate your custom model with the correct parameters
correct_model = model # Replace with your model parameters

# Step 3: Use a checkpoint to load weights from the 'variables' directory
#checkpoint = tf.train.Checkpoint(model=correct_model)
#checkpoint.restore(os.path.join(unzip_path)).assert_existing_objects_matched()

# Now `correct_model` has the loaded weights
correct_model.build([None, 4704])
correct_model.load_weights("/Users/denisekittelmann/Documents/Python/BiMoL/results/pcn/pcn_tt/model.weights.h5")

64

In [None]:
seed = 123

val_dataset = tf.data.Dataset.from_generator(
    lambda: generate_dataset(img_dir_test_lead, img_dir_test_trail, class_names_L, class_names_T, label_dict, image_size = (28,28), seed = seed),
    output_signature=(
        tf.TensorSpec(shape=(56, 28, 3), dtype=tf.float32),  # shape of x 
        tf.TensorSpec(shape=(1), dtype=tf.float32)  # shape of y 
    )
) 

print(val_dataset)