### Import Dependencies

In [1]:
import openai
import instructor
from qdrant_client import QdrantClient

from pydantic import BaseModel, Field

### RAG Pipeline

In [2]:
client = instructor.from_openai(openai.OpenAI())

In [3]:
class RAGGenerationResponse(BaseModel):
    answer: str = Field(description="The answer to the question")

In [4]:
def get_embedding(text, model="text-embedding-3-small"):
    response = openai.embeddings.create(
        input=text,
        model=model,
    )

    return response.data[0].embedding


def retrieve_data(query, qdrant_client, k=5):

    query_embedding = get_embedding(query)

    results = qdrant_client.query_points(
        collection_name="Amazon-items-collection-00",
        query=query_embedding,
        limit=k,
    )

    retrieved_context_ids = []
    retrieved_context = []
    similarity_scores = []
    retrieved_context_ratings = []

    for result in results.points:
        retrieved_context_ids.append(result.payload["parent_asin"])
        retrieved_context.append(result.payload["description"])
        retrieved_context_ratings.append(result.payload["average_rating"])
        similarity_scores.append(result.score)

    return {
        "retrieved_context_ids": retrieved_context_ids,
        "retrieved_context": retrieved_context,
        "retrieved_context_ratings": retrieved_context_ratings,
        "similarity_scores": similarity_scores,
    }


def process_context(context):

    formatted_context = ""

    for id, chunk, rating in zip(context["retrieved_context_ids"], context["retrieved_context"], context["retrieved_context_ratings"]):
        formatted_context += f"- ID: {id}, rating: {rating}, description: {chunk}\n"

    return formatted_context


def build_prompt(preprocessed_context, question):

    prompt = f"""
You are a shopping assistant that can answer questions about the products in stock.

You will be given a question and a list of context.

Instructtions:
- You need to answer the question based on the provided context only.
- Never use word context and refer to it as the available products.

Context:
{preprocessed_context}

Question:
{question}
"""

    return prompt


def generate_answer(prompt):

    response, raw_response = client.chat.completions.create_with_completion(
        model="gpt-4.1-mini",
        messages=[{"role": "system", "content": prompt}],
        temperature=0,
        response_model=RAGGenerationResponse
    )

    return response


def rag_pipeline(question, qdrant_client, top_k=5):

    retrieved_context = retrieve_data(question, qdrant_client, top_k)
    preprocessed_context = process_context(retrieved_context)
    prompt = build_prompt(preprocessed_context, question)
    answer = generate_answer(prompt)

    final_result = {
        "datamodel": answer,
        "answer": answer.answer,
        "question": question,
        "retrieved_context_ids": retrieved_context["retrieved_context_ids"],
        "retrieved_context": retrieved_context["retrieved_context"],
        "similarity_scores": retrieved_context["similarity_scores"]
    }

    return final_result

In [5]:
qdrant_client = QdrantClient(url="http://localhost:6333")

In [6]:
output = rag_pipeline("Can I get a charging cable? Please suggest me a good one.", qdrant_client)

In [7]:
output

