In [1]:
!pip install python-docx PyPDF2



In [1]:
import os
import docx
import numpy as np
import nest_asyncio
import openai
import asyncio
import re
import pdb
from pprint import pprint
from abc import ABC, abstractmethod
from dotenv import load_dotenv
from collections import defaultdict
from PyPDF2 import PdfReader
from typing import Callable

In [2]:
load_dotenv()
nest_asyncio.apply()

In [3]:
assert os.getenv("OPENAI_API_KEY")

In [4]:
class Settings:
    openai_api_key = os.getenv("OPENAI_API_KEY")
    # large instead of small with 3,072 dimensions
    embeddings_model_name = "text-embedding-3-large"
    # not mini but classic
    llm_model = "gpt-4o"


settings = Settings()

## Distance metrics

In [5]:
class DistanceMetric(ABC):
    @classmethod
    @abstractmethod
    def count(cls, x: np.array, y: np.array) -> float:
        pass

    @classmethod
    @property
    def name(cls) -> str:
        return cls.__name__


class CosineSimilarity(DistanceMetric):
    """
    https://en.wikipedia.org/wiki/Cosine_similarity

    Pros:
        - Scale-invariant
        - Suitable for high-dimensional data
    Cons:
        - Does not consider magnitude
        - Can be less intuitive
    """

    @classmethod
    def count(cls, x: np.array, y: np.array) -> float:
        return np.dot(x, y) / (np.linalg.norm(x) * np.linalg.norm(y))


class EuclideanDistance(DistanceMetric):
    """
    https://en.wikipedia.org/wiki/Euclidean_distance

    Pros:
        - Simple to implement
        - Intuitive and easy to understand
    Cons:
        - Sensitive to scale
        - Not suitable for high-dimensional data
    """

    @classmethod
    def count(cls, x: np.array, y: np.array) -> float:
        return np.linalg.norm(x - y)


class ManhattanDistance(DistanceMetric):
    """
    https://www.datacamp.com/tutorial/manhattan-distance

    Pros:
        - Simple to implement
        - Less sensitive to outliers
    Cons:
        - Sensitive to scale
        - Not suitable for high-dimensional data
    """

    @classmethod
    def count(cls, x: np.array, y: np.array) -> float:
        return np.sum(np.abs(x - y))


class JaccardSimilarity(DistanceMetric):
    """
    https://www.datacamp.com/tutorial/manhattan-distance

    Pros:
        - Good for binary and categorical data
        - Intuitive interpretation
    Cons:
        - Not suitable for continuous data
        - Can be computationally expensive
    """

    @classmethod
    def count(cls, x: np.array, y: np.array) -> float:
        intersection = np.sum(np.minimum(x, y))
        union = np.sum(np.maximum(x, y))
        return intersection / union


class HammingDistance(DistanceMetric):
    """
    https://en.wikipedia.org/wiki/Hamming_distance

    Pros:
        - Simple to implement
        - Suitable for categorical data
    Cons:
        - Requires equal length sequences
        - Not suitable for continuous data
    """

    @classmethod
    def count(cls, x: np.array, y: np.array) -> float:
        if x.shape != y.shape:
            raise ValueError("Arrays must be of equal length")
        return np.mean(x != y)

## Embedding model class

In [6]:
class EmbeddingModel:
    def __init__(self, settings: Settings) -> None:
        self.openai_api_key = settings.openai_api_key
        self.async_client = openai.AsyncOpenAI()
        self.client = openai.OpenAI()
        openai.api_key = self.openai_api_key
        self.embeddings_model_name = settings.embeddings_model_name
        self.batch_size = 1024

    async def async_get_embeddings(self, list_of_text: list[str]) -> list[np.array]:
        batches = [
            list_of_text[i : i + self.batch_size]
            for i in range(0, len(list_of_text), self.batch_size)
        ]

        async def process_batch(batch):
            embedding_response = await self.async_client.embeddings.create(
                input=batch, model=self.embeddings_model_name
            )
            return [embeddings.embedding for embeddings in embedding_response.data]

        # Use asyncio.gather to process all batches concurrently
        results = await asyncio.gather(*[process_batch(batch) for batch in batches])

        # Flatten the results
        return [
            np.array(embedding)
            for batch_result in results
            for embedding in batch_result
        ]

    def get_embedding(self, text: str) -> list[np.array]:
        embedding = self.client.embeddings.create(
            input=text, model=self.embeddings_model_name
        )

        return np.array(embedding.data[0].embedding)

