## RAG and prompt automation in dspy - batch invocation    

In this notebook, we conduct batch invocation on RAG pipeline defined by dspy, and generate dataset for benchmarking in the notebook dspy_rag_ft.ipynb. 

### Environment Setup

In [1]:
%%capture
!pip install dspy pypdf chromadb

In [2]:
import json, os, sys
import io, base64
import logging
import pandas as pd
import pathlib
from PIL import Image as PILImage

import dspy
from dspy.teleprompt import BootstrapFewShot

import boto3

In [3]:
session = boto3.Session()
boto_session = boto3.session.Session()
region_name = boto_session.region_name
print(region_name)

bedrock = session.client("bedrock", region_name=region_name)
br = session.client("bedrock-runtime", region_name=region_name)

us-east-1


### DSPy Language Model and Retriever Model configuration

In [4]:
claude_sonnet_model_id = "anthropic.claude-3-sonnet-20240229-v1:0"
clade_haiku_model_id = "anthropic.claude-3-haiku-20240307-v1:0"
llama_model_id = "meta.llama3-8b-instruct-v1:0"
titan_embed_model_id = "amazon.titan-embed-text-v2:0"

In [5]:
dsp_bedrock = dspy.Bedrock(region_name=region_name)

bedrock_sonnet = dspy.AWSAnthropic(aws_provider=dsp_bedrock,
                                  model=claude_sonnet_model_id,
                                  max_new_tokens=4096,
                                  max_tokens=4096)

bedrock_haiku = dspy.AWSAnthropic(aws_provider=dsp_bedrock,
                                 model=clade_haiku_model_id,
                                 max_new_tokens=4096,
                                 max_tokens=4096)

bedrock_llama = dspy.AWSMeta(aws_provider=dsp_bedrock, 
                             model=llama_model_id, 
                             #max_new_tokens=2048,
                             #max_tokens=2048
                            )

### Splitters and chunking configuration    

We will be using the RecursiveCharacterTextSplitter to generate logical and syntactically readable chunks. The size and overlap percentage can be empirically determined based on the dataset. For more flexibility it is possible to generate multiple files from the dataset file and make 1 file 1 chunk.

In [6]:
from langchain.document_loaders import PyPDFLoader, PyPDFDirectoryLoader
from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter

# Load documents 
loader = PyPDFLoader("../cuad_data/CUAD_v1/full_contract_pdf/Part_I/Strategic Alliance//ENERGOUSCORP_03_16_2017-EX-10.24-STRATEGIC ALLIANCE AGREEMENT.PDF")

documents = loader.load()

# Split documents by setting chunk size
text_splitter = RecursiveCharacterTextSplitter(
    separators=[
        "\n\n",
        "\n",
        " ",
        ".",
        ",",
        "\u200b",  # Zero-width space
        "\uff0c",  # Fullwidth comma
        "\u3001",  # Ideographic comma
        "\uff0e",  # Fullwidth full stop
        "\u3002",  # Ideographic full stop
        "",
    ],
    chunk_size = 5000, #5000(25)     
    chunk_overlap = 30, #20 (25)
    length_function=len,
)

docs = text_splitter.split_documents(documents)

### Clean and remove any empty pages in the PDF

In [7]:
print(len(docs))
clean_docs = []
for doc in docs:
    if len(doc.page_content):
        clean_docs.append(doc)

docs = clean_docs
print(len(docs))

32
32


### Setup Retriever on local disk using ChromaDB    

We are using chromaDB to demonstrate the use of vector databases. 

In [8]:
import chromadb
from chromadb.utils import embedding_functions
from chromadb.utils.embedding_functions import AmazonBedrockEmbeddingFunction

bedrock_ef = AmazonBedrockEmbeddingFunction(session=session, model_name=titan_embed_model_id)

bedrock_embeddings = bedrock_ef([doc.page_content for doc in docs])

In [9]:
from dspy.retrieve.chromadb_rm import ChromadbRM

collection_name="contexts"
persist_dir="cuad_db/"

chroma_client = chromadb.PersistentClient(persist_dir)
collection = chroma_client.get_or_create_collection(name=collection_name)

if collection.count() == 0:
    collection.add(embeddings=bedrock_embeddings,
                   documents=[doc.page_content for doc in docs],
                   ids=[str(i) for i in range(len(docs))])

In [10]:
rm = ChromadbRM(collection_name=collection_name, 
                persist_directory=persist_dir, 
                embedding_function=bedrock_ef,
                k=3)

dspy.settings.configure(rm=rm)

### Dataset setup in DSPy

Contract Understanding Atticus Dataset (CUAD) is a dataset for legal contract review. CUAD was created with dozens of legal experts from The Atticus Project and consists of over 13,000 annotations.


In [11]:
TRN_FILE = '../cuad_data/CUAD_v1/ENERGOUSCORP_qa.csv'
df_cuad_data = pd.read_csv(TRN_FILE)
df_cuad_data.head()

