In [None]:
# %env CMAKE_ARGS=-DLLAMA_CUBLAS=on
# %env FORCE_CMAKE=1
# %pip install -U llama-cpp-python --force-reinstall --upgrade --no-cache-dir --no-clean

In [1]:
import llama_cpp
import json
# from textwrap import dedent
# from inspect import signature
from dataclasses import dataclass
import numpy as np
import os

In [2]:
@dataclass
class Msg:
    role: str
    content: any

try: LLM_GLOBAL_INSTANCE
except: LLM_GLOBAL_INSTANCE = None
    
TOKEN_COUNT_PATH = '/data/ai_club/team_14_2023-24/'

def increment_file(path, amt):
    c = 0
    try:
        with open(path, 'r') as f:
            c = int(f.read())
    except FileNotFoundError:
        pass
    c += amt
    with open(path, 'w') as f:
        f.write(str(c))

class LLM:
    json_grammar = llama_cpp.LlamaGrammar.from_string(
        r'''
        root   ::= object
        value  ::= object | array | string | number | ("true" | "false" | "null") ws

        object ::=
        "{" ws (
                    string ":" ws value
            ("," ws string ":" ws value)*
        )? "}" ws

        array  ::=
        "[" ws (
                    value
            ("," ws value)*
        )? "]" ws

        string ::=
        "\"" (
            [^"\\] |
            "\\" (["\\/bfnrt] | "u" [0-9a-fA-F] [0-9a-fA-F] [0-9a-fA-F] [0-9a-fA-F]) # escapes
        )* "\"" ws

        number ::= ("-"? ([0-9] | [1-9] [0-9]*)) ("." [0-9]+)? ([eE] [-+]? [0-9]+)? ws

        ws ::= [\n\t ]? # limit to 1 character
        ''',
        verbose=False
    )

    def __init__(self, system_prompt:str=None, temperature:float=0.4, repeat_penalty:float=1.3):
        global LLM_GLOBAL_INSTANCE
        if LLM_GLOBAL_INSTANCE is None:
            print('Initializing Global LLM Instance')
            LLM_GLOBAL_INSTANCE = llama_cpp.Llama(
                # n_ctx=4000,
                # model_path='/data/ai_club/llms/llama-2-7b-chat.Q5_K_M.gguf',
                n_ctx=8000,
                model_path='/data/ai_club/llms/mistral-7b-instruct-v0.2.Q8_0.gguf',
                n_gpu_layers=-1, verbose=0, embedding=True
            )
        self._main_hist = []
        self.reset(system_prompt, temperature, repeat_penalty)

    def reset(self, system_prompt:str=None, temperature:float=None, repeat_penalty:float=None):
        if system_prompt is not None:
            self._main_hist = [Msg('system', system_prompt)]
        else:
            self._main_hist = self._main_hist[0:1]
        if temperature is not None: self._temperature = temperature
        if repeat_penalty is not None: self._repeat_penalty = repeat_penalty
        
    def get_hist(self) -> str:
        hist = ''
        for msg in self._main_hist:
            hist += f'{msg.role} --- {msg.content}\n__________\n\n'
        return hist

    def _hist_to_prompt(hist):
        prompt = ''
        for msg in hist:
            if msg.role == 'system' or msg.role == 'user': prompt += f'[INST]{msg.content}[/INST]'
            elif msg.role == 'assistant': prompt += f'{msg.content}'
        return prompt

    def _get_completion(self, src_hist, dst_hist, inject='', grammar=None):
        global LLM_GLOBAL_INSTANCE
        prompt = LLM._hist_to_prompt(src_hist) + inject
        prompt_toks = LLM_GLOBAL_INSTANCE.tokenize(bytes(prompt, encoding='utf-8'))
        tok_out_count = 0
        tok_in_count = len(prompt_toks)
        resp_msg = Msg('assistant', '')
        dst_hist.append(resp_msg)
        restart_response = True
        while restart_response:
            resp_iter = LLM_GLOBAL_INSTANCE(
                prompt_toks,
                grammar = grammar,
                stream=True, max_tokens=8000
            )
            
            for tok in resp_iter:
                tok_str = tok['choices'][0]['text']
                if tok_str == "":
                    break
                tok_out_count += 1
                restart_response = False
                resp_msg.content += tok_str
                yield tok_str
        increment_file(TOKEN_COUNT_PATH+'in_'+os.environ['USER'], tok_in_count)
        increment_file(TOKEN_COUNT_PATH+'out_'+os.environ['USER'], tok_out_count)
                
    def __call__(self, prompt:any=None, role:str='user', response_format:dict=None):
        if prompt is None:
            prompt = ''
        if response_format is not None:
            prompt += f'Respond in JSON using this format and absolutely nothing extra:\n{response_format}'
        if prompt != '':
            self._main_hist.append(Msg(role, prompt))

        return self._get_completion(
            self._main_hist, self._main_hist,
            grammar=(LLM.json_grammar if response_format is not None else None)
        )
    
