In [8]:
# imports
from pathlib import Path # useful QoL path conversion
import fitz # converts pdfs to text

from haystack import Pipeline, Document
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders.answer_builder import AnswerBuilder
from haystack.components.builders.prompt_builder import PromptBuilder

import logging
from logging import Logger
import colorlog
from dotenv import load_dotenv
from tiktoken import encoding_for_model, Encoding
from pprint import pprint

load_dotenv()

True

In [2]:
def setup_logging() -> Logger:
    # Create a custom logger
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)

    # Create a console handler with colored output
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.DEBUG)

    # Create a custom colored formatter
    formatter = colorlog.ColoredFormatter(
        "%(asctime)s - %(log_color)s%(levelname)s%(reset)s - %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
        log_colors={
            "DEBUG": "cyan",
            "INFO": "green",
            "WARNING": "yellow",
            "ERROR": "red",
            "CRITICAL": "red,bg_white",
        },
    )
    console_handler.setFormatter(formatter)

    # Add the console handler to the logger
    logger.addHandler(console_handler)

    return logger

In [3]:
# Build a RAG pipeline

class RAG:
    def __init__(self, dir_path: str, model_type: str = 'gpt-4'):
        """
        Args:
            dir_path (str): Directory containing data.
        """

        self.logger: Logger = setup_logging()

        self.logger.info("Setting up RAG!")
        self.model_type: str = model_type
        self.dir_path: str = dir_path
        self.logger.info("Extracting text from PDFs...")
        self.text_mapping: dict[str, str] = self.extract_text_from_files()
        self.logger.info("All text extracted!")

        self.logger.info("Initializing pipeline...")
        self.start()
        self.logger.info("Pipeline ready!")



    def extract_text_from_files(self, filetype: str = "pdf", max_doc_len: int = 4000) -> dict[str, str]:
        """
        This function extracts text from files and returns their path and contents in a dictionary.
        Args:
            filetype (str): File type of documents.
            max_doc_len (int): Maximum length of each text chunk.
        Returns (dict[str, str]): A dictionary of paths as keys and text as values.
        """
        text_map: dict[str, str] = {}

        for idx, file in enumerate(Path(self.dir_path).iterdir()):
            self.logger.info("Extracting from document %d ...", idx + 1)
            doc_path = file.read_bytes()  # file as bytes object

            with fitz.open(stream=doc_path, filetype=filetype) as doc:
                text = ""
                chunk_idx = 1

                for page in doc:
                    read_text: str = page.get_text()
                    remaining_text = read_text

                    while remaining_text:
                        chunk = remaining_text[:max_doc_len]
                        remaining_text = remaining_text[max_doc_len:]

                        if len(text) + len(chunk) <= max_doc_len:
                            text += chunk
                        else:
                            text_map[f"{file.as_posix()}_{chunk_idx}"] = text
                            text = chunk
                            chunk_idx += 1

                if text:
                    text_map[f"{file.as_posix()}_{chunk_idx}"] = text

        return text_map

    def text_to_memory(self,
                       text_dict: dict[str, str],
                       similarity_func: str = 'dot-product') -> InMemoryDocumentStore:
        """Takes the paths and text values from our PDFs and converts them into
        an in-memory document store with Haystack.

        Args:
            text_dict (dict[str, str]): A dictionary of paths as keys and text as values.
            similarity_func (str): Type should be either 'dot_product' or 'cosine'.

        Returns:
            (InMemoryDocumentStore): Document store for our RAG, contains our information.
        """
        self.document_store = InMemoryDocumentStore(embedding_similarity_function=similarity_func)

        docs: list[Document] = [
                Document(content=value, meta={"book": key}) for key, value in text_dict.items()
            ]

        self.document_store.write_documents(docs)

    def pipeline(self):
        prompt_template = """
        Given these documents, answer the question.
        Documents:
        {% for doc in documents %}
            {{ doc.content }}
        {% endfor %}
        Question: {{question}}
        Answer:
        """

        retriever = InMemoryBM25Retriever(document_store=self.document_store)
        prompt_builder = PromptBuilder(template=prompt_template)
        llm = OpenAIGenerator(model=self.model_type)

        self.logger.info("Embedding documents...")
        self.rag_pipeline = Pipeline()
        self.rag_pipeline.add_component("retriever", retriever)
        self.rag_pipeline.add_component("prompt_builder", prompt_builder)
        self.rag_pipeline.add_component("llm", llm)
        self.rag_pipeline.connect("retriever", "prompt_builder.documents")
        self.rag_pipeline.connect("prompt_builder", "llm")
        self.logger.info("Documents embedded!")


    def start(self):
        self.logger.info("Commiting documents to memory...")
        self.text_to_memory(self.text_mapping, similarity_func='cosine')
        self.logger.info("Documents stored!")

        self.logger.info("Setting up document pipeline...")
        self.pipeline()
        self.logger.info("Document pipeline built!")


