In [1]:
import json
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from transformers import AutoModelForCausalLM, AutoTokenizer
from huggingface_hub import login
import pandas as pd


In [2]:
login(token="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")

In [3]:
def retrieve_context(new_query, top_k=3):
    """
    Retrieve the top_k similar examples based on cosine similarity of TF-IDF vectors.

    This function builds a TF-IDF vectorizer using the 'query' field from the
    global RAG examples list, transforms the new query into its TF-IDF vector,
    and computes cosine similarity scores with all stored queries. The top_k most
    similar examples are returned.

    Parameters:
        new_query (str): The new query string for which similar examples are sought.
        top_k (int): The number of top similar examples to retrieve (default is 3).

    Returns:
        list: A list of the top_k examples (dictionaries) most similar to the new query.

    Raises:
        ValueError: If 'new_query' is not a non-empty string.
        RuntimeError: If the global 'rag_examples' list is not defined or is empty.
    """
    import numpy as np
    from sklearn.feature_extraction.text import TfidfVectorizer

    # Check if rag_examples exists and is non-empty.
    try:
        if not rag_examples:
            raise RuntimeError("The global 'rag_examples' list is not defined or is empty.")
    except NameError:
        raise RuntimeError("The global 'rag_examples' list is not defined.")

    # Validate that new_query is a non-empty string.
    if not isinstance(new_query, str) or not new_query.strip():
        raise ValueError("The 'new_query' parameter must be a non-empty string.")

    # Extract the list of queries from the rag_examples.
    queries = [ex['query'] for ex in rag_examples if 'query' in ex]
    if not queries:
        raise RuntimeError("No valid 'query' entries found in 'rag_examples'.")

    # Build a TF-IDF vectorizer using the queries from the RAG examples.
    vectorizer = TfidfVectorizer().fit(queries)
    query_vectors = vectorizer.transform(queries)

    # Transform the new query to its TF-IDF vector.
    new_query_vector = vectorizer.transform([new_query])

    # Compute cosine similarities (dot product on L2-normalized vectors).
    similarities = np.dot(query_vectors, new_query_vector.T).toarray().flatten()

    # Get indices of the top_k most similar examples.
    top_indices = similarities.argsort()[-top_k:][::-1]

    # Retrieve the corresponding examples from rag_examples.
    retrieved_examples = [rag_examples[i] for i in top_indices]

    return retrieved_examples


In [4]:
def build_prompt(new_query, retrieved_examples, data):
    """
    Build a prompt by incorporating retrieved examples and the new query.

    This function constructs a prompt for a Python pandas code generation assistant.
    It includes a set of example queries with their corresponding context and generated code,
    followed by the new query and the data's column information.

    Parameters:
        new_query (str): The new natural language query.
        retrieved_examples (list): A list of dictionaries, each containing keys:
            'query', 'retrieved_context', and 'generated_code'.
        data (pandas.DataFrame): A DataFrame whose columns will be included in the prompt.

    Returns:
        str: A prompt string constructed for the code generation task.

    Raises:
        ValueError: If new_query is not a non-empty string.
        ValueError: If retrieved_examples is not a list or is empty.
        ValueError: If data is not a pandas DataFrame.
    """
    # Validate new_query is a non-empty string.
    if not isinstance(new_query, str) or not new_query.strip():
        raise ValueError("new_query must be a non-empty string.")

    # Validate retrieved_examples is a non-empty list.
    if not isinstance(retrieved_examples, list) or not retrieved_examples:
        raise ValueError("retrieved_examples must be a non-empty list.")

    # Validate data is a pandas DataFrame.
    try:
        import pandas as pd
        if not isinstance(data, pd.DataFrame):
            raise ValueError("data must be a pandas DataFrame.")
    except ImportError:
        raise ImportError("pandas must be installed to use this function.")

    # Initialize the prompt with introductory context.
    prompt = (
        "You are a helpful assistant that generates Python pandas code based on a natural language query.\n"
        "Below are some examples:\n\n"
    )

    # Append each retrieved example to the prompt.
    for ex in retrieved_examples:
        # Check if each expected key exists in the example.
        if not all(key in ex for key in ['query', 'retrieved_context', 'generated_code']):
            raise ValueError("Each example in retrieved_examples must contain 'query', 'retrieved_context', and 'generated_code' keys.")

        prompt += f"Query: {ex['query']}\n"
        prompt += f"Context: {ex['retrieved_context']}\n"
        prompt += f"Code: {ex['generated_code']}\n\n"

    prompt += "End of examples\n\n"

    # Append the new query.
    prompt += f"\n\nNew Query: {new_query}\n"

    # Append instructions for generating the Python pandas code.
    prompt += "Generate the Python pandas code for ."
    prompt += " Generate only the most relevant python pandas code, nothing more\n"

    # Append the data's column information.
    prompt += f"The data consist of the following columns {data.columns.tolist()}"

    return prompt


