Metadata Filtering
Metadata filtering:

- extracting keywords from every chunk
- at seatch time, filter appropriate chunks based on the metadata


In [1]:
from llama_index.core import Settings
from llama_index.llms.google_genai import GoogleGenAI
from llama_index.embeddings.google_genai import GoogleGenAIEmbedding
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
from llama_index.vector_stores.qdrant import QdrantVectorStore
from qdrant_client import QdrantClient
from typing import Any, List, Optional, Sequence, Union

import asyncio
import google.genai.errors
import json
import logging as log
import os
import pprint
import qdrant_client
import time

In [2]:
import nest_asyncio
nest_asyncio.apply()

In [3]:
FORMAT_STRING = "%(module)s.%(funcName)s():%(lineno)d %(asctime)s\n[%(levelname)-5s] %(message)s\n"
log.basicConfig(level= log.ERROR, format=FORMAT_STRING)
log.info('Info text')

In [4]:
GOOGLE_API_KEY_NAME='GOOGLE_API_KEY'
assert GOOGLE_API_KEY_NAME in os.environ

OPEN_AI_KEY_NAME='OPENAI_API_KEY'
assert OPEN_AI_KEY_NAME in os.environ

TAI_DATASET_ROOT_NAME='TAI_DATASET_ROOT'
assert TAI_DATASET_ROOT_NAME in os.environ

model_name= "gemini-2.0-flash-lite"
num_data_samples= 10

In [5]:
llm = GoogleGenAI(model=model_name)
resp = llm.complete("Who is Paul Graham?")
print(resp)

Paul Graham is a prominent figure in the tech world, known for his work as a computer programmer, essayist, investor, and co-founder of the influential startup accelerator, **Y Combinator**.

Here's a breakdown of his key contributions:

*   **Computer Programmer:** He's a skilled programmer, particularly in Lisp. He wrote the first web application, Viaweb, which was later acquired by Yahoo! and became Yahoo! Store.
*   **Essayist:** Graham is well-known for his insightful and thought-provoking essays on a wide range of topics, including:
    *   **Startups and Entrepreneurship:** His essays are highly regarded in the startup community, offering advice on building companies, raising funding, and avoiding common pitfalls.
    *   **Programming and Technology:** He writes about programming languages, software development, and the evolution of technology.
    *   **Philosophy and Culture:** He explores broader themes related to society, education, and the pursuit of knowledge.
    *   **H

In [6]:
#pprint.pp(resp)

In [7]:
from llama_index.core.llms.function_calling import FunctionCallingLLM
from llama_index.core.base.llms.types import ChatMessage
from llama_index.core.constants import DEFAULT_TEMPERATURE, DEFAULT_NUM_OUTPUTS

class MyLLM(FunctionCallingLLM):
    def __init__(self, 
                 model_name: str,
                 temperature: float = DEFAULT_TEMPERATURE,
                 max_tokens: Optional[int] = None,):
        # Initialize the base class and the GoogleGenAI client
        super().__init__(model= model_name, temperature=temperature, max_tokens=max_tokens)
        self._llm = GoogleGenAI(model=model_name)
        self._num_achat_requests = 0

    @property
    def metadata(self): return self._llm.metadata

    def complete( self, prompt: str, formatted: bool = False, **kwargs: Any):
        log.info('complete executed.')
        try:
            return self._llm.complete( prompt, formatted, **kwargs)
        except Exception as exc:
            log.error(f'complete: exception:{type(my_llm).__name__}: {exc.message}')
            raise

    async def acomplete(self, prompt: str, formatted: bool = False, **kwargs: Any):        
        log.info('acomplete executed.')
        result = await self._llm.acomplete( prompt, formatted, **kwargs)
        return result

    def stream_complete( self, prompt: str, formatted: bool = False, **kwargs: Any):
        log.info('stream_complete executed.')
        return self._llm.stream_complete( prompt, formatted, **kwargs)

    async def astream_complete( self, prompt: str, formatted: bool = False, **kwargs: Any):
        log.info('astream_complete executed.')
        result = await self._llm.astream_complete( prompt, formatted, **kwargs)    
        return result

    def chat(self, messages: Sequence[ChatMessage], **kwargs: Any):
        log.info('chat executed.')
        return self._llm.chat( messages, **kwargs)    

    async def achat(self, messages: Sequence[ChatMessage], **kwargs: Any):
        
        try:
            result = await self._llm.achat( messages, **kwargs)    
            self._num_achat_requests += 1
            return result
        except google.genai.errors.ClientError as exc:
            log.error(f'achat: ClientError:{type(exc).__name__}: {exc.message}')
            if (exc.code==429):
                log.error(f'achat: RESOURCE EXAUSTED....................................................... num reqs:{self._num_achat_requests}')    
            else:
                log.error('achat: NOT RESOURCE EXAUSTED...................................................')    
                raise
        except Exception as exc:
            log.error(f'achat: exception:{type(exc).__name__}: {exc.message}')
            raise

        await asyncio.sleep( 60)
        self._num_achat_requests = 0
        log.error('Second attempt...........................................................................')    
        try:
            result = await self._llm.achat( messages, **kwargs)    
            self._num_achat_requests += 1
            log.error('Second attempt executed ..................................................................')    
            return result
        except Exception as exc:
            log.error(f'achat: exception:{type(exc).__name__}: {exc.message}')
            raise
        
    def stream_chat(self, messages: Sequence[ChatMessage], **kwargs: Any):
        log.info('stream_chat executed.')
        return self._llm.stream_chat( messages, **kwargs)    

    async def astream_chat(self, messages: Sequence[ChatMessage], **kwargs: Any):
        log.info('astream_chat executed.')
        response= await self._llm.astream_chat( messages, **kwargs)            
        return response

    def _prepare_chat_with_tools(
        self,
        tools: Sequence["BaseTool"],
        user_msg: Optional[Union[str, ChatMessage]] = None,
        chat_history: Optional[List[ChatMessage]] = None,
        verbose: bool = False,
        allow_parallel_tool_calls: bool = False,
        tool_choice: Union[str, dict] = "auto",
        strict: Optional[bool] = None,
        **kwargs: Any):
        return self._llm._prepare_chat_with_tools(tools, 
                                                  user_msg, 
                                                  chat_history, 
                                                  verbose, 
                                                  allow_parallel_tools_calls, 
                                                  tool_choice,
                                                  strict,
                                                  **kwargs)        

