# **Libraries and important arrays (priors, quadrants and images)**

In [2]:
import numpy as np
import math
import matplotlib.pyplot as plt
import pandas as pd
import random
from google.colab import files

In [3]:
priors = [[ 450, 525, 600, 675, 750],[ 750, 825, 900, 975, 1050], [ 900, 975, 1050, 1125, 1200], [ 450, 525, 600, 675, 750, 825, 900, 975, 1050, 1125, 1200]]
quadrants = ["TL", "TR", "BL", "BR"]


In [4]:
images = []
for i in range(400):
  images.append(i)

# **Memory Functions**

In [5]:
class Chunk(object):

    def __init__(self, name, slots):
        self.name = name
        self.slots = slots
        self.encounters = []
        self.fan = 0 # How many other chunks refer to this chunk?


    def add_encounter(self, time):
        """
        Add an encounter of this chunk at the specified time.
        """
        if time not in self.encounters:
            self.encounters.append(time)


    def __str__(self):
        return "Chunk " + str(self.name) + "\n" \
        "Slots: " + str(self.slots) + "\n" \
        "Encounters: " + str(self.encounters) + "\n" \
        "Fan: " + str(self.fan) + "\n"

In [6]:

import math
import random
import numpy as np
import random
import matplotlib.pyplot as plt
import pandas as pd
class Model(object):

    # Model parameters

    ga = 1.0 # spreading activation from the goal (:ga; default: 1.0)
    mas = 2.0 # maxmimum spreading (:mas; default: 2.0)

    d = 0.5 # decay (:bll; default: 0.5)
    s = 0.2 # scale of activation noise (:ans; default: 0)

    lf = 0.1 # latency factor (:lf; default: 0.1)
    le = 1.0 # latency exponent (:le; default: 1.0)

    rt = -1.0 # retrieval threshold (:rt; default: 0.0)

    mp = 3.0 # mismatch penalty (:mp)

    def __init__(self):
        self.time = 0
        self.goal = None
        self.dm = []


    def get_chunk(self, name):
        """
        Find the Chunk given its name
        """
        chunk_idx = [i for i, j in enumerate(self.dm) if j.name == name]
        if len(chunk_idx) == 0:
            return None
        else:
            return self.dm[chunk_idx[0]]


    def add_encounter(self, chunk):
        """
        Add an encounter of a specified chunk at the current time.
        If the chunk does not exist yet, create it first.
        """

        update_fan = False

        # If a chunk by this name does not yet exist, add it to DM
        if chunk.name not in [chunk.name for chunk in self.dm]:
            self.dm.append(chunk)
            update_fan = True

        # If a chunk by this name does exist, ensure that it has the same slots and slot values
        chunk_idx = [i for i, j in enumerate(self.dm) if j.name == chunk.name][0]
        if self.dm[chunk_idx].slots != chunk.slots:
            raise ValueError("Trying to add an encounter to a chunk with the same name (%s) but different slots and/or slot values" % chunk.name)

        # Add an encounter at the current time
        self.dm[chunk_idx].add_encounter(self.time)

        slot_vals = chunk.slots.values()

        # Add slot values as singleton chunks
        for v in slot_vals:
            if type(v) == str and v not in [ch.name for ch in self.dm]:  # NT: we want some contraints on the adding of chunks
                s = Chunk(name = v, slots = {})
                self.add_encounter(s)

        # Increment the fan of all chunks that this chunk references in its slots
        if update_fan:
            refs = [i for i, ref in enumerate(self.dm) if ref.name in slot_vals]
            for ref in refs:
                self.dm[ref].fan += 1


    def get_activation_no_noise(self, chunk):
        """
        Get the activation of the specified chunk at the current time, but without noise
        """
        # The specified chunk should exist in DM
        if chunk not in self.dm:
            raise ValueError("The specified chunk (%s) does not exist in DM" % str(chunk.name))

        # There should be at least one past encounter of the chunk
        if self.time <= min(chunk.encounters):
            raise ValueError("Chunk %s not encountered at or before time %s" % (str(chunk.name), str(self.time)))

        baselevel_activation = math.log(sum([(self.time - encounter) ** -self.d for encounter in chunk.encounters if encounter < self.time]))

        spreading_activation = self.get_spreading_activation_from_goal(chunk)

        return baselevel_activation + spreading_activation


    def get_activation(self, chunk):
        """
        Get the activation of the specified chunk at the current time.
        """
        return self.get_activation_no_noise(chunk) + self.noise(self.s)


    def get_latency(self, chunk):
        """
        Get the retrieval latency of the specified chunk at the current time.
        """
        activation = self.get_activation(chunk)
        return self.lf * math.exp(-self.le * activation)


    def noise(self, s):
        """
        Generate activation noise by drawing a value from a logistic distribution with mean 0 and scale s.
        """
        rand = random.uniform(0.001,0.999)
        return s * math.log((1 - rand)/rand)


    def get_spreading_activation_from_goal(self, chunk):
        """
        Calculate the amount of spreading activation from the goal buffer to the specified chunk.
        """

        if self.goal is None:
            return 0

        if type(self.goal) is Chunk:
            spreading = 0.0
            total_slots = 0
            for value in self.goal.slots.values():
                total_slots += 1
                ch1 = self.get_chunk(value)
                if ch1 != None and value in chunk.slots.values() and ch1.fan > 0:
                    spreading += max(0, self.mas - math.log(ch1.fan))

        if total_slots == 0:
            return 0

        return spreading * (self.ga / total_slots)


    def match(self, chunk1, pattern):
        """
        Does chunk1 match pattern in chunk pattern?
        """
        for slot, value in pattern.slots.items():
            if not(slot in chunk1.slots and chunk1.slots[slot] == value):
                return False
        return True


    def retrieve(self, chunk):
        """
        Retrieve the chunk with the highest activation that matches the request in chunk
        Returns the chunk (or None) and the retrieval latency
        """
        bestMatch = None
        bestActivation = self.rt
        for ch in self.dm:
            act = self.get_activation(ch)
            if self.match(ch, chunk) and act > bestActivation:
                bestMatch = ch
                bestActivation = act
        if bestMatch == None:
            latency = self.lf * math.exp(-self.le * self.rt)
        else:
            latency = self.lf * math.exp(-self.le * bestActivation) # calculate it here to avoid a new noise draw
        return bestMatch, latency

    def mismatch(self, value1, value2):
        """
        Calculate the mismatch between two slot values. If the two values are the same, the mismatch is 0.
        Otherwise, use the square root of the distance between the numbers as mismatch value
        """
        if value1 == value2:
            return 0.0
        if type(value1) == str or type(value2) == str:
            return None
        return -math.sqrt(abs(float(value1) - float(value2)))/5

    def partial_match(self, chunk, pattern):
        """
        Retrieve a chunk using partial matching.
        """
        penalty = 0
        for slot, value in pattern.slots.items():
            if not(slot in chunk.slots):
                return None
            similarity = self.mismatch(chunk.slots[slot], value)
            if similarity == None:
                return None
            penalty += similarity * self.mp
        return penalty


    def retrieve_partial(self, chunk, trace=False):
        """
        Retrieve a chunk using partial matching. This version only partially matches on numbers, and will
        use a predefined distance function
        """
        bestMatch = None
        bestActivation = self.rt
        for ch in self.dm:
            act = self.get_activation(ch)
            penalty = self.partial_match(ch, chunk)

            if penalty != None and act + penalty > bestActivation:
                bestMatch = ch
                bestActivation = act + penalty
            if trace == True and penalty != None:
                print("Chunk %s has activation %f and penalty %f" % (ch.name, act, penalty))
        if bestMatch == None:
            latency = self.lf * math.exp(-self.le * self.rt)
        else:
            latency = self.lf * math.exp(-self.le * bestActivation) # calculate it here to avoid a new noise draw
        return bestMatch, latency


    def get_retrieval_probability(self, chunk, pattern):
        """
        Returns the probability of retrieving a specific chunk that matches the specified pattern,
        given its activation and the activation of the other matching chunks
        """
        activations = dict([(ch, self.get_activation_no_noise(ch))for ch in self.dm if self.match(ch, pattern)])
        return math.exp(activations[chunk] / self.s)  / sum([math.exp(a / self.s) for a in activations.values()])


    def retrieve_blended_trace(self, pattern, slot):
        """
        Returns a blend of the requested slot value from all chunks in DM that match the specified pattern, weighted by their activation
        """

        latency = self.lf * math.exp(-self.le * self.rt) # Latency is determined by the retrieval threshold

        eligible_chunks = [ch for ch in self.dm if self.match(ch, pattern) and slot in ch.slots and ch.slots[slot]]

        if not eligible_chunks:
            return None, latency

        chunk_probs = dict([(ch, math.exp(self.get_activation_no_noise(ch) / self.s)) for ch in eligible_chunks])
        blended_value = sum([ch.slots[slot] * prob / sum(chunk_probs.values()) for ch, prob in chunk_probs.items()])

        return blended_value, latency


    def __str__(self):
        return "\n=== Model ===\n" \
        "Time: " + str(self.time) + " s \n" \
        "Goal:" + str(self.goal) + "\n" \
        "DM:" + "\n".join([str(c) for c in self.dm]) + "\n"

