<a href="https://colab.research.google.com/github/Troyanovsky/tiny_chain/blob/main/tiny_chain.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Tiny Chain
Because LangChain documentation is not easy to navigate + there is too much abstraction, making customization too difficult, I decided to make my own utility functions/classes for LLM usage.

## Install packages


In [28]:
!pip install openai
!pip install demjson3
!pip install tiktoken
!pip install chromadb
!pip install PyPDF2
!pip install python-dotenv

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [29]:
import os
import openai
from dotenv import load_dotenv
from google.colab import files

# Upload the environment file from your local machine
uploaded = files.upload()

# Rename the uploaded file to env_file.txt
for filename in uploaded.keys():
  os.rename(filename, "env_file.txt")

load_dotenv("env_file.txt")

openai.api_key = os.environ.get("OPENAI_API_KEY")

Saving env_file_Max.txt to env_file_Max.txt


## Get Response

In [30]:
import time

def get_API_Response(prompt,system_prompt="You are a helpful assistant",stop=None):
    try:
        response = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            temperature = 0,
            messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": prompt}
                ],
            stop=stop
            )
        response = str(response['choices'][0]['message']['content']).strip()
        return response
    except openai.error.RateLimitError as e:
        time.sleep(5)
        return get_API_Response(prompt)

## Memory
Memory Class
Attributes:
- Raw Messages: All previous messages in a list, in the format of a tuple (role_string, message_string). Role can be "User:" or "AI:"; message can be a string.
- Summary: Summary of all previous messages in one string

Methods:
- Last k words: returns the last messages wtihin the k number of words limit
- Summarize messages: summarizes all messages and store to Summary
- Add message: add one message string, together with the role.
- Delete last message: delete the last message from the list



In [31]:
def count_words(text):
    return len(text.split())

class SummaryBufferMemory:
    def __init__(self, word_limit=1000):
        self.raw_messages = []
        self.summary = ""
        self.word_count = 0
        self.word_limit = word_limit

    def last_messages(self):
        message_list = []
        word_count = 0

        for message_tuple in reversed(self.raw_messages):
            message_word_count = count_words(message_tuple[1])
            if word_count + message_word_count > self.word_limit:
                return "\n".join([f"{role}: {message}" for role, message in message_list])
            else:
                message_list.insert(0,message_tuple)
                word_count += message_word_count

        return "\n".join([f"{role}: {message}" for role, message in message_list])

    def summarize_messages(self):
        if len(self.raw_messages) >= 1:
            messages = "\n".join([f"{role}: {message}" for role, message in self.raw_messages])
        else:
            messages = ""
        prev_summary = self.summary
        summarize_template = f'Summarize the following conversation between an AI and a human in 150 words. The summary must retain all important information. Conversation history:```{messages}``` Previous summary: ```{prev_summary}```'
        summary = get_API_Response(summarize_template)
        self.summary = summary
        self.word_count = 0
        return summary

    def add_message(self, role, message):
        new_message_word_count = count_words(f"{role}: {message}")
        if new_message_word_count + self.word_count >= self.word_limit:
            self.summarize_messages()
        self.raw_messages.append((role, message))
        self.word_count += new_message_word_count

    def delete_last_message(self):
        if len(self.raw_messages) > 0:
            self.raw_messages.pop()

## File loader

load txt or pdf files with
```
content = file_loader("file_name")
```

In [32]:
import PyPDF2

def file_loader(path):
    if path.endswith('.pdf'):
        pdf_file = open(path, 'rb')
        pdf_reader = PyPDF2.PdfReader(pdf_file)
        text = ''
        for page in pdf_reader.pages:
            text += page.extract_text()
        return text
    elif path.endswith('.txt'):
        with open(path, 'r') as txt_file:
            return txt_file.read()
    else:
        raise ValueError('Unsupported file format')

In [45]:
# Example usage

pdf_content = file_loader("shareholder_letters.pdf")
txt_content = file_loader("sample.txt")

