pip install textgrad dspy faiss-cpu mlflow

In [74]:
import mlflow

mlflow.litellm.autolog()

In [75]:
import textgrad as tg

import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Verify the API key is loaded
if os.getenv("OPENAI_API_KEY") is None:
    raise ValueError("OPENAI_API_KEY not found in environment variables")


tg.set_backward_engine("gpt-4o", override=True)

# Step 1: Get an initial response from an LLM
model = tg.BlackboxLLM("gpt-4o")
question_string = ("If it takes 1 hour to dry 25 shirts under the sun, "
                    "how long will it take to dry 30 shirts under the sun? "
                    "Reason step by step.")

question = tg.Variable(question_string, role_description="question to the LLM", requires_grad=False)

# Step 2: Get the LLM's response
answer = model(question)
print(answer)

To determine how long it will take to dry 30 shirts under the sun, we need to consider the drying process and whether it is affected by the number of shirts.

1. **Understand the Drying Process**: Drying shirts under the sun is typically a parallel process. Each shirt dries independently of the others, assuming there is enough space and sunlight for all shirts to be exposed equally.

2. **Initial Information**: We know that 25 shirts take 1 hour to dry. This implies that each shirt, when exposed to the sun, takes 1 hour to dry.

3. **Drying 30 Shirts**: Since drying is a parallel process and each shirt dries independently, adding more shirts does not increase the drying time for each shirt. Therefore, drying 30 shirts will also take 1 hour, provided that all shirts have equal exposure to sunlight and there is no limitation in space or sunlight.

4. **Conclusion**: The time it takes to dry 30 shirts is the same as the time it takes to dry 25 shirts, which is 1 hour, assuming all conditi

In [76]:
answer.set_role_description("concise and accurate answer to the question")

optimizer = tg.TGD(parameters=[answer], verbose=1)

evaluation_instruction = (f"Here's a question: {question_string}. "
                           "Evaluate any given answer to this question, "
                           "be smart, logical, and very critical. "
                           "Just provide concise feedback.")

loss_fn = tg.TextLoss(evaluation_instruction)


In [77]:
loss = loss_fn(answer)
loss.backward()
optimizer.step()
answer

-----------------------TextualGradientDescent------------------------
To determine how long it will take to dry 30 shirts under the sun, we need to consider the drying process. Drying shirts under the sun is a parallel process, meaning each shirt dries independently, assuming there is sufficient space and sunlight for all shirts. Given that 25 shirts take 1 hour to dry, each shirt takes 1 hour to dry. Therefore, drying 30 shirts will also take 1 hour, assuming equal exposure and no space limitations. The drying time is independent of the number of shirts as long as conditions remain constant.


