In [1]:
import torch
import transformers

from langchain.utils.openai_functions import convert_pydantic_to_openai_function

from pydantic import BaseModel, Field

In [2]:
memory_file_path = '../data/memories.txt'

with open(memory_file_path, 'r') as file:
    memories = file.readlines()

In [3]:
incoming_memory = "(family) User has a daughter named Margot"

In [4]:
def save_memory(memory: str) -> None:
    """
    Save a memory to the list of memories.

    This function appends a given memory (string) to the `memories` list.

    Args:
        memory (str): The memory to be saved. It should be a string containing the memory.

    Returns:
        None: This function does not return any value. It modifies the `memories` list in place.
    """
    memories.append(memory)

In [5]:
def replace_memory(target_memory: str, replacement_memory: str) -> None:
    """
    Replace a memory in the memory list with a new one.

    This function searches for a specific memory in the `memories` list and replaces it with a new memory. 

    Args:
        target_memory (str): The memory to be replaced. It should be a string that
                              matches an existing memory in the list.
        replacement_memory (str): The new memory that will replace the target memory.
                                  This should be a string containing the replacement memory.

    Returns:
        None: This function does not return any value. It modifies the `memories` list in place.
    """
    for i, memory in enumerate(memories):
        if memory == target_memory:
            memories[i] = replacement_memory
            
            print(f"[INFO] Memory has been replaced")
            print(f"[INFO] Old Memory: {target_memory}  |  New Memory: {replacement_memory}")

In [6]:
class save_memory(BaseModel):
    """
    Save a memory to the list of memories.
    """
    memory: str = Field(..., description="The memory to be saved.")

class replace_memory(BaseModel):
    """
    Replace a memory in the memory list with a new one.
    """
    target_memory: str = Field(..., description="The old memory to be replaced.")
    replacement_memory: str = Field(..., description="The new memory that will replace the target memory.")

In [None]:
function_format = """{"name": "function_name", "arguments": {"arg_1": "value_1", "arg_2": value_2, ...}}"""

In [None]:
prompt = f"""
<|im_start|>system
You are a helpful assistant.
You have to give reasoning using the incoming memory.
The solution should include the given format after the reasoning: {function_format}.
The solution can be NONE.
<|im_end|>

<|im_start|>user
You have access to the following functions:

{convert_pydantic_to_openai_function(save_memory)}
{convert_pydantic_to_openai_function(replace_memory)}

Edge cases you must handle:
- If there are no functions that match the user request, you will respond with NONE.<|im_end|>

<|im_start|>User Memories
{memories}
<|im_end|>

<|im_start|>Incoming Memory
{incoming_memory}
<|im_end|>

<|im_start|>question
User Memories show the saved memories in the system.
If the Incoming Memory or a similar memory already exists, your solution should be NONE.
If the Incoming Memory or a similar memory does not exist in the User Memories, save the memory.
If the Incoming Memory or a similar memory contradicts a memory in the User Memories, replace the old memory with a new memory.

Which function should I use for the Incoming Memory?
<|im_end|>

<|im_start|>Solution
"""

In [None]:
model_id = "teknium/OpenHermes-2.5-Mistral-7B"

tokenizer = transformers.AutoTokenizer.from_pretrained(model_id)
model = transformers.AutoModelForCausalLM.from_pretrained(model_id, 
                                                          torch_dtype=torch.bfloat16).eval()

In [None]:
generation_config = model.generation_config
generation_config.update(
    **{
        **{
            "use_cache": True,
            "do_sample": True,
            "temperature": 0.2,
            "top_p": 1.0,
            "top_k": 0,
            "max_new_tokens": 512,
            "eos_token_id": tokenizer.eos_token_id,
            "pad_token_id": tokenizer.eos_token_id,
        }
    }
)

In [None]:
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)

generated_tokens = model.generate(**inputs, generation_config=generation_config)
print(tokenizer.decode(generated_tokens.squeeze(), skip_special_tokens=False))