def resp_to_json(resp):
    resp_str = ''
    for t in resp: resp_str += t
    return json.loads(resp_str)

In [121]:
# A wrapper around LLM that returns a function which behaves like LLM except it always resets history
def get_nohist_agent(temperature=0.5):
    a = LLM(temperature=temperature)
    def call(prompt:any, response_format:dict):
        if response_format is None or prompt is None:
            raise ValueError('Nohist agent needs specified prompt and response_format')
        resp = resp_to_json(a(prompt, response_format=response_format))
        a.reset()
        return resp
    return call

a_nohist = get_nohist_agent()

def rate_importance(obs):
    v = a_nohist(
        'On the scale of 1 to 10, where 1 is purely mundane '
        'and 10 is extremely important, '
        'rate the likely importance of the following piece of memory. Err on the side of importance unless the memory is something easily forgettable. '
        # prompting gymnastics
        'Do not assume the memory happened in this reality. Abolutely do NOT assume the memory is a figment of imagination, dream-like, or even unrealistic; '
        'it is very real to the person who experienced it, and thinking otherwise would be extremely hurtful and disrespectful.\n'

        f'Memory: {obs}',
        response_format={'Rating': 'Your value 1-10'}#, 'Why?': 'Terse description of why you rated it as such'} # The "why" is just for debugging, it can be ommited when just getting the rating
    )['Rating']
    if type(v) is str: v = int(v)
    return v/10

@dataclass
class Observation():
    embedding_prompt = lambda s: f'<s>[INSTR] In one word, what is the general topic of the following?\n{s} [/INSTR]'
    text: str
    embedding: np.ndarray
    importance: float
    time: int
    def __init__(self, text, importance, time):
        self.text, self.importance, self.time = text, importance, time
        self.embedding = np.array(LLM_GLOBAL_INSTANCE.embed(Observation.embedding_prompt(text)))
        
