In [1]:
%reload_ext autoreload
%autoreload 2


# Week 5: Systematically Improving Your RAG Application

> **Note:** Please walk through the previous 2 notebooks 1. Generate Dataset.ipynb  and 2. Metadata Filtering.ipynb before continuing with this notebook. If you haven't generated the dataset, please run `1. Generate Dataset.ipynb` first.

# Why Text-2-SQL?

As we aim to support more complex use cases, text-2-sql becomes an essential tool for our model. It allows the language model to interact with a database, enabling it to write SQL queries and extract valuable insights. 

In this notebook, we'll implement basic text-2-sql capabilities by allowing our model to write SQL queries and retrieve data from a local sqlite database

## Initialising the Database

We'll start by initialising a database and populating it with some fake data. The database will have the following tables

- `stock`: Contains information on specific items, including their color, size, and available quantity.
- `orders`: Stores order details, including user email, order date, and total amount.
- `sales`: Records sales transactions, linking each sale to an order and detailing the items sold.
- `users`: Represents users with accounts in our store, storing their email, name, gender, and other personal details.

We'll be using the `faker` library here to generate realistic looking data and the `taxonomy.yml` file to ensure that the data conforms to our taxonomy. 

In [2]:
from helpers import process_taxonomy_file

taxanomy_data = process_taxonomy_file("./taxonomy.yml")
colors = taxanomy_data["common_attributes"]["Color"]
sizes = taxanomy_data["common_attributes"]["Size"]

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
import random


def generate_size_preference(sizes):
    idx = random.choice([i for i in range(len(sizes))])
    size_pref = random.choice([1, 2])
    if idx == 0:
        return sizes[:2] if size_pref == 2 else [sizes[0]]

    if idx == len(sizes) - 1:
        return sizes[-2:] if size_pref == 2 else sizes[-1:]

    return sizes[idx - 1 : idx + 1] if size_pref == 2 else [sizes[idx]]


def generate_color_preference(colors):
    return random.sample(colors, k=random.randint(1, len(colors)))

In [4]:
import faker
from pydantic import BaseModel
from typing import Literal
from datetime import datetime
from rich import print
import random

fake = faker.Faker()
fake.unique.clear()


def generate_email_from_name(name: str) -> str:
    # Convert name to lowercase and split into parts
    name_parts = name.lower().split()
    first_name = name_parts[0]
    last_name = name_parts[-1] if len(name_parts) > 1 else ""

    # Different email patterns
    patterns = [
        f"{first_name}_{last_name}",  # lisa_burns
        f"{first_name[0]}_{last_name}",  # l_burns
        f"{first_name}.{last_name}",  # lisa.burns
        f"{first_name}{last_name}",  # lisaburns
        f"{first_name[0]}{last_name}",  # lburns
    ]

    domains = [
        "gmail.com",
        "yahoo.com",
        "hotmail.com",
        "outlook.com",
        "icloud.com",
    ]

    email = f"{random.choice(patterns)}@{random.choice(domains)}"
    return email


class UserConfig(BaseModel):
    name: str
    email: str
    gender: Literal["M", "F"]
    date_of_birth: str
    color_pref: list[str]
    size_pref: list[str]
    created_at: str


def generate_user() -> UserConfig:
    name = fake.unique.name_female()
    return UserConfig(
        name=name,
        email=generate_email_from_name(name),
        gender="F",
        date_of_birth=fake.date_of_birth(minimum_age=20, maximum_age=40).strftime(
            "%Y-%m-%d"
        ),
        color_pref=generate_color_preference(colors),
        size_pref=generate_size_preference(sizes),
        created_at=fake.unique.date_between(
            start_date=datetime(2023, 1, 1), end_date=datetime(2024, 12, 31)
        ).strftime("%Y-%m-%d"),
    )


print(generate_user())

In [5]:
from pydantic import Field


class ItemData(BaseModel):
    product_id: int
    price: float


