In [1]:
import pandas as pd
import networkx as nx
import logging
from random import shuffle
import pandas as pd
from matplotlib import pyplot as plt
import csrgraph as cg
import numpy as np
import random
import string
from itertools import combinations
import pickle
import gc
import os
import json
from transformers import GPT2LMHeadModel, GPT2Tokenizer

#os.environ['WANDB_MODE'] = 'disabled'

import matplotlib.pyplot as plt
import networkx as nx
import csrgraph as cg
import numpy as np
import random
import string
from wonderwords import RandomWord

  backends.update(_get_backends("networkx.backends"))
  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# mode paths
MODEL_OUTPUT_DIR = 'all_models' 

model_paths = {
    "generalist_v2": os.path.join(os.getcwd(), MODEL_OUTPUT_DIR, 'generalist_v2' ), # trained on 2-letter nodes, EAST WEST NORTH SOUTH, shortest/foraging (l50)
    "generalist_v1": os.path.join(os.getcwd(), MODEL_OUTPUT_DIR, 'generalist_v1' ), # trained on random nouns, includes directions, no repeats
    "foraging_v1": os.path.join(os.getcwd(), MODEL_OUTPUT_DIR, 'foraging_v1' ), # trained on 2-letter nodes, foraging (l50, no MODE), U D L R
    "ellie": os.path.join(os.getcwd(), MODEL_OUTPUT_DIR, 'ellie' ), # 2-letter nodes, foraging (l50, no MODE), EAST WEST NORTH SOUTH
}

In [3]:
class GPT:

    def __init__(self, base_model=None, base_model_name='gpt2', vocab_size=100):
        self.base_model = base_model
        self.base_model_name = base_model_name
        self.vocab_size = vocab_size

        if self.base_model is not None:
            self.tokenizer = GPT2Tokenizer.from_pretrained(base_model)
            self.model = GPT2LMHeadModel.from_pretrained(base_model)
            # This is important for open-ended generation
            self.tokenizer.pad_token = self.tokenizer.eos_token
            self.model.config.pad_token_id = self.model.config.eos_token_id

    def continue_input(self, input_sequence, max_new_tokens=5, num_return_sequences=1, no_repeat_ngram_size=0,
                       do_sample=False, temperature=0.7, num_beams=1):
        
        # 1. Tokenize the input to get both input_ids and an attention_mask
        inputs = self.tokenizer(input_sequence, return_tensors='pt')
        input_ids = inputs.input_ids
        attention_mask = inputs.attention_mask

        # 2. Prepare generation arguments
        generation_kwargs = {
            "max_new_tokens": max_new_tokens,
            "num_return_sequences": num_return_sequences,
            "num_beams": num_beams,
            "no_repeat_ngram_size": no_repeat_ngram_size,
            "do_sample": do_sample,
            "attention_mask": attention_mask,
            "pad_token_id": self.tokenizer.eos_token_id
        }

        # 3. Only add temperature if we are doing sampling
        if do_sample:
            generation_kwargs['temperature'] = temperature

        # Generate text using keyword arguments
        output = self.model.generate(input_ids, **generation_kwargs)

        # Decode the output
        sequence = output[0].tolist()
        text = self.tokenizer.decode(sequence)
        return text

**LOOP COMPLETION TASK**

In [4]:
def load_pkl(pth):
    with open(pth, 'rb') as f:
        d = pickle.load(f)
    return d

def is_valid_path(sequence, graphs):
    # Split the sequence into parts
    parts = sequence.split()

    # Extract nodes and edges; nodes are at even indices, edges at odd indices
    nodes = parts[::2]
    edges = parts[1::2]

    # Convert edges to a lowercase version for comparison (assuming all edges in graphs are lowercase)
    edges = [edge.lower() for edge in edges]

    # Iterate over each graph to check if the path exists
    for graph in graphs:
        path_exists = True
        for i in range(len(nodes) - 1):
            # Check if the current graph has the edge between the current node and the next node
            if not graph.has_edge(nodes[i], nodes[i+1]):
                path_exists = False
                break

        # If path exists in the current graph, return True
        if path_exists:
            return True

    # If none of the graphs contain the path, return False
    return False