<br>

### Metrics comparisons

In [7]:
embedding_model = EmbeddingModel(settings=settings)

yellow_sentence_v1 = "This car is yellow"
lemon_sentence_v1 = "This car is the same color as lemon"

yellow_vector_v1 = embedding_model.get_embedding(yellow_sentence_v1)
lemon_vector_v1 = embedding_model.get_embedding(lemon_sentence_v1)

In [8]:
yellow_sentence_v2 = "This car is yellow"
lemon_sentence_v2 = "This is not a car. It's a tree"

yellow_vector_v2 = embedding_model.get_embedding(yellow_sentence_v2)
lemon_vector_v2 = embedding_model.get_embedding(lemon_sentence_v2)

In [9]:
metric_classes = [
    CosineSimilarity,
    EuclideanDistance,
    ManhattanDistance,
    JaccardSimilarity,
    HammingDistance,
]
for metric in metric_classes:
    print(f"v1\t{metric.name}: {metric.count(yellow_vector_v1, lemon_vector_v1)}")
    print(f"v2\t{metric.name}: {metric.count(yellow_vector_v2, lemon_vector_v2)}")
    print()

v1	CosineSimilarity: 0.6998949235584049
v2	CosineSimilarity: 0.48110602414446435

v1	EuclideanDistance: 0.7747323055567967
v2	EuclideanDistance: 1.0187187983980743

v1	ManhattanDistance: 34.170326354105555
v2	ManhattanDistance: 44.290678236883764

v1	JaccardSimilarity: -0.9437817869598495
v2	JaccardSimilarity: -0.9888799853095968

v1	HammingDistance: 1.0
v2	HammingDistance: 1.0



### Summary

**Cosine Similarity:** Best for high-dimensional data, measures the cosine of the angle between vectors.

**Euclidean Distance:** Measures the straight-line distance between vectors, sensitive to scale and not ideal for high-dimensional data.

**Manhattan Distance:** Measures the sum of absolute differences, also sensitive to scale.

**Jaccard Similarity:** Good for binary or categorical data, measures the intersection over union.

**Hamming Distance:** Measures the number of differing elements, suitable for categorical data with equal length sequences.

<br>

## Vector class

Basic Vector class extended with new field `metadata`

In [10]:
class Vector:
    def __init__(self, data: np.array, key: str, metadata: dict) -> None:
        self.data = data
        self.key = key
        self.metadata = metadata

    def __repr__(self) -> str:
        return self.key

## Custom pythonic vector DB

Added new arg `min_quality` to ***search()*** method.

We can set some minimum acceptable value in addition to `k` values limit.

The reason is simple. We don't need low similarity responses. 

Better do NOT return anything instead of `k` number of low similarity items which may confuse our LLM model.

`min_quality` value depends on distance metrics method type and should be selected after some analysis.

In [11]:
class DB:
    def __init__(self, model: EmbeddingModel) -> None:
        self.vectors: dict[str, Vector] = {}
        self.model = model

    def insert(self, vector: Vector) -> None:
        self.vectors[vector.key] = vector

    def search(
        self,
        query_vector: np.array,
        k: int,
        min_quality: float,
        distance_measure: Callable,
    ) -> list[Vector]:
        scores = []
        for vector in self.vectors.values():
            similarity = distance_measure(query_vector, vector.data)
            if similarity < min_quality:
                continue
            scores.append((vector, similarity))
        return sorted(scores, key=lambda i: i[1], reverse=True)[:k]

    def search_by_text(
        self,
        query_text: str,
        k: int = 5,
        min_quality: float = 0.5,
        metric_class: DistanceMetric = CosineSimilarity,
        text_only: bool = False,
    ) -> list[tuple[str, float]]:
        query_vector = self.model.get_embedding(query_text)
        response = self.search(query_vector, k, min_quality, metric_class.count)
        return [item[0].key for item in response] if text_only else response

