In [2]:
import os
os.environ["KERAS_BACKEND"] = "torch"

import torch
import scipy
import keras
import scipy.spatial.distance

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from keras import Regularizer 
from keras import backend
from mpl_toolkits.mplot3d import Axes3D
from torch.utils.data import DataLoader, TensorDataset

In [3]:
np.random.seed(18229)
torch.manual_seed(94892)

<torch._C.Generator at 0x11ba05c70>

In [4]:
class mazeGeneratorI():
    '''
    Objects of the mazeGeneratorI class can create numpy and tf datasets of the first choice of the maze task.
    Task structure:
        Goal presentation, followed by delay period, followed by choice options.
    Response:
        One response required from agent at end of episode. Direction (Left, Up, Right, Down) of first step.
    Encoding:
        Both observations and labels are OneHot encoded.
    Usage:
        The two only function a user should need to access are "construct_numpy_data" and "construct_tf_data"
    Options:
        Both data construction methods have an option to shuffle the labels of data.
        The numpy data construction method allows to also return the maze identifiers.
    '''
    def __init__(self, goal_presentation_steps, delay_steps, choices_presentation_steps):
        self.version = 'v1.2.0'
        
        # Import variables defining episode
        self.goal_presentation_steps = goal_presentation_steps
        self.delay_steps = delay_steps
        self.choices_presentation_steps = choices_presentation_steps

        # Construct mazes dataframe
        ## Add encoded versions of the goal / choices presentations and the next step response
        self.mazesdf = self.import_maze_dic()
        self.mazesdf['Goal_Presentation'] = self.mazesdf['goal'].map({
            7:np.concatenate((np.array([1,0,0,0]),np.repeat(0,4))),
            9:np.concatenate((np.array([0,1,0,0]),np.repeat(0,4))),
            17:np.concatenate((np.array([0,0,1,0]),np.repeat(0,4))),
            19:np.concatenate((np.array([0,0,0,1]),np.repeat(0,4)))})
        self.mazesdf['Choices_Presentation']=self.mazesdf['ChoicesCategory'].map(lambda x: self.encode_choices(x=x))
        self.mazesdf['Step_Encoded']=self.mazesdf['NextFPmap'].map(lambda x: self.encode_next_step(x=x))

    def construct_numpy_data(self, number_of_problems, return_maze_identifiers = False, np_shuffle_data = False):
        # Create a new column which hold the vector for each problem
        self.mazesdf['Problem_Vec']=self.mazesdf.apply(lambda x: self.create_problem_observation(row= x,goal_presentation_steps= self.goal_presentation_steps,delay_steps= self.delay_steps,choices_presentation_steps= self.choices_presentation_steps), axis=1)
        # Set a random order of maze problems for the current session
        self.mazes_order = np.random.randint(0,8,number_of_problems)

        # Create vectors, holding observations and labels
        session_observation =np.array([])
        session_labels = np.array([])
        for i in self.mazes_order:
            session_observation = np.append(session_observation,self.mazesdf.iloc[i]['Problem_Vec'])
            session_labels = np.append(session_labels,self.mazesdf.iloc[i]['Step_Encoded'])

        # Reshape vectors to fit network observation and response space
        session_length = self.goal_presentation_steps + self.delay_steps + self.choices_presentation_steps
        session_observation = np.reshape(session_observation, (-1,session_length,8)).astype('float32')
        session_labels = np.reshape(session_labels, (-1,4)).astype('float32')

        # If np_shuffle_data == 'Labels, the order of labels is shuffled to randomise correct answers
        if np_shuffle_data == 'Labels':
          shuffle_generator = np.random.default_rng(38446)
          shuffle_generator.shuffle(session_labels,axis=0)

        # If return_maze_identifiers == 'IDs', return the array with maze IDs alongside the regular returns (observations, labels)
        if return_maze_identifiers == 'IDs':
          return session_observation, session_labels, self.mazes_order

        return session_observation, session_labels

    def construct_pytorch_data(self, number_of_problems, batch_size, shuffle_data=False):
        # Create dataset as described by the numpy dataset function
        npds, np_labels = self.construct_numpy_data(number_of_problems=number_of_problems, np_shuffle_data=shuffle_data)
        
        # Convert NumPy arrays to PyTorch tensors
        tensor_ds = torch.tensor(npds, dtype=torch.float32)
        tensor_labels = torch.tensor(np_labels, dtype=torch.float32)

        # Create a TensorDataset and DataLoader for batching and shuffling
        dataset = TensorDataset(tensor_ds, tensor_labels)
        dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle_data)
        
        return dataloader

    def reset_construction_params(self, goal_presentation_steps, delay_steps, choices_presentation_steps):
        self.goal_presentation_steps = goal_presentation_steps
        self.delay_steps = delay_steps
        self.choices_presentation_steps = choices_presentation_steps

    def import_maze_dic(self, mazeDic=None):
        if mazeDic is None:
            # Set up dataframe with first choices of maze task
            ## The dictionary was generated using MazeMetadata.py (v1.0.0) and the following call:
            ### mazes.loc[(mazes['Nsteps']==2)&(mazes['ChoiceNo']=='ChoiceI')][['goal','ChoicesCategory','NextFPmap']].reset_index(drop=True).to_dict()
            self.mazesDic = {'goal': {0: 9, 1: 9, 2: 19, 3: 17, 4: 17, 5: 7, 6: 19, 7: 7},
            'ChoicesCategory': {0: 'ul',
            1: 'rd',
            2: 'ld',
            3: 'rd',
            4: 'ul',
            5: 'ur',
            6: 'lr',
            7: 'lr'},
            'NextFPmap': {0: 'u', 1: 'r', 2: 'd', 3: 'd', 4: 'l', 5: 'u', 6: 'r', 7: 'l'}}
        else:
            self.mazesDic = mazeDic
        
        # Create and return dataframe
        return pd.DataFrame(self.mazesDic)

    def encode_choices(self, x):
        # Helper function to create the observation vector for choice periods
        choices_sec = np.repeat(0,4)
        choicesEncoding = pd.Series(list(x))
        choicesEncoding = choicesEncoding.map({'l':1,'u':2,'r':3,'d':4})
        for encodedChoice in choicesEncoding:
            choices_sec[encodedChoice-1]=1
        return np.concatenate((np.repeat(0,4),choices_sec))

    def encode_next_step(self, x):
        # Helper function to change the response / action to a OneHot encoded vector
        step_sec = np.repeat(0,4)
        stepEncoding = pd.Series(list(x))
        stepEncoding = stepEncoding.map({'l':1,'u':2,'r':3,'d':4})
        for encodedStep in stepEncoding:
            step_sec[encodedStep-1]=1
        return step_sec

    def create_problem_observation(self, row, goal_presentation_steps, delay_steps, choices_presentation_steps):
        # Helper function to create one vector describing the entire outline of one maze problem (Goal presentation, Delay Period, and Choices Presentation)
        goal_vec = np.tile(row['Goal_Presentation'], goal_presentation_steps)
        delay_vec = np.tile(np.repeat(0,8), delay_steps)
        choices_vec = np.tile(row['Choices_Presentation'], choices_presentation_steps)
        problem_vec = np.concatenate((goal_vec,delay_vec,choices_vec))
        return problem_vec

    def __repr__(self):
        return '\n'.join([
            f'Maze DataSet Generator',
            f'Goal Presentation Steps: {self.goal_presentation_steps}',
            f'Delay Steps: {self.delay_steps}',
            f'Choices Presentation Steps: {self.choices_presentation_steps}'])