my_llm = MyLLM(model_name=model_name)

In [8]:
print(f'my_llm:{type(my_llm).__name__}')

my_llm:MyLLM


In [9]:
"""
resp = my_llm.complete("Who is Paul Graham?")
print(resp)
"""

'\nresp = my_llm.complete("Who is Paul Graham?")\nprint(resp)\n'

In [10]:
Settings.llm = my_llm
Settings.embed_model = GoogleGenAIEmbedding(model="text-embedding-3-small")

## Create the Qdrant Vector DB

In [11]:
vector_db_path= os.path.join( os.environ[TAI_DATASET_ROOT_NAME],'qdrant_vect_db_ai_tutor')
vector_db_path

'/home/minguzzi/repo/towards_ai_course/dataset/qdrant_vect_db_ai_tutor'

In [12]:
ai_tutor_knowledge_file_path= os.path.join( os.environ[TAI_DATASET_ROOT_NAME],'ai_tutor_knowledge','ai_tutor_knowledge.jsonl')
ai_tutor_knowledge_file_path

'/home/minguzzi/repo/towards_ai_course/dataset/ai_tutor_knowledge/ai_tutor_knowledge.jsonl'

In [13]:
qdrant_client = QdrantClient(path=vector_db_path)

vector_store = QdrantVectorStore( client=qdrant_client,  collection_name="ai_tutor_knowledge")

In [14]:
with open(ai_tutor_knowledge_file_path, "r") as file:
    ai_tutor_knowledge = [json.loads(line) for line in file][0:num_data_samples]
    
ai_tutor_knowledge[1]['content']
len(ai_tutor_knowledge)

10

In [15]:
documents = ai_tutor_knowledge

In [16]:
from typing import List
from llama_index.core import Document

def create_docs_from_list(data_list: List[dict]) -> List[Document]:
    documents = []
    for data in data_list:
        documents.append(
            Document(
                doc_id=data["doc_id"],
                text=data["content"],
                metadata={  # type: ignore
                    "url": data["url"],
                    "title": data["name"],
                    "tokens": data["tokens"],
                    "source": data["source"],
                },
                excluded_llm_metadata_keys=[
                    "title",
                    "tokens",
                    "source",
                ],
                excluded_embed_metadata_keys=[
                    "url",
                    "tokens",
                    "source",
                ],
            )
        )
    return documents

doc = create_docs_from_list(documents)
doc[1].excluded_llm_metadata_keys

['title', 'tokens', 'source']

<b> Split the text into Chunks</b>

In [17]:
from llama_index.core.node_parser import TokenTextSplitter

# Define the splitter object that split the text into segments with 512 tokens,
# with a 128 overlap between the segments.
text_splitter = TokenTextSplitter(separator=" ", chunk_size=512, chunk_overlap=128)

<B>Uses a predefined pipeline to generate the tags for each chunk</B>

In [18]:
from llama_index.core.extractors import (
    KeywordExtractor,
)
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.ingestion import IngestionPipeline

# Create the pipeline to apply the transformation on each chunk,
# and store the transformed text in the chroma vector store.
pipeline = IngestionPipeline(
    transformations=[
        text_splitter,
        KeywordExtractor(keywords=10, llm=my_llm),
        GoogleGenAIEmbedding(model="text-embedding-3-small"),
    ],
    vector_store=vector_store,
)

# Run the transformation pipeline.
nodes = pipeline.run(documents=doc, show_progress=True)

Parsing nodes:   0%|          | 0/10 [00:00<?, ?it/s]

3375812736.achat():51 2025-04-05 18:35:30,498█                                                                                                               | 17/63 [00:03<00:08,  5.68it/s]
[ERROR] achat: ClientError:ClientError: Resource has been exhausted (e.g. check quota).

3375812736.achat():53 2025-04-05 18:35:30,500
[ERROR] achat: RESOURCE EXAUSTED....................................................... num reqs:17

