# Using LLMs to Generate CQL

Since LLMs seem to excel at a lot of things, we wanted to show how they can be used to generate CQL to query your Cassandra tables. This notebook provides a guide derived from the [SQL-PaLM](https://arxiv.org/abs/2306.00739) paper on how to automatically show the LLM your DB schema, and let it inform the LLM on querying your data.

## Setup

#### Requirements

In [None]:
# Install requirements, if not already installed
!pip install openai cassandra-driver

Collecting openai
  Downloading openai-1.13.3-py3-none-any.whl (227 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m227.4/227.4 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting cassandra-driver
  Downloading cassandra_driver-3.29.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m18.8/18.8 MB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
Collecting httpx<1,>=0.23.0 (from openai)
  Downloading httpx-0.27.0-py3-none-any.whl (75 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m75.6/75.6 kB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
Collecting geomet<0.3,>=0.1 (from cassandra-driver)
  Downloading geomet-0.2.1.post1-py3-none-any.whl (18 kB)
Collecting httpcore==1.* (from httpx<1,>=0.23.0->openai)
  Downloading httpcore-1.0.4-py3-none-any.whl (77 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.8/77.8 kB[0m [31m8.4 MB/s[0m eta [36m0:

#### Connect to Services

In [None]:
# Initialize the OpenAI Client
import os

from getpass import getpass
import openai

if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass("OpenAI API Key: ")

client = openai.OpenAI()


OpenAI API Key: ··········


In [None]:
# Connect to a Cassandra Cluster and initialize the session
import re

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from getpass import getpass
from google.colab import files

ASTRA_TOKEN = os.environ.get(
    "ASTRA_DB_TOKEN",
    getpass("Astra DB Token: ")
)

ASTRA_BUNDLE_PATH = os.environ.get(
    "ASTRA_DB_BUNDLE_PATH",
    list(files.upload().keys())[0],
)

ASTRA_KEYSPACE = os.environ.get(
    "ASTRA_DB_KEYSPACE",
    input("Astra DB Keyspace: "),
)

cloud_config = {
    'secure_connect_bundle': ASTRA_BUNDLE_PATH
}
auth_provider = PlainTextAuthProvider("token", ASTRA_TOKEN)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect(keyspace=ASTRA_KEYSPACE)


def execute_statement(statement: str):
    # This is a simple wrapper around executing CQL statements in our
    # Cassandra cluster, and either raising an error or returning the results
    try:
        rows = session.execute(statement)
        return rows.all()
    except:
        print(f"Query Failed: {statement}")
        raise


Astra DB Token: ··········


Saving secure-connect-enablement-classic (1).zip to secure-connect-enablement-classic (1).zip
Astra DB Keyspace: enablement


ERROR:cassandra.connection:Closing connection <AsyncoreConnection(135229243594336) 256697d1-b172-4480-b148-5089346d2239-eu-west-1.db.astra.datastax.com:29042:ab854d5c-564b-4934-abe9-bdc826681350> due to protocol error: Error from server: code=000a [Protocol error] message="Beta version of the protocol used (5/v5-beta), but USE_BETA flag is unset"


#### (Optional) Dummy DB Setup

Feel free to skip this section if you are instead adapting the notebook to fit your existing Cassandra Database. Here, we will utilize the python `cassandra-driver` package to connect to a DB and create some fake tables. This schema is pulled from [this DataStax example](https://www.datastax.com/learn/data-modeling-by-example/digital-library-data-model) on creating a data model for a digital music library.

In [None]:

session.execute("DROP TABLE IF EXISTS accounts");
session.execute("DROP TABLE IF EXISTS transactions");
session.execute("DROP TABLE IF EXISTS customers");




In [None]:
create_tables_cql = """
CREATE TABLE customers (
    name TEXT PRIMARY KEY,
    type TEXT,
    balance DECIMAL,
    created_at TIMESTAMP,
    phone TEXT
);

CREATE TABLE accounts (
    name TEXT PRIMARY KEY,
    customer_name TEXT,
    type TEXT,
    balance DECIMAL,
    created_at TIMESTAMP
);

CREATE TABLE transactions (
    name TEXT,
    account_id UUID,
    type TEXT,
    amount DECIMAL,
    balance_after DECIMAL,
    timestamp TIMESTAMP, -- Added missing comma here
    PRIMARY KEY (name, account_id, amount)
);

"""

# Execute the CQL statements for table creation
for statement in create_tables_cql.split(";"):
    if len(statement.strip()):
        session.execute(statement.strip())


In [None]:
from uuid import uuid4
from datetime import datetime

# Assume these are executed after generating UUIDs for customers and accounts
insert_fake_data_cql = """
INSERT INTO customers (name, type, balance, created_at, phone) VALUES ('John Doe', 'Personal', 1200.50, toTimestamp(now()), '555-1234');
INSERT INTO customers (name, type, balance, created_at, phone) VALUES ('Jane Smith', 'Business', 5000.00, toTimestamp(now()), '555-5678');
INSERT INTO customers (name, type, balance, created_at, phone) VALUES ('Alice Johnson', 'Personal', 750.25, toTimestamp(now()), '555-9012');
INSERT INTO customers (name, type, balance, created_at, phone) VALUES ('Bob Brown', 'Business', 2250.00, toTimestamp(now()), '555-3456');
INSERT INTO customers (name, type, balance, created_at, phone) VALUES ('Charlie Davis', 'Personal', 300.00, toTimestamp(now()), '555-7890');



INSERT INTO accounts (name, customer_name, type, balance, created_at) VALUES ('Acc-JohnDoe', 'John Doe', 'Checking', 1200.50, toTimestamp(now()));
INSERT INTO accounts (name, customer_name, type, balance, created_at) VALUES ('Acc-JaneSmith', 'Jane Smith', 'Savings', 5000.00, toTimestamp(now()));
INSERT INTO accounts (name, customer_name, type, balance, created_at) VALUES ('Acc-AliceJohnson', 'Alice Johnson', 'Checking', 750.25, toTimestamp(now()));
INSERT INTO accounts (name, customer_name, type, balance, created_at) VALUES ('Acc-BobBrown', 'Bob Brown', 'Savings', 2250.00, toTimestamp(now()));
INSERT INTO accounts (name, customer_name, type, balance, created_at) VALUES ('Acc-CharlieDavis', 'Charlie Davis', 'Checking', 300.00, toTimestamp(now()));



INSERT INTO transactions (name, account_id, type, amount, balance_after, timestamp) VALUES ('Transaction1', uuid(), 'Deposit', 100.00, 1300.50, toTimestamp(now()));
INSERT INTO transactions (name, account_id, type, amount, balance_after, timestamp) VALUES ('Transaction2', uuid(), 'Withdrawal', 200.00, 4800.00, toTimestamp(now()));
INSERT INTO transactions (name, account_id, type, amount, balance_after, timestamp) VALUES ('Transaction3', uuid(), 'Deposit', 50.00, 800.25, toTimestamp(now()));
INSERT INTO transactions (name, account_id, type, amount, balance_after, timestamp) VALUES ('Transaction4', uuid(), 'Withdrawal', 100.00, 2150.00, toTimestamp(now()));
INSERT INTO transactions (name, account_id, type, amount, balance_after, timestamp) VALUES ('Transaction5', uuid(), 'Deposit', 200.00, 500.00, toTimestamp(now()));


"""



## (Optional) Give the LLM Additional Context with the Built-in 'Comments' Column

LLM response quality greatly depends on the context they've been given - the more concise descriptions they have access to, the better. We can choose to augment the DB schema we pass to the model by utilizing the built-in `comment` property of CQL tables.

NOTE: You can also include these comments at table creation by using the `WITH <table property 1> AND <table property 2> ... AND comment = '<comment>'` syntax

In [None]:
add_comments_cql = f"""
ALTER TABLE customers WITH comment = 'Customer profiles, uniquely identified by customer name.';

ALTER TABLE accounts WITH comment = 'Bank accounts, uniquely identified by account name, associated with customer names.';

ALTER TABLE transactions WITH comment = 'Banking transactions, uniquely identified by a composite key of transaction name, account_id, and amount.';

"""

In [None]:
# This parses the text above into executable strings by the driver
for line in add_comments_cql.split("\n"):
    sc_loc = line.find(";")
    if sc_loc > -1:
        execute_statement(line[:sc_loc])

## Run Queries from User Questions

#### Generating & Executing CQL

Now, we can ask ChatGPT to provide us with some queries that answer our questions! The prompt template we use is taken from [SQL-PaLM](https://arxiv.org/abs/2306.00739), and adapted to fit the CQL use case. In order to use it though, we need to retrieve the schema from our DB.

In [None]:
TEXT2CQL_PROMPT = """Convert the question to CQL (Cassandra Query Language) that can retrieve an appropriate answer, or answer saying that the data model does not support answering such a question in a performant way:

[Schema : values (type)]
{schema}

[Partition Keys]
{partition_keys}

[Clustering Keys]
{clustering_keys}

[Q]
{question}

[CQL]
"""


def generate_schema_partition_clustering_keys(keyspace: str = ASTRA_KEYSPACE) -> (str, str):
    """Generates a TEXT2CQL_PROMPT compatible schema for a keyspace"""
    # Get all table names in our keyspace
    table_names = execute_statement(
        f"SELECT table_name, comment FROM system_schema.tables WHERE keyspace_name = '{keyspace}'"
    )
    tn_str = ", ".join(["'" + tn.table_name + "'" for tn in table_names])

    # Now get all the column names corresponding to those tables
    columns = execute_statement(
        f"SELECT * FROM system_schema.columns WHERE table_name IN ({tn_str}) AND keyspace_name = '{keyspace}' ALLOW FILTERING"
    )

    # Now, we construct our prompt template formatted schema, partition_keys, and clustering keys
    # from the table and column objects returned from the DB
    schema = " | ".join([
        f"{table.table_name} '{table.comment}' : " + " , ".join([
            f"{col.column_name} ({col.type})"
            for col in columns
            if col.table_name == table.table_name
        ])
        for table in table_names
    ])
    partition_keys = " | ".join([
        f"{table.table_name} : " + " , ".join([
            col.column_name for col in columns
            if col.table_name == table.table_name
            and col.kind == "partition_key"
        ])
        for table in table_names
    ])
    clustering_keys = " | ".join([
        f"{table.table_name} : " + " , ".join([
            f"{col.column_name} ({col.clustering_order})" for col in columns
            if col.table_name == table.table_name
            and col.kind == "clustering"
        ])
        for table in table_names
    ])
    return schema, partition_keys, clustering_keys


def execute_query_from_question(question: str, debug_cql: bool = True, debug_prompt: bool = False, return_cql: bool = False):
    """Generates and executes CQL from a user question based on LLM output"""
    # Get all of the variables necessary to fill out the prompt
    schema, partition_keys, clustering_keys = generate_schema_partition_clustering_keys()
    prompt = TEXT2CQL_PROMPT.format(
        schema=schema,
        partition_keys=partition_keys,
        clustering_keys=clustering_keys,
        question=question,
    )

    if debug_prompt:
        print(f"Prompting model with:\n{prompt}")

    # Get generated CQL from the LLM (in this case gpt-3.5-turbo)
    completion = client.chat.completions.create(
        messages=[{
            "role": "user",
            "content": prompt,
        }],
        model="gpt-3.5-turbo",
    ).choices[0].message.content

    if debug_cql:
        print(f"Question: {question}\nGenerated Query: {completion}\n")

    # Need to trim trailing ';' if present to work with cassandra-driver
    if completion.find(";") > -1:
        completion = completion[:completion.find(";")]

    results = execute_statement(completion)

    if return_cql:
        return (results, completion)
    else:
        return results

In [None]:
# Show full prompting trace for a banking-related question
print(execute_query_from_question("What are the customer names with accounts with balance above 1000 , allow filtering ? ", debug_prompt=True))




Prompting model with:
Convert the question to CQL (Cassandra Query Language) that can retrieve an appropriate answer, or answer saying that the data model does not support answering such a question in a performant way:

[Schema : values (type)]
accounts 'Bank accounts, uniquely identified by account name, associated with customer names.' : balance (decimal) , created_at (timestamp) , customer_name (text) , name (text) , type (text) | customers 'Customer profiles, uniquely identified by customer name.' : balance (decimal) , created_at (timestamp) , name (text) , phone (text) , type (text) | transactions 'Banking transactions, uniquely identified by a composite key of transaction name, account_id, and amount.' : account_id (uuid) , amount (decimal) , balance_after (decimal) , name (text) , timestamp (timestamp) , type (text)

[Partition Keys]
accounts : name | customers : name | transactions : name

[Clustering Keys]
accounts :  | customers :  | transactions : account_id (asc) , amount (

Pretty cool that it can find the data to answer our questions! Let's see if we can take this one step further, and actually generate coherent responses using this data:

#### End to End Question Answering

Now, let's wrap up by showing how we can make a subsequent LLM call to answer the user's question with natural language. This completes a full "RAG" style pipeline!

In [None]:
ANSWER_PROMPT = """Query:
```
{cql}
```

Output:
```
{results_repr}
```
===

Given the above results from querying the DB, answer the following user question:

{question}
"""


def answer_question(question: str, debug_cql: bool = False, debug_prompt: bool = False) -> str:
    """Conducts a full RAG pipeline where the LLM retrieves relevant information
    and references it to answer the question in natural language.
    """
    # Get necessary fields to fill out prompt
    query_results, cql = execute_query_from_question(
        question=question,
        debug_cql=debug_cql,
        debug_prompt=debug_prompt,
        return_cql=True,
    )
    prompt = ANSWER_PROMPT.format(
        question=question,
        results_repr=str(query_results),
        cql=cql,
    )

    if debug_prompt:
        print(f"Prompting model with:\n{prompt}")

    # Return the generated answer from the LLM
    return client.chat.completions.create(
        messages=[{
            "role": "user",
            "content": prompt,
        }],
        model="gpt-3.5-turbo",
    ).choices[0].message.content


In [None]:
# Show full prompting trace
print(
    answer_question("What is the amount of Transaction1?", debug_prompt=True)

)

Prompting model with:
Convert the question to CQL (Cassandra Query Language) that can retrieve an appropriate answer, or answer saying that the data model does not support answering such a question in a performant way:

[Schema : values (type)]
accounts 'Bank accounts, uniquely identified by account name, associated with customer names.' : balance (decimal) , created_at (timestamp) , customer_name (text) , name (text) , type (text) | customers 'Customer profiles, uniquely identified by customer name.' : balance (decimal) , created_at (timestamp) , name (text) , phone (text) , type (text) | transactions 'Banking transactions, uniquely identified by a composite key of transaction name, account_id, and amount.' : account_id (uuid) , amount (decimal) , balance_after (decimal) , name (text) , timestamp (timestamp) , type (text)

[Partition Keys]
accounts : name | customers : name | transactions : name

[Clustering Keys]
accounts :  | customers :  | transactions : account_id (asc) , amount (

In [None]:
# Show full prompting trace
print(
    answer_question("What is the type of Transaction5 and the balance_after ?", debug_prompt=True)

)

Prompting model with:
Convert the question to CQL (Cassandra Query Language) that can retrieve an appropriate answer, or answer saying that the data model does not support answering such a question in a performant way:

[Schema : values (type)]
accounts 'Bank accounts, uniquely identified by account name, associated with customer names.' : balance (decimal) , created_at (timestamp) , customer_name (text) , name (text) , type (text) | customers 'Customer profiles, uniquely identified by customer name.' : balance (decimal) , created_at (timestamp) , name (text) , phone (text) , type (text) | transactions 'Banking transactions, uniquely identified by a composite key of transaction name, account_id, and amount.' : account_id (uuid) , amount (decimal) , balance_after (decimal) , name (text) , timestamp (timestamp) , type (text)

[Partition Keys]
accounts : name | customers : name | transactions : name

[Clustering Keys]
accounts :  | customers :  | transactions : account_id (asc) , amount (

In [None]:
# Show full prompting trace
print(
    answer_question("What is the account type of John Doe and its balance allow filtering? ", debug_prompt=True)
)

Prompting model with:
Convert the question to CQL (Cassandra Query Language) that can retrieve an appropriate answer, or answer saying that the data model does not support answering such a question in a performant way:

[Schema : values (type)]
accounts 'Bank accounts, uniquely identified by account name, associated with customer names.' : balance (decimal) , created_at (timestamp) , customer_name (text) , name (text) , type (text) | customers 'Customer profiles, uniquely identified by customer name.' : balance (decimal) , created_at (timestamp) , name (text) , phone (text) , type (text) | transactions 'Banking transactions, uniquely identified by a composite key of transaction name, account_id, and amount.' : account_id (uuid) , amount (decimal) , balance_after (decimal) , name (text) , timestamp (timestamp) , type (text)

[Partition Keys]
accounts : name | customers : name | transactions : name

[Clustering Keys]
accounts :  | customers :  | transactions : account_id (asc) , amount (

Awesome! Our model is answering questions based on just the data in our dummy DB, and is able to construct queries for retrieving that data in a fully automated way.