Goals:
This notebook attempts to set up a POC for "Infinite Memory"
1. Store
2. Retrieve

In [None]:
import time
from IPython.display import display
from loguru import logger
from openai import OpenAI
from getpass import getpass
from pydantic import BaseModel, Field


True

# Helper Functions

In [None]:
openai_api_key = getpass("enter_openai_api_key")

In [None]:
from tenacity import retry, stop_after_attempt, wait_fixed
from typing import Type, Union, Any
from llama_index.core.output_parsers.utils import parse_json_markdown
import json

openai_client = OpenAI(
            api_key=openai_api_key,  
        )

def make_request(model: str, messages: list[dict[str, str]]) -> str:
    start_time = time.time()
    response = openai_client.chat.completions.create(
        model=model,
        messages=messages,
        temperature=0,
    )
    return response.choices[0].message.content


@retry(
    stop=stop_after_attempt(3),
    wait=wait_fixed(2),
)
def chat_completion_request(
    messages: list[dict[str, str]],
    model: str = "gpt-4o",
    response_model: Type[BaseModel] = None,
) -> Union[str, dict[str, Any]]:
    try:
        content = make_request(model, messages)
        if response_model is not None:
            parsed_content = parse_json_markdown(content)
            try:
                return response_model(**parsed_content)
            except TypeError as e:
                error_message = {
                    "role": "user",
                    "content": f"JSON decoding error: {e}. Please adhere to the json response format that obeys the following schema: {response_model.model_json_schema()}",
                }
                messages.append(error_message)
                logger.error(
                    f"TypeError in response_model parsing: {e}. Content: {parsed_content}"
                )
                raise
        else:
            return content
    except json.JSONDecodeError as e:
        error_message = {
            "role": "user",
            "content": f"JSON decoding error: {e}. Please adhere to the json response format that obeys the following schema: {response_model.model_json_schema()}",
        }
        messages.append(error_message)
        logger.error(f"JSON decoding error: {e}. Content: {content}")
        raise
    except Exception as e:
        logger.error(f"Error while making chat completion request: {e}")
        raise


# Intialize DB

In [16]:
import chromadb
chroma_client = chromadb.EphemeralClient()

In [None]:
from chromadb import Documents, EmbeddingFunction, Embeddings

class CustomOpenAIEmbeddingFunction(EmbeddingFunction):
    def __call__(self, input: list[str]) -> Embeddings:
        input = [text.replace("\n", " ") for text in input]
        emb_resp = openai_client.embeddings.create(input=input, model='text-embedding-3-small').data
        return [emb.embedding for emb in emb_resp]


In [18]:
collection = chroma_client.get_or_create_collection(name="user_history", embedding_function=CustomOpenAIEmbeddingFunction()) 

# Create functions to add, delete and query DB

In [54]:
# deleting
def delete_from_index(collection, ids):
    collection.delete(
        ids=ids
    )
delete_from_index(collection=collection,ids=['ce617a99-cc06-478c-9ddb-7f041572a139',
  'b5c1f87b-3e66-4dd0-8bcf-02b6d523a74c'])

Delete of nonexisting embedding ID: ce617a99-cc06-478c-9ddb-7f041572a139
Delete of nonexisting embedding ID: b5c1f87b-3e66-4dd0-8bcf-02b6d523a74c
Delete of nonexisting embedding ID: ce617a99-cc06-478c-9ddb-7f041572a139
Delete of nonexisting embedding ID: b5c1f87b-3e66-4dd0-8bcf-02b6d523a74c


In [32]:
import uuid
# inserting
def insert_to_index(collection, documents, metadatas=None):
    collection.add(
        documents=documents,
        metadatas=metadatas,
        ids=[str(uuid.uuid4()) for i in range(len(documents))],
    )
    logger.info(f"Successfully inserted {len(documents)} documents")

insert_to_index(collection=collection, documents=['Fu Nan is a boy', 'Jane is a girl'])

