In [1]:
import os
import cv2
import numpy as np
import re
from typing import List
from dotenv import load_dotenv
from llama_index.multi_modal_llms.openai import OpenAIMultiModal
from llama_index.llms.openai import OpenAI
from pydantic import BaseModel, Field
from engine import StandarOpenAIBuilder
from llama_index.core.query_pipeline import QueryPipeline
from llama_index.core import PromptTemplate
from llama_index.core.schema import TextNode
from llama_index.core.schema import ImageDocument
from llama_index.core.indices import MultiModalVectorStoreIndex
from llama_index.vector_stores.pinecone import PineconeVectorStore
from pinecone import Pinecone, ServerlessSpec
from llama_index.core import Settings, StorageContext, VectorStoreIndex
from fitz import open as fitz_open
from llama_index.core import SimpleDirectoryReader
from llama_index.core import load_index_from_storage

from llama_index.core.vector_stores import MetadataInfo, VectorStoreInfo
from llama_index.core.indices.multi_modal.retriever import (
    MultiModalVectorIndexRetriever,
)
from llama_index.core.response.notebook_utils import display_source_node
from llama_index.core.schema import ImageNode
from IPython.display import Image
from llama_index.core.vector_stores import (
    MetadataFilter,
    MetadataFilters,
    FilterOperator,
)
from llama_index.core.schema import MetadataMode
from llama_index.core import QueryBundle
from llama_index.core.schema import NodeWithScore

load_dotenv(override=True)

PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")

store_path = "data"
temp_path = "data/temp"
index_name_text = "text-index"
index_name_image = "images-index"


In [15]:
def process_pdf(pdf_path):
    """
    Processes a PDF file and extracts each page as an image.

    Args:
        pdf_path (str): The path to the PDF file to be processed.

    Returns:
        None

    This function performs the following steps:
    1. Opens the PDF file using the `fitz_open` function.
    2. Extracts the file name from the given PDF path.
    3. Creates a directory to store the extracted images if it does not already exist.
    4. Iterates through each page of the PDF document.
    5. Converts each page to an image using the `get_pixmap` method.
    6. Saves each image as a PNG file in the created directory.
    7. Closes the PDF document.
    """
    pdf_document = fitz_open(pdf_path)
    file_name = pdf_path.split("/")[-1].split(".")[0]
    if not os.path.exists(f"./{store_path}/temp/{file_name}"):
        os.makedirs(f"./{store_path}/temp/{file_name}")

    for page_number in range(pdf_document.page_count):
        page = pdf_document[page_number]
        pix = page.get_pixmap()
        image_array = np.frombuffer(pix.samples, dtype=np.uint8).reshape(
            pix.height, pix.width, 3
        )
        cv2.imwrite(
            f"./{store_path}/temp/{file_name}/{file_name}_{page_number + 1}.png",
            image_array,
        )
    pdf_document.close()

def parse_pdf_folder(pdf_folder):
    """
    Parses a folder containing PDF files and processes each PDF.

    Args:
        pdf_folder (str): The path to the folder containing PDF files.

    Returns:
        None
    """
    pdfs = [f for f in os.listdir(pdf_folder) if f.endswith(".pdf")]
    for pdf in pdfs:
        process_pdf(f"{pdf_folder}/{pdf}")

def init_pinecone():
    """
    Initializes and configures Pinecone indexes for text and image data.

    This function performs the following steps:
    1. Creates a Pinecone client using the provided API key.
    2. Lists existing indexes in Pinecone.
    3. Checks if the specified text and image indexes exist; if not, creates them with the specified dimensions and metrics.
    4. Initializes Pinecone indexes for text and image data.
    5. Creates PineconeVectorStore instances for text and image data.

    Returns:
        tuple: A tuple containing two PineconeVectorStore instances:
            - text_store: The PineconeVectorStore instance for text data.
            - image_store: The PineconeVectorStore instance for image data.
    """
    pc = Pinecone(api_key=PINECONE_API_KEY)
    pc_list = pc.list_indexes()
    if not any(index_name_text == idx["name"] for idx in pc_list.indexes):
        pc.create_index(
            name=index_name_text,
            dimension=1536,
            metric="dotproduct",
            spec=ServerlessSpec(cloud="aws", region="us-east-1"),
        )
    if not any(index_name_image == idx["name"] for idx in pc_list.indexes):
        pc.create_index(
            name=index_name_image,
            dimension=512,
            metric="dotproduct",
            spec=ServerlessSpec(cloud="aws", region="us-east-1"),
        )

    pinecone_index_text = pc.Index(index_name_text)
    pinecone_index_image = pc.Index(index_name_image)
    image_store = PineconeVectorStore(
        pinecone_index=pinecone_index_image, index_name=index_name_text
    )
    text_store = PineconeVectorStore(
        pinecone_index=pinecone_index_text, index_name=index_name_text
    )
    return text_store, image_store