# **Model**

## Conversion to pulses/time functions

In [7]:
def noise(s):
  rand = random.uniform(0.001,0.999)
  return s * math.log((1 - rand)/rand)

def pulses_to_time(pulses, t_0 = 0.011, a = 1.2, b = 0.01, add_noise =True):
  time = 0
  pulse_duration = t_0
  while pulses > 0:
    time = time + pulse_duration
    pulses = pulses - 1
    pulse_duration = a * pulse_duration + add_noise * noise(b * a *  pulse_duration)
  return time

def time_to_pulses(time, t_0 = 0.011, a = 1.2, b = 0.01, add_noise =True):
  pulses = 0
  pulse_duration = t_0
  while time >= pulse_duration:
    time = time - pulse_duration
    pulses = pulses + 1
    pulse_duration = a * pulse_duration + add_noise * noise(b * a *pulse_duration)
  return pulses

## Function to randomize the quadrants that correspond to each prior

In [8]:
def random_assign():
  quad = []
  quad = quadrants.copy()
  prior_quadrants = []
  for i in range(len(quad)):
    random_quadrant = random.choice(quad)
    prior_quadrants.append([random_quadrant, priors[i], "p"+str(i+1)])
    quad.remove(random_quadrant)
  return prior_quadrants



## Final Model