[32m2024-11-20 23:09:21.559[0m | [1mINFO    [0m | [36m__main__[0m:[36minsert_to_index[0m:[36m9[0m - [1mSuccessfully inserted 2 documents[0m


In [55]:
class QueryResult(BaseModel):
    ids: list[list[str]]
    documents: list[list[str]]
    distances: list[list[float]]

# querying https://docs.trychroma.com/guides#filtering-by-metadata
def query_index(query_texts, n_results=1, where=None, where_document=None):
    query_result = collection.query(
        query_texts=query_texts,
        n_results=n_results,
        where=where,
        where_document=where_document,
    )
    return QueryResult(**query_result)
    
query_result = query_index(
    query_texts=["female", "male"]
)

# /Store

## extract_snippets_from_conversation

In [49]:
from typing import Optional
from datetime import datetime


class Snippet(BaseModel):
    text: str
    date_of_event: Optional[str] = Field(description="to be filled in if the snippet is an event", default=None) # TODO not sure what to do with this info for now

class ConversationSnippets(BaseModel):
    snippets: list[Snippet]

extract_snippets_from_conversation_prompt = """\
You are to extract snippets of a given conversation between a career confidante and a user, which the confidante should take node of. Think of it as the confidante jotting key points down during the conversation in their journal.
Each snippet has to contain sufficient information to stand alone and be understood without the context of the entire conversation.

**
IMPORTANT: Only return the output in JSON format. The JSON structure should be a list of snippet objects, each with the fields:
	•	"text" (str): The extracted text snippet from the conversation.
	•	"date_of_event" (string): The date of the event mentioned in the snippet. If the snippet is not about an event, this field should be null. Date shouuld be formatted as "YYYY-MM-DD".

Example conversation that happend on 2024-02-01:
User: I am a software engineer and I am considering a career change.
Confidante: What are you considering?
User: I am considering becoming a data scientist.
Confidante: What is motivating you to make this change?
User: I am interested in working with data and I want to leverage my programming skills. I am also going to start taking a course in data science.
Confidante: That's awesome, when do you plan to start the course?
User: I plan to start next month.
Confidante: Great!

Example JSON:
{{
    "snippets": [
        {{
            "text": "User is considering becoming a data scientist.",
            "date_of_event": null
        }},
        {{
            "text": "User is interested in working with data and wants to leverage programming skills. User is also going to start taking a course in data science.",
            "date_of_event": null
        }},
        {{
            "text": "User plans to start data science course next month.",
            "date_of_event": "2024-03-01"
        }}

    ]
}}
===== END OF EXAMPLE ======

The 'snippets' key must be a list of snippets.
The result must be a list of objects with 'text' and 'date_of_event' keys.
Ensure each snippet contains sufficient information to stand alone and be understood without the context of the entire conversation.
**

Conversation that happened on {date}:
{conversation}

JSON:
"""

def _get_date_today():
    return datetime.now().strftime("%Y-%m-%d")


def _construct_conversation(user_messages:list[str], assistant_messages:list[str])->str:
    conversation = []
    for user_message, assistant_message in zip(user_messages, assistant_messages):
        conversation.append(f"User: {user_message}")
        conversation.append(f"Confidante: {assistant_message}")
    return "\n".join(conversation)

def extract_snippets_from_conversation(user_messages:list[str], assistant_messages:list[str]):
    conversation = _construct_conversation(user_messages, assistant_messages)
    prompt = extract_snippets_from_conversation_prompt.format(date=_get_date_today(), conversation=conversation)
    conversation_snippets: ConversationSnippets = chat_completion_request(
        messages=[
            {"role": "user", "content": prompt}
        ],
        response_model=ConversationSnippets
    )
    logger.info(f"Successfully extracted {len(conversation_snippets.snippets)} snippets from conversation")
    return conversation_snippets

In [37]:
# test case
test_snippets = extract_snippets_from_conversation(
    user_messages=["I just got laid off from my job.", "I am considering a career change.", "First, I am thinking of starting a course in data science.", "tomorrow"],
    assistant_messages=["What are you considering?", "What are your interests?", "When do you plan to start the course?", "That's great!"]
)
test_snippets

ConversationSnippets(snippets=[Snippet(text='User just got laid off from their job.', date_of_event=None), Snippet(text='User is considering a career change.', date_of_event=None), Snippet(text='User is thinking of starting a course in data science.', date_of_event=None), Snippet(text='User plans to start the data science course tomorrow.', date_of_event='2024-11-21')])

## Insert snippets to index

In [40]:
# insert to index
def insert_snippets_to_index(collection, conversation_snippets: ConversationSnippets):
    insert_to_index(
        collection=collection, 
        documents=[snippet.text for snippet in conversation_snippets.snippets], 
        metadatas=[{"date_of_event": snippet.date_of_event} if snippet.date_of_event else {"date_of_evebt": ""} for snippet in conversation_snippets.snippets]
    )
        
insert_snippets_to_index(collection=collection, conversation_snippets=test_snippets)

[32m2024-11-20 23:15:56.733[0m | [1mINFO    [0m | [36m__main__[0m:[36minsert_to_index[0m:[36m9[0m - [1mSuccessfully inserted 4 documents[0m


In [44]:
# collection.peek()['documents']
# collection.count()

## [to be done] Deduplicating/Updating snippets against content in index

In [None]:
class DocumentNode(BaseModel):
    id: str
    document: str    

class SnippetsWithContext(BaseModel):
    snippet: str
    context: list[DocumentNode]


In [None]:
from typing import Optional
from datetime import datetime

class Snippet(BaseModel):
    snippet: str
    date_of_event: Optional[str] = Field(description="to be filled in if the snippet is an event", default=None)

class ConversationSnippets(BaseModel):
    snippets: list[Snippet]

extract_snippets_from_conversation_prompt = """\
You are to extract snippets of a given conversation between a career confidante and a user, which the confidante should take node of. Think of it as the confidante jotting key points down during the conversation in their journal.
Each snippet has to contain sufficient information to stand alone and be understood without the context of the entire conversation.

**
IMPORTANT: Only return the output in JSON format. The JSON structure should be a list of snippet objects, each with the fields:
	•	"snippet" (str): The extracted snippet from the conversation.
	•	"date_of_event" (string): The date of the event mentioned in the snippet. If the snippet is not about an event, this field should be null. Date shouuld be formatted as "YYYY-MM-DD".

Example conversation that happend on 2024-02-01:
User: I am a software engineer and I am considering a career change.
Confidante: What are you considering?
User: I am considering becoming a data scientist.
Confidante: What is motivating you to make this change?
User: I am interested in working with data and I want to leverage my programming skills. I am also going to start taking a course in data science.
Confidante: That's awesome, when do you plan to start the course?
User: I plan to start next month.
Confidante: Great!

Example JSON:
{{
    "snippets": [
        {{
            "snippet": "User is considering becoming a data scientist.",
            "date_of_event": null
        }},
        {{
            "snippet": "User is interested in working with data and wants to leverage programming skills. User is also going to start taking a course in data science.",
            "date_of_event": null
        }},
        {{
            "snippet": "User plans to start data science course next month.",
            "date_of_event": "2024-03-01"
        }}

    ]
}}
===== END OF EXAMPLE ======

The 'snippets' key must be a list of snippets.
The result must be a list of objects with 'snippet' and 'date_of_event' keys.
Ensure each snippet contains sufficient information to stand alone and be understood without the context of the entire conversation.
**

Conversation that happened on {date}:
{conversation}

JSON:
"""

def determine_snippets_to_add_or_delete():
    pass

In [None]:
def delete_documents_from_index():
    pass

## Putting it all together

In [51]:
def store(user_messages:list[str], assistant_messages:list[str]):
    conversation_snippets = extract_snippets_from_conversation(
        user_messages=user_messages,
        assistant_messages=assistant_messages
    )
    # determine_snippets_to_add_or_delete()
    # delete_documents_from_index()
    insert_snippets_to_index(collection=collection, conversation_snippets=conversation_snippets)
    logger.info(f"There are now {collection.count()} documents in the index")

In [52]:
# Test
store(
    user_messages=["I just got laid off from my job.", "I am considering a career change.", "First, I am thinking of starting a course in data science.", "tomorrow"],
    assistant_messages=["What are you considering?", "What are your interests?", "When do you plan to start the course?", "That's great!"]
)

[32m2024-11-20 23:22:24.764[0m | [1mINFO    [0m | [36m__main__[0m:[36mextract_snippets_from_conversation[0m:[36m82[0m - [1mSuccessfully extracted 4 snippets from conversation[0m
[32m2024-11-20 23:22:25.760[0m | [1mINFO    [0m | [36m__main__[0m:[36minsert_to_index[0m:[36m9[0m - [1mSuccessfully inserted 4 documents[0m
[32m2024-11-20 23:22:25.762[0m | [1mINFO    [0m | [36m__main__[0m:[36mstore[0m:[36m9[0m - [1mThere are now 20 documents in the index[0m


# /Retrieve

In [None]:
MIN_DISTANCE=1.3
K=10

def _build_context(query_result: QueryResult, min_distance:float)->str:
    documents = query_result.documents[0]
    distances = query_result.distances[0]
    context = ["Here are some notes from previous conversations between you and the user that might be relevant to you. Note that this snippets are from conversations that happened in the past."]
    context_num = 1
    seen_contexts = set() # to handle exact duplicates that could inadvertedly be in the index
    for document, distance in zip(documents, distances):
        if distance < min_distance and document not in seen_contexts:
            context.append(f"{context_num}: {document}")
            context_num += 1
            seen_contexts.add(document)
    return "\n".join(context)


def retrieve(content_to_retrieve:str, min_distance:float=MIN_DISTANCE, k:int=K):
    query_result = query_index(
        query_texts=[content_to_retrieve],
        n_results=k
    )
    return _build_context(query_result, min_distance)

    

In [64]:
# test
retrieval_result = retrieve("when is the user going to start a course?")
print(retrieval_result)

Here are some notes from previous conversations between you and the user that might be relevant to you. Note that this snippets are from conversations that happened in the past.
1: User is thinking of starting a course in data science.
2: User plans to start the data science course tomorrow.
3: User is considering a career change.
