#### Spatial model

Consolidation can be thought of as training a neocortical generative / predictive model on replayed hippocampal memories. For sequential data, the generative model could correspond to an autoregressive sequence model like GPT-2 that learns to predict the next item in the sequence (by minimising the prediction error on sequences from the training data). An environment can be represented as a grid in which each location (i.e. square) is labelled by a random noun. Routes in the environment can then be represented as sequences of form 'apple EAST pancake NORTH material EAST chair', which makes it straightforward to train GPT-2.

More specifically, we can represent the shortest path between two locations as 'FROM: apple, TO: chair, PATH: apple EAST pancake NORTH material EAST chair'. This enables us to test the ability to infer the shortest path based on a few examples of a new environment.

This notebook simulates the task as follows:
* Pre-train model so that it learns how to find the shortest path between two locations in general
* Train on a few paths from a new environment, representing the consolidation of spatial sequences encoded in the hippocampus
* Compare performance before / after consolidation


#### Installation / imports:

In [None]:
! pip install git+https://github.com/huggingface/transformers --upgrade
! pip install accelerate evaluate wonderwords simpletransformers --upgrade
! pip install huggingface_hub --upgrade

In [None]:
import random
import pandas as pd
import networkx as nx
import logging
from random import shuffle
import pandas as pd
from matplotlib import pyplot as plt
import numpy as np
import random
import string
import os
import re
import glob
import torch
from wonderwords import RandomWord
import os
import gc
import pickle
from sklearn.linear_model import LinearRegression
from scipy.stats import pearsonr
from itertools import permutations
import logging
from random import shuffle
from matplotlib import pyplot as plt
from transformers import GPT2LMHeadModel, GPT2Tokenizer
import math


os.environ["WANDB_DISABLED"] = "true"

Define a class for loading a model from a directory, and generating outputs given some input:

In [None]:
class GPT:

    def __init__(self, base_model):
        self.tokenizer = GPT2Tokenizer.from_pretrained(base_model)
        self.model = GPT2LMHeadModel.from_pretrained(base_model)

    def continue_input(self, input_sequence, max_length=200, num_return_sequences=1, no_repeat_ngram_size=0,
                       do_sample=False, temperature=0.7, num_beams=1):
        
        input_ids = self.tokenizer.encode(input_sequence, return_tensors='pt')

        # Generate text
        output = self.model.generate(
            input_ids,
            max_length=max_length,
            num_return_sequences=num_return_sequences,
            num_beams=num_beams,
            no_repeat_ngram_size=no_repeat_ngram_size,
            do_sample=do_sample,
            temperature=temperature,
        )

        # Decode the output
        sequence = output[0].tolist()
        text = self.tokenizer.decode(sequence)
        return text

#### Pre-train model on arbitrary stimuli to learn rules of task

The get_random_stimuli() function generates a random set of nouns and adjectives (3 for each by default). The stimuli are all possible combinations, e.g. for the adjectives ABC and nouns DEF, the stimuli are AD, AE, AF, BD, etc. The get_stimuli() function is the equivalent but for Oliver's task stimuli. 

The get_reward() function predicts reward points for a sequence of stimuli. Given a list of stimuli in random order, the stimulus at which the sequence starts, the adjective at which the sequence ends, and the noun that gives 2 points of reward, the function returns a list of stimuli and their rewards, e.g. ['small chair (2)', 'angry chair (2)', 'metal spoon (-1)'].

In [None]:
r = RandomWord()
nouns = [r.word(include_parts_of_speech=["nouns"]).replace(" ", "_") for _ in range(9)]

def create_unique_random_grid(nouns, size=3):
    """Creates a size x size grid with unique random nouns."""
    random_nouns = random.sample(nouns, size * size)
    return [random_nouns[i * size:(i + 1) * size] for i in range(size)]

def find_shortest_paths(grid, start_name, end_name):
    """Finds all shortest paths from start_name to end_name in a grid. """
    # Find coordinates of start and end points
    start = end = None
    for i, row in enumerate(grid):
        for j, name in enumerate(row):
            if name == start_name:
                start = (i, j)
            if name == end_name:
                end = (i, j)
    
    # Check if start or end points were not found
    if start is None or end is None:
        print ("start or end not found")
        return []

    paths = []
    start_x, start_y = start
    end_x, end_y = end

    # Total horizontal and vertical distances
    x_dist = end_x - start_x
    y_dist = end_y - start_y

    # Generate a list of directions taken in the shortest path
    # We know that the shortest route is x_dist EAST or WESTs, and y_dist NORTH or SOUTHs
    hor_moves = ['EAST' if x_dist > 0 else 'WEST'] * abs(x_dist)
    ver_moves = ['SOUTH' if y_dist > 0 else 'NORTH'] * abs(y_dist)
    all_moves = hor_moves + ver_moves

    # We have a list, e.g. [NORTH, NORTH, EAST, EAST] and we want to find all possible orderings
    # Each ordering (i.e. permutation) is a possible shortest path from start_name to end_name
    for path in set(permutations(all_moves, len(all_moves))):
        sequence = [f'FROM: {start_name}, TO: {end_name}, PATH: {start_name}']
        x, y = start
        for direction in path:
            if direction == 'EAST' and x < 2:
                x += 1
            elif direction == 'WEST' and x > 0:
                x -= 1
            elif direction == 'SOUTH' and y < 2:
                y += 1
            elif direction == 'NORTH' and y > 0:
                y -= 1
            else:
                # Invalid move, skip this path
                break
            sequence.append(f"{direction} {grid[x][y]}")

            # add the path when it successfully reaches the end point
            if (x, y) == end:
                paths.append(' '.join(sequence))

    return paths
  