3375812736.achat():51 2025-04-05 18:35:30,596
[ERROR] achat: ClientError:ClientError: Resource has been exhausted (e.g. check quota).

3375812736.achat():53 2025-04-05 18:35:30,598
[ERROR] achat: RESOURCE EXAUSTED....................................................... num reqs:18

3375812736.achat():51 2025-04-05 18:35:31,044█████▊                                                                                                          | 19/63 [00:04<00:08,  5.23it/s]
[ERROR] achat: ClientError:ClientError: Resource has been exhausted (e.g. check quota).

3375812736.ac

Generating embeddings:   0%|          | 0/63 [00:00<?, ?it/s]

  self._client.create_payload_index(


In [25]:
len(nodes)

63

In [28]:
print(nodes[0].metadata['excerpt_keywords'])
print(nodes[1].metadata['excerpt_keywords'])
print(nodes[2].metadata['excerpt_keywords'])
type(nodes[2].metadata['excerpt_keywords'])

BERT, Hugging Face, Kubernetes, Docker, Model Deployment, FastAPI, Uvicorn, Minikube, Scalability, Containerization
BERT, HuggingFace, Kubernetes, Model Deployment, Docker, Minikube, Kubectl, GitHub, Containerization, MLops
NER, Ecommerce, Machine Learning, Custom Model, Github, Data Preparation, Spacy, Mobile Phone, Product Description, Annotations


str

In [30]:
keywords = set()
for node in nodes:
    keywords |= set(node.metadata['excerpt_keywords'].split(", "))
keywords    

{'AI',
 'AI Genie',
 'AI accelerator',
 'AI accelerators',
 'ASIC',
 'Accuracy',
 'AdaBoost',
 'Ai-Genie',
 'Algorithms',
 'AlphaGo',
 'Analogies',
 'Annotations',
 'Architecture',
 'Attention',
 'BERT',
 'Bayesian Optimization',
 'Bayesian optimization',
 'BiGS',
 'Bias',
 'Black Box',
 'Black Box Optimization',
 'Byte Pair Encoding',
 'CPU',
 'CPUs',
 'Categorical Crossentropy',
 'ChatGPT',
 'ChatGPT-4o',
 'Classification',
 'Coding',
 'Configuration File',
 'Containerization',
 'Context Length',
 'Convergence Plot',
 'Custom Model',
 'DNN',
 'DNN accelerators',
 'DPO',
 'DRAM',
 'Data',
 'Data Analysis',
 'Data Annotation',
 'Data Augmentation',
 'Data Conversion',
 'Data Preparation',
 'Debiasing',
 'Decision Trees',
 'Decoder',
 'Decoder Blocks',
 'Decoding',
 'Deep Learning',
 'DeepMind',
 'Developments',
 'Diagrams',
 'Discrimination',
 'DocBin',
 'Docker',
 'Dropout',
 'Ecommerce',
 'Efficiency',
 'Embedding',
 'Embeddings',
 'Encoding',
 'Ensemble Methods',
 'Equity',
 'Evalua

In [21]:
from llama_index.core.vector_stores import MetadataFilter, MetadataFilters, FilterOperator

filters = MetadataFilters(
    filters=[
        MetadataFilter(
            key="excerpt_keywords",
            operator=FilterOperator.TEXT_MATCH,
            value='Hugging Face',
        ),
    ]
)

In [22]:
from llama_index.core import VectorStoreIndex

# Create the index based on the vector store.
index = VectorStoreIndex.from_vector_store(vector_store)

In [23]:
# Define a query engine that is responsible for retrieving related pieces of text,
# and using a LLM to formulate the final answer.
query_engine = index.as_query_engine(filters=filters)

res = query_engine.query("What is BERT?")
res.response

'A basic BERT model from the Hugging Face transformer was used.\n'

In [24]:
# Show the retrieved nodes
for src in res.source_nodes:
    print("Node ID\t", src.node_id)
    print("Title\t", src.metadata["title"])
    print("Text\t", src.text)
    print("Score\t", src.score)
    print("Score\t", src.metadata["excerpt_keywords"])
    print("-_" * 20)

Node ID	 180cab3a-f1dc-48f4-b60c-43a4bf9f35e1
Title	 BERT HuggingFace Model Deployment using Kubernetes [ Github Repo]  03/07/2024
Text	 Github Repo : https://github.com/vaibhawkhemka/ML-Umbrella/tree/main/MLops/Model_Deployment/Bert_Kubernetes_deployment   Model development is useless if you dont deploy it to production  which comes with a lot of issues of scalability and portability.   I have deployed a basic BERT model from the huggingface transformer on Kubernetes with the help of docker  which will give a feel of how to deploy and manage pods on production.   Model Serving and Deployment:ML Pipeline:Workflow:   Model server (using FastAPI  uvicorn) for BERT uncased model    Containerize model and inference scripts to create a docker image    Kubernetes deployment for these model servers (for scalability)  Testing   Components:Model serverUsed BERT uncased model from hugging face for prediction of next word [MASK]. Inference is done using transformer-cli which uses fastapi and uvic