Variable(value=To determine how long it will take to dry 30 shirts under the sun, we need to consider the drying process. Drying shirts under the sun is a parallel process, meaning each shirt dries independently, assuming there is sufficient space and sunlight for all shirts. Given that 25 shirts take 1 hour to dry, each shirt takes 1 hour to dry. Therefore, drying 30 shirts will also take 1 hour, assuming equal exposure and no space limitations. The drying time is independent of the number of shirts as long as conditions remain constant., role=concise and accurate answer to the question, grads={Variable(value=To improve the concise and accurate answer to the question, consider the following feedback:

1. **Clarify Assumptions**: While the answer correctly identifies that the drying process is parallel, it could benefit from explicitly stating the assumption that there is sufficient space and sunlight for all shirts. This would preemptively address any potential concerns about limitati

In [None]:
question_string = ("what are high memory and low memory in linux?")

question = tg.Variable(question_string, role_description="question to the LLM", requires_grad=False)

# Step 2: Get the LLM's response
answer = model(question)
answer

In [None]:
import json

with open("ragqa_arena_tech_examples.jsonl") as f:
    data = [json.loads(line) for line in f]

data[2]

{'question': 'why are my text messages coming up as maybe?',
 'response': 'This is part of the Proactivity features new with iOS 9: It looks at info in emails to see if anyone with this number sent you an email and if it finds the phone number associated with a contact from your email, it will show you "Maybe". \n\nHowever, it has been suggested there is a bug in iOS 11.2 that can result in "Maybe" being displayed even when "Find Contacts in Other Apps" is disabled.',
 'gold_doc_ids': [3956, 3957, 8034]}

In [58]:
import random

random.Random(0).shuffle(data)
trainset, devset, testset = data[:200], data[200:500], data[500:1000]

len(trainset), len(devset), len(testset)

(200, 300, 500)

In [73]:
# Just importing dspy for the metric only
from dspy.evaluate import SemanticF1
import dspy
from textgrad.engine import get_engine
import litellm

litellm.set_verbose=False

engine = get_engine("experimental:gpt-4o", cache=False)

# Instantiate the metric.
metric = SemanticF1(decompositional=True)
model = tg.BlackboxLLM(engine=engine)

# Produce a prediction from our `cot` module, using the `example` above as input.
example = data[2]
question = tg.Variable(example["question"], role_description="question to the LLM", requires_grad=False)
pred = model(question)

# Compute the metric score for the prediction.
lm = dspy.LM('openai/gpt-4o-mini')
dspy.configure(lm=lm)

def evaluate_single(the_model, the_example):
    the_example = dspy.Example(
        question=the_example["question"],
        response=the_example["response"]
    )
    question = tg.Variable(the_example["question"], role_description="question to the LLM", requires_grad=False)
    pred = dspy.Prediction(
        response=the_model(question)
    )
    score = metric(the_example, pred)
    # print("Question:\n", example.question)
    # print("\n\nGround truth:\n", example.response)
    # print("\n\nPrediction:\n", pred.response)
    # print("\n\nSemantic F1 score:", score)
    return score

In [None]:
from tqdm import tqdm
# Clear instances
tqdm._instances.clear()

# Reset monitor thread
if hasattr(tqdm, 'monitor'):
    tqdm.monitor.exit()
    tqdm.monitor = None

In [60]:
from tqdm import tqdm

# def evaluate(the_model):
#     total_score = 0
#     top_score = 0
#     pbar = tqdm(devset)
#     for example in pbar:
#         score = evaluate_single(the_model, example)
#         total_score += score
#         top_score += 1
#         pbar.set_description(f"Evaluating (score: {total_score:.1f}/{top_score}, {total_score/max(1, top_score):.2%})")
#     return total_score / top_score

from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

def evaluate(the_model):
    total_score = 0
    pbar = tqdm(total=len(devset), position=0, leave=True)

    # Use ThreadPoolExecutor since the work is I/O bound (API calls)
    with ThreadPoolExecutor(max_workers=24) as executor:
        # Submit all tasks
        future_to_example = {
            executor.submit(evaluate_single, the_model, example): example 
            for example in devset
        }
        
        # Process completed tasks as they finish
        for future in as_completed(future_to_example):
            score = future.result()
            total_score += score
            pbar.update(1)
            pbar.set_description(f"Evaluating (score: {total_score:.1f}/{pbar.n}, {total_score/max(1, pbar.n):.2%})")
    
    pbar.close()
    return total_score / len(devset)


In [61]:
evaluate(model)

Evaluating (score: 39.6/82, 48.25%):  27%|██▋       | 82/301 [01:50<04:54,  1.35s/it] 
Evaluating (score: 134.0/300, 44.66%): 100%|██████████| 300/300 [02:03<00:00,  2.43it/s]


0.446588789809674

In [62]:
import json

max_characters = 6000  # for truncating >99th percentile of documents
topk_docs_to_retrieve = 5  # number of documents to retrieve per search query

with open("ragqa_arena_tech_corpus.jsonl") as f:
    corpus = [json.loads(line)['text'][:max_characters] for line in f]

embedder = dspy.Embedder('openai/text-embedding-3-small', dimensions=512)
search = dspy.retrievers.Embeddings(embedder=embedder, corpus=corpus, k=topk_docs_to_retrieve)

Training a 32-byte FAISS index with 337 partitions, based on 28436 x 512-dim embeddings


In [72]:
class RAG():
    def __init__(self, model, search):
        self.model = model
        self.search = search

    def __call__(self, question):
        docs = self.search(question)
        context = "\n".join(docs.passages)
        question = tg.Variable(context + "\n" + question, role_description="question to the LLM", requires_grad=False)
        # print(question)
        return self.model(question)


rag = RAG(model, search)

rag("what are high memory and low memory in linux?")

Variable(value=In Linux, the concepts of "high memory" and "low memory" relate specifically to how a 32-bit Linux kernel manages memory. This is particularly relevant when the physical memory installed in a system exceeds the addressable range of a 32-bit address space.

### Low Memory
- **Definition**: Low memory refers to the portion of memory that the kernel can directly address without any special mapping.
- **Limits**: In a 32-bit architecture, low memory roughly covers up to 896 MB of the physical memory space (due to address space required for other kernel functionalities), although the exact cutoff can vary based on kernel configuration.
- **Access**: Kernel operations on low memory are straightforward because this memory is permanently mapped into the kernel's address space.
- **Use**: Low memory is used for kernel operations and data structures because it doesn't require additional handling for access, making operations more efficient.

### High Memory
- **Definition**: High 