In [4]:
directory: str = "/home/bjl/code/projects/twitch-rag/data/textbooks"
openai_model: str = 'gpt-4'
agent: RAG = RAG(dir_path=directory, model_type=openai_model)

2024-03-30 13:22:15 - [32mINFO[0m - Setting up RAG![0m
2024-03-30 13:22:15 - [32mINFO[0m - Extracting text from PDFs...[0m
2024-03-30 13:22:15 - [32mINFO[0m - Extracting from document 1 ...[0m
2024-03-30 13:22:15 - [32mINFO[0m - Extracting from document 2 ...[0m
2024-03-30 13:22:15 - [32mINFO[0m - Extracting from document 3 ...[0m
2024-03-30 13:22:15 - [32mINFO[0m - All text extracted![0m
2024-03-30 13:22:15 - [32mINFO[0m - Initializing pipeline...[0m
2024-03-30 13:22:15 - [32mINFO[0m - Commiting documents to memory...[0m
2024-03-30 13:22:15 - [32mINFO[0m - Documents stored![0m
2024-03-30 13:22:15 - [32mINFO[0m - Setting up document pipeline...[0m
2024-03-30 13:22:15 - [32mINFO[0m - Embedding documents...[0m
2024-03-30 13:22:15 - [32mINFO[0m - Documents embedded![0m
2024-03-30 13:22:15 - [32mINFO[0m - Document pipeline built![0m
2024-03-30 13:22:15 - [32mINFO[0m - Pipeline ready![0m


In [9]:
def question_func(question: str) -> str:
    """ Ask our pipeline a quest from our documentation.

    Args:
        question (str): A question to ask of the agent

    Returns
        (str): Response object from agent!

    """
    results = agent.rag_pipeline.run(
        {
            "retriever": {"query": question},
            "prompt_builder": {"question": question},
        }
    )

    pprint(results["llm"]["replies"])

    return results

In [11]:
results = question_func("Tell me about coding up classes in Python.")

Ranking by BM25...:   0%|          | 0/384 [00:00<?, ? docs/s]

['In Python, classes are a blueprint for creating objects (a particular data '
 'structure), providing initial values for state, and implementations of '
 "behavior. Classes are created using the keyword 'class' and an indented "
 'block, which contains class methods. Below is an example of the creation of '
 'a class:\n'
 '\n'
 '```Python\n'
 'class MyClass:\n'
 '    x = 5\n'
 '```\n'
 '\n'
 'In this example, a class named "MyClass" is created with a property named '
 '"x" set to 5. In order to create an object from the class, you would do the '
 'following:\n'
 '\n'
 '```Python\n'
 'p1 = MyClass()\n'
 'print(p1.x)\n'
 '```\n'
 '\n'
 'Here, `p1` is an object of `MyClass`.\n'
 '\n'
 'Classes also includes `__init__()` function, which is used to assign values '
 'to object properties or other operations that are necessary to do when the '
 'object is being created.\n'
 '```Python\n'
 'class Person:\n'
 '    def __init__(self, name, age):\n'
 '        self.name = name\n'
 '        self.a

In [12]:
results

{'llm': {'replies': ['In Python, classes are a blueprint for creating objects (a particular data structure), providing initial values for state, and implementations of behavior. Classes are created using the keyword \'class\' and an indented block, which contains class methods. Below is an example of the creation of a class:\n\n```Python\nclass MyClass:\n    x = 5\n```\n\nIn this example, a class named "MyClass" is created with a property named "x" set to 5. In order to create an object from the class, you would do the following:\n\n```Python\np1 = MyClass()\nprint(p1.x)\n```\n\nHere, `p1` is an object of `MyClass`.\n\nClasses also includes `__init__()` function, which is used to assign values to object properties or other operations that are necessary to do when the object is being created.\n```Python\nclass Person:\n    def __init__(self, name, age):\n        self.name = name\n        self.age = age\n```\nThis `__init__()` function is called automatically every time the class is bein

In [None]:
# Dust Bin
def get_tokens(self, contents: str | list[str]) -> list[int] | list[list[str]]:
        tik_encoder: Encoding = encoding_for_model(model_name=self.model_type)
        if isinstance(contents, list[str]):
            return tik_encoder.encode_batch(contents)
        return tik_encoder.encode(contents)