In [1]:
from decouple import config
from langchain_community.utilities import SQLDatabase
from langchain_experimental.sql import SQLDatabaseChain
from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import sessionmaker
from operator import itemgetter

from langchain.chains import create_sql_query_chain, LLMChain
from langchain_openai import ChatOpenAI, OpenAI
from langchain_core.prompts import PromptTemplate, FewShotPromptTemplate
from langchain_community.tools.sql_database.tool import QuerySQLDataBaseTool
from langchain_core.runnables import RunnablePassthrough, RunnableLambda, RunnableParallel
from langchain_core.output_parsers import StrOutputParser
from langchain.evaluation import load_evaluator, EmbeddingDistance
from langchain_community.embeddings import HuggingFaceEmbeddings

import pandas as pd
import psycopg2
import json

In [2]:
OPENAI_API_KEY = config("OPENAI_API_KEY")
DB_USER = config('DB_USER')
DB_PASSWORD = config('DB_PASSWORD')
DB_HOST = 'localhost'
DB_PORT = '5432'
DB_NAME = 'food_security'
DATABASE_URL = f'postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'
FILE_SQL_EXAMPLES_EN = "sql_examples_en.json"
USE_BEST_MATCHING_COLUMNS = False

## Prompt Templates

### Create Prompt to Select best Table

In [3]:
def connect_to_database(database_url=DATABASE_URL):
    """Connects to a postgreSQL


    Parameters
    ----------
    database_url : String
        postgreSQL database connection URL, by default DATABASE_URL
    """
    # conn = psycopg2.connect(f"dbname={DB_NAME} user={DB_USER} password={DB_PASSWORD}")
    conn = psycopg2.connect(database_url)

    cur = conn.cursor()

    # Query to get table names and column names
    cur.execute("SELECT table_name, description FROM table_metadata")
    tables = cur.fetchall()

    cur.execute("SELECT table_name, column_name, description FROM column_metadata")
    columns = cur.fetchall()

    cur.close()
    conn.close()

    return tables, columns

In [4]:
def find_best_table_prompt(user_query, tables, columns, 
                           return_chain=True, llm=None):# Define the template for selecting the best table
    template = """
    You are a database assistant. Given the following tables and columns with their descriptions, select the best table that matches the user's query.

    Tables and Columns:
    {table_info}

    User Query:
    {user_query}

    Provide the output in the following JSON format:
    {{
        "best_matching_table": {{
            "table_name": "<best_table_name>",
            "description": "<best_table_description>"
        }}
    }}
    """
    # Prepare the table_info string including descriptions for each table and its columns
    table_info = ""
    for table in tables:
        table_name, table_description = table
        table_info += f"Table: {table_name} - {table_description}\n"
        table_columns = [col for col in columns if col[0] == table_name]
        for column in table_columns:
            _, column_name, column_description = column
            table_info += f"    Column: {column_name} - {column_description}\n"
        table_info += "\n"

    # Create the PromptTemplate
    prompt_template = PromptTemplate(
        template=template,
        input_variables=["table_info", "user_query"]
    )

    # Format the template 
    formatted_prompt = prompt_template.format(table_info=table_info, user_query=user_query)

    if return_chain:
        # Create the chain using the ChatOpenAI model and the PromptTemplate
        chain = LLMChain(llm=llm,prompt=prompt_template)
        return chain, {"table_info": table_info, "user_query": user_query}

    return formatted_prompt


In [5]:
def get_columns_info(table_name, columns):
    columns_info = ""
    for column in columns:
        table, column_name, column_description = column
        if table == table_name:
            columns_info += f"    Column: {column_name} - {column_description}\n"
    return columns_info