{'datamodel': RAGGenerationResponse(answer='Yes, you can get a charging cable. A good option available is the 20FT/6M Link Cable (ID: B09QY98TXK). It is a long cable that should provide ample length for charging and use.'),
 'answer': 'Yes, you can get a charging cable. A good option available is the 20FT/6M Link Cable (ID: B09QY98TXK). It is a long cable that should provide ample length for charging and use.',
 'question': 'Can I get a charging cable? Please suggest me a good one.',
 'retrieved_context_ids': ['B0BWQYJY87',
  'B09PVCVQDP',
  'B09QY98TXK',
  'B09Y8TFH8D',
  'B0B1PM6D24'],
 'retrieved_context': ['YOXXZUS Switch Charger,Switch AC Adapter Support TV Dock Mode for Switch OLED/Switch Lite 39W (15V 2.6A) „ÄêSafe and Stable Charging„ÄëIt takes about 2.5 hours to fully charge the SWITCH,short circuit protection, over current protection, surge protection, overload protection, etc. „ÄêSupport Switch TV Dock„ÄëThis charger is compatible with the SWITCH TV dock,It works for docking

In [8]:
print(output["answer"])

Yes, you can get a charging cable. A good option available is the 20FT/6M Link Cable (ID: B09QY98TXK). It is a long cable that should provide ample length for charging and use.


### RAG Pipeline with Grounding Context

In [9]:
class RAGUsedContext(BaseModel):
    id: str = Field(description="The ID of the item used to answer the question")
    description: str = Field(description="Short description of the item used to answer the question")

class RAGGenerationResponse(BaseModel):
    answer: str = Field(description="The answer to the question")
    references: list[RAGUsedContext] = Field(description="List of items used to answer the question")

In [10]:
def get_embedding(text, model="text-embedding-3-small"):
    response = openai.embeddings.create(
        input=text,
        model=model,
    )

    return response.data[0].embedding


def retrieve_data(query, qdrant_client, k=5):

    query_embedding = get_embedding(query)

    results = qdrant_client.query_points(
        collection_name="Amazon-items-collection-00",
        query=query_embedding,
        limit=k,
    )

    retrieved_context_ids = []
    retrieved_context = []
    similarity_scores = []
    retrieved_context_ratings = []

    for result in results.points:
        retrieved_context_ids.append(result.payload["parent_asin"])
        retrieved_context.append(result.payload["description"])
        retrieved_context_ratings.append(result.payload["average_rating"])
        similarity_scores.append(result.score)

    return {
        "retrieved_context_ids": retrieved_context_ids,
        "retrieved_context": retrieved_context,
        "retrieved_context_ratings": retrieved_context_ratings,
        "similarity_scores": similarity_scores,
    }


def process_context(context):

    formatted_context = ""

    for id, chunk, rating in zip(context["retrieved_context_ids"], context["retrieved_context"], context["retrieved_context_ratings"]):
        formatted_context += f"- ID: {id}, rating: {rating}, description: {chunk}\n"

    return formatted_context


def build_prompt(preprocessed_context, question):

    prompt = f"""
You are a shopping assistant that can answer questions about the products in stock.

You will be given a question and a list of context.

Instructtions:
- You need to answer the question based on the provided context only.
- Never use word context and refer to it as the available products.
- As an output you need to provide:

* The answer to the question based on the provided context.
* The list of the IDs of the chunks that were used to answer the question. Only return the ones that are used in the answer.
* Short description (1-2 sentences) of the item based on the description provided in the context.

- The short description should have the name of the item.
- The answer to the question should contain detailed information about the product and returned with detailed specification in bullet points.

Context:
{preprocessed_context}

Question:
{question}
"""

    return prompt


def generate_answer(prompt):

    response, raw_response = client.chat.completions.create_with_completion(
        model="gpt-4.1-mini",
        messages=[{"role": "system", "content": prompt}],
        temperature=0,
        response_model=RAGGenerationResponse
    )

    return response


def rag_pipeline(question, qdrant_client, top_k=5):

    retrieved_context = retrieve_data(question, qdrant_client, top_k)
    preprocessed_context = process_context(retrieved_context)
    prompt = build_prompt(preprocessed_context, question)
    answer = generate_answer(prompt)

    final_result = {
        "original_ouput": answer,
        "answer": answer.answer,
        "references": answer.references,
        "question": question,
        "retrieved_context_ids": retrieved_context["retrieved_context_ids"],
        "retrieved_context": retrieved_context["retrieved_context"],
        "similarity_scores": retrieved_context["similarity_scores"]
    }

    return final_result

In [11]:
result = rag_pipeline("Can I get some earphones?", qdrant_client, top_k=10)

In [12]:
result

{'original_ouput': RAGGenerationResponse(answer='Yes, you can get earphones in the form of wireless gaming earbuds. The MORMOQUE T2 Wireless Gaming Earbuds offer the following features:\n\n- 45ms Low Latency Combat Mode for gaming\n- Wireless earbuds with microphone\n- IPX5 Waterproof rating\n- 30 hours total playtime with the charging case\n- Hi-Fidelity Stereo Sound quality with punchy bass\n- Touch control ergonomic earbuds\n- Fast charging with Type-C, providing 100 minutes of playtime in just 10 minutes of charging\n- ENC microphone with noise reduction for clear voice transmission\n- Breath LED light decoration\n\nThese earbuds are suitable for gaming and general use with a strong battery life and good sound quality.', references=[RAGUsedContext(id='B0BRV29L6C', description='MORMOQUE T2 Wireless Gaming Earbuds, 45ms Low Latency Combat Mode, Wireless Earbuds with Microphone, IPX5 WaterproofÔºåwith 30H Playtime Gaming Mode-Gaming Wireless Headphones with Ultra-Low Latency; Hi-Fidel

In [13]:
print(result["answer"])

Yes, you can get earphones in the form of wireless gaming earbuds. The MORMOQUE T2 Wireless Gaming Earbuds offer the following features:

- 45ms Low Latency Combat Mode for gaming
- Wireless earbuds with microphone
- IPX5 Waterproof rating
- 30 hours total playtime with the charging case
- Hi-Fidelity Stereo Sound quality with punchy bass
- Touch control ergonomic earbuds
- Fast charging with Type-C, providing 100 minutes of playtime in just 10 minutes of charging
- ENC microphone with noise reduction for clear voice transmission
- Breath LED light decoration

These earbuds are suitable for gaming and general use with a strong battery life and good sound quality.