Unnamed: 0,index,question,input,answer,qa_id
0,0,What is The name of the contract?,Highlight the parts (if any) of this contract ...,STRATEGIC ALLIANCE AGREEMENT,ENERGOUSCORP_03_16_2017-EX-10.24-STRATEGIC ALL...
1,1,What is The two or more parties who signed the...,Highlight the parts (if any) of this contract ...,"Dialog Semiconductor (UK) Ltd., DIALOG, Energo...",ENERGOUSCORP_03_16_2017-EX-10.24-STRATEGIC ALL...
2,2,What is The date of the contract?,Highlight the parts (if any) of this contract ...,"November 6, 2016",ENERGOUSCORP_03_16_2017-EX-10.24-STRATEGIC ALL...
3,3,What is The date when the contract is effective?,Highlight the parts (if any) of this contract ...,"November 6, 2016",ENERGOUSCORP_03_16_2017-EX-10.24-STRATEGIC ALL...
4,4,On what date will the contract's initial term ...,Highlight the parts (if any) of this contract ...,"Unless earlier terminated as provided herein, ...",ENERGOUSCORP_03_16_2017-EX-10.24-STRATEGIC ALL...


In [12]:
dataset_cuad = []

for index, question, input, answer, qa_id in df_cuad_data.values:
    dataset_cuad.append(dspy.Example(question=question, answer=answer).with_inputs("question"))

trainset = [x.with_inputs('question') for x in dataset_cuad[0:28]]
devset = [x.with_inputs('question') for x in dataset_cuad[28:32]]

### Setup RAG pipeline

In [13]:
class GenerateAnswer(dspy.Signature):
    """Answer questions with short factoid answers."""

    context = dspy.InputField(desc="may contain relevant facts")
    question = dspy.InputField()
    answer = dspy.OutputField(desc="often between 1 and 5 words")
    
    
class RAG(dspy.Module):
    def __init__(self, num_passages=3):
        super().__init__()
        self.retrieve = dspy.Retrieve(k=num_passages)
        self.generate_answer = dspy.ChainOfThought(GenerateAnswer)
    
    def forward(self, question):
        context = [r for r in self.retrieve(question).passages]
        prediction = self.generate_answer(context=context, question=question)
        return dspy.Prediction(context=context, answer=prediction.answer)

### Batch Invocation on RAG

In [14]:
TEST_OUTPUT_FILE = 'rag_haiku_results_2.csv'

dspy.settings.configure(lm=bedrock_haiku)
rag_pipeline = RAG()

In [15]:
# Batch invocation
question_list=[]
ref_answer_list=[]
rag_answer_list=[]

for i in range(len(dataset_cuad)):
    print(i,end='|')
    question_list.append(dataset_cuad[i].question)
    ref_answer_list.append(dataset_cuad[i].answer) 
    pred = rag_pipeline(dataset_cuad[i].question)
    rag_answer_list.append(pred.answer)
    
# store responses in csv 
df_response = pd.DataFrame()  
df_response["question"] = question_list
df_response["ref_answer"] = ref_answer_list
df_response["response"] = rag_answer_list

df_response.to_csv(TEST_OUTPUT_FILE, index=False)

 		You are using the client AWSAnthropic, which will be removed in DSPy 2.6.
 		Changing the client is straightforward and will let you use new features (Adapters) that improve the consistency of LM outputs, especially when using chat LMs. 

 		Learn more about the changes and how to migrate at
 		https://github.com/stanfordnlp/dspy/blob/main/examples/migration.ipynb


0|1|2|3|4|5|6|7|8|9|10|11|12|13|14|15|16|17|18|19|20|21|22|23|24|25|26|27|28|29|30|31|

In [16]:
TEST_OUTPUT_FILE = 'rag_sonnet_results_2.csv'

dspy.settings.configure(lm=bedrock_sonnet)
rag_pipeline = RAG()

In [17]:
# Batch invocation
question_list=[]
ref_answer_list=[]
rag_answer_list=[]

for i in range(len(dataset_cuad)):
    print(i,end='|')
    question_list.append(dataset_cuad[i].question)
    ref_answer_list.append(dataset_cuad[i].answer) 
    pred = rag_pipeline(dataset_cuad[i].question)
    rag_answer_list.append(pred.answer)
    
# store responses in csv 
df_response = pd.DataFrame()  
df_response["question"] = question_list
df_response["ref_answer"] = ref_answer_list
df_response["response"] = rag_answer_list

df_response.to_csv(TEST_OUTPUT_FILE, index=False)

0|1|2|3|4|5|6|7|8|9|10|11|12|13|14|15|16|17|18|19|20|21|22|23|24|25|26|27|28|29|30|31|

### RAG Prompt Optimization by DSPy