In [6]:
def find_best_columns_prompt(user_query, best_matching_table, columns, 
                       return_chain=True, llm=None):
    # Define the template for selecting the relevant columns
    column_template = """
    You are a database assistant. Given the following columns for the table '{table_name}', select the columns that are most relevant to the user's query.

    Table Description: {table_description}

    Columns:
    {columns_info}

    User Query:
    {user_query}

    Relevant Columns:
    """

    columns_info = get_columns_info(best_matching_table["table_name"], columns)

    # Create the PromptTemplate for column selection
    column_prompt_template = PromptTemplate(
        template=column_template,
        input_variables=["table_name", "table_description", "columns_info", "user_query"]
    )

    # Example usage of the template with a user query
    formatted_column_prompt = column_prompt_template.format(
        table_name=best_matching_table["table_name"],
        table_description=best_matching_table["description"],
        columns_info=columns_info,
        user_query=user_query
    )

    # Prepare the context for running the chain
    context = {
        "table_name": best_matching_table["table_name"],
        "table_description": best_matching_table["description"],
        "columns_info": columns_info,
        "user_query": user_query}

    if return_chain:
        chain = LLMChain(llm=llm,prompt=column_prompt_template)
        return chain, context

    return formatted_column_prompt

In [7]:
def load_sql_examples(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        return json.load(file)

In [8]:
def create_sql_prompt(examples, best_matching_table, columns_metadata, 
                      use_best_matching_columns=False, llm=None):
    """
    Creates a FewShotPromptTemplate for generating SQL queries based on table and column metadata.

    This function generates a prompt template that includes detailed information about the table and its columns.
    The generated prompt instructs a language model (LLM) to create a syntactically correct SQL query based on
    user input. If the table contains a date column and the user does not specify a date, the prompt also instructs
    the LLM to retrieve the most recent data available.

    Parameters
    ----------
    examples : list of dict
        A list of example inputs and corresponding SQL queries. Each example should be a dictionary with 'input' and 'query' keys.
    best_matching_table : dict
        A dictionary containing the best matching table information with 'table_name' and 'description' keys.
    columns_metadata : list of tuples
        A list of tuples containing columns metadata. Each tuple should include 'table_name', 'column_name', and 'description'.
    use_best_matching_columns : bool, optional
        A flag indicating whether to use only the best-matching columns (if True) or all columns in the table (if False). Default is True.

    Returns
    -------
    sql_prompt : FewShotPromptTemplate
        A FewShotPromptTemplate object that can be used with an LLM to generate SQL queries.
    """
    # Prepare table_info string based on the best matching table and columns
    # table_info = f"Table: {best_matching_table['table_name']} - {best_matching_table['description']}\n"
    columns_info = "Columns:\n"
    has_date_column = False

    # Determine which columns to use: best-matching or all columns
    if use_best_matching_columns:
        # If using best_matching_columns, use those provided (filtering columns_metadata based on matching logic)
        columns_to_use = columns_metadata  # Assuming columns_metadata is already filtered
    else:
        # Use all columns for the given table from columns_metadata
        columns_to_use = [col for col in columns_metadata if col[0] == best_matching_table['table_name']]

    # Construct the columns_info string
    for column in columns_to_use:
        table_name, column_name, column_description = column
        columns_info += f"    Column: {column_name} - {column_description}\n"
        if 'date' in column_name.lower():
            has_date_column = True

    # Create FewShot Prompt with instructions for handling most recent data
    example_prompt = PromptTemplate.from_template("User input: {input}\nSQL query: {query}")

    # Add a special instruction if the table has a date column
    recent_data_instruction = (
        "If the user does not specify a date, retrieve the most recent data available by ordering the results "
        "by the date column in descending order."
    ) if has_date_column else ""

    # Combine table_info and columns_info in the prompt
    sql_prompt = FewShotPromptTemplate(
        examples=examples,
        example_prompt=example_prompt,
        prefix=(
            "You are a PostgreSQL expert. Given an input question, create a syntactically correct PostgreSQL query to run. "
            "Unless otherwise specified, do not return more than {top_k} rows.\n\n"
            "Here is the relevant table information:\n{table_info}\n\n"
            f"{recent_data_instruction}\n\n"
            "Below are a number of examples of questions and their corresponding SQL queries."
        ),
        suffix="User input: {input}\nSQL query: ",
        input_variables=["input", "table_info", "top_k"],
    )
    
    
    return sql_prompt

In [9]:
def create_answer_chain(llm):
    # Define the prompt template with emphasis on including units, time-specific details, and using the latest data when time is not specified
    answer_prompt = PromptTemplate.from_template(
        """
        You are a knowledgeable assistant. Given the following user question and SQL result, answer the question accurately.
        
        Always ensure to:
        1. Include appropriate units in your answer (e.g., Kwacha per kg, liters, etc.).
        2. Specify the time period or date if the question implies or explicitly asks for it.
        3. If the user does not specify a time, provide the most recent information available in the database and clearly state that this is the latest data.
        4. if the SQL result has number with decimals, please round it 

        For example, if the user asks "What's the price of Maize?", your answer should include the price with the correct unit and mention that this is the most recent price, e.g., "The most recent price of Maize is 60 Kwacha per kg."
        If the user asks about a specific time period, such as "What's the price of Maize for May 2024?", include the time in your answer, e.g., "The price of Maize in May 2024 is 60 Kwacha per kg."

        Question: {question}
        SQL Result: {result}
        Answer: """
    )

    return answer_prompt | llm | StrOutputParser()

In [10]:
def run_sql_chain(user_query, best_table_info, columns_info, best_columns=None, llm=None):
    """
    Executes an SQL query generation chain using a language model (LLM) based on the user query, 
    best matching table, and columns information.

    This function loads example SQL queries, creates an SQL prompt tailored to the best matching 
    table and its columns, and then executes a chain that generates and executes an SQL query. 
    The response is returned after processing the generated query.

    Parameters
    ----------
    user_query : str
        The user's query for which an SQL query needs to be generated.
    best_table_info : dict
        A dictionary containing the best matching table information with 'table_name' and 'description' keys.
    columns_info : list of tuples
        A list of tuples containing columns metadata for the table. Each tuple includes 'table_name', 
        'column_name', and 'description'.
    best_columns : list of tuples, optional
        A list of tuples containing the best matching columns metadata, if available. If provided, 
        the SQL prompt will be generated using only these columns. Default is None.
    llm : Any, optional
        The language model (e.g., ChatOpenAI) to be used for generating the SQL query. Default is None.

    Returns
    -------
    response : Any
        The response from the executed SQL query chain, typically containing the results of the SQL query.

    Example
    -------
    response = run_sql_chain(
        user_query="What is the price of maize?",
        best_table_info={"table_name": "maize_prices", "description": "Contains maize price data"},
        columns_info=[("maize_prices", "price", "Price of maize"), ("maize_prices", "date", "Date of the price entry")],
        llm=ChatOpenAI()
    )
    print(response)
    """
    # Load examples and create prompts
    examples = load_sql_examples(file_path=FILE_SQL_EXAMPLES_EN)
    
    # Create SQL Query
    if USE_BEST_MATCHING_COLUMNS and best_columns:
        sql_prompt = create_sql_prompt(
            examples=examples, 
            best_matching_table=best_table_info, 
            columns_metadata=best_columns, 
            use_best_matching_columns=True
        )
    else:
        sql_prompt = create_sql_prompt(
            examples=examples, 
            best_matching_table=best_table_info, 
            columns_metadata=columns_info
        )

    # Initialize LLM and other components
    best_table = best_table_info['table_name']
    engine = create_engine(DATABASE_URL)
    db = SQLDatabase(engine=engine, ignore_tables=['table_metadata', 'column_metadata'])

    execute_query = QuerySQLDataBaseTool(db=db)
    write_query = create_sql_query_chain(llm, db, sql_prompt)

    # Create the answer chain
    answer_chain = create_answer_chain(llm)

    # Put everything together
    master_chain = (
        RunnablePassthrough.assign(query=write_query).assign(
            result=itemgetter("query") | execute_query
        )
        | answer_chain
    )

    # Invoke the master chain and return the response
    response = master_chain.invoke({
        "question": user_query, 
        "top_k": 3,
        "table_info": best_table
    })
    return response

In [11]:
def process_sql_query(user_question, use_huggingface=False):
    """
    Processes a user's question by generating and executing an SQL query using a language model (LLM). 
    Optionally, uses a Hugging Face model or defaults to OpenAI's GPT-3.5-turbo.

    This function first initializes the appropriate LLM based on the `use_huggingface` flag. It then 
    retrieves metadata information, identifies the best matching table and relevant columns, and 
    executes the SQL query based on the processed information.

    Parameters
    ----------
    user_question : str
        The user's question for which an SQL query needs to be generated and executed.
    use_huggingface : bool, optional
        A flag to determine whether to use a Hugging Face model instead of the default OpenAI model. 
        Default is False.

    Returns
    -------
    output : Any
        The output from the executed SQL query chain, typically containing the results of the SQL query.

    Example
    -------
    output = process_sql_query(
        user_question="What is the price of maize?",
        use_huggingface=False
    )
    print(output)
    """
    # Initialize LLM
    # To Do: add Hugging Face LLM
    if use_huggingface:
        pass  # Hugging Face LLM initialization can be added here
    else:
        openai_llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, openai_api_key=OPENAI_API_KEY)
   
    # Retrieve the metadata info (tables and columns)
    tables, columns = connect_to_database()

    # Chain 1: Find the Best Table
    best_table_chain, context = find_best_table_prompt(user_question, tables, columns, llm=openai_llm)
    best_table_output_str = best_table_chain.run(**context)

    # Convert the string output to a dictionary
    try:
        best_table_output = json.loads(best_table_output_str)['best_matching_table']
    except json.JSONDecodeError:
        print("Error: The output is not valid JSON.")
        best_table_output = None

    # Chain 2: Find Relevant Columns
    best_columns_chain, context = find_best_columns_prompt(user_question, best_table_output, columns, llm=openai_llm)
    best_columns_output = best_columns_chain.run(**context)

    # Retrieve result 
    output = run_sql_chain(
        user_query=user_question, 
        best_table_info=best_table_output, 
        columns_info=columns, 
        best_columns=best_columns_output, 
        llm=openai_llm
    )
    
    return output