print(pdf_content[:150])
print(txt_content[:150])

 
  
 
 
To our shareholders:    Amazon.com passed many milestones in 1997: by year-end, we had served more than 1.5 million customers, yielding 838% 
To make a Python package, you can follow these high-level steps:

Create a new directory for your package
Inside the directory, create a file named __


## Text Splitter
Helper function to split text strings with given word length and overlap.
```
text_splitter(string, n_words=500, overlap=50)
```

In [34]:
def text_splitter(string, n_words=500, overlap=50):
    words = string.split()
    sections = []

    if n_words >= len(words):
        return [string]

    for i in range(0, len(words) - overlap, n_words - overlap):
        section = words[i:i + n_words]
        sections.append(' '.join(section))

    return sections

In [35]:
# Example usage

text = "It may take some time to adjust to life without caffeine, but it's definitely possible. With a little patience and effort, you can wean yourself off coffee and caffeine without any unpleasant side effects."

splitted_text = text_splitter(text, 20, 5)

print(splitted_text)

["It may take some time to adjust to life without caffeine, but it's definitely possible. With a little patience and", 'With a little patience and effort, you can wean yourself off coffee and caffeine without any unpleasant side effects.']


## Vector Database
This vectorIndex class provides an interface to work with a database that uses ChromaDB library for vector indexing and searching.

The class has the following methods:

- __init__(self, documents): Initializes a ChromaDB client and collection with a list of strings (documents)
- add_documents(self, documents): Adds new list of documents to the collection and generates ids for them.
- query_documents(self, query_string, n_results=3): Queries the collection using the given query_string and retrieves n_results number of most relevant documents with their associated ids and distances.
- delete_documents(self, ids): Deletes documents from the collection based on their ids.
- persist(self): Persists the current state of the ChromaDB client to drive.

In [36]:
import chromadb

class vectorIndex:
    def __init__(self, documents):
        self.client = chromadb.Client(chromadb.config.Settings(chroma_db_impl="duckdb+parquet",persist_directory="database"))
        self.collection = self.client.get_or_create_collection(name="mydb")
        self.last_id = 0
        self.add_documents(documents)

    def add_documents(self, documents):
        if len(documents) > 0:
            ids = [str(self.last_id + i) for i in range(len(documents))]
            self.collection.add(documents=documents, ids=ids)
            self.last_id += len(documents)

    def query_documents(self, query_string, n_results=3):
        results = self.collection.query(query_texts=[query_string], n_results=n_results)
        documents = results['documents'][0]
        ids = results['ids'][0]
        distances = results['distances'][0]
        result_dict = {'ids': ids, 'documents': documents, 'distances': distances}
        return result_dict

    def delete_documents(self, ids):
        self.collection.delete(ids=ids)

    def persist(self):
        self.client.persist()

In [37]:
# Example usage
texts = ["Hello how are you", "I'm feeling great", "What's the weather like today.",
         "Here is chromadb documentation","from chromdb.config import Settings",
         "collection = client.get_or_create_collection(name=\"mydb\")"]

index = vectorIndex(texts)

result = index.query_documents("chromadb is a vector database", 2)
print(result)

/root/.cache/chroma/onnx_models/all-MiniLM-L6-v2/onnx.tar.gz: 100%|██████████| 79.3M/79.3M [00:06<00:00, 12.7MiB/s]


{'ids': ['3', '4'], 'documents': ['Here is chromadb documentation', 'from chromdb.config import Settings'], 'distances': [0.588258683681488, 1.1970349550247192]}


## ReAct Agent
An react agent that follows the reasoning-act pattern.

Initialize with
```
agent = ReActAgent(verbose=True)
```
Add/remove tools with
```
agent.add_tool(("tool name","tool description",function_name))
agent.remove_tool("tool_name")
```
Complete task with
```
agent.run("Your query")
```
To see intermediate setps
```
print(agent.action_history)
```

In [38]:
import demjson3 as json
import re

