In [1]:
import os
import numpy as np
import chromadb
import json
from chromadb.utils import embedding_functions
from chromadb.config import Settings
from openai import OpenAI
from tenacity import retry, wait_random_exponential, stop_after_attempt
from dotenv import load_dotenv
from datetime import datetime
load_dotenv()
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

In [11]:
class VectorStore:
    def __init__(self) -> None:
        openai_ef = embedding_functions.OpenAIEmbeddingFunction(
            api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-large"
        )
        self.client = chromadb.Client(Settings(persist_directory="./"))
        self.collection = self.client.get_or_create_collection(
            "feature_store", embedding_function=openai_ef
        )
        self.client = chromadb.PersistentClient(path="./")


    def get_entry_numbers(self):
        number_of_entries = self.collection.count()
        return number_of_entries

    def add(self, text, time):
        number_of_entries = self.get_entry_numbers()
        self.collection.add(
            documents=[
                str(text)
            ],  # we handle tokenization, embedding, and indexing automatically. You can skip that and add your own embeddings as well
            metadatas=[{"timestamp":time}],  # filter on these!
            ids=[str(number_of_entries)],  # unique for each document
        )

    def retrieve(self, query, n=2, time_field=None):
        if time_field is not None:
            results = self.collection.query(
                query_texts=query,
                n_results=n,
                where={"timestamp": time_field}, # optional filter
            )
        elif time_field is None:
            results = self.collection.query(
                query_texts=query,
                n_results=n,
            )
        return results

    def injest_chunks(self, chunks, timestamp):
        for chunk, time in zip(chunks, timestamp):
            self.add(chunk, time)

    def get_start_time(self, text):
        segments = text.split("|")
        start_time_segment = segments[0].split(":")[1]
        start_time = float(start_time_segment)
        return int(start_time)

    def extract_text_blocks(self, txt_file_path, n):
        text_blocks = []
        timestamp = []
        chunk = ""
        with open(txt_file_path, "r") as file:
            lines = file.readlines()
            largest_index_div_by_n = (len(lines)//n)*n
            largest_index = len(lines)
            for idx, line in enumerate(lines):
                text_part = line.split("text:")[1].strip()
                chunk += text_part + " "

                if idx % n == 2:
                    text_blocks.append(chunk)
                    timestamp.append(time_part)
                    chunk = ""
                elif idx % n == 0:
                    time_part = self.get_start_time(line)

                if idx == largest_index:
                    text_blocks.append(chunk)
                    timestamp.append(time_part)
        return text_blocks, timestamp

    def extract_text(self, line):
        text = line.split("|")[-1]
        text = text.split("text:")[-1].strip()
        return text


In [12]:
class Retriever:
    tools = [
        {
            "type": "function",
            "function": {
                "name": "get_single_timestamp",
                "description": "Get the description of a conversation at a specific timestamp",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "time": {
                            "type": "string",
                            "description": "The timestamp to get the description for in dd/mm/yyyy hh:mm:ss format",
                        },
                    },
                    "required": ["time"],
                },
            },
        },
        {
            "type": "function",
            "function": {
                "name": "get_start_end_timestamp",
                "description": "Get the start and end timestamp of a conversation",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "start_time": {
                            "type": "string",
                            "description": "the start time mentioned provided in dd/mm/yyyy hh:mm:ss format",
                        },
                        "end_time": {
                            "type": "string",
                            "description": "End time mentioned provided in dd/mm/yyyy hh:mm:ss format",
                        },
                    },
                    "required": ["start_time", "end_time"],
                },
            },
        },
        {
            "type": "function",
            "function": {
                "name": "get_topic",
                "description": "Get the topic of the conversation",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "topic": {
                            "type": "string",
                            "description": "The topic of conversation",
                        },
                    },
                    "required": ["topic"],
                },
            },
        },
    ]
    GPT_MODEL = "gpt-3.5-turbo-0613"

    @retry(wait=wait_random_exponential(multiplier=1, max=40), stop=stop_after_attempt(3))
    def chat_completion_request(messages, tools=None, tool_choice=None, model=GPT_MODEL):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=messages,
                tools=tools,
                tool_choice=tool_choice,
                seed =123,
            )
            return response
        except Exception as e:
            print("Unable to generate ChatCompletion response")
            print(f"Exception: {e}")
            return e
        
    def run(prompt):
        current_datetime = datetime.now()
        formatted_date = current_datetime.strftime("%d/%m/%Y")
        print(formatted_date)
        messages = []
        messages.append(
            {
                "role": "system",
                "content": "Don't make assumptions about what values to plug into functions. Ask for clarification if a user request is ambiguous.",
            }
        )
        messages.append({"role": "user", "content": prompt})
        messages.append({"role": "assistant", "content": f"For reference today is {formatted_date}"})
        chat_response = Retriever.chat_completion_request(messages, tools=Retriever.tools)
        assistant_message = chat_response.choices[0].message
        messages.append(assistant_message)
        fn_name=assistant_message.tool_calls[0].function.name
        args=json.loads(assistant_message.tool_calls[0].function.arguments)
        print(args)
        print(fn_name)

In [13]:
text_file = "./text/sharktank.txt"
store = VectorStore()
chunks, timestamp = store.extract_text_blocks(txt_file_path=text_file, n=3)
print(len(chunks), len(timestamp))
store.injest_chunks(chunks, timestamp)

195 195


In [16]:
prompt = "when did we speak about sandcastles?"
Retriever.run(prompt)
store.retrieve(prompt, 2)

19/02/2024
{'topic': 'sandcastles'}
get_topic


{'ids': [['3', '7']],
 'distances': [[1.0464648008346558, 1.096422791481018]],
 'metadatas': [[{'timestamp': 52}, {'timestamp': 121}]],
 'embeddings': None,
 'documents': [["but it's impossible to build a real sand castle using the traditional fill and flip buckets. The wet sand sticks in the bucket, even at its best, it just looks boring. That's why we invented Create-A-Castle, revolutionary split mold sand castle kits that allow you to build elaborate sand structures in no time at all. ",
   "Nothing like this. We're unique in the fact that we split, so the molds split in half, but they also are stackable in the right kind of sand. You built that entire castle. "]],
 'uris': None,
 'data': None}