## Evaluate results 

In [12]:
def evaluate_with_embedding_distance(df, lan='English', 
                                     distance='cosine', ref_response_col='response-1'):


    # Filter the dataframe to keep only instances for that language
    df = df.query('language == @lan')
    
    # Add columns to keep LLM-response and eval score
    df['llm_response'] = None
    score_name = f"score_{distance}"
    df[score_name] = None

    
    # Setup evaluator
    if distance == "cosine":
        dist_metric = EmbeddingDistance.COSINE
    elif distance == "euclidean":
        dist_metric = EmbeddingDistance.EUCLIDEAN

    embedding_model = HuggingFaceEmbeddings()
    hf_evaluator = load_evaluator("embedding_distance", distance_metric=dist_metric, 
                              embeddings=embedding_model)
    for idx, row in df.iterrows():
        question = row['question']
        reference_response = row[ref_response_col]
        llm_response = process_sql_query(question)
        print('LLM-Response', "===>", llm_response)
        if not llm_response:
            print('Blank response')
        score = hf_evaluator.evaluate_strings(prediction=llm_response, 
                                              reference=reference_response)
        df.loc[idx, 'llm_response'] = llm_response
        df.loc[idx, score_name] = score['score']
    
    return df

In [13]:
# table_info = db.table_info
# len(table_info)