In [5]:
# This constructor might run for around a minute to generate the dataset
gen = mazeGeneratorI(goal_presentation_steps=20,delay_steps=10,choices_presentation_steps=20)
torchdf = gen.construct_pytorch_data(number_of_problems=5120, batch_size=128)
torchdf_test = gen.construct_pytorch_data(number_of_problems=2560, batch_size=128)
torchdf_val = gen.construct_pytorch_data(number_of_problems=2560, batch_size=128)
print(torchdf)

example_data = next(iter(torchdf))

<torch.utils.data.dataloader.DataLoader object at 0x175f3e7d0>


In [6]:

class SE1(Regularizer):
  """A regulariser for sptially embedded RNNs.
  Applies L1 regularisation to recurrent kernel of
  RNN which is weighted by the distance of units
  in predefined 3D space.
  Calculation:
      se1 * sum[distance_matrix o recurrent_kernel]
  Attributes:
      se1: Float; Weighting of SE1 regularisation term.
      distance_tensor: TF tensor / matrix with cost per
      connection in weight matrix of network.
  """

  def __init__(self, se1=0.01, neuron_num = 100, network_structure = (5,5,4), coordinates_list = None, distance_power = 1, distance_metric = 'euclidean'):  
    self.version = 'v1.1.0'
    self.distance_power = distance_power
    
    # Set SE1 regularisation strength to default of 0.01 if no value given
    se1 = 0.01 if se1 is None else se1
    self._check_penalty_number(se1)
    self.se1 = se1

    # Set up tensor with distance matrix
    ## Set up neurons per dimension
    nx = np.arange(network_structure[0])
    ny = np.arange(network_structure[1])
    nz = np.arange(network_structure[2])

    ## Set up coordinate grid
    [x,y,z] = np.meshgrid(nx,ny,nz)
    self.coordinates = [x.ravel(),y.ravel(),z.ravel()]

    ## Override coordinate grid if one if provided in init
    if coordinates_list is not None:
      self.coordinates = coordinates_list

    ## Check neuron number / number of coordinates
    if (len(self.coordinates[0])==neuron_num)&(len(self.coordinates[1])==neuron_num)&(len(self.coordinates[2])==neuron_num):
      pass
    else:
      raise ValueError('Network / coordinate structure does not match the number of neurons.')

    ## Calculate the euclidean distance matrix
    euclidean_vector = scipy.spatial.distance.pdist(np.transpose(self.coordinates), metric=distance_metric)
    euclidean = scipy.spatial.distance.squareform(euclidean_vector**self.distance_power)
    self.distance_matrix = euclidean.astype('float32')
    self.spatial_cost_matrix = self.distance_matrix

    ## Add minimal cost for recurrent self connection (on diagonal)
    #diag_dist = np.diag(np.repeat(0.1,100)).astype('float32')
    #self.distance_matrix = self.distance_matrix + diag_dist

    ## Create tensor from distance matrix
    self.distance_tensor =  torch.from_numpy(self.distance_matrix)


  def __call__(self, x):
    # Add calculation of loss here.
    # L1 for reference: self.l1 * math_ops.reduce_sum(math_ops.abs(x))
    abs_weight_matrix = torch.abs(x)

    #se1_loss = self.se1 * tf.math.multiply(abs_weight_matrix, self.distance_tensor)
    #se1_loss = tf.math.reduce_sum(abs_weight_matrix)
    se1_loss = self.se1 * torch.sum(torch.multiply(abs_weight_matrix, self.distance_tensor))
    
    return se1_loss

  def _check_penalty_number(self, x):
    """check penalty number availability, raise ValueError if failed."""
    if not isinstance(x, (float, int)):
      raise ValueError(('Value: {} is not a valid regularization penalty number, '
                        'expected an int or float value').format(x))

  def visualise_distance_matrix(self):
    plt.imshow(self.distance_matrix)
    plt.colorbar()
    plt.show()

  def visualise_neuron_structure(self):
    fig = plt.figure()
    ax = Axes3D(fig)
    ax.scatter(self.coordinates[0],self.coordinates[1],self.coordinates[2],c='b',marker='.')
    ax.set_xlabel('x')
    ax.set_ylabel('y')
    ax.set_zlabel('z')
    plt.show()

  def get_config(self):
    return {'se1': float(self.se1)}

