In [1]:
# In this file, I have implemented both the Modern Hopfield Network and the Fluid Hopfield Network. The first segment of 
# code below consists of helper functions, and the latter segments of code contain the actual implementations of the networks
# themselves. Feel free to try running these networks on your own (or existing) semantic information datasets!

In [2]:
# First, we define some important helper functions for the Modern Hopfield Network and the Fluid Hopfield Network below!
import numpy as np

# softmax function takes in a vector 'x' and either a constant 'slope_param' or a vector 'slope_param' and applies the 
# softmax to 'x' with slope parameters given by 'slope_param'
def softmax(x, slope_param):
  y = (np.exp(x))**(slope_param)
  tot = np.sum(y)
  return y / tot

# passes each element of a vector 'v' through sigmoid nonlinearity
def vector_sigmoid(v):
  denom = 1. + np.exp(-1 * v)
  return np.ones(len(v)) / denom

# applies sigmoid nonlinearity to the number 'x'
def sigmoid(x):
  return 1. / (1. + np.exp(-x))

# detects whether 'v' is a "sub-vector" of 'w', i.e. all of the ones of 'v' are also ones of 'w'
def isSubPattern(v, w):  
  diff = w - v
  return np.all(diff >= 0)

# detects whether vector 'v' is a sub-pattern of any of the rows of the matrix 'matrix_of_vectors'
def isSubPatternOfAny(v, matrix_of_vectors):
  for w in matrix_of_vectors:
    if isSubPattern(v, w):
      return True
  return False

In [4]:
# Now, we define the Modern Hopfield Network, which takes in 5 parameters, shown below. The matrix 'data' consists of 
# the feature vectors of the stored memories row-wise, and 'num_features' is the dimension of each of these rows. The 
# number of total stored memories is 'num_memories'. Finally, the slope parameter 'slope_param' and the time parameter
# 'time_param' are specified. 

class ModernHopfieldNet:

  # initialize a Modern Hopfield Network
  def __init__(self, data, num_features, num_memories, slope_param, time_param):
    (rows, cols) = np.shape(data)
    assert num_features == cols
    assert num_memories == rows
    self.num_features = num_features
    self.num_memories = num_memories
    self.weights = data
    self.slope_param = slope_param
    self.time_param = time_param
  
  # pass feature pattern 'v' as input to network and perform 'num_epochs' cycles of updating 'v'
  def forward(self, v, num_epochs):
    v_curr = v 
    for t in range(num_epochs):
      h = np.matmul(self.weights, v_curr)
      weights_transpose = np.transpose(self.weights)
      softmax_h = softmax(h, self.slope_param) 
      # update rule for network is implemented below
      v_new = v_curr + (1.0 / self.time_param) * (np.matmul(weights_transpose, softmax_h) - v_curr)
      v_curr = v_new

    # for simplicity, we could round components of 'v_curr' to 0 or 1 to get a final binary vector as output:
    # return np.around(v_curr)
    # But, to be more rigorous, it would be better to actually use the 'self.__find_stored_mem' method, as described
    # in the code for the Fluid Hopfield Net ('criterion' can be set to 0.1):
    v_curr = self.__find_stored_mem(v_curr, 0.1)
  
  # this helper method records (Euclidean) distances of each of the stored memory vectors in the network to 'v_curr'
  def __record_distances_to_mems(self, v_curr):
    distances = np.zeros(self.weights.shape[0])
    for i in range(len(self.weights)):
      distances[i] = np.sqrt( np.sum( (self.weights[i] - v_curr)**2 ) )
    return distances

  # this helper function is used at the end of the 'forward' method. it takes in the vector 'v_curr' and a threshold
  # value 'criterion' and returns closest stored memory vector that is within a Euclidean distance of 'criterion' from 'v_curr',
  # if such a stored memory exists; else, it returns 'v_curr'  
  def __find_stored_mem(self, v_curr, criterion):
    distances = self.__record_distances_to_mems(v_curr)
    indices = np.where(distances == np.min(distances))[0]
    if (distances[indices[0]] < criterion):
      return self.weights[indices[0]]
    else:
      return v_curr

  # set slope parameter of network (i.e. the "level of attention" across all the stored memories)
  def set_slope_param(self, slope_param):
    self.slope_param = slope_param
  
  # set time parameter of network
  def set_time_param(self, time_param):
    self.time_param = time_param 
  
  # returns slope parameter
  def get_slope_param(self):
    return self.slope_param 
  
  # returns time parameter
  def get_time_param(self):
    return self.time_param

In [5]:
# Now, the goal is to automate the process of giving attention to allow for the learning of novel concepts and robust 
# retrieval of existing memories. To achieve this, we incorporate a vector of slope parameters - one per memory - into
# the network and have each slope parameter dynamically shift during each cycle of the 'forward' method.