# df_eval = pd.read_csv("data/raw/eval-set.csv")
# df_res = evaluate_with_embedding_distance(df=df_eval, lan='English', distance='cosine')

# question = "Whats the price of Maize?"
# response = process_sql_query(user_question=question, llm=openai_chat_model)


# create_sql_prompt(examples, best_matching_table, columns_metadata)

# Test Full Pipeline 

In [14]:
import warnings
warnings.filterwarnings("ignore")

import logging
import json

# Set the logging level for the `httpx` logger to WARNING to suppress INFO logs
logging.getLogger("httpx").setLevel(logging.WARNING)

# You can also suppress other loggers if necessary
logging.getLogger("openai").setLevel(logging.WARNING)
logging.getLogger("langchain").setLevel(logging.WARNING)

# Set the logging level for langsmith.client to ERROR to suppress warnings
logging.getLogger("langsmith.client").setLevel(logging.ERROR)

# Suppress a specific warning
warnings.filterwarnings("ignore", ".*USER_AGENT environment variable not set.*")

from sql_chain import process_sql_query, get_latest_date

from utils import translate_text_openai, classify_query_llm

from langchain_community.embeddings import HuggingFaceEmbeddings, OpenAIEmbeddings
from sklearn.metrics.pairwise import cosine_similarity