Create a Factuality Judge that will adjudicate if the predicted answer factually correct to the groundtruth answer and means the same as the groundtruth answer. This is used to compile RAG in dspy to optimize the response. 


In [18]:
class FactJudge(dspy.Signature):
    """Judge if the predicted answer factually correct to the groundtruth answer and same as groundtruth answer. Answer either Factual[True] or Factual[False]"""

    question = dspy.InputField(desc="Question to be answered")
    groundtruth_answer = dspy.InputField(desc="groundtruth answer for the question")
    predicted_answer = dspy.InputField(desc="predicted answer for the question")
    factually_correct = dspy.OutputField(desc="Is the predicted answer factually correct to the groundtruth answer and same as groundtruth answer ?", prefix="Factual[True/False]:")


judge = dspy.ChainOfThought(FactJudge)


def factuality_metric(example, pred):
    factual = judge(question=example.question, groundtruth_answer=example.answer, predicted_answer=pred.answer) #context=pred.context, 
    #logging.info(f"\n factual LLM judge {factual}")
    #logging.info(f"\n example.answer {example.answer}")
    #logging.info(f"\n pred.answer {pred.answer}")
    llm_judge_ans = bool("Factual[True]" in factual.factually_correct 
                         or '100% True' in factual.factually_correct
                         or '100% factually correct' in factual.factually_correct
                         or factual.factually_correct=='True') #or "correct" in factual.factually_correct.lower()
    #print(f"llm_judge_ans = {llm_judge_ans}")
    logging.info(f"llm_judge_ans = {llm_judge_ans}")
    return llm_judge_ans

In [19]:
def validate_context_and_answer(example, pred, trace=None):
        answer_EM = dspy.evaluate.answer_exact_match(example, pred)
        answer_PM = dspy.evaluate.answer_passage_match(example, pred)
        answer_LLMJudge = factuality_metric(example, pred)

        logging.info(f"\n example question :: {example.question} , example answer :: {example.answer} ")
        logging.info(f"\n pred answer :: {pred.answer}")
        logging.info(f"\n answer_EM :: {answer_EM}, answer_PM ::{answer_PM} answer_LLMJudge :: {answer_LLMJudge}")
        return answer_LLMJudge or answer_EM or answer_PM


Set up a basic teleprompter, which will compile our RAG program.

In [20]:
teleprompter = BootstrapFewShot(metric=validate_context_and_answer)

Prepare groundtruth data as the training set for compiling

In [None]:
#COMP_FILE = './cuad_data/rag_compile_data.csv'
COMP_FILE = '../cuad_data/CUAD_v1/ENERGOUSCORP_qa.csv'

df_comp_data = pd.read_csv(COMP_FILE)
df_comp_data.head()

In [None]:
dataset_comp = []

for index,question,input,answer,qa_id in df_comp_data.values:
    dataset_comp.append(dspy.Example(question=question, answer=answer).with_inputs("question"))

### Batch Invocation on Compiled RAG

In [None]:
TEST_OUTPUT_FILE = 'ragc_haiku_results_2.csv'

dspy.settings.configure(lm=bedrock_haiku)
rag_pipeline = RAG()

# Compile!
compiled_rag = teleprompter.compile(rag_pipeline, trainset=dataset_comp)

In [None]:
# Batch invocation
question_list=[]
ref_answer_list=[]
rag_answer_list=[]

for i in range(len(dataset_cuad)):
    print(i,end='|')
    question_list.append(dataset_cuad[i].question)
    ref_answer_list.append(dataset_cuad[i].answer) 
    pred = compiled_rag(dataset_cuad[i].question)
    rag_answer_list.append(pred.answer)
    
# store responses in csv 
df_response = pd.DataFrame()  
df_response["question"] = question_list
df_response["ref_answer"] = ref_answer_list
df_response["response"] = rag_answer_list

df_response.to_csv(TEST_OUTPUT_FILE, index=False)

In [None]:
TEST_OUTPUT_FILE = 'ragc_sonnet_results_2.csv'

dspy.settings.configure(lm=bedrock_sonnet)
rag_pipeline = RAG()

# Compile!
compiled_rag = teleprompter.compile(rag_pipeline, trainset=dataset_comp)

In [None]:
# Batch invocation
question_list=[]
ref_answer_list=[]
rag_answer_list=[]

for i in range(len(dataset_cuad)):
    print(i,end='|')
    question_list.append(dataset_cuad[i].question)
    ref_answer_list.append(dataset_cuad[i].answer) 
    pred = compiled_rag(dataset_cuad[i].question)
    rag_answer_list.append(pred.answer)
    
# store responses in csv 
df_response = pd.DataFrame()  
df_response["question"] = question_list
df_response["ref_answer"] = ref_answer_list
df_response["response"] = rag_answer_list

df_response.to_csv(TEST_OUTPUT_FILE, index=False)