In [7]:
class SE1_sWc(SE1):
    '''
    Version of SE1 regulariser which combines the spatial and communicability parts in loss function.
    Additional comms_factor scales the communicability matrix.
    The communicability term used here is unbiased weighted communicability:
    Crofts, J. J., & Higham, D. J. (2009). A weighted communicability measure applied to complex brain networks. Journal of the Royal Society Interface, 6(33), 411-414.
    '''
    def __init__(self, se1=0.01, comms_factor = 1, neuron_num = 100, network_structure = (5,5,4), coordinates_list = None, distance_power = 1, distance_metric = 'euclidean'):
      SE1.__init__(self, se1, neuron_num , network_structure , coordinates_list, distance_power , distance_metric)
      self.comms_factor = comms_factor

    def __call__(self, x):
      # Take absolute of weights
      abs_weight_matrix = torch.abs(x)

      # Calulcate weighted communicability (see reference in docstring)
      stepI = torch.sum(abs_weight_matrix, axis=1)
      stepII = torch.pow(stepI, -0.5)
      stepIII = torch.diag(stepII)
      stepIV = torch.matrix_exp(stepIII@abs_weight_matrix@stepIII)
      comms_matrix = stepIV.clone()  # Create a copy to avoid modifying the original tensor
      comms_matrix.diagonal(dim1=-2, dim2=-1).zero_()  # Set diagonal to zeros in-place

      # Multiply absolute weights with communicability weights
      comms_matrix = comms_matrix**self.comms_factor
      comms_weight_matrix = torch.multiply(abs_weight_matrix, comms_matrix)

      # Multiply comms weights matrix with distance tensor, scale the mean, and return as loss
      se1_loss = self.se1 * torch.sum(torch.multiply(comms_weight_matrix , self.distance_tensor))
      
      return se1_loss