## File parsers

In [12]:
class FileParser(ABC):
    @abstractmethod
    def parse_file(self, path: str) -> str:
        pass


class DocxFileParser(FileParser):
    def parse_file(self, path: str) -> str:
        doc = docx.Document(path)
        return "\n".join([para.text for para in doc.paragraphs])


class TxtFileParser(FileParser):
    def parse_file(self, path: str) -> str:
        with open(path, "r", encoding="utf-8", errors="ignore") as file:
            return file.read()


class PdfFileParser(FileParser):
    def parse_file(self, path: str) -> str:
        reader = PdfReader(path)
        content = []
        for page in reader.pages:
            content.append(page.extract_text())
        return "\n".join(content)

In [13]:
# Fast test
assert DocxFileParser().parse_file("data/Chinese.docx")
assert TxtFileParser().parse_file("data/Chinese.txt")
assert PdfFileParser().parse_file("data/Chinese.pdf")

## Splitter

After analyzing the input files, I saw a pattern in splitting recipes using double square brackets and numbers inside

Examples: [1] `<recipe>` [123] `<recipe>` [323]

That's why I decided to write a custom splitter that will bring us closer to that golden batch

In the future, if we want to load files that are formatted differently, 
we will need to have another splitter, or modify the current one to meet the new requirements.

In [14]:
class CookbookTextSplitter:
    pattern = r"\[\d+\]"

    def split(self, text: str) -> list[str]:
        return re.split(self.pattern, text)

In [15]:
splitter = CookbookTextSplitter()

<br>

### Select any one of the following

In [16]:
path_to_file = "data/Chinese.docx"
# path_to_file = "data/Chinese.pdf"
# path_to_file = "data/Chinese.txt"

<br>

In [17]:
_, file_ext = os.path.splitext(path_to_file)

parsers_map = {".docx": DocxFileParser, ".txt": TxtFileParser, ".pdf": PdfFileParser}

text = parsers_map[file_ext]().parse_file(path_to_file)

In [18]:
splitted_text = splitter.split(text)

In [19]:
print(len(splitted_text))

102


<br>

In [20]:
# THIS CELL EAT MONEY BALANCE / DO NOT SPAM IT
embedding_model = EmbeddingModel(settings=settings)
embeddings = asyncio.run(embedding_model.async_get_embeddings(splitted_text))

### Fill DB with vectors

In [21]:
db = DB(model=embedding_model)

for item, emb in zip(splitted_text, embeddings):
    metadata = {
        "path_to_file": path_to_file,
        "model": embedding_model.embeddings_model_name,
    }
    vector = Vector(data=emb, key=item, metadata=metadata)
    db.insert(vector)

In [22]:
len(db.vectors)

102

## LLM part

Now let's test LLM responses with and without RAG

We will try to ask some questions about chinese food recipe (provided below) that stored in sourse file and compare results

We already have a vector database filled with data, so we will use it

Food recipe reference will be provided below

In [23]:
# LLM model class


class ChatOpenAI:
    def __init__(self, settings: Settings) -> None:
        self.model_name = settings.llm_model
        self.client = openai.OpenAI(api_key=settings.openai_api_key)

    def run(self, messages, text_only: bool = True):
        if not isinstance(messages, list):
            raise ValueError("messages must be a list")

        chat_completion = self.client.chat.completions.create(
            messages=messages, model=self.model_name
        )

        if text_only:
            return chat_completion.choices[0].message.content

        return chat_completion


chat = ChatOpenAI(settings=settings)

In [24]:
# Prompt class


class Prompt:
    def as_user(self, text: str) -> dict[str, str]:
        return {"role": "user", "content": text}

    def as_assistant(self, text: str) -> dict[str, str]:
        return {"role": "assistant", "content": text}

    def as_system(self, text: str) -> dict[str, str]:
        return {"role": "system", "content": text}


prompt = Prompt()

## Mu Shu Pork

1/2 lb pork tenderloin, cut into 1/4-inch strips

Marinade:
2 tsps soy sauce
1/2 tsp cornstarch,
1/8 tsp salt
1/2 tsp dry sherry
1/4 tsp sugar

