In [7]:
import pandas as pd
import dspy
from dspy.evaluate import SemanticF1
import json
import requests
import boto3

In [8]:
# Helper functions to setup datset and evaluation
def create_dataset(path: str) -> list[dspy.Example]:
	dataset = pd.read_excel(path)
	dataset_dict = dataset.to_dict(orient='records')
	dspy_dataset = []

	for row in dataset_dict:
		dspy_dataset.append(dspy.Example(question=row['input'], response=row['expected_output']).with_inputs("question"))

	print(f"Dataset length: {len(dspy_dataset)}")
	return dspy_dataset

def create_sets(dataset: list[dspy.Example], metric: dspy.Module = SemanticF1()):
	trainset, valset, devset, testset = dataset[:15], dataset[15:30], dataset[30:40], dataset[40:]
	evaluate = dspy.Evaluate(devset=devset, metric=metric, num_threads=24, display_progress=True, display_table=3)

	for name, set in zip([trainset, valset, devset, testset], ["trainset", "valset", "devset", "testset"]):
		print(f"{set} length: {len(name)}")

	return trainset, valset, devset, testset, evaluate

In [9]:
dspy_dataset = create_dataset("synthetics/synthetic_dataset_revised.xlsx")
trainset, valset, devset, testset, evaluate = create_sets(dspy_dataset)

Dataset length: 52
trainset length: 15
valset length: 15
devset length: 10
testset length: 12


In [None]:
def search(query: str, top_k: int) -> list[str]:
    url = "http://localhost:8000/api/llm/retrieval"
    headers = {
        "accept": "application/json",
        "Content-Type": "application/json"
    }
    data = {
        "query": query,
        "top_k": top_k
    }

    documents = requests.post(url, headers=headers, json=data).json()["documents"]
    return [f"[{i}]" + doc["doc_title"] + doc["url"] + "\n\n" + doc["content"] for i, doc in enumerate(documents)]

class TitanLM(dspy.LM):
    def __init__(self, model: str, client, max_tokens: int = 1024, temperature: float = 0.7, top_p: float = 0.9, **kwargs):
        self.client = client
        self.history = []
        self.temperature = temperature
        self.max_tokens = 1024
        self.top_p = top_p

        super().__init__(model, **kwargs)
        self.model = model
    
    def _format_message(self, prompt: str):
        body = json.dumps(
            {
                "inputText": prompt,
                "textGenerationConfig": {
                    "maxTokenCount": self.max_tokens,
                    "stopSequences": [],
                    "temperature": self.temperature,
                    "topP": self.top_p,
                },
            }
        )
        return body

    def generate_content(self, prompt: str) -> str:
        body = self._format_message(prompt)
        response = self.client.invoke_model(
            body=body,
            modelId=self.model,
            accept="application/json",
            contentType="application/json",
        )
        response_body = json.loads(response.get("body").read())
        return response_body.get("results")

    def __call__(self, prompt=None, messages=None, **kwargs):
        # Custom chat model working for text completion model
        prompt = '\n\n'.join([x['content'] for x in messages] + ['BEGIN RESPONSE:'])

        completions = self.generate_content(prompt)
        self.history.append({"prompt": prompt, "completions": completions})

        # Must return a list of strings
        return [completions[0].get("outputText")]

    def inspect_history(self):
        for interaction in self.history:
            print(f"Prompt: {interaction['prompt']} -> Completions: {interaction['completions']}")

In [11]:
class GenerateCitedParagraph(dspy.Signature):
    """Generate a paragraph with citations."""
    context = dspy.InputField(desc="may contain relevant facts")
    question = dspy.InputField()
    paragraph = dspy.OutputField(desc="includes citations with links")

class CitedRAG(dspy.Module):
    def __init__(self, num_docs=20):
        self.num_docs = num_docs
        self.respond = dspy.ChainOfThought(GenerateCitedParagraph)

    def forward(self, question):
        context = search(question, top_k=self.num_docs)
        return self.respond(context=context, question=question)

In [12]:
lm = TitanLM("amazon.titan-text-premier-v1:0", client=boto3.client("bedrock-runtime"))
dspy.configure(lm=lm)

In [13]:
rag = CitedRAG()
rag(question="How do I increase my data center efficiency?")

Prediction(
    reasoning='To increase data center efficiency, consider implementing the following strategies:\n1. Utilize energy-efficient servers and equipment, such as EPEAT-registered servers, which meet stricter performance requirements and criteria than ENERGY STAR.\n2. Implement energy-efficient cooling solutions, such as computer room air conditioners (CRAC) or computer room air handlers (CRAH) with high supply air temperature capabilities.\n3. Optimize power usage effectiveness (PUE) by minimizing energy consumption in non-IT infrastructure components, such as cooling and lighting systems.\n4. Monitor and meter energy usage to identify areas for improvement and track progress towards energy efficiency goals.',
    paragraph='To increase data center efficiency, consider the following strategies:\n1. Utilize energy-efficient servers and equipment, such as EPEAT-registered servers, which meet stricter performance requirements and criteria than ENERGY STAR ([19]).\n2. Implement en