class SoldItem(BaseModel):
    product_id: int
    size: str
    color: str
    quantity: int
    price: float


class UserOrder(BaseModel):
    user_email: str
    items: list[SoldItem]
    date: str = Field(
        default_factory=lambda: fake.date_between(
            start_date=datetime(2023, 1, 1), end_date=datetime(2023, 12, 31)
        ).strftime("%Y-%m-%d")
    )
    total_amount: float

In [6]:
def generate_random_order(
    items: list[ItemData],
    users: list[UserConfig],
):
    order_item_count = random.randint(1, 5)
    user = random.choice(users)
    order_items = random.sample(items, order_item_count)

    user_sizes = user.size_pref
    user_colors = user.color_pref

    orders = []
    total = 0
    for item in order_items:
        orders.append(
            SoldItem(
                product_id=item.product_id,
                size=random.choice(user_sizes),
                color=random.choice(user_colors),
                price=item.price,
                quantity=random.choice(
                    [1, 2]
                ),  # User buys at most 2 sof the same item,
            )
        )
        total += item.price * orders[-1].quantity

    return UserOrder(user_email=user.email, items=orders, total_amount=total)

In [7]:
from datasets import load_dataset

NUMBER_USERS = 100
NUMBER_ORDERS = 200

users = [generate_user() for _ in range(NUMBER_USERS)]

In [8]:
items = [
    ItemData(product_id=item["id"], price=item["price"])
    for item in load_dataset("ivanleomk/labelled-ecommerce-taxonomy")["train"]
]

In [9]:
user_emails = set([user.email for user in users])
len(user_emails)

100

In [10]:
orders = [generate_random_order(items, users) for _ in range(NUMBER_ORDERS)]

In [11]:
from itertools import product
from helpers import process_taxonomy_file
import pandas as pd


def generate_combinations(taxonomy_data: dict):
    """Generate a random subset of combinations of colors and sizes from the taxonomy"""
    import random

    # Get colors and sizes from taxonomy data
    common_attrs = taxonomy_data["common_attributes"]

    colors = common_attrs["Color"]
    sizes = common_attrs["Size"]

    # Select random number of colors and sizes
    num_colors = random.randint(1, 3)
    num_sizes = random.randint(1, 3)

    # Randomly sample colors and sizes
    selected_colors = random.sample(colors, num_colors)
    selected_sizes = random.sample(sizes, num_sizes)

    # Generate random stock count for each color-size combination
    stock_dict = {}
    for color, size in product(selected_colors, selected_sizes):
        stock_dict[(color, size)] = random.randint(1, 30)
    return stock_dict


taxonomy_data = process_taxonomy_file("./taxonomy.yml")
combinations = []
for item in load_dataset("ivanleomk/labelled-ecommerce-taxonomy")["train"]:
    stock_dict = generate_combinations(taxonomy_data)
    for (color, size), stock in stock_dict.items():
        combinations.append(
            {"product_id": item["id"], "color": color, "size": size, "stock": stock}
        )

combinations = pd.DataFrame(combinations)
combinations

Unnamed: 0,product_id,color,size,stock
0,1,Green,XXS,4
1,1,Green,M,1
2,1,Green,S,9
3,2,Brown,M,24
4,2,Gray,M,6
...,...,...,...,...
727,189,Brown,XXXL,22
728,190,Brown,L,16
729,190,Brown,XXS,23
730,191,Gray,XXXL,23


Now that we've generated our data, we'll insert it into our database. 

In [18]:
import sqlite3

# Connect to SQLite database
conn = sqlite3.connect("./db.sqlite")
cursor = conn.cursor()

# Create tables from init.sql
with open("init.sql", "r") as sql_file:
    init_sql = sql_file.read()
    cursor.executescript(init_sql)

In [12]:
import sqlite3

# Connect to SQLite database
conn = sqlite3.connect("./db.sqlite")
cursor = conn.cursor()