In [5]:
# For models trained on two-letter nodes, generate a random 2-letter name
def generate_name() -> str:
    """Generate a random 2-letter name."""
    return ''.join(random.choices(string.ascii_lowercase, k=2))

# For models trained on nouns, generate a random noun
def generate_noun():
    """Generates a single random noun."""
    r = RandomWord()
    word = r.word(include_parts_of_speech=["nouns"])
    return word.replace(" ", "_") if word else None

def test_loop(model, loop_templates):
    accuracy_scores = []  # Store accuracy scores for each template
    results_dict = {}

    for template in loop_templates:
        template_accuracy = []  # Store accuracy for each iteration of the current template

        for _ in range(100):  # Repeat for 10 versions of each template
            # Fill the template with random nouns or 2-letter names
            names = [generate_name() for _ in range(template.count("{}") - 1)]
            names += [names[0]]
            filled_template = template.format(*names)
            #print(filled_template)

            # The true final item is the last name generated
            true_final_item = names[-1]
            input_len = len(filled_template.split())

            # Use the model to predict/continue the input based on the filled template
            # Adjust the prompt as needed for your specific model and task
            prediction = model.continue_input(filled_template[0:-3],
                                              max_new_tokens=5,
                                              do_sample=False)
            #print(prediction)
            # Assuming the prediction is a string, extract the last word/item
            predicted_items = prediction.strip().split()[0:input_len]
            predicted_final_item = predicted_items[-1] if predicted_items else None
            #print(f"True final:{true_final_item}, predicted final: {predicted_final_item}")

            # Calculate accuracy for this iteration
            is_correct = int(predicted_final_item == true_final_item)
            #print(is_correct)
            template_accuracy.append(is_correct)

        # Calculate average accuracy for this template
        accuracy_scores.extend(template_accuracy)
        results_dict[template] = sum(template_accuracy) / len(template_accuracy)

    # Calculate and return the overall average accuracy
    overall_avg_accuracy = sum(accuracy_scores) / len(accuracy_scores)
    return overall_avg_accuracy, results_dict


In [9]:
loop_templates = ["{} R {} L {}",
                  "{} L {} R {}",
                  "{} U {} D {}",
                  "{} D {} U {}",
                  "{} R {} D {} L {} U {}",
                  "{} D {} L {} U {} R {}",
                  "{} L {} U {} R {} D {}",
                  "{} U {} R {} D {} L {}",
                  "{} R {} R {} U {} L {} L {} D {}",
                  "{} U {} U {} L {} D {} D {} R {}"]

loop_templates = ["{} EAST {} WEST {}",
                  "{} WEST {} EAST {}",
                  "{} NORTH {} SOUTH {}",
                  "{} SOUTH {} NORTH {}",
                  "{} EAST {} SOUTH {} WEST {} NORTH {}",
                  "{} SOUTH {} WEST {} NORTH {} EAST {}",
                  "{} WEST {} NORTH {} EAST {} SOUTH {}",
                  "{} NORTH {} EAST {} SOUTH {} WEST {}",
                  "{} EAST {} EAST {} NORTH {} WEST {} WEST {} SOUTH {}",
                  "{} NORTH {} NORTH {} WEST {} SOUTH {} SOUTH {} EAST {}"]

In [10]:
model = GPT(base_model=model_paths["ellie"],)
average_accuracy, spatial_results_dict = test_loop(model, loop_templates)
print(f"Average Accuracy: {average_accuracy}")

Average Accuracy: 0.803


In [8]:
model = GPT(base_model=model_paths["foraging_v1"],)
average_accuracy, spatial_results_dict = test_loop(model, loop_templates)
print(f"Average Accuracy: {average_accuracy}")

Average Accuracy: 0.757