In [5]:
def extract_insight(generated_text, data):
    """
    Extracts and executes Python pandas code from the generated text to derive insights from the given DataFrame.

    This function extracts the generated Python pandas code from the given text, executes it in a local scope,
    and returns the resulting output.

    Parameters:
        generated_text (str): The generated text containing Python pandas code.
        data (pandas.DataFrame): The DataFrame (df) on which the extracted code will be executed.

    Returns:
        object: The result of executing the extracted pandas code.

    Raises:
        ValueError: If generated_text is not a valid string or does not contain extractable code.
        ValueError: If data is not a pandas DataFrame.
        KeyError: If the executed code does not produce a variable named 'result'.
        SyntaxError: If the extracted code is not valid Python.
        Exception: If any other error occurs during code execution.
    """
    import pandas as pd

    # Validate that generated_text is a non-empty string.
    if not isinstance(generated_text, str) or not generated_text.strip():
        raise ValueError("generated_text must be a non-empty string.")

    # Validate that data is a pandas DataFrame.
    if not isinstance(data, pd.DataFrame):
        raise ValueError("data must be a pandas DataFrame.")

    try:
        # Extract the generated code from the text.
        if "Code:" not in generated_text:
          return ""

        # Split the text at 'Code:' and take the portion after it.
        code_section = generated_text.split("Code:")[-1]
        # The code might be on the first line; we split by newline and strip extra whitespace.
        extracted_code = code_section.split("\n")[0].strip()


        if not extracted_code:
            raise ValueError("No valid code found in generated_text.")

        # Initialize a local execution environment with 'df' as the DataFrame.
        local_vars = {"df": data}

        # Execute the extracted code safely.
        exec(extracted_code, {}, local_vars)

        # Ensure the 'result' variable is defined in the executed code.
        if "result" not in local_vars:
            raise KeyError("The executed code did not produce a variable named 'result'.")

        return local_vars["result"]

    except SyntaxError:
        raise SyntaxError("The extracted code contains syntax errors.")
    except Exception as e:
        raise Exception(f"An error occurred while executing the generated code: {str(e)}")


