In [1]:
%%capture --no-stderr
%pip install --upgrade --quiet -r requirements.txt

In [1]:
import json
from llama_index.core import (
    SimpleDirectoryReader,
    VectorStoreIndex,
    StorageContext,
    load_index_from_storage,
)
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.huggingface import HuggingFaceLLM
from llama_index.core.tools import QueryEngineTool, ToolMetadata
from llama_index.core.agent import ReActAgent

from agents import load_model
from agents import prompts








## Create database replica

In [2]:
from langchain_community.utilities import SQLDatabase

db = SQLDatabase.from_uri("sqlite:///Chinook.db")

## Create metadata

### Samples and Info Schemas

In [3]:
# List tables
l_tables = db.get_usable_table_names()

# Create metadata
d_metadata = {}
for table in l_tables:
    # Sample 10 rows from each table
    sample = db.run(f"SELECT * FROM {table} LIMIT 5;", fetch="cursor")
    # Get table schema
    infoschema = db.run(f"SELECT sql FROM sqlite_master WHERE type='table' AND name='{table}';", fetch="cursor")
    # Store metadata
    d_metadata[table] = {
        "sample": list(sample.mappings()),
        "infoschema": list(infoschema.mappings())[0]['sql'],
    }

### GenAI Metadata

In [4]:
model, pipe = load_model('Llama-3_2-3B-Instruct')

Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

In [6]:
system_prompt = """
You are an AI model trained to analyze database schemas and provide detailed descriptions. 
Given the following table sample and information schema, please:

1. Write a short description the table, including its columns and data types. BE SIMPLE AND CONCISE.
2. Suggest the main use case for this table in a database.

I will provide you with the following information:
{   "sample": "...",
    "schema": "...",
}

Please provide a detailed and structured response with no code formatting using the following format:
{   "description": '...",
    "use case": '...",
}
"""
system_prompt = prompts.metadata

In [7]:
for table in l_tables:
    print(f"Processing table: {table}")
    # Generate response
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": f"{d_metadata[table]}"},
    ]
    response = pipe(messages)

    # Clean response
    max_retries = 3
    for attempt in range(max_retries):
        try:
            response = pipe(messages)
            json_string = response[0]['generated_text'][2]['content'].replace("'", '"')
            data_dict = json.loads(json_string)
            break  # Exit loop if successful
        except Exception as e:
            print(f"Error processing table {table}: {e}")
            if attempt == max_retries - 1:
                raise  # Re-raise the exception if max retries reached
            print(f"Retrying... ({attempt + 1}/{max_retries})")

    # Store metadata
    d_metadata[table]['description'] = data_dict['description']
    d_metadata[table]['use case'] = data_dict['use case']

Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


Processing table: Album


Starting from v4.46, the `logits` model output will have the same type as the model (except at train time, where it will always be FP32)
Setting `pad_token_id` to `eos_token_id`:None for open-end generation.
Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


Processing table: Artist


Setting `pad_token_id` to `eos_token_id`:None for open-end generation.
Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


Processing table: Customer


Setting `pad_token_id` to `eos_token_id`:None for open-end generation.
Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


Processing table: Employee


Setting `pad_token_id` to `eos_token_id`:None for open-end generation.
Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


Processing table: Genre


Setting `pad_token_id` to `eos_token_id`:None for open-end generation.
Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


Processing table: Invoice


Setting `pad_token_id` to `eos_token_id`:None for open-end generation.
Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


Processing table: InvoiceLine


Setting `pad_token_id` to `eos_token_id`:None for open-end generation.
Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


Processing table: MediaType


Setting `pad_token_id` to `eos_token_id`:None for open-end generation.
Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


Processing table: Playlist


Setting `pad_token_id` to `eos_token_id`:None for open-end generation.
Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


Error processing table Playlist: Expecting ',' delimiter: line 3 column 207 (char 513)
Retrying... (1/3)


Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


Processing table: PlaylistTrack


Setting `pad_token_id` to `eos_token_id`:None for open-end generation.
Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


Processing table: Track


Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


In [None]:
# Convert RowMapping to Dict
for table in l_tables:
    d_metadata[table]['sample'] = [dict(row) for row in d_metadata[table]['sample']]

# Save dictionary to a JSON file
with open('data/metadata.json', 'w') as json_file:
    json.dump(d_metadata, json_file, indent=4)

In [26]:
with open('data/metadata.json', 'r') as json_file:
    d_metadata = json.load(json_file)

In [28]:
# Create text files
for table in l_tables:
    with open(f'data/metadata/{table}.txt', 'w') as txt_file:
        txt_file.write(f"\n\nTable: {table}\n\n")
        txt_file.write(f"Description: \n{d_metadata[table]['description']}\n\n")
        txt_file.write(f"Use Case: \n{d_metadata[table]['use case']}\n\n")
        txt_file.write(f"Schema: \n{d_metadata[table]['infoschema']}\n\n")
        txt_file.write(f"Sample:\n")
        for row in d_metadata[table]['sample']:
            txt_file.write(f"{row}\n")

## Create RAG

In [29]:
try:
    storage_context = StorageContext.from_defaults(
        persist_dir="data/storage"
    )
    db_index = load_index_from_storage(storage_context)

    index_loaded = True
except:
    index_loaded = False

In [30]:
if not index_loaded:
    # load data
    chinook_metadata = SimpleDirectoryReader(
        input_files=[f'data/metadata/{table}.txt' for table in l_tables]
    ).load_data()

    # build index
    chinook_index = VectorStoreIndex.from_documents(
        chinook_metadata, 
        embed_model=HuggingFaceEmbedding(
            model_name="BAAI/bge-small-en-v1.5"
        )
    )

    # persist index
    chinook_index.storage_context.persist(persist_dir="data/storage")

In [31]:
db_engine = chinook_index.as_retriever()

In [35]:
tables = db_engine.retrieve("Which album has the most tracks?")

In [39]:
'\n'.join([t.text for t in tables])

"Table: Album\n\nDescription: \nThe Album table contains information about albums, including the album ID, title, and the ID of the artist who created it. The album ID uniquely identifies each album, and the artist ID establishes a foreign key relationship with the Artist table. The title is limited to 160 characters.\n\nUse Case: \nThe main use case for this table is to store and manage information about albums, including their titles, artist IDs, and album IDs, allowing for efficient querying and retrieval of album data in conjunction with artist data.\n\nSchema: \nCREATE TABLE [Album]\n(\n    [AlbumId] INTEGER  NOT NULL,\n    [Title] NVARCHAR(160)  NOT NULL,\n    [ArtistId] INTEGER  NOT NULL,\n    CONSTRAINT [PK_Album] PRIMARY KEY  ([AlbumId]),\n    FOREIGN KEY ([ArtistId]) REFERENCES [Artist] ([ArtistId]) \n\t\tON DELETE NO ACTION ON UPDATE NO ACTION\n)\n\nSample:\n{'AlbumId': 1, 'Title': 'For Those About To Rock We Salute You', 'ArtistId': 1}\n{'AlbumId': 2, 'Title': 'Balls to the

In [43]:
sql_prompt = "Write a SQL query using the information provided. Only return the query, do not explain it."
prompt = "Which album has the most tracks?"

In [44]:
messages = [
        {"role": "system", "content": sql_prompt},
        {"role": "sqlagent", "content": '\n'.join([t.text for t in tables])},
        {"role": "user", "content": prompt},
    ]
response = pipe(messages)

Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


In [57]:
query = response[0]['generated_text'][3]['content'].replace('```sql', '').replace('```', '').strip()

In [58]:
db.run(fr"""{query}""")

"[('Greatest Hits', 57)]"