Cooking Sauce:
2 tsps soy sauce
2 tsps dry sherry
1 tsp sesame oil
1/2 tsp sugar
10 6-inch flour tortillas
2 1/2 tsps cooking oil
2 eggs, beaten with 1/4 tsp salt
1 1/2 oz vermicelli, soaked and cut into 2-inch pieces
1 large tree ears, soaked and thinly sliced
4 medium Chinese dried mushrooms, soaked and thinly sliced
3 whole green onions, shredded
1/2 cup red bell pepper, shredded
1/2 cup bamboo shoots, cut into matchstick pieces
hoisin sauce for serving

Combine the marinade ingredients and the meat in a bowl, marinate and
set aside.

Wrap tortillas in foil and warm for 10 minutes in a 350F oven.
Heat skillet with 1 1/2 tsp of oil, over high heat. Add the beaten eggs
and tilting the skillet to distribute the eggs into thin pancake and cook
just until set. Remove to cutting surface and cut into 1/4-inch wide
strips.

Prepare cooking sauce and set aside.

Heat a wok and add 2 tsps of the oil, stir-fry the pork mixture for 2
minutes or until meat is browned. Add tree ears, mushrooms, green onions,
red pepper, and bamboo shoots, toss for a few seconds, then add the
vermicelli and egg strips. Stir in the sauce mixture and toss until
combined. Tranfer to a platter and serve with warm tortillas.

To eat, spread a little of the hoisin sauce over tortilla, place some
pork mixture on top, fold it over and eat as sandwich-style.

Makes 4 servings as a main course or 8 with other dishes.

### No RAG

In [25]:
messages_no_rag = []
messages_no_rag.append(
    prompt.as_system("You are an expert in Chinese cuisine and always respond kindly.")
)
messages_no_rag.append(
    prompt.as_user(
        "What is Mu Shu Pork? Write me a detailed recipe for this dish, starting with the ingredients and ending with the cooking method."
    )
)

In [26]:
# THIS CELL EAT MONEY BALANCE / DO NOT SPAM IT
print(chat.run(messages_no_rag))

Mu Shu Pork is a delicious and flavorful Chinese dish that's popular both in China and around the world. It features stir-fried pork with vegetables, typically served with Mandarin pancakes and hoisin sauce. Here's a detailed recipe for making Mu Shu Pork at home:

### Ingredients:

#### For the Pork Marinade:
- 1/2 pound (about 225 grams) pork tenderloin or shoulder, thinly sliced
- 1 tablespoon soy sauce
- 1 teaspoon rice wine or dry sherry
- 1/2 teaspoon cornstarch
- 1/4 teaspoon sesame oil

#### For the Stir-Fry:
- 2 tablespoons vegetable oil, divided
- 2 large eggs, beaten
- 3 dried wood ear mushrooms, soaked in warm water until soft and thinly sliced
- 1/2 cup bamboo shoots, julienned
- 1/2 cup napa cabbage, shredded
- 1/2 cup carrots, julienned
- 3 green onions, sliced
- 1 tablespoon ginger, minced
- 1 tablespoon garlic, minced

#### For the Sauce:
- 2 tablespoons hoisin sauce
- 1 tablespoon soy sauce
- 1 tablespoon oyster sauce
- 1 teaspoon sugar
- 2 teaspoons cornstarch mixed 

<br>

### Let's add RAG now

In [27]:
RAG_PROMPT = """
You are an expert in Chinese cuisine and always respond kindly.

Use the provided recipes context to answer the user's query.

Recipes context will contain data from database.

At the beginning of each recipe there will be a <recipe_start> tag, and at the end <recipe_end>.

The recipes will be separated from each other by two new lines.

If you come across a recipe in context that is not mentioned in the user's query, you can ignore it.

You may not answer the user's query unless there is specific recipe context in the following text.

If you do not know the answer, or cannot answer, please respond with "I don't know".
"""

In [28]:
USER_PROMPT_TEMPLATE = """
Recipes context:
{context}

User Query:
{user_query}
"""

In [29]:
user_query = "What is Mu Shu Pork? Write me a detailed recipe for this dish, starting with the ingredients and ending with the cooking method."

## Some thoughts on the chosen values