def ingest_data(documents, text_store, image_store):
    """
    Ingests data into a multi-modal vector store index and persists the storage context.

    Args:
        documents (list): A list of documents to be ingested.
        text_store (VectorStore): The vector store for text data.
        image_store (ImageStore): The image store for image data.

    Returns:
        tuple: A tuple containing the created MultiModalVectorStoreIndex and the StorageContext.
    """
    storage_context = StorageContext.from_defaults(
        vector_store=text_store, image_store=image_store
    )

    index = MultiModalVectorStoreIndex.from_documents(
        documents=documents,
        storage_context=storage_context,
        store_nodes_override=True,
    )
    index.storage_context.persist(persist_dir="./storage")
    return index, storage_context


def load_index_from_storage_folder(text_store, image_store):
    """
    Load an index from the specified storage folder.

    This function initializes a storage context using the provided text store
    and a default persistence directory. It then loads an index from the storage
    context and the provided image store.

    Args:
        text_store (str): The path or identifier for the text storage.
        image_store (str): The path or identifier for the image storage.

    Returns:
        Index: The loaded index from the storage.
    """
    storage_context = StorageContext.from_defaults(
        vector_store=text_store, persist_dir="./storage"
    )
    index = load_index_from_storage(storage_context, image_store=image_store)
    return index


# TODO: Extract more metadata as summary, title, etc. So wa can also have more information in the metadata
def get_meta(file_path):
    """
    Extracts metadata from a given file path.

    Args:
        file_path (str): The path to the file.

    Returns:
        dict: A dictionary containing the file path, item part, and page part.

    Raises:
        ValueError: If the file_path is not a string.
        ValueError: If the file name format is incorrect.
    """
    if not isinstance(file_path, str):
        raise ValueError("file_name must be a string")

    file_name = os.path.basename(file_path)
    pattern = re.compile(r"^[a-zA-Z0-9-]+__[a-zA-Z0-9--]+_\d+\.[a-zA-Z0-9]+$")
    if not pattern.match(file_name):
        raise ValueError("file name format is incorrect")

    try:
        item_part = file_name.split("__")[0]
        page_part = file_path.split("_")[-1].split(".")[0]
        return {
            "file_path": file_path,
            "item": item_part,
            "page": page_part,
        }
    except (IndexError, ValueError):
        raise ValueError("file name format is incorrect")

def display_response(nodes: List[TextNode]):
    """
    Display the content of each TextNode in the provided list.

    Args:
        nodes (List[TextNode]): A list of TextNode objects to display.

    Returns:
        None
    """
    """Display response."""
    for node in nodes:
        print(node.get_content(metadata_mode="all"))




In [17]:
vector_store_info = VectorStoreInfo(
    content_info="Manuals",
    metadata_info=[
        MetadataInfo(
            name="item",
            description="Item of the ikea manual.",
            type="string",
        ),
        MetadataInfo(
            name="file_path",
            description="File path of the image of the manual",
            type="string",
        ),
    ],
)


engine = StandarOpenAIBuilder()
text_store, image_store = init_pinecone()
parse_pdf_folder(store_path)
documents = SimpleDirectoryReader(
    input_dir=temp_path, filename_as_id=True, recursive=True, file_metadata=get_meta
).load_data()

index, store_context = ingest_data(documents, text_store, image_store)

  pattern = re.compile(r"^[a-zA-Z0-9-]+__[a-zA-Z0-9--]+_\d+\.[a-zA-Z0-9]+$")


Upserted vectors:   0%|          | 0/100 [00:00<?, ?it/s]

