### Prototying first - RAG + Agent SDK

#### Utility functions

In [2]:
from typing import List, Dict, Any
import PyPDF2
from docx import Document
import markdown
import os
import pandas as pd
import pytesseract
from PIL import Image
from langchain.text_splitter import RecursiveCharacterTextSplitter

class DocumentProcessor:
    def __init__(self):
        #self.supported_types = settings.SUPPORTED_FILE_TYPES
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=50,
            length_function=len,
            is_separator_regex=False,
        )

    def process_document(self, root_path: str) -> List[Dict[str, Any]]:
        """Process a document and return chunks with metadata."""
        if not os.path.exists(root_path):
            raise FileNotFoundError(f"File not found: {file_path}")
            
        chunks_collection = []
        for root, dirs, files in os.walk(file_root):
            for file in files:
                file_path = os.path.join(root, file)
                file_ext = os.path.splitext(file_path)[1].lower()
                #if file_ext not in self.supported_types:
                    #raise ValueError(f"Unsupported file type: {file_ext}")

                # Extract text based on file type
                if file_ext == '.pdf':
                    text = self._extract_pdf_text(file_path)
                elif file_ext == '.docx':
                    text = self._extract_docx_text(file_path)
                elif file_ext == '.md':
                    text = self._extract_markdown_text(file_path)
                elif file_ext == '.csv':
                    text = self._extract_csv(file_path)
                elif file_ext in ['.png', 'jpeg', 'jpg']:
                    text = self._extract_image(file_path)
                else:  # .txt
                    text = self._extract_text_file(file_path)

                # Chunk the text
                chunks = self._chunk_text(text)
                chunks_collection.append(
                    {
                        "chunks": chunks,
                        "file_path": file_path
                    }
                )

        # Add metadata to chunks
        return self._add_metadata(chunks_collection)

    def _extract_csv(self, file_path: str) -> str:
        texts = ""
        df = pd.read_csv(file_path)
        for i, row in df.iterrows():
            text = " | ".join([f"{col}: {row[col]}" for col in df.columns])
            texts += '\n ' + (text)
        return texts
            
    def _extract_image(self, file_path: str) -> str:
        img = Image.open(file_path)
        text = pytesseract.image_to_string(img)
        return text
        
    def _extract_pdf_text(self, file_path: str) -> str:
        """Extract text from PDF file."""
        text = ""
        with open(file_path, 'rb') as file:
            pdf_reader = PyPDF2.PdfReader(file)
            for page in pdf_reader.pages:
                text += page.extract_text() + "\n"
        return text

    def _extract_docx_text(self, file_path: str) -> str:
        """Extract text from DOCX file."""
        doc = Document(file_path)
        return "\n".join([paragraph.text for paragraph in doc.paragraphs])

    def _extract_markdown_text(self, file_path: str) -> str:
        """Extract text from Markdown file."""
        with open(file_path, 'r', encoding='utf-8') as file:
            md_text = file.read()
            return markdown.markdown(md_text)

    def _extract_text_file(self, file_path: str) -> str:
        """Extract text from plain text file."""
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.read()

    def _chunk_text(self, text: str) -> List[str]:
        """Split text into overlapping chunks."""
        chunks = self.text_splitter.split_text(text)

        return chunks

    def _add_metadata(self, chunks_collection: List[dict]) -> List[Dict[str, Any]]:
        """Add metadata to each chunk."""
        results = []
        idx = 0
        for data in chunks_collection:
            chunks = data["chunks"]
            file_path = data["file_path"]
            file_name = os.path.basename(file_path)
            for chunk in chunks:
                results.append(
                    {
                        "text": chunk,
                        "metadata": {
                            "source": file_path,
                            "file_name": file_name,
                            "chunk_index": idx
                        }
                    }
                )
                idx += 1
        return results


In [3]:
from typing import List, Dict, Any
from google.cloud import aiplatform
from vertexai.language_models import TextEmbeddingModel
#from .config import settings