# example usage
grid = create_unique_random_grid(nouns)
paths = find_shortest_paths(grid, grid[0][0], grid[2][2])

# print the grid and the paths to see the output
print("Grid:", grid)
print("Shortest Paths:", paths)


In [None]:
def shuffle_stimuli(stimuli):
    random.shuffle(stimuli)
    return stimuli

def get_all_paths_for_grid(grid):
    all_paths = []
    items = [item for sublist in grid for item in sublist]
    for start in items:
        for end in items:
            if start != end:
                all_paths.extend(find_shortest_paths(grid, start, end))
    return shuffle_stimuli(all_paths)

grid = create_unique_random_grid(nouns)
len(get_all_paths_for_grid(grid))

We now generate training and test data for the pre-training.

In [None]:
training_strs = []
for i in range(2000):
    nouns = [r.word(include_parts_of_speech=["nouns"]).replace(" ", "_") for _ in range(9)]
    grid = create_unique_random_grid(nouns)
    training_strs.extend(get_all_paths_for_grid(grid))

testing_strs = []
for i in range(10):
    nouns = [r.word(include_parts_of_speech=["nouns"]).replace(" ", "_") for _ in range(9)]
    grid = create_unique_random_grid(nouns)
    testing_strs.extend(get_all_paths_for_grid(grid))

print(f"{len(training_strs)} shortest paths on arbitrary grids generated for pre-training.")

The function below runs a script to fine-tune a gpt-2 model on the arbitrary stimuli.

The name_or_path argument is which model to fine-tune from. In the pre-training stage, this will be set to 'gpt2'.

In [None]:
def train_model_script(name_or_path='spatial_model', 
                       num_epochs=3,
                       output_dir='./clm_script',
                       save_steps=100,
                       lr=5e-05 ):
    torch.cuda.empty_cache()
    gc.collect()
    ! python ./run_clm.py \
        --model_name_or_path {name_or_path} \
        --train_file {os.path.join(output_dir, 'train.txt')} \
        --validation_file {os.path.join(output_dir, 'train.txt')} \
        --per_device_train_batch_size 1 \
        --per_device_eval_batch_size 1 \
        --do_train \
        --do_eval \
        --output_dir {output_dir} \
        --overwrite_output_dir \
        --num_train_epochs {num_epochs} \
        --save_strategy 'steps' \
        --save_steps {save_steps} \
        --learning_rate {lr}       


Shuffle the data, write it to train.txt and test.txt files, and train gpt2:

In [None]:
!rm -rf spatial_model
!mkdir spatial_model

text_file = open("spatial_model/train.txt", "w")
n = text_file.write('\n'.join(training_strs))
text_file.close()

text_file = open("spatial_model/test.txt", "w")
n = text_file.write('\n'.join(testing_strs))
text_file.close()

train_model_script(name_or_path='gpt2', output_dir='spatial_model', num_epochs=5, save_steps=2000)

In [None]:
model = GPT(base_model='spatial_model')

Can the model generalise the rules to new stimuli?

In [None]:
out = model.continue_input("FROM: table, TO: box, PATH: table SOUTH box\nFROM: box, TO: chair, PATH: box EAST chair\nFROM: table, TO: chair, PATH:", 
                           do_sample=False)
print(out)

#### Simulate the task

The simulate_task() function runs one trial training on a subset of shortest paths from a new environment.

In [None]:
def simulate_task(seed=0, num_new=1000, num_orig=1000, num_epochs=3):
    random.seed(seed)
    training_strs = []
    
    grid = create_unique_random_grid(nouns)
    print(grid)
    training_strs =  get_all_paths_for_grid(grid)

    train_set = training_strs[0:120]
    test_set = training_strs[120:]
    print(test_set)
    
    # oversampling trick to avoid overfitting to sequence order
    train_set = np.random.choice(train_set, num_new).tolist()

    output_dir = f'clm_script_{seed}'
    ! rm -rf {output_dir}
    ! mkdir {output_dir}

    text_file = open(os.path.join(output_dir, 'train.txt'), "w")
    n = text_file.write('\n'.join(train_set))
    text_file.close()

    text_file = open(os.path.join(output_dir, 'test.txt'), "w")
    n = text_file.write('\n'.join(test_set))
    text_file.close()

    train_model_script(name_or_path='spatial_model', 
                       num_epochs=num_epochs, 
                       output_dir=output_dir,
                       save_steps=100)

In [None]:
simulate_task(seed=0, num_new=2000, num_epochs=10)

In [None]:
model = GPT(base_model='clm_script_0')

In [None]:
out = model.continue_input("FROM:", 
                           do_sample=False)
print(out)