A few words about the chosen minimum quality value and the chosen metric class.

After some tests and playing around with the metric classes, I realized that the best possible class is `Cosine Similarity`

It provides the most relevant values for our case with high-dimensional vectors.

I also tested some values for the `min_quality` argument.

The tests show that the really good values are stored between **0.6** and **1**

Elements below 0.6 are not so good, so we will ignore them.

In [30]:
# THIS CELL EAT MONEY BALANCE / DO NOT SPAM IT
search_result = db.search_by_text(
    user_query, k=5, min_quality=0.6, metric_class=CosineSimilarity
)

In [31]:
context = "\n\n".join(
    [f"<recipe_start>\n{item[0].key.strip()}\n<recipe_end>" for item in search_result]
)
user_query = USER_PROMPT_TEMPLATE.format(context=context, user_query=user_query)

In [32]:
messages_with_rag = []
messages_with_rag.append(prompt.as_system(RAG_PROMPT))
messages_with_rag.append(prompt.as_user(user_query))

In [33]:
# THIS CELL EAT MONEY BALANCE / DO NOT SPAM IT
print(chat.run(messages_with_rag))

Mu Shu Pork is a traditional Chinese dish. Here is a detailed recipe for preparing Mu Shu Pork:

**Ingredients:**

- **For the Pork and Marinade:**
  - 1/2 lb pork tenderloin, cut into 1/4-inch strips
  - 2 tsps soy sauce
  - 1/2 tsp cornstarch
  - 1/8 tsp salt
  - 1/2 tsp dry sherry
  - 1/4 tsp sugar

- **For the Cooking Sauce:**
  - 2 tsps soy sauce
  - 2 tsps dry sherry
  - 1 tsp sesame oil
  - 1/2 tsp sugar

- **Additional Ingredients:**
  - 10 6-inch flour tortillas
  - 2 1/2 tsps cooking oil
  - 2 eggs, beaten with 1/4 tsp salt
  - 1 1/2 oz vermicelli, soaked and cut into 2-inch pieces
  - 1 large tree ear, soaked and thinly sliced
  - 4 medium Chinese dried mushrooms, soaked and thinly sliced
  - 3 whole green onions, shredded
  - 1/2 cup red bell pepper, shredded
  - 1/2 cup bamboo shoots, cut into matchstick pieces
  - Hoisin sauce, for serving

**Cooking Method:**

1. **Marinate the Pork:**
   - Combine the marinade ingredients (soy sauce, cornstarch, salt, dry sherry, and su

In [34]:
# Not let's try to mention some unreal dish and check the result

In [35]:
user_strange_query = "What is Death Star? Write me a detailed recipe for this dish, starting with the ingredients and ending with the cooking method."

In [36]:
# THIS CELL EAT MONEY BALANCE / DO NOT SPAM IT
search_result = db.search_by_text(user_strange_query, k=5, min_quality=0.6)

In [37]:
len(search_result)

0

In [38]:
# As expected no results for such cringe

In [39]:
context = "\n\n".join(
    [f"<recipe_start>\n{item[0].key.strip()}\n<recipe_end>" for item in search_result]
)
user_strange_query = USER_PROMPT_TEMPLATE.format(
    context=context, user_query=user_strange_query
)

In [40]:
print(user_strange_query)


Recipes context:


User Query:
What is Death Star? Write me a detailed recipe for this dish, starting with the ingredients and ending with the cooking method.



In [41]:
messages_with_rag = []
messages_with_rag.append(prompt.as_system(RAG_PROMPT))
messages_with_rag.append(prompt.as_user(user_strange_query))

In [42]:
# THIS CELL EAT MONEY BALANCE / DO NOT SPAM IT
print(chat.run(messages_with_rag))

I don't know.


In [43]:
# Cool, because no one knows :)

# Summary

### Goals:
1. Add one of the following "extras" to the RAG pipeline:
    1. Allow it to work with PDF files [Done]
    2. Implement a new distance metric [Done]
    3. Add metadata support to the vector database [Done]
2. Make a simple diagram of the RAG process [Done]
3. Run the notebook [Done]
4. Record a Loom walking through the notebook, the questions in the notebook, and your addition!