In [18]:
def retriever_custom(
        query: str = Field(
          description="Original query from the user, should be always the original one",
          examples=["Which ones are the tools of the tuffle furniture?"]
        ),
        filter_entinty: str = Field(
          description="The primary entity the user is inquiring about, typically the item for which they need instructions from the manual. Must be in lowercase and just one word",
          examples=["tuffle"]
        ),
        return_images: bool = False
    ):
    """
    Retrieves relevant images and generates a response based on the provided query and filter entity.
    Args:
        query (str): Original query from the user, should be always the original one.
            Example: "Which ones are the tools of the tuffle furniture?"
        filter_entinty (str): The primary entity the user is inquiring about, typically the item for which they need instructions from the manual.
            Must be in lowercase and just one word.
            Example: "tuffle"
    Returns:
        tuple: A tuple containing the response from the language model and the retrieval results.
    """
    retriever_engine = index.as_retriever(
        similarity_top_k=3,
        image_similarity_top_k=10,
        vector_store_info=vector_store_info,
        verbose=True,
        filters=MetadataFilters(
            filters=[
                MetadataFilter(
                    key="item",
                    operator=FilterOperator.EQ,
                    value=filter_entinty,
                ),
            ]
        ),
    )
    
    assert isinstance(retriever_engine, MultiModalVectorIndexRetriever)
    
    retrieval_results = retriever_engine.text_to_image_retrieve(query)
    retrieved_images = []
    for res_node in retrieval_results:
        if isinstance(res_node.node, ImageNode):
            retrieved_images.append(res_node.node.metadata["file_path"])
        else:
            print(display_source_node(res_node, source_length=200))
    
    
    import matplotlib.pyplot as plt
    from PIL import Image
    
    from llama_index.core.schema import ImageDocument
    
    image_documents = [
        ImageDocument(image_path=image_path) for image_path in retrieved_images
    ]
    response = engine.llm.complete(
        prompt=query,
        image_documents=image_documents,
    )
    if return_images: 
        return response, retrieval_results
    return response, []

In [19]:
# Only for evaluators for lack of time
def extract_keyword_from_query(query):
    llm = OpenAI()
    json_prompt_str = """\
        Please extract the entity that que question is referencing: {question}.  Retrieve only the single word of in "minus", the item brand, like 'fredy' or 'tuffing'. 
    """
    llm = OpenAI(model="gpt-3.5-turbo")
    json_prompt_tmpl = PromptTemplate(json_prompt_str)
    p = QueryPipeline(
        chain=[json_prompt_tmpl, llm],
        verbose=True,
    )
    output = p.run(question=query)
    return output

In [20]:
def get_product_list():
    with os.scandir("data") as entries:
        products = [entry.name for entry in entries if entry.is_file()]
    print(products)
    return ", ".join(products)

In [21]:
from llama_index.core.tools import FunctionTool
from llama_index.core.agent import ReActAgent
image_retriever_tool = FunctionTool.from_defaults(fn=retriever_custom, description="Agent that will interpretate ikea manuals, so will be usefull for answer question related to that. You must pass the query and the main entity to the function tool. You must go and call the function always to get the images from the documents. If you not have results, maybe call first get_product_list_tool")
get_product_list_tool = FunctionTool.from_defaults(fn=get_product_list, description="Tool that provide list of products and names in the storage, usefull when user mistake with the name or you can't find the product")
llm = OpenAI(model="gpt-3.5-turbo")
agent = ReActAgent.from_tools([image_retriever_tool, get_product_list_tool], llm=llm, verbose=True,
                              system_prompt =f""" 
                              Agent that will answer general question regarding ikea manuals, 
                              not use any prior knowledge, only answer from documents or other agents. 
                              First call get_product_list_tool tool to check if the user is asking about an product of the list.
                              If you dont find a product item by the query, you can go to check the list of products using your tools and then figure the item, but if you think as a typo error, but if is completely different product that is not in the list, you can ask to the user that you don't have that item. 
                              If you can't answer, ask more information to the user.
                              Always try to figure out if the user wanted to answer about a product on the list, so if he mistake with the name of the product try to figure out wich ones he wants using the get_product_list_tool.
                              Always answer in the language the main user talk to you initially
                              """)

In [22]:
response = agent.chat("What is step 4 of assembling the Tuffing")