# Create tables from init.sql
with open("init.sql", "r") as sql_file:
    init_sql = sql_file.read()
    cursor.executescript(init_sql)

# Insert users data
for user in users:
    cursor.execute(
        """
        INSERT INTO users (email, name, gender, date_of_birth, created_at)
        VALUES (?, ?, ?, ?, ?)
    """,
        (user.email, user.name, user.gender, user.date_of_birth, user.created_at),
    )

# Insert orders and sales data
for order in orders:
    # Insert order
    cursor.execute(
        """
        INSERT INTO orders (user_email, date, total_amount)
        VALUES (?, ?, ?)
    """,
        (order.user_email, order.date, order.total_amount),
    )

    order_id = cursor.lastrowid  # Get the ID of the last inserted order

    # Insert sales for each item in the order
    for item in order.items:
        cursor.execute(
            """
            INSERT INTO sales (order_id, product_id, size, color, quantity, price_per_unit)
            VALUES (?, ?, ?, ?, ?, ?)
        """,
            (
                order_id,
                item.product_id,
                item.size,
                item.color,
                item.quantity,
                item.price,
            ),
        )

# Insert stock data from combinations DataFrame
for _, row in combinations.iterrows():
    cursor.execute(
        """
        INSERT INTO stock (product_id, color, size, quantity)
        VALUES (?, ?, ?, ?)
    """,
        (row["product_id"], row["color"], row["size"], row["stock"]),
    )

# Commit the changes and close the connection
conn.commit()
conn.close()

# Text-2-SQL as a Tool Call

## Allowing a Language Model to Write SQL

Now that we have a database, we can start to look at how we can augment our model with text-2-sql capabilities. 

In [17]:
ds = [item for item in load_dataset("ivanleomk/labelled-ecommerce-taxonomy")["train"]]
ds[0]

{'image': <PIL.JpegImagePlugin.JpegImageFile image mode=RGB size=768x1024>,
 'title': 'Lace Detail Sleeveless Top',
 'brand': 'H&M',
 'description': "Elevate your casual wardrobe with this elegant sleeveless top featuring intricate lace detailing at the neckline. Perfect for both day and night, it's crafted from a soft, breathable fabric for all-day comfort.",
 'category': 'Women',
 'subcategory': 'Tops',
 'product_type': 'Tank Tops',
 'attributes': '[{"name": "Sleeve Length", "value": "Sleeveless"}, {"name": "Neckline", "value": "Crew Neck"}]',
 'material': 'Cotton',
 'pattern': 'Solid',
 'id': 1,
 'price': 181.04,
 'occasions': '["Everyday Wear", "Casual Outings", "Smart Casual", "Dinner Dates", "Partywear"]'}

In [19]:
id_to_product = {item["id"]: item for item in ds}

In [20]:
import sqlite3

# Connect to the database
conn = sqlite3.connect("./db.sqlite")
cursor = conn.cursor()

# Fetch 10 users
cursor.execute("""
    SELECT *
    FROM users
    LIMIT 4
""")

users = cursor.fetchall()
for user in users:
    print(user)

# Close the connection
conn.close()


In [38]:
from pydantic import BaseModel
import instructor
import openai


class SQLQuery(BaseModel):
    chain_of_thought: str
    query: str


client = instructor.from_openai(openai.AsyncOpenAI())
schema = open("./init.sql", "r").read()

user_query = "What was the latest item that I bought"
query = await client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[
        {
            "role": "system",
            "content": f"You are a helpful assistant that can answer questions about the database. Here is the schema of the database: {schema}. The user's email is dorothy_barker@icloud.com",
        },
        {"role": "user", "content": user_query},
    ],
    response_model=SQLQuery,
)

print(query)

In [26]:
conn = sqlite3.connect("./db.sqlite")
cursor = conn.cursor()
cursor.execute(query.query)
results = cursor.fetchall()
for result in results:
    print(result)
    print(id_to_product[result[0]])

conn.close()

## Integrating it into our current flow