USER_AGENT environment variable not set, consider setting it to identify your requests.


In [None]:
def get_latest_date(db, commodity=None):
    """
    Retrieves the most recent date from the commodity prices table in the database.
    
    If a commodity is specified, it retrieves the latest date for that commodity.
    If no commodity is specified or if the specified commodity is not found, it retrieves the latest date across all commodities.
    
    Parameters
    ----------
    db : SQLDatabase
        The database connection object.
    commodity : str, optional
        The name of the commodity to filter by. Default is None.

    Returns
    -------
    str
        The latest date as a string in the format 'YYYY-MM-DD'.
    """
    try:
        if commodity:
            query = f"""
            SELECT MAX(collection_date) as latest_date
            FROM commodity_prices
            WHERE commodity = '{commodity}';
            """
            result = db.run(query)
            
            # Check if the result contains a date
            print(result)
            latest_date = result[0]['latest_date'] if result else None

            if latest_date:
                return latest_date
        
        # If no commodity is provided or the query failed, retrieve the latest date without filtering by commodity
        query = """
        SELECT MAX(collection_date) as latest_date
        FROM commodity_prices;
        """
        result = db.run(query)
        
        if result:
            return result[0]['latest_date']
        else:
            raise ValueError("Failed to retrieve the latest date from the database.")
    
    except Exception as e:
        print(f"Error retrieving latest date: {e}")
        # Optionally, return a default value or raise an error
        return None

In [15]:
engine = create_engine(DATABASE_URL)
db = SQLDatabase(engine=engine, ignore_tables=['table_metadata', 'column_metadata'])

In [16]:
get_latest_date(db, commodity="Maize")

Error retrieving latest date: string indices must be integers, not 'str'


In [None]:
valid_queries_en = []
with open("sql_examples_en.json", 'r') as file:
        examples = json.load(file)
        for item in examples:
                valid_queries_en.append(item['input']) 

In [None]:
user_query = "Hie"
classify_query_llm(user_query)

In [None]:
def prepare_valid_embeddings(valid_queries, embeddings):
    """
    Prepare embeddings for a list of valid queries.

    Parameters
    ----------
    valid_queries : list of str
        A list of valid queries that can be converted to SQL.
    embeddings : Any
        The embedding model used to generate embeddings.

    Returns
    -------
    list of lists
        A list of embeddings for the valid queries.
    """
    return [embeddings.embed_query(query) for query in valid_queries]