class ReActAgent:
    def __init__(self, verbose=False, max_steps=10):
        self.available_tools = []
        self.action_history = []
        self.verbose = verbose
        self.max_steps = max_steps
        self.add_tool(("GPT tool","Ask GPT for an answer. Use it for logical reasoning or as a fall-back when other tools do not work. Input is a question string.", get_API_Response))

    def add_tool(self, tool):
        if (len(tool) == 3 and
            isinstance(tool[0], str) and
            isinstance(tool[1], str) and
            callable(tool[2])):
            self.available_tools.append(tool)
        else:
            raise ValueError("Each tool must have three fields: ('tool name string', 'tool description string', function_name)")

    def remove_tool(self, tool_name):
        for tool in self.available_tools:
            if tool[0] == tool_name:
                self.available_tools.remove(tool)

    def get_action(self, task_string):
        if len(self.available_tools) > 0:
            tools_string = ""
            tool_names = []
            for tool in self.available_tools:
                tools_string = tools_string + "\n" + tool[0] + " : " + tool[1]
                tool_names.append(tool[0])
        else:
            tools_string = "No available tools"
            tool_names = "[]"
        action_history_string = ""
        if len(self.action_history) > 0:
            for step in self.action_history:
                action_history_string += step[0]
                action_history_string += step[1]
                action_history_string += step[2]
                action_history_string += step[3]
        react_template = f'''
Answer the following questions as best you can. You have access to the following tools:

{tools_string}

Use the following format:

Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of {tool_names}
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question

Begin!

Question: {task_string}

{action_history_string}
        '''
        response = get_API_Response(react_template, stop="Observation:")
        return response

    def parse_and_execute(self, action_string):
        thought = None
        action = None
        param = None
        result = None

        # extract thought
        match = re.search(r"Thought:\s*(.*)", action_string)
        if match:
            thought = match.group(1).strip()

        # extract action_name
        match = re.search(r"Action:\s*(.*)", action_string)
        if match:
            action = match.group(1).strip()

        # extract param
        match = re.search(r"Action Input:\s*(.*)", action_string)
        if match:
            param = match.group(1).strip()

        # find and execute tool function
        for tool in self.available_tools:
            if tool[0] == action:
                try:
                    result = tool[2](param)
                except Exception as e:
                    print("Error message:", str(e))
                    result = "There was an error in tool execution. Please try another tool."
                break

        # handle no matching tool
        if result is None:
            return f"The tool {action} doesn't exist. Try another tool"

        return thought, action, param, result

    def run(self, task_string):
        self.action_history = []
        step_count = 0
        while True:
            step_count += 1
            if step_count >= self.max_steps:
                print("\nExceeded maximum step count.")
                return "Exceeded maximum step count."
            action_string = self.get_action(task_string)
            if "Final Answer:" in action_string:
                print("\n", action_string)
                return action_string
            thought, action, param, result = self.parse_and_execute(action_string)
            thought_str = "\nThought: " + thought
            action_str = "\nAction: " + action
            param_str = "\nAction Input: " + param
            result_str = "\nObservation: " + result
            self.action_history.append((thought_str, action_str, param_str, result_str))
            if self.verbose:
                print(thought_str)
                print(action_str)
                print(param_str)
                print(result_str)

In [39]:
# Example usage
agent = ReActAgent(verbose=True)

def calculator(input):
    return "ERROR"

def backup_calculator(input):
    return "4"

agent.add_tool(("Calculator", "Use this tool to calcualte math. Input is a math expression.", calculator))

agent.add_tool(("Backup Calculator", "Use this tool to calcualte math. Use as a backup for Calculator Input is a math expression.", backup_calculator))

answer = agent.run("What is 2+2")


Thought: This is a simple math question, I can use the calculator tool.

Action: Calculator

Action Input: 2+2

Observation: ERROR

Thought: Oh no, the calculator tool is not working. I will try the backup calculator tool.

Action: Backup Calculator

Action Input: 2+2

Observation: 4

 Thought: I now know the final answer.
Final Answer: 4.