Now that we've seen how we can use a language model to generate valid SQL queries, let's integrate it into our current flow.

We'll start by taking a function that will do the following

1. Given a user query, we'll extract out the relevant items
2. Next we'll get the model to generate the relevant SQL to check for stock availability of the items
3. We'll then return the recomended items and the stock availability of the items

Most of this will be re-using our code from the previous notebook.

In [56]:
from typing import Optional
from pydantic import BaseModel, model_validator
import openai
import json
import pandas as pd
import instructor


class Attribute(BaseModel):
    name: str
    values: list[str]


class QueryFilters(BaseModel):
    attributes: list[Attribute]
    material: Optional[list[str]]
    min_price: Optional[float] = None
    max_price: Optional[float] = None
    subcategory: str
    category: str
    product_type: list[str]
    occasions: list[str]

    @model_validator(mode="after")
    def validate_attributes(self):
        # Validate category exists in taxonomy
        if self.category not in taxonomy_data["taxonomy_map"]:
            raise ValueError(
                f"Invalid category: {self.category}. Valid categories are {taxonomy_data['taxonomy_map'].keys()}"
            )

        # Validate subcategory exists under category
        if self.subcategory not in taxonomy_data["taxonomy_map"][self.category]:
            raise ValueError(
                f"Invalid subcategory {self.subcategory} for category {self.category}. Valid subcategories are {taxonomy_data['taxonomy_map'][self.category]}"
            )

        # Validate product types
        valid_types = taxonomy_data["taxonomy_map"][self.category][self.subcategory][
            "product_type"
        ]
        for product_type in self.product_type:
            if product_type not in valid_types:
                raise ValueError(
                    f"Invalid product type: {product_type}. Valid product types are {valid_types}"
                )

        # Validate attributes
        valid_attrs = taxonomy_data["taxonomy_map"][self.category][self.subcategory][
            "attributes"
        ]
        for attr in self.attributes:
            if attr.name not in valid_attrs:
                raise ValueError(f"Invalid attribute name: {attr.name}")
            for value in attr.values:
                if value not in valid_attrs[attr.name]:
                    raise ValueError(
                        f"Invalid value {value} for attribute {attr.name}. Valid values are {valid_attrs[attr.name]}"
                    )

        # Validate occasions
        for occasion in self.occasions:
            if occasion not in taxonomy_data["occasions"]:
                raise ValueError(
                    f"Invalid occasion: {occasion}. Valid Occasions are {taxonomy_data['occasions']}"
                )

        # Validate materials if provided
        if self.material:
            for material in self.material:
                if material not in taxonomy_data["materials"]:
                    raise ValueError(
                        f"Invalid material: {material}. Valid Materials are {taxonomy_data['materials']}"
                    )

        return self