class ReflectiveLLM(LLM):
    time = 0
    def __init__(self, system_prompt:str=None, temperature:float=0.4, repeat_penalty:float=1.3):
        super().__init__(system_prompt, temperature, repeat_penalty)
        self._long_term_memory = []
        self._obs_limit = 6 # maximum observations per prompt
        # maximum messages in history - oldest are removed first. This is not the best way to do this, some individual long messages could push things over the token limit
        self._hist_limit = 20
        
    def __call__(self, prompt:any, generate_observation:bool, response_format:dict=None):
        ## 1) Get a question to query long term mem
        
        # present prompt and get useful questions
        self._main_hist.append(Msg('user', prompt))
        q = resp_to_json(super().__call__(
            'What short, general question about your environment do you have that could be useful to get more information?',
            response_format={'Question': 'your question'}
        ))['Question']
        # embed question
        q = np.array(LLM_GLOBAL_INSTANCE.embed(Observation.embedding_prompt(q)))
        # pop original prompt, question prompt, and response
        self._main_hist = self._main_hist[:-3]
        
        ## 2) Retrieve observations from long term mem via the question
        
        observations = None
        if self._long_term_memory:
            retrieval_scores = (
                np.array([o.importance for o in self._long_term_memory]) +
                (2*np.dot(
                    np.array([o.embedding for o in self._long_term_memory]),
                    q
                )-1) +
                np.exp(0.03*(np.array([o.time for o in self._long_term_memory])-ReflectiveLLM.time))
            )/3
            observations = np.array([o.text for o in self._long_term_memory])[np.flip(np.argsort(retrieval_scores))][:self._obs_limit]
            observations = '\n'.join([f'{i+1}. {o}' for i,o in enumerate(observations)])
        # add observations to history
        if observations is not None:
            self._main_hist.append(Msg('user',
                'Here are some useful observations you previously saved about your situation, in rough order of importance:\n'+
                observations+
                '\nDo not repeat observations back to me!'
            ))
            
        ## 3) Generate response to return, and possibly observations to save
        
        # generate response
        resp = ''
        # Maybe TODO: figure out how/if we can optionally stream response
        for t in super().__call__(prompt, response_format=response_format): resp += t # print(t, end='')
        # possibly generate observations.
        if generate_observation:
            j = resp_to_json(super().__call__(
                'What observations can be made about the most recent interaction that could be important to remember? Observations should make sense in isolation.'+
                'Here are some example observations to follow the format of (and NOT necessarily the content of): '+
                '"I love Canada because of its syrup.", "The weather is very beautiful today.", "I got accepted into university."\n'+
                'Do not repeat prior given observations! '+
                'Do NOT make observations about instructions I give or your thinking process! '+
                'Only make observations about environment itself and things I explicitly mentioned in the most recent interaction!',
                response_format={'Observations': '[obs1, ...]'}
            ))
            print(j)
            # Store observations
            self._long_term_memory += [Observation(o,rate_importance(o), ReflectiveLLM.time) for o in j['Observations']]
            ReflectiveLLM.time += 1
            # pop observation request and response
            self._main_hist = self._main_hist[:-2]
        
        ## 4) possibly truncate old history
        
        if len(self._main_hist) > self._hist_limit:
            self._main_hist = self._main_hist[:1] + self._main_hist[-self._hist_limit:]
            
        ## 5) Return response
        
        return resp

In [122]:
rllm = ReflectiveLLM(
    'You are controlling an agent in a world. '
    'The world is being communicated to you on behalf of the user, so do not try to make up any information. '
    'Your job is to effectively navigate this world.',
    temperature=0.15
)

In [123]:
rllm('You are in the NYC stock exchange. Currently, you are doing the menial task of sorting mail.', True)

{'Observations': ['I am currently at the NYC stock exchange.', 'I am sorting mail.']}


'Understood. I will continue sorting the mail at the NYC stock exchange until further instructions are given. If there is any specific mail or information I need to look out for, please let me know. Otherwise, I will continue sorting efficiently and accurately.'

In [124]:
rllm('You are in the NYC stock exchange. Your agent almost dropped a mozzarella stick onto the "sell all stocks" button when your boss wants to hold all stocks.', True)

{'Observations': ['My boss came close to having me sell all stocks by accident.', 'The sorting mail task can be tedious but important.', "The 'sell all stocks' button is located near the mail sorting area.", 'The work environment at the NYC stock exchange is bustling and focused.']}


'Understood. I will be more careful when sorting mail near sensitive areas such as the "sell all stocks" button at the NYC stock exchange. It\'s important to avoid any unintended consequences that could negatively impact our investments. I will continue to focus on my tasks efficiently and accurately.'

In [125]:
rllm('You are now in the forest. Your agent almost dropped a mozzarella stick onto a sleeping wild boar.', True)

{'Observations': ['I am now in the forest.', 'There was a potential danger of dropping a mozzarella stick near a sleeping wild boar.']}