In [22]:

def model():

  final_dataset = []

  # Number of participants
  n = 35

  # Iterate over each participant
  for j in range(n):

    # Initialize the model
    m = Model()

    #Assign different quadrants to prior for each participant
    quadrants_priors = random_assign()

    # Create the list that is going to contain the reproduction errors of the participants
    error_list = []

    # Iterate over the blocks (1 practice, 12 analysis)
    for k in range(13):

      # By default, all of blocks will not be practice
      practice = "no"

      # For odd numbered analysis blocks, the stimuli are random images from the image list.
      # The estimated time will be NA as there is no previous block with the same images.
      if (k) % 2 != 0 or k == 0:
        stimuli = random.sample(range(1, 401), 48)
        estimated_time = ["NA"]*48

      # For even numbered blocks, the same stimuli are shown but in a randomizes order.
      # The priors of the respective images from previous block are recorded and appended to the estimated_time list.
      else:
        previous_stimuli = stimuli
        estimated_time = []
        random.shuffle(stimuli)
        for element in range(len(previous_stimuli)):
          for item in range(len(stimuli)):
            if previous_stimuli[element] == stimuli[item]:
              estimated_time.append(shuffled_quadrants[item][0])

      # The list of quadrants and priors that are going to be shown to each participant in each block is created in advanced.
      # The array shuffled_quadrants, contains 48 randomly chosen quadrants, the respective prior intervals and the prior block name.
      shuffled_quadrants = []
      shuffled_quadrants.append(quadrants_priors * 12)
      (random.shuffle(shuffled_quadrants[0]))
      shuffled_quadrants = shuffled_quadrants[0]

      quadrants_array = []
      priors_array = []
      block_array = []

      # Separate the quadrants, prior list and prior block names.
      for p in range(48):

        quadrants_array.append(shuffled_quadrants[p][0])
        priors_array.append(random.choice(shuffled_quadrants[p][1]) / 1000)
        block_array.append(shuffled_quadrants[p][2])

      # Iterate over every trial
      for i in range(48):

        # Model time is advanced 300ms until the thicker frame and white background appears in one of the quadrants.
        m.time += 0.3

        # The model is advanced 375ms until an image appears in the selected quadrant
        m.time += 0.375

        # The image appears and lasts the corresponding sample interval of time
        random_quadrant = quadrants_array[i]

        random_prior = priors_array[i]


        # If the block is a practice check, the image is a circle (0)
        if k == 0 and i <= 15:
          stimulus = 0

        # Else, the image is chosen from the randomly pickes images
        stimulus = stimuli[i]

        prior_block = block_array[i]

        # The model advances the sample interval time and converts it to pulses. An encounter is added to memory
        prior_pulses = time_to_pulses(random_prior)
        m.time += random_prior
        c2 = Chunk(name = "set " + (random_quadrant) + str(prior_pulses) + str(stimulus), slots = {"type": "pulses_set", "pulse": prior_pulses})
        m.add_encounter(c2)

        # The model time is advanced 5ms to avoid infinite activation
        m.time += 0.05

        # The model performs a blended retrieval of the sample interval and converts it to time
        blend_pattern = Chunk(name = "blended-test", slots = {"type": "pulses_set"})
        retrieved_interval = pulses_to_time(m.retrieve_blended_trace(blend_pattern, "pulse")[0])

        # The model is then advanced the reproduced interval as well as the time of recall (latency)
        m.time += (retrieved_interval)
        m.time += m.retrieve_blended_trace(blend_pattern, "pulse")[1]

        # The retrieved pulse interval is also added to memory
        c3 = Chunk(name = "set_ret " + (random_quadrant) + str(prior_pulses) + str(stimulus) + str(retrieved_interval), slots = {"type": "pulses_ret", "pulse": time_to_pulses(retrieved_interval)})
        m.add_encounter(c3)

        # The model is advanced 250ms until the cycle begins again.
        m.time += 0.25

        # The following list of ifs statements check whether previous intervals exist in order to record them, up to 7 intervals away.
        # If they don't, then a NA will be recorded instead
        if i > 0:
          ts1 = priors_array[i-1]
        else:
          ts1 = "NA"
        if i > 1:
          ts2 = priors_array[i-2]
        else:
          ts2 = "NA"
        if i > 2:
          ts3 = priors_array[i-3]
        else:
          ts3 = "NA"
        if i > 3:
          ts4 = priors_array[i-4]
        else:
          ts4 = "NA"
        if i > 4:
          ts5 = priors_array[i-5]
        else:
          ts5 = "NA"
        if i > 5:
          ts6 = priors_array[i-6]
        else:
          ts6 = "NA"
        if i > 6:
          ts7 = priors_array[i-7]
        else:
          ts7 = "NA"

        # The previous quadrant is checked to see whether it is the same or differetn
        if (quadrants_array[i] == quadrants_array[i-1]) and i > 0:
          rep = "rep"
        else:
          rep = "swi"

        # The calcluation of outliers is done at this step. The reproduction errors of a participant are recorded.
        # Using the fomula for the z-score, the current reproduction error is either deemed an outlier or not.
        median = np.median(error_list)
        mad = np.median(np.absolute(error_list - median))
        outlier = ((round(round(retrieved_interval,2) - random_prior,2)) - median) / mad
        if outlier > 2.5 or outlier < -2.5:
          is_outlier = "TRUE"
        else:
          is_outlier = "FALSE"

        # If the block number is 0 and the trial is under 16, the block is set to be practice block
        if k == 0 and i <= 16:
          practice = "yes"

        # The current reproduction error is appended to the list, for future calculation of outliers.
        error_list.append(round(round(retrieved_interval,2) - random_prior,2))

        # Finally, all the data is collected and appended to an array, which will be what the model function returns.
        final_dataset.append([len(final_dataset)+1,"S00"+str(j), k, practice, random_quadrant, str(stimulus), prior_block, random_prior, round(retrieved_interval,2) - random_prior, j, (j+1)*(k+1)*(i+1), round(retrieved_interval,2), is_outlier,ts1,ts2,ts3,ts4,ts5,ts6,ts7,rep,estimated_time[i]])

        # If the practice block finishes, the corresponding block loop is over, so the analysis trials can start

        if i == 47:
          m.time += random.randint(5, 300)

        if k == 0 and i == 15:
          break


  return final_dataset

In [18]:
final_model = model()

  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
  outlier = ((round(round(retrieved_interval,2) - random_prior,2)) - median) / mad
  outlier = ((round(round(retrieved_interval,2) - random_prior,2)) - median) / mad


## Convert and download the CSV file

In [19]:
df = pd.DataFrame(final_model)
headerList = ["","student_nr","Block_nr","practice","quadrant","stimulus","prior","ts","repr_error","sub_id","trial_index","tr","outlier","ts_1","ts_2","ts_3","ts_4","ts_5","ts_6","ts_7","qrep","ts_stim"]
df.to_csv("final_model.csv", header=headerList, index=False)

dat = pd.read_csv("final_model.csv")

In [None]:
files.download("final_model.csv")