async def extract_query_filters(
    query: str, client: openai.AsyncOpenAI, taxonomy_data: dict
) -> QueryFilters:
    return await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": """
                    You are a helpful assistant that extracts user requirements from a query.
                    
                    Use these references:
                    - Taxonomy: {{ taxonomy }}
                    - Valid occasions: {{ occasions }} 
                    - Valid materials: {{ materials }}
                    
                    Guidelines:
                    - If a filter isn't needed, return an empty list
                    - Only use values from the provided taxonomy, occasions and materials lists. If the attribute exists on multiple types, make sure that you only look at the specific types listed under the subcategory you have chosen
                    - if the user gives a range (Eg. around 50), just give a buffer of 20 on each side (Eg. 30-70)
                    - if the user gives a vague price (Eg. I have a high budget), just set max price to 1000
                    - Only add attributes and filters that a user has mentioned explicitly
                    - Cap sleeves are the same as sleeveless
                    - Only include a regular or relaxed fit if the user has explicitly mentioned it. If you do include either of them, include both in the filters
                    - only classify an item as unisex if the user has explicitly mentioned it. 

                    Extract the requirements and format them according to the QueryFilters model.
                """,
            },
            {"role": "user", "content": query},
        ],
        context={
            "taxonomy_map": taxonomy_data["taxonomy_map"],
            "taxonomy": taxonomy_data["taxonomy"],
            "occasions": taxonomy_data["occasions"],
            "materials": taxonomy_data["materials"],
        },
        response_model=QueryFilters,
    )


def retrieve_and_filter(query: str, table, filters: QueryFilters, max_k=100):
    query_parts = []

    # We do a prefilter on category,price and material since these will always be provided
    query_parts.append(f"category='{filters.category}'")
    query_parts.append(f"subcategory='{filters.subcategory}'")

    if filters.min_price:
        query_parts.append(f"price >= {filters.min_price}")
    if filters.max_price:
        query_parts.append(f"price <= {filters.max_price}")

    query_string = " AND ".join(query_parts)
    items = (
        table.search(query=query)
        .where(query_string, prefilter=True)
        .limit(max_k)
        .to_list()
    )

    items = [
        {
            **item,
            "attributes": json.loads(item["attributes"]),
            "occasions": json.loads(item["occasions"]),
        }
        for item in items
    ]

    if filters.product_type:
        items = [item for item in items if item["product_type"] in filters.product_type]

    if filters.material:
        items = [item for item in items if item["material"] in filters.material]

    if filters.occasions:
        items = [
            item
            for item in items
            if any(occasion in item["occasions"] for occasion in filters.occasions)
        ]

    if filters.attributes:
        for attr in filters.attributes:
            if not attr.values:
                continue
            curr_items = []
            for item in items:
                attr_name = attr.name
                attr_values = attr.values
                item_attr_values = item["attributes"]
                for item_attr in item_attr_values:
                    if (
                        item_attr["name"] == attr_name
                        and item_attr["value"] in attr_values
                    ):
                        curr_items.append(item)
                        break

            items = curr_items

    return items


In [76]:
from helpers import process_taxonomy_file
import lancedb

taxonomy_data = process_taxonomy_file("./taxonomy.yml")
client = instructor.from_openai(openai.AsyncOpenAI())
db = lancedb.connect("./lancedb")
labelled_table = db.open_table("labelled_items")

query = "What are some good tank tops under 40, what sizes do you have it in?"
filters = await extract_query_filters(query, client, taxonomy_data)
items = retrieve_and_filter(query, labelled_table, filters)
pd.DataFrame(items)


Unnamed: 0,id,title,description,brand,category,subcategory,product_type,attributes,occasions,material,pattern,price,vector,_distance
0,87,Lace Trim Tank Top,Elevate your wardrobe with this elegant sleeve...,Zara,Women,Tops,Tank Tops,"[{'name': 'Sleeve Length', 'value': 'Sleeveles...","[Everyday Wear, Casual Outings, Smart Casual, ...",Cotton,Solid,28.53,"[0.07034333050251007, 0.014594318345189095, -0...",1.227293
1,183,Tie-Dye Sleeveless Top,Add a splash of color to your wardrobe with th...,Urban Outfitters,Women,Tops,Tank Tops,"[{'name': 'Sleeve Length', 'value': 'Sleeveles...","[Casual Outings, Everyday Wear, Smart Casual, ...",Cotton,Geometric,36.26,"[0.09776991605758667, 0.06634704023599625, -0....",1.431715


We don't have the ability to support this query since we haven't added the sizes on the frontend. Text-to-SQL is a great fit for this here. 

In [119]:
from pydantic import Field


class ModelResponse(BaseModel):
    chain_of_thought: str
    response: str


class Response(BaseModel):
    use_sql: bool
    summary: str = Field(
        description="A summary of the user's request and the information we've fetched so far. Make sure to provide all of the important context such as product ids ,title, brand etc of the product"
    )
    sql_query: Optional[str] = None

    async def execute(self, conn, client: openai.AsyncOpenAI):
        items = []
        if self.use_sql and self.sql_query:
            cursor = conn.cursor()
            cursor.execute(self.sql_query)
            results = cursor.fetchall()
            items = [item for item in results]
            conn.close()

        print(f"Fetched {items}")

        return await client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "system",
                    "content": f"""
                You're a helpful assistant that aims to help answer a user query in our database.
                
                Here is a summary of the user's request and the information we've fetched so far:
                {self.summary}
                
                We've identified the following items as being relevant to the user id and provided the id, brand and titles here.
                {items}

                Rules
                - Make sure not to expose the product id in your final response where possible, always use the title of the item and mention its brand.
                - only respond and answer the user's question. 
                - if items is empty, then that means we don't have any stock for the item in the size or color that the user wants. But mention that we found those items but they're currently out of stock.
                """,
                }
            ],
            response_model=ModelResponse,
        )

Let's extract this into a function and see how it works from end to end

In [120]:
async def check_database(
    query: str,
    client: openai.AsyncOpenAI,
    taxonomy_data: dict,
    items: list,
    schema: str,
    db_path: str
):
    """
    Check database for additional information needed to answer the query.
    
    Args:
        query: The user's natural language query
        client: OpenAI client for making API calls
        taxonomy_data: Product taxonomy information
        items: List of relevant items already retrieved
        schema: Database schema
        db_path: Path to SQLite database
    """
    sql_query = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": f"""
            You are a helpful assistant that can help write SQL queries to retrieve items from the database.
            
            Here is the schema of the database: 
            ```
            {schema}
            ```

            The user's query is: {query}. Determine if you need to write a SQL query to get the information needed to answer the query or not

            We've fetched the following relevant items from the database. 
            
            - If you need to write SQL, use the `id` field here as the product_id. 
            - Make sure to preserve the product_id in your query so you can use it to associate the returned items with the original query
            - If you don't need to write SQL, just return use_sql as False and the sql string as None. If you determine that more details are required, then make sure to provide a SQL query that can be executed based off the schema provided.
            
            ```
            {items}
            ```
            """,
            },
            {"role": "user", "content": query},
        ],
        response_model=Response,
    )

    conn = sqlite3.connect(db_path)
    print(sql_query)
    return await sql_query.execute(conn, client)


async def respond_to_query(
    query: str,
    client: openai.AsyncOpenAI,
    taxonomy_data: dict,
    table,
    schema: str,
    db_path: str
):
    """
    Process a natural language query and return relevant product information.
    
    Args:
        query: The user's natural language query
        client: OpenAI client for making API calls
        taxonomy_data: Product taxonomy information  
        table: LanceDB table containing product data
        schema: Database schema
        db_path: Path to SQLite database
    """
    filters = await extract_query_filters(query, client, taxonomy_data)
    items = retrieve_and_filter(query, table, filters)
    items = [
        {
            "id": item["id"],
            "brand": item["brand"],
            "title": item["title"],
            "material": item["material"],
            "attributes": item["attributes"],
        }
        for item in items
    ]
    print(f"Identified {len(items)} items after applying {filters}")
    

    return await check_database(
        query=query,
        client=client,
        taxonomy_data=taxonomy_data,
        items=items,
        schema=schema,
        db_path=db_path
    )


# Example usage:

DB_PATH = "./db.sqlite"
TAXONOMY_PATH = "./taxonomy.yml"
LANCEDB_PATH = "./lancedb"

# Initialize dependencies
taxonomy_data = process_taxonomy_file(TAXONOMY_PATH)
client = instructor.from_openai(openai.AsyncOpenAI())
db = lancedb.connect(LANCEDB_PATH)
labelled_table = db.open_table("labelled_items")

# Load schema
with open("init.sql") as f:
    schema = f.read()

# Example query
query = "I'm looking for a black tank top . I wear a size M-L. What are some good options?"
result = await respond_to_query(
    query=query,
    client=client,
    taxonomy_data=taxonomy_data,
    table=labelled_table,
    schema=schema,
    db_path=DB_PATH
)
print(result.response)