### Installing necessary libraries

In [66]:
%pip install langgraph google-cloud-aiplatform pinecone pydantic --quiet

I0000 00:00:1759649482.139006    3056 fork_posix.cc:71] Other threads are currently calling into gRPC, skipping fork() handlers



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [67]:
!python -m pip install --upgrade google-genai --quiet

I0000 00:00:1759649488.759479    3056 fork_posix.cc:71] Other threads are currently calling into gRPC, skipping fork() handlers



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [68]:
!python -m pip install dotenv --quiet

I0000 00:00:1759649496.559965    3056 fork_posix.cc:71] Other threads are currently calling into gRPC, skipping fork() handlers



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [None]:
# gcloud auth application-default login

### Loading env variables

In [69]:
from dotenv import load_dotenv
load_dotenv()

True

In [70]:
from google import genai

### setting up vertex ai 

In [71]:
import os

PROJECT_ID = "bdc-trainings"  # @param {type: "string", placeholder: "[your-project-id]", isTemplate: true}
if not PROJECT_ID or PROJECT_ID == "[your-project-id]":
    PROJECT_ID = str(os.environ.get("GOOGLE_CLOUD_PROJECT"))

LOCATION = os.environ.get("GOOGLE_CLOUD_REGION", "global")

client = genai.Client(vertexai=True, project=PROJECT_ID, location=LOCATION)

In [72]:
if not client.vertexai:
    print("Using Gemini Developer API.")
elif client._api_client.project:
    print(
        f"Using Vertex AI with project: {client._api_client.project} in location: {client._api_client.location}"
    )
elif client._api_client.api_key:
    print(
        f"Using Vertex AI in express mode with API key: {client._api_client.api_key[:5]}...{client._api_client.api_key[-5:]}"
    )

Using Vertex AI with project: bdc-trainings in location: global


### configurations

In [None]:

import os


EMBEDDING_MODEL = "models/gemini-embedding-001"
LLM_MODEL = "gemini-2.5-flash"




In [74]:
import os
import time
from dataclasses import dataclass
from typing import List, Dict, Any

# Pinecone (serverless)
from pinecone import Pinecone, ServerlessSpec

### Initialising Pinecone

In [75]:
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
if not PINECONE_API_KEY:
    raise RuntimeError("PINECONE_API_KEY not found. Please set it in your environment.")
else:
  print("PINECONE API KEY found")

pc = Pinecone(api_key=PINECONE_API_KEY)

PINECONE API KEY found


### Initialising Logger

In [107]:

import logging

# Configure logging
logging.basicConfig(
    level=logging.INFO,  # Change to DEBUG for more detailed logs
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)

logger = logging.getLogger(__name__)


In [109]:

import logging

# Configure logging to both console and file
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.FileHandler("rag_workflow.log"),  # Logs to file
        logging.StreamHandler()                   # Logs to console
    ]
)

logger = logging.getLogger(__name__)


In [None]:

import json
import pinecone

from langchain_google_genai import GoogleGenerativeAIEmbeddings
embeddings = GoogleGenerativeAIEmbeddings(model=EMBEDDING_MODEL)

test_dim = len(embeddings.embed_query("dimension probe"))

In [77]:
print(test_dim)

3072


### creating vector db and index through Pinecone

In [None]:

INDEX_NAME = "raggcp1" 
METRIC = "cosine"

# Create the index if it doesn't exist
existing = [idx["name"] for idx in pc.list_indexes()]
if INDEX_NAME not in existing:
    print(f"Creating index '{INDEX_NAME}' ...")
    pc.create_index(
        name=INDEX_NAME,
        dimension=test_dim, #3072
        metric=METRIC,
        spec=ServerlessSpec(cloud="aws", region="us-east-1"),)
    # optional: wait a moment for the index to be ready
    time.sleep(5)
else:
    print(f"Index '{INDEX_NAME}' already exists, reusing it.")

index = pc.Index(INDEX_NAME)
print(index.describe_index_stats())

Index 'raggcp1' already exists, reusing it.
{'dimension': 3072,
 'index_fullness': 0.0,
 'metric': 'cosine',
 'namespaces': {'': {'vector_count': 30}},
 'total_vector_count': 30,
 'vector_type': 'dense'}