In [None]:
def classify_query_llm(user_query, llm=None):
    """
    Classify the user query using an LLM based on examples of valid and invalid SQL-generating queries.

    Parameters
    ----------
    user_query : str
        The user's query.
    llm : Any, optional
        The language model (e.g., ChatGPT, GPT-3.5) to be used for classification. Default is None.

    Returns
    -------
    bool
        True if the query is classified as valid for SQL generation, False otherwise.
    """

    # Example queries
    examples = [
        {"query": "What is the price of maize?", "classification": "Valid"},
        {"query": "How much maize was produced last year?", "classification": "Valid"},
        {"query": "Where can I find food?", "classification": "Valid"},
        {"query": "Tell me a joke.", "classification": "Invalid"},
        {"query": "Hello", "classification": "Invalid"},
        {"query": "Bot", "classification": "Invalid"}
    ]

    # Construct the prompt
    example_prompts = "\n".join([f'Query: "{ex["query"]}"\nClassification: {ex["classification"]}' for ex in examples])
    prompt_template = PromptTemplate.from_template(
        f"""
        You are a knowledgeable assistant who can determine whether a query is valid for generating an SQL query or not. 
        Given the following examples, classify the new query accordingly.

        Examples:
        {example_prompts}

        Now classify the following query:

        Query: "{{user_query}}"
        Classification (Valid/Invalid):
        """
    )

    # Initialize the LLM if not provided
    if not llm:
        llm = ChatOpenAI(model="gpt-4", temperature=0, openai_api_key=OPENAI_API_KEY)

    # Create the LLMChain for classification
    classification_chain = LLMChain(llm=llm, prompt=prompt_template)

    # Perform the classification
    result = classification_chain.run({"user_query": user_query})

    # Return True if classified as valid, otherwise False
    return "Valid" in result

In [None]:
# Example usage
user_query = "Hey"
is_valid = classify_query_llm(user_query)
if is_valid:
    print("Valid SQL-related query")
else:
    print("Not a valid SQL-related query")


In [None]:
response.content

In [None]:
ny_query = "Thumba la chimanga ndi zingati pano?"
en_query = translate_text_openai(ny_query, "Chichewa", "English")
valid = classify_query_llm(en_query)

In [None]:
en_query

In [None]:
process_sql_query(ny_query)

In [None]:
questions = questions = ["What is the price of Maize in Rumphi",
                 "Where can I find the cheapest maize?",
                    "Which district harvested the most beans?",
                    "How much is Maize in Zomba?",
                    "Which district produced more Tobacco, Mchinji or Kasungu?",
                    "Where can I get bananas?", "Kodi chimanga chotchipa ndingachipeze kuti?",
                    "Ndi boma liti komwe anakolola nyemba zambiri?",
                    "Ku Zomba chimanga akugulitsa pa bwanji?",
                    "Kodi ndi boma liti anakolola chimanga chambiri pakati pa Lilongwe kapena Kasungu?",
                    "Ndikuti ndingapeze mpunga wambiri?"]

for q in questions:
    print()
    print("QUESTION:", q)
    response= process_sql_query(q)
    print("LLM Response:", response)

In [None]:
translate_text_openai("Ndikuti ndingapeze mpunga wambiri?", "Chichewa", "English")

In [None]:
import pandas as pd
import numpy as np

In [None]:
df_prices = pd.read_csv("data/tables/prices.csv")

In [None]:
df_prices.head()

In [None]:
res = df_prices.query('Month_Name == "May" and Commodity =="Maize"')

In [None]:
res.Price.mean()

In [None]:
df_prices = pd.read_csv("data/raw/prices-maize.csv")
df_prices2 = df_prices.replace("#DIV/0!", np.NaN)
df_prices2['Price'] = df_prices2['Price'].astype(float)

In [None]:
df_prices2.drop(columns=['EPA', 'Year'], inplace=True)

In [None]:
df_prices2.query('Month == "February" and Market == "Phalombe"')

In [None]:
df_prices2.query('Month == "February" and Market == "Chikhwawa"')

In [None]:
df_prices2.query('Month == "May"').Price.mean()