class EmbeddingGenerator:
    def __init__(self):
        #self.project = settings.GOOGLE_CLOUD_PROJECT
        #self.location = settings.VERTEX_AI_LOCATION
        self.model = "text-embedding-005"
        
        """
        # Initialize Vertex AI
        aiplatform.init(
            project=self.project,
            location=self.location
        )
        """
        
        # Initialize the embedding model
        self.embedding_model = TextEmbeddingModel.from_pretrained(self.model)

    def generate_embeddings(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Generate embeddings for text chunks."""
        # Extract texts from chunks
        texts = [chunk["text"] for chunk in chunks]
        
        # Generate embeddings
        embeddings = self.embedding_model.get_embeddings(texts)
        
        # Combine embeddings with original chunk data
        for chunk, embedding in zip(chunks, embeddings):
            chunk["embedding"] = embedding.values
            
        return chunks

    def generate_single_embedding(self, text: str) -> List[float]:
        """Generate embedding for a single text."""
        embedding = self.embedding_model.get_embeddings([text])[0]
        return embedding.values 

#### 1. Embedding management:

Code that can:
- Create index using multiple type of files 
- Create endpoint
- Deploy index to endpoint 
- Query from endpoint 

In [None]:
import faiss
from __future__ import annotations
from typing import List, Dict, Any, Optional
from google.cloud import aiplatform
from google.cloud.aiplatform.matching_engine import (
    MatchingEngineIndex,
    MatchingEngineIndexEndpoint,
)
import numpy as np
#from src.common.config import settings

In [None]:
# Load tools
processor = DocumentProcessor()
embedder = EmbeddingGenerator()
index = faiss.IndexFlatL2(768)

In [None]:
# main process
file_root = '/home/jupyter/code_test/adk_rag/prototype/test_data'

# chunking 
processed_data = processor.process_document(file_root)

In [None]:
# convert to embeddings
with_embeddings = embedder.generate_embeddings(processed_data)
# extract vectors 
vectors = np.array([e['embedding'] for e in with_embeddings])
# build up index
index.add(vectors)

In [None]:
# query process
query = "media effectiveness"
query_embeddings = np.array([embedder.generate_single_embedding(query)])
_, I = index.search(query_embeddings, k=3)


In [None]:
with_embeddings[I[0][0]]['text']

#### 2. Agent ADK 

Code that can:
- Define an Agent
- Deploy agent to Agent Engine

In [None]:
from google import adk
from google.adk.agents import LlmAgent
from vertexai.preview.reasoning_engines import AdkApp
from typing import List, Dict, Any
from google.adk.tools import ToolContext
from google.adk.sessions import InMemorySessionService

In [None]:
def test_func(tool_context: ToolContext) -> str:
    """
    just a test function that says yeah!

    Returns:
        yeah!.
    """
    return "yeah" 

In [None]:
# Define the RAG agent using ADK
rag_agent = LlmAgent(
    name="test_agent",
    model="gemini-2.0-flash",
    description="testing agent",
    instruction=(
        "Always use 'test_func()' tool when answering user questions"
    ),
    #tools=[test_func],
    
)

# Wrap the agent in an AdkApp for deployment
app = AdkApp(agent=rag_agent)

In [None]:
### try deploy to remote and interact with it

In [None]:
from vertexai import agent_engines
import vertexai
vertexai.init(staging_bucket="gs://yuan_evernote_rag_cs")

remote_app = agent_engines.create(
    agent_engine=rag_agent,
    requirements=[
        "google-cloud-aiplatform[adk,agent_engines]==1.90.0"   
    ]
)

In [None]:
test_agent = vertexai.agent_engines.get('projects/163097687798/locations/us-central1/reasoningEngines/2032750709153202176')

In [None]:
session1 = test_agent.create_session(user_id="u_123")

In [None]:
# Store the vector database in the session's state
session1['state']["index"] = index
session1['state']["embedder"] = embedder
session1['state']["metadata_store"] = with_embeddings

In [None]:
# Create the ADK runner with VertexAiSessionService
from google.adk.sessions import VertexAiSessionService
session_service = VertexAiSessionService()


In [None]:
runner = adk.Runner(
    agent=test_agent,
    app_name="test_app",
    session_service=session_service)

In [None]:
# Helper method to send query to the runner
session = session_service.create_session(app_name="test_app", user_id="user123")
def call_agent(query, session_id, user_id):
  content = types.Content(role='user', parts=[types.Part(text=query)])
  events = runner.run(
      user_id=user_id, session_id=session_id, new_message=content)

  for event in events:
      if event.is_final_response():
          final_response = event.content.parts[0].text
          print("Agent Response: ", final_response)

In [None]:
call_agent("hi", 123, "u_123")

In [None]:
for event in rag_agent.stream_query(
    user_id="u_123",
    session_id=session1.id,
    message="whatsup",
):
    print(event)

In [None]:
test_agent.delete(force=True)

In [None]:

def retrieve_documents(query, tool_context: ToolContext) -> List[str]:
    """
    Vector-search tool: returns the top_k text snippets relevant to the question.

    Args:
        question: User query text.
        tool_context: Cotext of the tool.
    Returns:
        List of document text snippets.
    """
    index = tool_context.state.get("index")
    embedder = tool_context.state.get("embedder")
    metadata_store = tool_context.state.get("metadata_store")
    query_embeddings = np.array([embedder.generate_single_embedding(query)])
    _, I = index.search(query_embeddings, k=5)
    return [metadata_store[i]['text'] for i in I[0] if i >= 0]

In [None]:

# Create a session service and a new session
session_service = InMemorySessionService()
session = session_service.create_session(app_name="my_app", user_id="user123")

In [None]:
# Store the vector database in the session's state
session.state["index"] = index
session.state["embedder"] = embedder
session.state["metadata_store"] = with_embeddings

In [None]:
from google.adk.runners import Runner
from google.genai import types # For creating response content

In [None]:
runner = Runner(app_name = "my_app", agent=rag_agent, session_service=session_service)

In [None]:
# Create a user message
content = types.Content(role='user', parts=[types.Part(text="What is media effectiveness")])

In [None]:
# Run the agent
events = runner.run(user_id="user123", session_id=session.id, new_message=content)

In [None]:
# Process the events
for event in events:
    if event.is_final_response():
        final_response = event.content.parts[0].text
        print("Agent Response:", final_response)

#### 3. Evaluation framework
- Code that we can use to evaluate the Agent 


In [None]:
endpoint = aiplatform.MatchingEngineIndexEndpoint(
        index_endpoint_name="7730008746939645952"
    )

In [None]:
endpoint

691974057.us-central1-163097687798.vdb.vertexai.goog

In [None]:
settings.GOOGLE_CLOUD_PROJECT, settings.VERTEX_AI_LOCATION

In [None]:
aiplatform.init(project=settings.GOOGLE_CLOUD_PROJECT, location=settings.VERTEX_AI_LOCATION)

In [None]:
result = endpoint.find_neighbors(
    deployed_index_id="deployed_index_5428000834283634688_v1",
    queries=[[123]],
    num_neighbors=3
)

In [None]:
result[0]

In [None]:
endpoint.public_endpoint_domain_name

In [None]:
endpoint._public_match_client = '691974057.us-central1-163097687798.vdb.vertexai.goog'

In [None]:
endpoint._public_match_client

In [None]:
from google.cloud.aiplatform import agent_engines

### test agent engine

In [None]:
import vertexai
from vertexai import agent_engines

vertexai.init(
    project=settings.GOOGLE_CLOUD_PROJECT,               # Your project ID.
    location=settings.VERTEX_AI_LOCATION,                # Your cloud region.
    staging_bucket = "gs://yuan_evernote_rag_cs"
)

In [None]:
from google.adk.agents import Agent
from vertexai.preview.reasoning_engines import AdkApp

In [None]:
from src.common import VectorStore, EmbeddingGenerator

In [None]:
# Initialize shared utilities
_vs = VectorStore()
_embedder = EmbeddingGenerator()

#@function_tool
def retrieve_documents(question: str, top_k: int = 5) -> List[str]:
    """
    Vector-search tool: returns the top_k text snippets relevant to the question.

    Args:
        question: User query text.
        top_k: Number of similar chunks to return.
    Returns:
        List of document text snippets.
    """
    query_emb = _embedder.generate_single_embedding(question)
    hits = _vs.search_vectors(query_emb, top_k=top_k)
    return [hit['metadata'].get('text', '') for hit in hits]

In [None]:
rag_agent = Agent(
    name="cymbal_knowledge_agent",
    model="gemini-2.0-flash-001",
    #description="Company knowledge assistant that can ingest new files on demand and answer based on our internal documents.",
    #instruction=(
    #    "You are a corporate knowledge assistant. "
    #    "For any question, always call `retrieve_documents(question)` first to fetch relevant context before answering."
    #),
    tools=[retrieve_documents],
)

In [None]:
import inspect

methods = [name for name, member in inspect.getmembers(Agent, predicate=inspect.isfunction)]
print("Instance/Static Methods:", methods)

In [None]:
response = rag_agent.query(
    input="What is the exchange rate from US dollars to Swedish currency?"
)

print(response)

In [None]:
app = AdkApp(agent=rag_agent)

In [None]:
from vertexai import agent_engines

remote_agent = agent_engines.create(
    app,
    requirements=["google-cloud-aiplatform[agent_engines,adk]"],
)

In [None]:
def test_function():
    return "yes"

In [None]:
from google.adk.agents import Agent
from vertexai.preview.reasoning_engines import AdkApp

agent = Agent(
    model="gemini-2.0-flash",
    name='currency_exchange_agent',
    tools=[test_function],
)

app = AdkApp(agent=agent)

In [None]:
remote_agent = agent_engines.create(
    agent,                    # Optional.
)

### Testing index endpoint

In [22]:
from __future__ import annotations
import numpy as np
from typing import List, Dict, Any, Optional
from google.cloud import aiplatform
from google.cloud.aiplatform.matching_engine import (
    MatchingEngineIndex,
    MatchingEngineIndexEndpoint,
)
from google.cloud.aiplatform_v1.types import IndexDatapoint
from google.cloud import firestore

In [4]:
processor = DocumentProcessor()
embedder = EmbeddingGenerator()
db = firestore.Client()

In [None]:
# main process
file_root = '/home/jupyter/code_test/adk_rag/prototype/test_data'

# chunking 
processed_data = processor.process_document(file_root)

In [None]:
# convert to embeddings - upsert
with_embeddings = embedder.generate_embeddings(processed_data)

In [5]:
display_name = "test_index"
matches = MatchingEngineIndex.list(
    filter=f'display_name="{display_name}"'
)

index = matches[0]

[<google.cloud.aiplatform.matching_engine.matching_engine_index.MatchingEngineIndex object at 0x7f43911c7f40> 
 resource name: projects/163097687798/locations/us-central1/indexes/7922713552870178816]

In [None]:
"""
# upsert vectors
        # `upsert_datapoints` (Vertex AI 2.15+)
# extract vectors 
datapoints = [IndexDatapoint(datapoint_id=str(i), feature_vector=e['embedding']) for i, e in enumerate(with_embeddings)]
index.upsert_datapoints(
    datapoints = datapoints
)
"""
"""
# Save to Firestore under 'files' collection
for item in with_embeddings:
    text = item['text']
    metadata = item['metadata']
    idx = metadata['chunk_index']
    file_name = metadata['file_name']
    source = metadata['source']
    doc_ref = db.collection("rag").document(str(idx))
    doc_ref.set({
        "file_path": source,
        "file_name": file_name,
        "text": item["text"]
    })
"""

In [None]:
"""
endpoint_display_name = "test_index_endpoint"
endpoint = MatchingEngineIndexEndpoint.create(
        display_name=endpoint_display_name,
        public_endpoint_enabled=True
    )
"""

In [9]:
endpoint = aiplatform.MatchingEngineIndexEndpoint(
    index_endpoint_name="7694472531129925632"
)

In [8]:
MatchingEngineIndex.list(
            filter=f'display_name="{display_name}"'
        )

[<google.cloud.aiplatform.matching_engine.matching_engine_index.MatchingEngineIndex object at 0x7f43910d7730> 
 resource name: projects/163097687798/locations/us-central1/indexes/7922713552870178816]

In [33]:
query = "media effectiveness"
query_embedding = embedder.generate_single_embedding(query)
response = endpoint.find_neighbors(
    deployed_index_id="deployed_index_1747401318896",
    queries=[query_embedding],
    num_neighbors=3,
)

In [42]:
retrieved_results = []
for response_ in response[0]:
    r = response_.id
    r = db.collection("rag").document(r).get().to_dict()
    retrieved_results.append(
        {
            'text': r['text'],
            'file_name': r['file_name'],
            'file_path': r['file_path']
        }
    )

In [43]:
retrieved_results

[{'text': 'ul\nMasterChef\n\nMEDIA MEASUREMENT',
  'file_name': 'ChatGPT Image Apr 30, 2025, 12_07_00 PM.png',
  'file_path': '/home/jupyter/code_test/adk_rag/prototype/test_data/ChatGPT Image Apr 30, 2025, 12_07_00 PM.png'},
 {'text': 'name: LY | email: 123@gmail.com\n name: YY | email: 234@yahoo.com\n name: TJ | email: 345@ntu.com.sg',
  'file_name': 'email.csv',
  'file_path': '/home/jupyter/code_test/adk_rag/prototype/test_data/email.csv'},
 {'text': 'Large language models are powerful tools with extensive capabilities; nonetheless, they grapple with a distinct limitation known as the context window. This context window defines the boundaries within which these models can proficiently process text. Take, for example, gpt-3.5-turbo, which operates within a context length of 4,096 tokens, approximately corresponding to 3,500 words.\n\nBut what occurs when you present these models with a document that exceeds their context window? This is where a clever strategy known as "chunking" co