# Context

After releasing your RAG-based product, you will want to characterize the types of questions you receive and how you perform on each cluster of questions. This notebook generates synthetic data that is a stand-in for your production data. You won't use code from this notebook, and instead it creates assets used in analyze_clusters (which is a notebook you can reuse in your work.)

In [1]:
import asyncio
from typing import List
import instructor
import json
import lancedb
from openai import AsyncOpenAI
from numpy.random import poisson, uniform
from question_types import QuestionTypes, TypedQuestion, Question, Product, question_type_details
client = instructor.from_openai(AsyncOpenAI())

question_type_stats = {
    QuestionTypes.COMPARISON: {
        "avg_questions_per_item": 7,
        "frac_thumbs_up": 0.1,
    },
    QuestionTypes.GENERAL: {
        "avg_questions_per_item": 5,
        "frac_thumbs_up": 0.7,
    },
    QuestionTypes.TYPICAL_PRICE: {
        "avg_questions_per_item": 2,
        "frac_thumbs_up": 0.3,
    },
    QuestionTypes.CUSTOMER_SERVICE: {
        "avg_questions_per_item": 3,
        "frac_thumbs_up": 0.7,
    },
    QuestionTypes.VISUAL: {
        "avg_questions_per_item": 2,
        "frac_thumbs_up": 0.1,
    },
    QuestionTypes.ACCESSORIES: {
        "avg_questions_per_item": 2,
        "frac_thumbs_up": 0.3,
    },
    QuestionTypes.COMPATIBILITY: {
        "avg_questions_per_item": 1,
        "frac_thumbs_up": 0.8,
    },
    QuestionTypes.COUNTRY_OF_ORIGIN: {
        "avg_questions_per_item": 1,
        "frac_thumbs_up": 0.6,
    },
    QuestionTypes.ENVIRONMENTAL: {
        "avg_questions_per_item": 1,
        "frac_thumbs_up": 0.1,
    },
    QuestionTypes.AUTHENTIC: {
        "avg_questions_per_item": 1,
        "frac_thumbs_up": 0.1,
    },
    QuestionTypes.MATERIALS: {
        "avg_questions_per_item": 1,
        "frac_thumbs_up": 0.2,
    },
    QuestionTypes.TIME_SENSITIVE: {
        "avg_questions_per_item": 6,
        "frac_thumbs_up": 0.1,
    },
}

db = lancedb.connect("../bootstrap_evals/lancedb")
products = db.open_table("products").to_pandas()[["title", "description"]]
products = [
    Product(title=row["title"], description=row["description"])
    for _, row in products.iterrows()
]


async def generate_questions(
    n_questions: int, product: Product, question_type: QuestionTypes, semaphore: asyncio.Semaphore
) -> List[TypedQuestion]:
    async with semaphore:
        question_type_info = question_type_details[question_type]
        question_type_title = question_type_info.title
        question_type_description = question_type_info.description
        question_type_example = question_type_info.example
        prompt = f"Create {n_questions} questions about a {product.title} that someone might ask before purchasing it.\n"
        prompt += f"The description of the item is: {product.description}\n"
        prompt += f"We want to focus on a specific type of question. That is: `{question_type_title}`.\n"
        prompt += f"This category of questions is described as: `{question_type_description}`.\n"
        prompt += f"Here is an example of a question in that category: `{question_type_example}`.\n"
        prompt += "The questions should be varied. Do not have duplicates.\n"
        prompt += "Use creative license to make the questions specific and concrete (e.g. you can make up other product names or details to make concrete questions)"
        prompt += "Respond only with the list of questions."

        frac_thumbs_up = question_type_stats[question_type]["frac_thumbs_up"]
        try:
            questions = client.chat.completions.create_iterable(
                model="gpt-4o-mini",
                response_model=Question,
                messages=[{"role": "user", "content": prompt}],
            )
            return [
                TypedQuestion(
                    question=Question(text=q.text),
                    question_type=question_type,
                    product=product,
                    thumbs_up=uniform() < frac_thumbs_up,
                )
                async for q in questions
            ]
        except Exception as e:
            print(f"Error generating evals: {str(e)}")
            return []


async def all_questions_for_qtype(question_type: QuestionTypes, semaphore: asyncio.Semaphore) -> List[TypedQuestion]:
    avg_questions_per_item = question_type_stats[question_type.value]["avg_questions_per_item"]
    tasks = []
    for product in products:
        n_questions = round(avg_questions_per_item)
        task = generate_questions(n_questions, product, question_type, semaphore)
        tasks.append(task)
    
    results = await asyncio.gather(*tasks)
    return [question for sublist in results for question in sublist]

async def generate_all_questions():
    semaphore = asyncio.Semaphore(20)  # Limit concurrency to 20
    tasks = []
    for qt in QuestionTypes:
        task = all_questions_for_qtype(qt, semaphore)
        tasks.append(task)
    
    results = await asyncio.gather(*tasks)
    return [question for sublist in results for question in sublist]


In [2]:
questions = await generate_all_questions()

In [3]:
def serialize_question(q: TypedQuestion):
    return {
        "question": q.question.text,
        "product": q.product.dict(),
        "thumbs_up": q.thumbs_up
    }

with open("prod_questions.json", "w") as f:
    serialized = [serialize_question(q) for q in questions]
    json.dump(serialized, f, default=str)