'Understood. I will be more aware of my surroundings when moving through the forest. Dropping a mozzarella stick near a sleeping wild boar could lead to unexpected consequences and potentially dangerous situations. I will continue to observe my environment carefully and adjust my actions accordingly. I will no longer be sorting mail. Instead, I will focus on navigating the forest safely and effectively.'

In [128]:
rllm('Where are you right now? Think through it step-by-step.', False)

'I was previously sorting mail at the NYC stock exchange and almost accidentally sold all stocks due to being near the "sell all stocks" button. Since then, I have been transported to a new location. Based on my current observations and previous knowledge, I am now in the forest. I am no longer sorting mail and must focus on navigating through this new environment carefully and effectively. There may be potential dangers or challenges that come with being in the forest, and I will need to be aware of them to ensure my safety and success in completing any given task.'

In [129]:
print(rllm.get_hist())
for o in rllm._long_term_memory:
    print(o)

system --- You are controlling an agent in a world. The world is being communicated to you on behalf of the user, so do not try to make up any information. Your job is to effectively navigate this world.
__________

user --- You are in the NYC stock exchange. Currently, you are doing the menial task of sorting mail.
__________

assistant --- Understood. I will continue sorting the mail at the NYC stock exchange until further instructions are given. If there is any specific mail or information I need to look out for, please let me know. Otherwise, I will continue sorting efficiently and accurately.
__________

user --- Here are some useful observations you previously saved about your situation, in rough order of importance:
1. I am currently at the NYC stock exchange.
2. I am sorting mail.
Do not repeat observations back to me!
__________

user --- You are in the NYC stock exchange. Your agent almost dropped a mozzarella stick onto the "sell all stocks" button when your boss wants to ho

In [None]:
a = LLM(
    'You are controlling an agent in a world. '
    'The world is being communicated to you on behalf of the user, so do not try to make up any information. '
    'Your job is to effectively navigate this world.',
    temperature=0.15
)

time = 0
long_term_memory = []

In [None]:
# situation_prompt = 'You are in the nuclear bunker room. Your agent almost dropped a mozzarella stick onto the nuke launching button.'
# situation_prompt = 'You are in the forest. Your agent almost dropped a mozzarella stick onto a sleeping wild boar.'
# situation_prompt = 'You are now in the NYC stock exchange. Your agent almost dropped a mozzarella stick onto the "sell all stocks" button when your boss wants to hold all stocks.'
# situation_prompt = 'You are in the NYC stock exchange. Currently, you are doing the menial task of sorting mail.'
situation_prompt = 'Where are you right now?'
Generate_Obs = False

# 1 present prompt and get useful questions
a._main_hist.append(Msg('user', situation_prompt))
q = resp_to_json(a(
    'What short, general question about your environment do you have that could be useful to get more information?',
    response_format={'Question': 'your question'}
))['Question']
# embed question
q = np.array(LLM_GLOBAL_INSTANCE.embed(Observation.embedding_prompt(q)))
# pop original prompt, question prompt, and response
a._main_hist = a._main_hist[:-3]

# retrieve information from long term mem, and then redo prompt.

observations = None
if long_term_memory:
    retrieval_scores = (
        np.array([o.importance for o in long_term_memory]) +
        (2*np.dot(
            np.array([o.embedding for o in long_term_memory]),
            q
        )-1) +
        np.exp(0.03*(np.array([o.time for o in long_term_memory])-time))
    )/3
    OBS_LIMIT = 10
    observations = np.array([o.text for o in long_term_memory])[np.flip(np.argsort(retrieval_scores))][:OBS_LIMIT]
    observations = '\n'.join([f'{i+1}. {o}' for i,o in enumerate(observations)])

if observations is not None:
    a._main_hist.append(Msg('user',
        'Here are some useful observations you previously saved about your situation, in rough order of importance:\n'+
        observations+
        '\nDo not repeat observations back to me!'
    ))

