In [8]:
import dspy

from weaviate.classes.init import Auth

import weaviate
from dspy.retrieve.weaviate_rm import WeaviateRM


In [None]:
google_api_key = ""
huggingface_api_key = ""

In [None]:
WEAVIATE_API_KEY = ""
WEAVIATE_URL = ""

In [5]:
gemini_flash = dspy.Google(model="gemini-1.5-flash-latest", api_key=google_api_key)

In [10]:
# Connect to Weaviate Cloud
weaviate_client = weaviate.connect_to_weaviate_cloud(
    cluster_url= WEAVIATE_URL,
    auth_credentials=Auth.api_key(WEAVIATE_API_KEY),
)

print(weaviate_client.is_ready())

            Please make sure to close the connection using `client.close()`.


True


In [11]:

retriever_model = WeaviateRM("WeaviateBlogChunk", weaviate_client=weaviate_client, k=10)

dspy.settings.configure(lm=gemini_flash, rm=retriever_model)
gemini_flash("say hello")

['Hello!\n']

# Load Dataset (Questions derived from Weaviate's Blog Posts)

In [12]:
import json

file_path = './WeaviateBlogRAG-0-0-0.json'
with open(file_path, 'r') as file:
    dataset = json.load(file)

gold_answers = []
queries = []

for row in dataset:
    gold_answers.append(row["gold_answer"])
    queries.append(row["query"])
    
data = []

for i in range(len(gold_answers)):
    data.append(dspy.Example(gold_answer=gold_answers[i], question=queries[i]).with_inputs("question"))

trainset, devset, testset = data[:25], data[25:35], data[35:]

# Metric to Assess Response Quality

In [13]:
class TypedEvaluator(dspy.Signature):
    """Evaluate the quality of a system's answer to a question according to a given criterion."""
    
    criterion: str = dspy.InputField(desc="The evaluation criterion.")
    question: str = dspy.InputField(desc="The question asked to the system.")
    ground_truth_answer: str = dspy.InputField(desc="An expert written Ground Truth Answer to the question.")
    predicted_answer: str = dspy.InputField(desc="The system's answer to the question.")
    rating: float = dspy.OutputField(desc="A float rating between 1 and 5. IMPORTANT!! ONLY OUTPUT THE RATING!!")


def MetricWrapper(gold, pred, trace=None):
    alignment_criterion = "How aligned is the predicted_answer with the ground_truth?"
    return dspy.TypedPredictor(TypedEvaluator)(criterion=alignment_criterion,
                                          question=gold.question,
                                          ground_truth_answer=gold.gold_answer,
                                          predicted_answer=pred.answer).rating

# LCEL RAG Program

In [5]:
!pip install langchain-google-genai > /dev/null


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.3.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [14]:
from langchain_google_genai import ChatGoogleGenerativeAI

llm = ChatGoogleGenerativeAI(model="gemini-1.5-pro-latest", google_api_key=google_api_key)

llm.invoke("say hello")



AIMessage(content='Hello!\n', additional_kwargs={}, response_metadata={'prompt_feedback': {'block_reason': 0, 'safety_ratings': []}, 'finish_reason': 'STOP', 'safety_ratings': []}, id='run-308dc6ba-ee9f-4ee2-bf3e-47f4ec2ae1f5-0', usage_metadata={'input_tokens': 3, 'output_tokens': 3, 'total_tokens': 6, 'input_token_details': {'cache_read': 0}})

In [15]:
# From LangChain, import standard modules for prompting.
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

prompt = PromptTemplate.from_template("Given the context: {context}. What is the answer to the question: `{question}`? IMPORTANT! ONLY OUTPUT THE ANSWER!")

# This is how you'd normally build a chain with LCEL. This chain does retrieval then generation (RAG).
retrieve = lambda x: dspy.Retrieve(k=5)(x["question"]).passages
vanilla_chain = RunnablePassthrough.assign(context=retrieve) | prompt | llm | StrOutputParser()

In [16]:
type(vanilla_chain)

langchain_core.runnables.base.RunnableSequence

In [17]:
# From DSPy, import the modules that know how to interact with LangChain LCEL.
from dspy.predict.langchain import LangChainPredict, LangChainModule