In [8]:

class RNNWeightMatrixHistoryI(keras.callbacks.Callback):
    '''
    Saves the RNN_Weight_Matrix to the training history before
    the start of training and after finishing each epoch.

    The network model needs to be build manually before calling fit() method
    for this callback to work.
    '''
    def __init__(self, RNN_layer_number = 0):
        super(RNNWeightMatrixHistoryI, self).__init__()
        self.RNN_layer_number = RNN_layer_number

    def on_train_begin(self, logs=None):
        # Create key for RNN_Weight_Matrix in history
        self.model.history.history['RNN_Weight_Matrix'] = []
        #print("Created key for RNN_Weight_Matrix in history.")

        # Save matrix before start of training
        self.model.history.history['RNN_Weight_Matrix'].append(self.model.layers[self.RNN_layer_number].get_weights()[1])
        #print("Saved RNN_Weight_Matrix to history.")

    def on_epoch_end(self, epoch, logs=None):
        # Save RNN_Weight_Matrix to history
        self.model.history.history['RNN_Weight_Matrix'].append(self.model.layers[self.RNN_layer_number].get_weights()[1])
        #print("Saved RNN_Weight_Matrix to history.")

In [12]:
regu_strength = 0.3

backend.clear_session()
regu = SE1_sWc(se1=regu_strength)
coord = regu.coordinates
cost = regu.spatial_cost_matrix

## Assemble network
tf_model = keras.models.Sequential([
    keras.layers.GaussianNoise(stddev=0.05),
    keras.layers.SimpleRNN(100, activation='relu',recurrent_initializer='orthogonal', return_sequences=False, recurrent_regularizer= regu),
    keras.layers.Dense(4, activation='softmax')
])
tf_model.build(input_shape=example_data[0].shape)

In [13]:
tf_model.compile(optimizer=keras.optimizers.Adam(),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

In [19]:
os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1"
dummy_input = np.random.randn(*example_data[0].shape).astype(np.float32)
tf_model(dummy_input)

NotImplementedError: Exception encountered when calling SimpleRNNCell.call().

[1mThe operator 'aten::linalg_qr.out' is not currently implemented for the MPS device. If you want this op to be added in priority during the prototype phase of this feature, please comment on https://github.com/pytorch/pytorch/issues/77764. As a temporary fix, you can set the environment variable `PYTORCH_ENABLE_MPS_FALLBACK=1` to use the CPU as a fallback for this op. WARNING: this will be slower than running natively on MPS.[0m

Arguments received by SimpleRNNCell.call():
  • sequence=torch.Tensor(shape=torch.Size([128, 8]), dtype=float32)
  • states=('torch.Tensor(shape=torch.Size([128, 100]), dtype=float32)',)
  • training=False

In [None]:
history = tf_model.fit(torchdf, epochs=10,validation_data=torchdf_test,
                       callbacks=RNNWeightMatrixHistoryI(RNN_layer_number=1)
                       )

RuntimeError: Unable to automatically build the model. Please build it yourself before calling fit/evaluate/predict. A model is 'built' when its variables have been created and its `self.built` attribute is True. Usually, calling the model on a batch of data is the right way to build it.
Exception encountered:
'Variable sequential/simple_rnn/simple_rnn_cell/kernel is already initialized.'