> Running step 8283efd9-7e46-4799-b9f7-27ff183235e9. Step input: What is step 4 of assembling the Tuffing
[1;3;38;5;200mThought: The current language of the user is: English. I need to use a tool to help me answer the question.
Action: retriever_custom
Action Input: {'query': 'What is step 4 of assembling the Tuffing', 'filter_entinty': 'tuffing', 'return_images': False}
[0m[1;3;34mObservation: (CompletionResponse(text='In step 4 of assembling the Tuffing, you need to attach two screws to secure the fabric panel between two horizontal poles. Make sure the fabric is stretched evenly and the screws are tightened properly.', additional_kwargs={}, raw=ChatCompletion(id='chatcmpl-ANH9Rl4K2II2WTOJhK7zwTPIf5prQ', choices=[Choice(finish_reason='stop', index=0, logprobs=None, message=ChatCompletionMessage(content='In step 4 of assembling the Tuffing, you need to attach two screws to secure the fabric panel between two horizontal poles. Make sure the fabric is stretched evenly and the screws 

In [23]:
print(response)

In step 4 of assembling the Tuffing, you need to attach two screws to secure the fabric panel between two horizontal poles. Make sure the fabric is stretched evenly and the screws are tightened properly.


In [24]:
image_documents_chunks = [
    {
        "item_folder": "smagoera-wardrobe-white__AA-2177175-4-100",
        "pages": list(range(6, 32)),
        "question": "Give a step-by-step instruction guide on how to assemble the Smagoera.",
    },
    {
        "item_folder": "tuffing-bunk-bed-frame-dark-grey__AA-1627840-10-2",
        "pages": [1, 35],
        "question": "What does the Tuffing look like?",
    },
    {
        "item_folder": "fredde-gaming-desk-black__AA-2508156-1-100",
        "pages": [3],
        "question": "What parts are included in the Fredde?",
    },
    {
        "item_folder": "tuffing-bunk-bed-frame-dark-grey__AA-1627840-10-2",
        "pages": [8],
        "question": "What is step 4 of assembling the Tuffing?",
    },
]

def generate_correct_answers():
    for tests in image_documents_chunks:
        image_documents = [
            ImageDocument(
                image_path=f"{store_path}/temp/{tests['item_folder']}/{tests['item_folder']}_{page}.png"
            )
            for page in tests["pages"]
        ]
        response = engine.llm.complete(
            prompt=tests["question"],
            image_documents=image_documents,
            temperature=0.0,
        )
        tests["response"] = response
    return image_documents_chunks

In [25]:
generate_correct_answers()

[{'item_folder': 'smagoera-wardrobe-white__AA-2177175-4-100',
  'pages': [6,
   7,
   8,
   9,
   10,
   11,
   12,
   13,
   14,
   15,
   16,
   17,
   18,
   19,
   20,
   21,
   22,
   23,
   24,
   25,
   26,
   27,
   28,
   29,
   30,
   31],
  'question': 'Give a step-by-step instruction guide on how to assemble the Smagoera.',
  'response': CompletionResponse(text='### Assembly Guide for Smagoera\n\n#### Tools Required:\n- Phillips screwdriver\n- Flathead screwdriver\n- Hammer\n- Drill with 8 mm (5/16") and 3 mm (1/8") bits\n\n#### Step-by-Step Instructions:\n\n1. **Preparation:**\n   - Ensure you have all the parts and tools ready.\n   - Lay out all pieces and hardware.\n\n2. **Base Assembly:**\n   - Attach 10 screws to the two base panels using the provided hardware.\n\n3. **Side Panel Assembly:**\n   - Insert the wooden dowels into the side panels.\n   - Secure with screws as shown.\n\n4. **Back Panel Installation:**\n   - Attach the back panel to the base using screws and 

In [26]:
def generate_eval_answers():
    for tests in image_documents_chunks:
        mssg = extract_keyword_from_query(tests["question"])
        item = str(mssg).split(":")[-1].strip().lower()
        response, images = retriever_custom(tests["question"], item, True)

        source_image_nodes = [
            score_img_node.node.metadata["file_path"]
            for score_img_node in images
        ]
        context = [
            score_img_node.node.metadata["item"]
            for score_img_node in images
        ]
        
        tests["predict"] = response
        tests["images"] = source_image_nodes
        tests["context"] = context

In [27]:
generate_eval_answers()


[1;3;38;2;155;135;227m> Running module adf58ca9-51c8-4f63-830a-02fbfe3bce67 with input: 
question: Give a step-by-step instruction guide on how to assemble the Smagoera.

[0m[1;3;38;2;155;135;227m> Running module 2ddf6e18-b2b0-4442-8518-2af2dd4ce881 with input: 
messages:         Please extract the entity that que question is referencing: Give a step-by-step instruction guide on how to assemble the Smagoera..  Retrieve only the single word of in "minus", the item brand...

[0m[1;3;38;2;155;135;227m> Running module 10c01d6c-7686-45dc-a5af-f8fa8dfdf925 with input: 
question: What does the Tuffing look like?

[0m[1;3;38;2;155;135;227m> Running module 5be1bfee-3f80-4ba6-acac-c4c65c9e6545 with input: 
messages:         Please extract the entity that que question is referencing: What does the Tuffing look like?.  Retrieve only the single word of in "minus", the item brand, like 'fredy' or 'tuffing'. 
    

[0m[1;3;38;2;155;135;227m> Running module f516440c-9cda-4d57-a0b9-1c9e1c0ef79

In [28]:
from llama_index.llms.openai import OpenAI
from llama_index.core.evaluation import CorrectnessEvaluator
from llama_index.core.evaluation.multi_modal import (
    MultiModalRelevancyEvaluator,
    MultiModalFaithfulnessEvaluator,
)

import os

judges = {}

judges["correctness"] = CorrectnessEvaluator(
    llm=OpenAI(temperature=0, model="gpt-4"),
)

judges["relevancy"] = MultiModalRelevancyEvaluator(
    multi_modal_llm=OpenAIMultiModal(
        model="gpt-4o",
        max_new_tokens=300,
    )
)

judges["faithfulness"] = MultiModalFaithfulnessEvaluator(
    multi_modal_llm=OpenAIMultiModal(
        model="gpt-4o",
        max_new_tokens=300,
    )
)

In [None]:
evals = {
    "names": list(range(1, 5)),
    "correctness": [],
    "relevancy": [],
    "faithfulness": [],
}

for data_entry in image_documents_chunks:
    batch_names = []
    batch_correctness = []
    batch_relevancy = []
    batch_faithfulness = []
    correctness_result = await judges["correctness"].aevaluate(
        query=data_entry["question"],
        response=data_entry["predict"].text,
        reference=data_entry['response'].text,
    )

    relevancy_result = judges["relevancy"].evaluate(
        query=data_entry["question"],
        response=data_entry["predict"].text,
        contexts=data_entry["context"],
        image_paths=data_entry["images"],
    )

    faithfulness_result = judges["faithfulness"].evaluate(
        query=data_entry["question"],
        response=data_entry["predict"].text,
        contexts=data_entry["context"],
        image_paths=data_entry["images"],
    )
    batch_correctness.append(correctness_result)
    batch_relevancy.append(relevancy_result)
    batch_faithfulness.append(faithfulness_result)

    evals["correctness"] += batch_correctness
    evals["relevancy"] += batch_relevancy
    evals["faithfulness"] += batch_faithfulness

# save evaluations
evaluations_objects = {
    "names": list(range(1, 5)),
    "correctness": [e.dict() for e in evals["correctness"]],
    "faithfulness": [e.dict() for e in evals["faithfulness"]],
    "relevancy": [e.dict() for e in evals["relevancy"]],
}


In [None]:
evaluations_objects

In [None]:
from llama_index.core.evaluation.notebook_utils import get_eval_results_df
import pandas as pd

deep_eval_df, mean_correctness_df = get_eval_results_df(
   list(range(1, 5)), evals["correctness"], metric="correctness"
)
_, mean_relevancy_df = get_eval_results_df(
    list(range(1, 5)), evals["relevancy"], metric="relevancy"
)
_, mean_faithfulness_df = get_eval_results_df(
    list(range(1, 5)), evals["faithfulness"], metric="faithfulness"
)

mean_scores_df = pd.concat(
    [
        mean_correctness_df.reset_index(),
        mean_relevancy_df.reset_index(),
        mean_faithfulness_df.reset_index(),
    ],
    axis=0,
    ignore_index=True,
)
mean_scores_df = mean_scores_df.set_index("index")
mean_scores_df.index = mean_scores_df.index.set_names(["metrics"])
mean_scores_df.index.name = 'sample' 
mean_scores_df = mean_scores_df.rename(index={'rag': 'samples'})


In [None]:
mean_scores_df