# This is how to wrap it so it behaves like a DSPy program.
# Just Replace every pattern like `prompt | llm` with `LangChainPredict(prompt, llm)`.
zeroshot_chain = RunnablePassthrough.assign(context=retrieve) | LangChainPredict(prompt, llm) | StrOutputParser()
zeroshot_chain = LangChainModule(zeroshot_chain)  # then wrap the chain in a DSPy module.


For example, replace imports like: `from langchain_core.pydantic_v1 import BaseModel`
with: `from pydantic import BaseModel`
or the v1 compatibility namespace if you are working in a code base that has not been fully upgraded to pydantic 2 yet. 	from pydantic.v1 import BaseModel

  from dspy.predict.langchain import LangChainPredict, LangChainModule
 		You are using the client GPT3, which will be removed in DSPy 2.6.
 		Changing the client is straightforward and will let you use new features (Adapters) that improve the consistency of LM outputs, especially when using chat LMs. 

 		Learn more about the changes and how to migrate at
 		https://github.com/stanfordnlp/dspy/blob/main/examples/migration.ipynb


OpenAIError: The api_key client option must be set either by passing api_key to the client or by setting the OPENAI_API_KEY environment variable

In [17]:
type(zeroshot_chain)

dspy.predict.langchain.LangChainModule

In [10]:
gemini_flash.inspect_history(n=1)




You are a processor for prompts. I will give you a prompt template (Python f-string) for an arbitrary task for other LMs.
Your job is to prepare three modular pieces: (i) any essential task instructions or guidelines, (ii) a list of variable names for inputs, (iv) the variable name for output.

---

Follow the following format.

Template:
```

${template}

```

Let's now prepare three modular pieces.

Essential Instructions: ${essential_instructions}

Input Keys: comma-separated list of valid variable names

Output Key: a valid variable name

---

Template:
```

Given the context: {context}. What is the answer to the question: `{question}`? IMPORTANT! ONLY OUTPUT THE ANSWER!

```

Let's now prepare three modular pieces.

Essential Instructions:[32mEssential Instructions: The context should be a string containing relevant information. The question should be a string asking a question about the context. The output should be a string containing only the answer to the question.

Input 

"\n\n\nYou are a processor for prompts. I will give you a prompt template (Python f-string) for an arbitrary task for other LMs.\nYour job is to prepare three modular pieces: (i) any essential task instructions or guidelines, (ii) a list of variable names for inputs, (iv) the variable name for output.\n\n---\n\nFollow the following format.\n\nTemplate:\n```\n\n${template}\n\n```\n\nLet's now prepare three modular pieces.\n\nEssential Instructions: ${essential_instructions}\n\nInput Keys: comma-separated list of valid variable names\n\nOutput Key: a valid variable name\n\n---\n\nTemplate:\n```\n\nGiven the context: {context}. What is the answer to the question: `{question}`? IMPORTANT! ONLY OUTPUT THE ANSWER!\n\n```\n\nLet's now prepare three modular pieces.\n\nEssential Instructions:\x1b[32mEssential Instructions: The context should be a string containing relevant information. The question should be a string asking a question about the context. The output should be a string containing 

In [11]:
question = "What is Lock Striping?"

zeroshot_chain.invoke({"question": question})

'Answer: Lock Striping is a pattern that solves race conditions, that can occur during parallel data imports, by using a fixed number of locks. It assigns objects to a specific lock based on their UUID, ensuring that objects with the same UUID are not processed concurrently while allowing objects with different UUIDs to be processed in parallel. \n'

In [12]:
from dspy.teleprompt import BootstrapFewShot
optimizer = BootstrapFewShot(metric=MetricWrapper,
                            max_bootstrapped_demos=3)

optimized_chain = optimizer.compile(zeroshot_chain, teacher=zeroshot_chain, trainset=trainset)

 12%|█████▎                                      | 3/25 [00:09<01:12,  3.29s/it]


In [13]:
optimized_chain.invoke({"question": question})

'Answer: Lock Striping is a solution for race conditions in database design, particularly when importing data in parallel streams. It uses a fixed number of locks to ensure objects with the same UUID are never processed concurrently, preventing data duplication without sacrificing import performance. \n'

In [15]:
type(optimized_chain)

dspy.predict.langchain.LangChainModule

In [19]:
type(optimized_chain.chain)

langchain_core.runnables.base.RunnableSequence