# below are hyperparameters of the Fluid Hopfield Network
num_epochs = 20
epsilon = 0.5
decay = 0.1
criterion = 0.1

# We now define the Fluid Hopfield Network, which takes in 4 parameters, shown below. The matrix 'data' consists of 
# the feature vectors of the stored memories row-wise, and 'num_features' is the dimension of each of these rows. The 
# number of total stored memories is 'num_memories'. The time parameter 'time_param' is also specified. (Notice that no
# slope parameters are specified to the network, since they are now automated!)

class FluidHopfieldNet:

  # initialize a Fluid Hopfield Network
  def __init__(self, data, num_features, num_memories, time_param):
    (rows, cols) = np.shape(data)
    assert num_features == cols
    assert num_memories == rows
    self.num_features = num_features
    self.num_memories = num_memories
    self.weights = data
    self.time_param = time_param
    # initialize slope parameters for already-consolidated memories from 'data' to 1
    self.theta = np.ones(rows)

  # pass feature pattern 'v' as input to network and perform 'num_epochs' cycles of updating 'v'
  def forward(self, v, num_epochs):

    # check if 'v' is sub-pattern of any of the existing memories of network
    if (not isSubPatternOfAny(v, self.weights)):
      # if not, add it as a new memory to the network, with a low slope parameter of -1
      self.theta = np.append(self.theta, -1.)
      self.weights = np.append(self.weights, [v], axis=0)
      self.num_memories += 1
    
    # for later analysis, one can store distance of current input pattern to each of the stored memories
    distances_to_mems = np.zeros((num_epochs+1, self.num_memories))
    distances_to_mems[0] = self.__record_distances_to_mems(v)

    v_curr = v 
    for t in range(num_epochs):   
      # compile the similiarities between 'v_curr' and each stored memory
      h = np.matmul(self.weights, v_curr)
      # pass slope parameters through sigmoid nonlinearity, and then apply them to each component of h
      h = np.multiply(vector_sigmoid(self.theta), h)
      weights_transpose = np.transpose(self.weights)
      # scale the sigma(theta_j)'s by a factor of 10
      softmax_h = softmax(h, 10.) 
      # same update rule as with Modern Hopfield Net, though 'softmax_h' is now the generalized softmax function
      v_new = v_curr + (1.0 / self.time_param) * (np.matmul(weights_transpose, softmax_h) - v_curr)
      v_curr = v_new
      # dynamically update theta weights
      self.__weight_update(v_curr)
      # find distance of current input pattern to each of the stored memories
      distances_to_mems[t+1] = self.__record_distances_to_mems(v_curr)

    # if you'd like to analyze how distance of current input pattern to each of stored memories changes over the epochs
    # you can uncomment the second part of the return statement below!
    return self.__find_stored_mem(v_curr, criterion) #, distances_to_mems

  # takes in current input to network 'v_curr' and updates the 'self.theta' based on similarity between 'v_curr' and 
  # each stored memory
  def __weight_update(self, v_curr):
    theta_new = np.zeros_like(self.theta)
    weights_transpose = np.transpose(self.weights)
    for j in range(len(self.weights)):
      v = self.weights[j]
      # using a cosine similarity metric between 'v_curr' and the stored memory 'v' -- it ranges from 0 to 1
      similarity_rating = ( np.dot(v, v_curr) / (np.linalg.norm(v) * np.linalg.norm(v_curr)) )
      # update rule for theta_j, the slope parameter for the jth stored memory 
      theta_new[j] = (self.theta[j] + (similarity_rating) * epsilon) - decay * np.absolute(self.theta[j])
    self.theta = theta_new
  
  # this helper method records (Euclidean) distances of each of the stored memory vectors in the network to 'v_curr'
  def __record_distances_to_mems(self, v_curr):
    distances = np.zeros(self.weights.shape[0])
    for i in range(len(self.weights)):
      distances[i] = np.sqrt( np.sum( (self.weights[i] - v_curr)**2 ) )
    return distances
  
  # this helper function is used at the end of the 'forward' method. it takes in the vector 'v_curr' and a threshold
  # value 'criterion' and returns closest stored memory vector that is within a Euclidean distance of 'criterion' from 'v_curr',
  # if such a stored memory exists; else, it returns 'v_curr'
  def __find_stored_mem(self, v_curr, criterion):
    distances = self.__record_distances_to_mems(v_curr)
    indices = np.where(distances < criterion)[0]
    if (len(indices) > 0):
      return self.weights[indices[0]]
    else:
      return v_curr

  # return the theta vector of slope parameters - one for each memory
  def get_theta(self):
    return self.theta
  
  # set time parameter of network to 'time_param'
  def set_time_param(self, time_param):
    self.time_param = time_param 
  
  # returns time parameter
  def get_time_param(self):
    return self.time_param

  # returns weights matrix, i.e. with the memory vectors of the network as its rows
  def get_weights_matrix(self):
    return self.weights