### Load your dataset

In [79]:
# Load your dataset
with open("datasets/self_critique_loop_dataset.json") as f:
    kb_data = json.load(f)

In [80]:
print(kb_data)

[{'doc_id': 'KB001', 'question': 'What are best practices for debugging?', 'answer_snippet': "When addressing debugging, it's important to follow well-defined patterns...", 'source': 'debugging_guide.md', 'confidence_indicator': 'moderate', 'last_updated': '2024-01-10'}, {'doc_id': 'KB002', 'question': 'What are best practices for performance tuning?', 'answer_snippet': "When addressing performance tuning, it's important to follow well-defined patterns...", 'source': 'performance tuning_guide.md', 'confidence_indicator': 'moderate', 'last_updated': '2024-02-10'}, {'doc_id': 'KB003', 'question': 'What are best practices for caching?', 'answer_snippet': "When addressing caching, it's important to follow well-defined patterns...", 'source': 'caching_guide.md', 'confidence_indicator': 'moderate', 'last_updated': '2024-03-10'}, {'doc_id': 'KB004', 'question': 'What are best practices for asynchronous programming?', 'answer_snippet': "When addressing asynchronous programming, it's important 

### Embedding the data

In [81]:

# Extract texts
texts = [entry["answer_snippet"] for entry in kb_data]



# Get embeddings
embeddings_1 = embeddings.embed_documents(texts)


#### Upserting data into vector db

In [66]:

  

# Upsert
vectors = [{
    "id": item["doc_id"],
    "values": vector,
    "metadata": {
        "question": item["question"],
        "snippet": item["answer_snippet"],
        "source": item["source"]
    }
} for item, vector in zip(kb_data, embeddings_1)]

index.upsert(vectors)

print("KB Indexed into Pinecone")


KB Indexed into Pinecone


In [None]:
# ###testing
# query = "What are best practices for performance tuning?"
# k = 5

# # Get the embedding vector for the query
# query_vector = embeddings.embed_query(query)

# # Query Pinecone index using the vector
# results = index.query(
#     vector=query_vector,
#     top_k=k,
#     include_metadata=True
# )


In [106]:
# results

### Defining RAG NODES

In [None]:
### Defining RAG NODES

# Retrieval Node
def retrieve_kb(question, top_k=5):
    query_vector = embeddings.embed_query(question)
    results = index.query(
    vector=query_vector,
    top_k=5,
    include_metadata=True
)
    return results["matches"]

In [83]:
###testing the retrieval node
# query="What are best practices for asynchronous programming?"
# retrieve_kb(query="What are best practices for asynchronous programming?")

In [None]:
# snippets=retrieve_kb(query="What are best practices for asynchronous programming?")

### Generation Node

In [92]:
# Generation Node
def generate_answer(question, snippets):
    context = "\n".join([f"[{s['id']}] {s['metadata']['snippet']}" for s in snippets])
    prompt = f"""Use the context below to answer the question:
Context:
{context}
Question: {question}
Answer with citations in the format [KBxxx]."""

    return client.models.generate_content(contents=prompt, model=LLM_MODEL).text.strip()

In [85]:
# generate_answer(query="What are best practices for asynchronous programming?",snippets=snippets)

In [86]:
# answer=generate_answer(query="What are best practices for asynchronous programming?",snippets=snippets)

### Critique Node

In [93]:
# Critique Node
def critique_answer(question, snippets, answer):
    context = "\n".join([f"- {s['metadata']['snippet']}" for s in snippets])
    prompt = f"""Given the context and an answer, determine if the answer is COMPLETE or needs REFINEMENT.
Context:
{context}

Question: {question}

Answer:
{answer}

Respond in one of the following formats:
- COMPLETE
- REFINE: <list missing keywords or ideas>
"""
    return client.models.generate_content(contents=prompt, model=LLM_MODEL).text.strip()

In [88]:
# critique_answer(query, snippets, answer)

### Refinement Node

In [94]:




# Refinement Node
def refine_answer(question, prev_snippets, missing_keywords):
    new_query = f"{question} {missing_keywords}"
    extra_snippet = retrieve_kb(new_query, top_k=1)
    combined = prev_snippets + extra_snippet
    return generate_answer(question, combined)


In [90]:
# refine_answer(query, prev_snippets=snippets, missing_keywords='REFINE: performance tuning')

### Build LangGraph Workflow 

In [None]:
# Build LangGraph Workflow 


from typing import TypedDict
from langgraph.graph import StateGraph


# Define the schema for the graph state
class RAGState(TypedDict):
    question: str
    snippets: list
    answer: str
    critique: str
    final_answer: str


def build_rag_graph():
    graph = StateGraph(RAGState)



    
    def retrieve(state):
        question = state["question"]
        logger.info(f"Retrieving snippets for question: {question}")
        snippets = retrieve_kb(question)
        logger.info(f"[Retriever] Retrieved {len(snippets)} snippets")

        return {"question": question, "snippets": snippets}
    

    
    def generate(state):
        logger.info(f"[GenerateAnswer] Generating answer for: {state['question']}")
        answer = generate_answer(state["question"], state["snippets"])
        logger.info(f"[GenerateAnswer] Generated answer: {answer[:100]}...")
        return {**state, "answer": answer}

    
    def critique(state):
        logger.info(f"[Critique] Critiquing answer...")
        result = critique_answer(state["question"], state["snippets"], state["answer"])
        logger.info(f"[Critique] Critique result: {result}")
        return {**state, "critique": result}

    
    # def needs_refine(state):
    #     return "REFINE" in state["critique"]

        
    
    def needs_refine(state):
        decision = "REFINE" in state["critique"]
        logger.info(f"[CheckCritique] Needs refinement? {decision}")
        return decision

    
    def refine(state):
        logger.info(f"[Refine] Refining answer...")
        missing = state["critique"].split("REFINE:")[-1].strip()
        final_answer = refine_answer(state["question"], state["snippets"], missing)
        logger.info(f"[Refine] Final refined answer: {final_answer[:100]}...")
        return {**state, "final_answer": final_answer}
    
    def end_node(state):
        logger.info(f"[END] Final state reached.")
        return state  # Just passes through

    
 
    graph.add_node("Retriever", retrieve)
    graph.add_node("GenerateAnswer", generate)
    graph.add_node("Critique", critique)
    graph.add_node("Refine", refine)
    graph.add_node("END", end_node)

    # Add condition
    graph.add_conditional_edges("Critique", needs_refine, {
        True: "Refine",
        False: "END"
    })

    
   
# Set entry and edges
    graph.set_entry_point("Retriever")
    graph.add_edge("Retriever", "GenerateAnswer")
    graph.add_edge("GenerateAnswer", "Critique")
    graph.add_edge("Refine", "END")

    return graph.compile()





    


In [111]:
graph= build_rag_graph()

question = input("Enter your question: ")
logger.info(f"[Main] Starting RAG workflow for question: {question}")

result = graph.invoke({"question": question})

logger.info(f"[Main] Workflow completed. Final result: {result.get('final_answer', result.get('answer'))}")
print(result.get("final_answer", result.get("answer")))


2025-10-05 13:41:36,611 - __main__ - INFO - [Main] Starting RAG workflow for question: What are best practices for unit testing
2025-10-05 13:41:36,613 - __main__ - INFO - Retrieving snippets for question: What are best practices for unit testing
2025-10-05 13:41:38,342 - __main__ - INFO - [Retriever] Retrieved 5 snippets
2025-10-05 13:41:38,344 - __main__ - INFO - [GenerateAnswer] Generating answer for: What are best practices for unit testing
2025-10-05 13:41:38,345 - google_genai.models - INFO - AFC is enabled with max remote calls: 10.
2025-10-05 13:41:40,326 - httpx - INFO - HTTP Request: POST https://aiplatform.googleapis.com/v1beta1/projects/bdc-trainings/locations/global/publishers/google/models/gemini-2.5-flash:generateContent "HTTP/1.1 200 OK"
2025-10-05 13:41:40,328 - __main__ - INFO - [GenerateAnswer] Generated answer: When addressing unit testing, it's important to follow well-defined patterns [KB006], [KB026], [KB01...
2025-10-05 13:41:40,329 - __main__ - INFO - [Critique

When addressing unit testing, it's important to follow well-defined patterns [KB006], [KB016], [KB026].


In [112]:
print(result.get("final_answer", result.get("answer")))

When addressing unit testing, it's important to follow well-defined patterns [KB006], [KB016], [KB026].


In [105]:
graph= build_rag_graph()

question=input("Enter your question: ")

result= graph.invoke({"question": question})


print(result.get("final_answer", result["answer"]))

When addressing API versioning, it's important to follow well-defined patterns. [KB005], [KB025], [KB015]


### Testing Queries
Try the pipeline with:
1. “What are best practices for caching?”
2
2. “How should I set up CI/CD pipelines?”
3. “What are performance tuning tips?”
4. “How do I version my APIs?”
5. “What should I consider for error handling?

#### What are best practices for caching?

In [113]:
graph= build_rag_graph()

question=input("Enter your question: ")

result= graph.invoke({"question": question})


print(result.get("final_answer", result["answer"]))

2025-10-05 13:46:39,366 - __main__ - INFO - Retrieving snippets for question: What are best practices for caching?
2025-10-05 13:46:41,061 - __main__ - INFO - [Retriever] Retrieved 5 snippets
2025-10-05 13:46:41,062 - __main__ - INFO - [GenerateAnswer] Generating answer for: What are best practices for caching?
2025-10-05 13:46:41,063 - google_genai.models - INFO - AFC is enabled with max remote calls: 10.
2025-10-05 13:46:43,073 - httpx - INFO - HTTP Request: POST https://aiplatform.googleapis.com/v1beta1/projects/bdc-trainings/locations/global/publishers/google/models/gemini-2.5-flash:generateContent "HTTP/1.1 200 OK"
2025-10-05 13:46:43,075 - __main__ - INFO - [GenerateAnswer] Generated answer: When addressing caching, it's important to follow well-defined patterns [KB003, KB023, KB013]....
2025-10-05 13:46:43,076 - __main__ - INFO - [Critique] Critiquing answer...
2025-10-05 13:46:43,077 - google_genai.models - INFO - AFC is enabled with max remote calls: 10.
2025-10-05 13:46:44,77

When addressing caching, it's important to follow well-defined patterns [KB003, KB023, KB013].


####  “How should I set up CI/CD pipelines?”

In [114]:
graph= build_rag_graph()

question=input("Enter your question: ")

result= graph.invoke({"question": question})


print(result.get("final_answer", result["answer"]))

2025-10-05 13:48:18,659 - __main__ - INFO - Retrieving snippets for question: How should I set up CI/CD pipelines?
2025-10-05 13:48:20,308 - __main__ - INFO - [Retriever] Retrieved 5 snippets
2025-10-05 13:48:20,310 - __main__ - INFO - [GenerateAnswer] Generating answer for: How should I set up CI/CD pipelines?
2025-10-05 13:48:20,311 - google_genai.models - INFO - AFC is enabled with max remote calls: 10.
2025-10-05 13:48:21,991 - httpx - INFO - HTTP Request: POST https://aiplatform.googleapis.com/v1beta1/projects/bdc-trainings/locations/global/publishers/google/models/gemini-2.5-flash:generateContent "HTTP/1.1 200 OK"
2025-10-05 13:48:21,994 - __main__ - INFO - [GenerateAnswer] Generated answer: When addressing CI/CD, it's important to follow well-defined patterns [KB007, KB027, KB017]....
2025-10-05 13:48:21,995 - __main__ - INFO - [Critique] Critiquing answer...
2025-10-05 13:48:21,996 - google_genai.models - INFO - AFC is enabled with max remote calls: 10.
2025-10-05 13:48:24,149 

When setting up CI/CD pipelines, it's important to follow well-defined patterns [KB007, KB027, KB017].


#### 3. “What are performance tuning tips?”

In [115]:
graph= build_rag_graph()

question=input("Enter your question: ")

result= graph.invoke({"question": question})


print(result.get("final_answer", result["answer"]))

2025-10-05 13:49:54,860 - __main__ - INFO - Retrieving snippets for question: What are performance tuning tips?
2025-10-05 13:49:56,523 - __main__ - INFO - [Retriever] Retrieved 5 snippets
2025-10-05 13:49:56,524 - __main__ - INFO - [GenerateAnswer] Generating answer for: What are performance tuning tips?
2025-10-05 13:49:56,525 - google_genai.models - INFO - AFC is enabled with max remote calls: 10.
2025-10-05 13:49:58,354 - httpx - INFO - HTTP Request: POST https://aiplatform.googleapis.com/v1beta1/projects/bdc-trainings/locations/global/publishers/google/models/gemini-2.5-flash:generateContent "HTTP/1.1 200 OK"
2025-10-05 13:49:58,356 - __main__ - INFO - [GenerateAnswer] Generated answer: When addressing performance tuning, it's important to follow well-defined patterns [KB002, KB022, KB...
2025-10-05 13:49:58,357 - __main__ - INFO - [Critique] Critiquing answer...
2025-10-05 13:49:58,357 - google_genai.models - INFO - AFC is enabled with max remote calls: 10.
2025-10-05 13:49:59,92

When addressing performance tuning, it's important to follow well-defined patterns [KB002, KB022, KB012].


#### 4. “How do I version my APIs?”

In [116]:
graph= build_rag_graph()

question=input("Enter your question: ")

result= graph.invoke({"question": question})


print(result.get("final_answer", result["answer"]))

2025-10-05 13:52:23,793 - __main__ - INFO - Retrieving snippets for question: How do I version my APIs?
2025-10-05 13:52:25,432 - __main__ - INFO - [Retriever] Retrieved 5 snippets
2025-10-05 13:52:25,433 - __main__ - INFO - [GenerateAnswer] Generating answer for: How do I version my APIs?
2025-10-05 13:52:25,434 - google_genai.models - INFO - AFC is enabled with max remote calls: 10.
2025-10-05 13:52:27,645 - httpx - INFO - HTTP Request: POST https://aiplatform.googleapis.com/v1beta1/projects/bdc-trainings/locations/global/publishers/google/models/gemini-2.5-flash:generateContent "HTTP/1.1 200 OK"
2025-10-05 13:52:27,647 - __main__ - INFO - [GenerateAnswer] Generated answer: To version your APIs, it's important to follow well-defined patterns [KB005, KB025, KB015]....
2025-10-05 13:52:27,648 - __main__ - INFO - [Critique] Critiquing answer...
2025-10-05 13:52:27,649 - google_genai.models - INFO - AFC is enabled with max remote calls: 10.
2025-10-05 13:52:30,124 - httpx - INFO - HTTP R

When addressing API versioning, it's important to follow well-defined patterns [KB005, KB025, KB015].


#### 5. “What should I consider for error handling?

In [117]:
graph= build_rag_graph()

question=input("Enter your question: ")

result= graph.invoke({"question": question})


print(result.get("final_answer", result["answer"]))

2025-10-05 13:55:12,687 - __main__ - INFO - Retrieving snippets for question: What should i consider for error handling?
2025-10-05 13:55:14,353 - __main__ - INFO - [Retriever] Retrieved 5 snippets
2025-10-05 13:55:14,355 - __main__ - INFO - [GenerateAnswer] Generating answer for: What should i consider for error handling?
2025-10-05 13:55:14,357 - google_genai.models - INFO - AFC is enabled with max remote calls: 10.
2025-10-05 13:55:16,020 - httpx - INFO - HTTP Request: POST https://aiplatform.googleapis.com/v1beta1/projects/bdc-trainings/locations/global/publishers/google/models/gemini-2.5-flash:generateContent "HTTP/1.1 200 OK"
2025-10-05 13:55:16,022 - __main__ - INFO - [GenerateAnswer] Generated answer: When addressing error handling, it's important to follow well-defined patterns [KB009, KB029, KB019]...
2025-10-05 13:55:16,023 - __main__ - INFO - [Critique] Critiquing answer...
2025-10-05 13:55:16,023 - google_genai.models - INFO - AFC is enabled with max remote calls: 10.
2025

When addressing error handling, it's important to follow well-defined patterns [KB009, KB029, KB019].