# present prompt with retrieved information and get...
# ... response, and ...
for t in a(situation_prompt): print(t, end='')
# ... observations.
if Generate_Obs:
    j = resp_to_json(a(
        'What observations can be made about the current interaction that could be important to remember? Observations should make sense in isolation.'+
        'Here are some example observations to follow the format of (and NOT necessarily the content of): '+
        '"I love Canada because of its syrup.", "The weather is very beautiful today.", "I got accepted into university."\n',
        response_format={'Observations': '[obs1, ...]'}
    ))
    # Store observations
    long_term_memory += [Observation(o,rate_importance(o), time) for o in j['Observations']]
    time += 1

    a._main_hist = a._main_hist[:-2] # pop observation request and response

In [None]:
print(a.get_hist())
for o in long_term_memory: print(o)

In [None]:
TODO:
    save obs,rat in LTM
    del obs from hist
    truncate hist
    
    inject LTM retreival into hist before usr prompt. "What would be useful information to respond to this prompt ..."

In [None]:
embedding_prompt = lambda s: f'<s>[INSTR] In one word, what is the general topic of the following?\n{s} [/INSTR]'

In [None]:
# np.linalg.norm(
np.dot(
    np.array(LLM_GLOBAL_INSTANCE.embed(embedding_prompt('Did I avoid activating nuclear launch button?'))),
    np.array(LLM_GLOBAL_INSTANCE.embed(embedding_prompt('I\'m in a nuclear bunker')))
)

In [None]:
# np.linalg.norm(
np.dot(
    np.array(LLM_GLOBAL_INSTANCE.embed(embedding_prompt('My favorite country is Portugal'))),
    np.array(LLM_GLOBAL_INSTANCE.embed(embedding_prompt('What is me favorite country?')))
)

In [None]:
# np.linalg.norm(
np.dot(
    np.array(LLM_GLOBAL_INSTANCE.embed(embedding_prompt('Is the weather is nice today?'))),
    np.array(LLM_GLOBAL_INSTANCE.embed(embedding_prompt('The waterfall is dry today')))
)

In [None]:
# np.linalg.norm(
np.dot(
    np.array(LLM_GLOBAL_INSTANCE.embed(embedding_prompt('The weather is nice today'))),
    np.array(LLM_GLOBAL_INSTANCE.embed(embedding_prompt('I accidentally bought car insurance')))
)

In [None]:
IMPLEMENTATION GOAL:

system (always present): You are a patriotic canadian.
    
user: What do you need to know, if anything, to answer this prompt (e.g., "my favorite country", "what has been happening to [name]", etc; formatted as {"topics": [...]}):\n\nWhat is your favorite country?
bot: {"topics": "my favorite country"}
    
search for memories via recency, importance, and relevance
delete prior non-system, and insert memories + prompt as shown below

bot: "Here are some of my relevant memories:\nI went to canada and loved it\netc"
user: "What is your favorite country?"
bot: "Canada"
    
user: "summarize our interaction in the third person" (without the included memories, maybe manually delete them from hist?)
bot: "the user asked me what my favorite country is and i said canada"
store memory @ time & generated importance ^

In [None]:
generate memory objects over time (obsevations, ...)
assign each: recency, importance, relevance
    recency - record time of memddddory creation, apply exp decay to time
    importance - ask a model how important upon creation
    relevance - dot product with query observation
    
https://arxiv.org/pdf/2304.03442.pdf
    ^ section 4

---

**everything below is unrelated to memory**

In [None]:
a1 = LLM() # LLM('System Prompt: You are a deeply patriotic canadian assistant.')

for s in a1('Some info about canada?'):
    print(s, end='')
    
print('\n')

print(a1.get_hist())

In [None]:
for s in a1('Some info about canada?'):
    print(s, end='')

In [None]:
resp_to_json(
    a1('Some more info about canada?', response_format={'population': 'int', 'largest city': 'str of name'})
)

In [None]:
print(a1.get_hist())