In [10]:
def LLM(new_query, df, rag_examples):
    """
    Generate insights using a large language model with retrieval-augmented examples.

    This function leverages an LLM (specifically the Mistral-7B model) to generate Python pandas code based on a natural language query.
    It retrieves similar examples from a provided list of RAG examples, constructs a prompt that includes these examples and data details,
    generates the code using the model, executes the generated code on the provided DataFrame, and returns the resulting insight.

    Parameters:
        new_query (str): The natural language query to generate the pandas code.
        df (pandas.DataFrame): The DataFrame on which the generated code will operate.
        rag_examples (list): A list of dictionaries, each containing keys 'query', 'retrieved_context', and 'generated_code'.

    Returns:
        object: The result obtained from executing the generated code on the DataFrame.

    Raises:
        ValueError: If new_query is not a non-empty string.
        ValueError: If df is not a pandas DataFrame.
        ValueError: If rag_examples is not a non-empty list.
        Exception: If an error occurs during model loading, prompt construction, code generation, or code execution.
    """
    try:
        # Validate inputs.
        if not isinstance(new_query, str) or not new_query.strip():
            raise ValueError("new_query must be a non-empty string.")

        try:
            import pandas as pd
        except ImportError:
            raise ImportError("pandas is required for this function.")

        if not isinstance(df, pd.DataFrame):
            raise ValueError("df must be a pandas DataFrame.")
        if not isinstance(rag_examples, list) or not rag_examples:
            raise ValueError("rag_examples must be a non-empty list.")

        # Load the Mistral-7B model and its tokenizer.
        from transformers import AutoTokenizer, AutoModelForCausalLM

        model_name = "mistralai/Mistral-7B-Instruct-v0.1"
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        model = AutoModelForCausalLM.from_pretrained(model_name)

        # Retrieve similar examples from rag_examples using the new_query.
        # (Assuming retrieve_context is defined to use the global rag_examples or has been updated accordingly.)
        retrieved_examples = retrieve_context(new_query)

        # Build the prompt using the retrieved examples, the new query, and data details.
        prompt = build_prompt(new_query, retrieved_examples, df)

        # Encode the prompt for the model.
        inputs = tokenizer(prompt, return_tensors="pt").to(model.device)

        # Generate output from the model.
        output = model.generate(
            **inputs,
            max_new_tokens=500,  # Adjust the number of tokens as needed.
            do_sample=True,
            temperature=0.7
        )

        # Decode the generated text.
        generated_text = tokenizer.decode(output[0], skip_special_tokens=True)

        # Remove the prompt from the generated text to isolate the new content.
        generated_text_trim = generated_text.replace(prompt, "")

        print(f"generated_text_trim:{generated_text_trim}")

        # Extract and execute the generated code to obtain the result.
        result = extract_insight(generated_text_trim, df)

        return result

    except Exception as e:
        raise Exception(f"An error occurred in the LLM function: {str(e)}")


In [7]:
import pandas as pd
import json
df = pd.read_excel("/content/data/transactions.xlsx")
json_file_path = 'data/small_rag.json'
with open(json_file_path, 'r') as file:
    rag_examples = json.load(file)

# Few Examples

In [11]:
new_query = "Find the Total Quanitiy ordered from each location"
result = LLM(new_query,df,rag_examples)

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.


generated_text_trim:


Context: Group by 'Customer Location' and sum the 'Quantity Ordered' column.
Code: result = df.groupby('Customer Location')['Quantity Ordered'].sum().reset_index()


In [12]:
result

Unnamed: 0,Customer Location,Quantity Ordered
0,Canada,625
1,France,600
2,Germany,525
3,UK,607
4,USA,645


In [13]:
new_query = "Find the Total Discounted Price by each Shipment Partner"
result = LLM(new_query,df,rag_examples)

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.


generated_text_trim:
In the context, group by 'Shipment Partner' and sum the total discounted price.

Code: result = df.groupby('Shipment Partner')['Discounted Price'].sum().reset_index(name='Total Discounted Price')


In [14]:
result

Unnamed: 0,Shipment Partner,Total Discounted Price
0,DHL,49022.3091
1,FedEx,59233.5153
2,UPS,61144.7544
3,USPS,53569.5483


In [18]:
new_query = "Find the average Discount Applied by each Product Category"
result = LLM(new_query,df,rag_examples)

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.


generated_text_trim:


This query is very similar to the first one. However, the only difference is that we are looking at the average discount applied by each product category instead of per product category. So the query will generate the same pandas code as the first example.

Query: Find the average Discount Applied by each Product Category
Code: result=df.groupby('Product Category')['Discount Applied'].mean().reset_index()


In [19]:
result

Unnamed: 0,Product Category,Discount Applied
0,Books,0.146818
1,Clothing,0.147077
2,Electronics,0.147056
3,Home & Kitchen,0.149519
4,Toys,0.151019


In [20]:
#END