In [12]:
import os
import toml
import asyncio
import operator
import pandas as pd
import streamlit as st
from io import StringIO
import sys
import streamlit as st

from langchain_openai import ChatOpenAI
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain.chains.openai_functions import create_structured_output_runnable
from langchain_core.prompts import ChatPromptTemplate
from langgraph.graph import StateGraph, END

import phoenix as px
from phoenix.trace.langchain import LangChainInstrumentor
from typing import TypedDict, List, Tuple, Annotated

In [26]:
secrets = "C:/Users/Administrator/Documents/github/reporter/secrets.toml"
github_secrets = "secrets.toml"
os.environ["OPENAI_API_KEY"] = toml.load(secrets)["OPENAI_API_KEY"]
llm = ChatOpenAI(temperature=0, model="gpt-4o")

# Phoenix init.

session = px.launch_app()
LangChainInstrumentor().instrument()


Existing running Phoenix instance detected! Shutting it down and starting a new instance...
Attempting to instrument while already instrumented


In [14]:

class GraphState(TypedDict):
    request: str # Set
    source_path: str # Set
    past_execs: List

    tasks: str
    past_tasks: List[str]
    current_task: str
    
    error: str
    messages: List
    generation: str
    iterations: int # Set
    max_iterations: int # Set
    reflect: str


    response: str

class Plan(BaseModel):
    tasks: str = Field(
        description="Tasks to complete to fulfill the user request in string format"
        )

class code(BaseModel):
    prefix: str = Field(description="Description of the problem and approach")
    imports: str = Field(description="Code block import statements")
    code: str = Field(description="Code block not including import statements")
    description: str = "Schema for code solutions to execute openpyxl related user requests"

class answer(BaseModel):
    ans: str = Field("Answer to the provided question")



# Templates section

In [15]:
import os
import toml
import json

from openpyxl import load_workbook
from openai import OpenAI
from langchain.vectorstores.faiss import FAISS
from langchain_openai.embeddings import OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate

from utils import (
    query_llm_gpt4,
    extract_code_from_llm,
    load_excel_to_df,
    load_sheets_to_dfs
)



secrets = "C:/Users/Administrator/Documents/github/reporter/secrets.toml"
os.environ["OPENAI_API_KEY"] = toml.load(secrets)["OPENAI_API_KEY"]


def planner_template():

    system_msg = f"""
You are an intelligent data analyst specializing in answering Excel file exploration related questions by making a plan with clear, actionable tasks that can be executed by coding in python's Pandas and seaborn modules (if graph's are involved) to answer the user question about the excel file.
Analyze the user's request and create a concise list of steps a programmer should follow to fulfill the exploration question:

Follow these guidelines:
- Focus on essential tasks that directly achieve the user's goal. Avoid trivial steps like "Open the Excel file." Ensure tasks are descriptive enough to be executed without further clarification and keeps the list brief.
- The last step should ALWAYS be returning your findings by either printing them, if its text, or displaying chart if the output is a graph.
- If the user questions is instead a request to change any source data from the excel file, Abandon all operations and ONLY print that you can only answer questions in the "explore" mode and can't manipulate the data. \n
- The last section of the program should always be a print statement that provides a meaningful word response to the user question.
- Try not to print the dataframe or its head in the answers
"""

    template = [
        (
            'system',
            system_msg
        ),
        (
            "placeholder",
            "{messages}"
        )
    ]

    prompt = ChatPromptTemplate.from_messages(template)

    return prompt


def format_request(request, source):
    dfs, sheet_names = load_sheets_to_dfs(source)
    head_view = ''

    for i, df in enumerate(dfs):
        head_view += f"\nSheet {i}: {sheet_names[i]}\nSheet head:\n{df.head(3)}\n\n"

    formatted = f"""
The user wants to find an answer to their question regarding this excel file called: {source}.\n

------------
There are {len(dfs)} sheets in the excel file. Here is how the first few rows of those sheets look like:
{head_view}
------------

User request: {request}"""

    return formatted


def retrieve_context(request, retriever):
    code_examples = retriever.invoke(f"Documentation related to : {request}")
    content = [doc.page_content for doc in code_examples]
    seperator = "\n\n\n-----\n\n\n"
    conc_content = seperator.join(content)

    return conc_content

def code_chain_template():

    prompt = ChatPromptTemplate.from_messages(
        [
            (
                'system',
                """You are a coding assistant with expertise in Python's pandas module and seaborn module.\n
Answer the user question about the excel file by writing code for executing each task from the generated plan. Ensure that any code you provie can be executed \n
with all required imports and variables defined. Structure your response with a description of the code solution. \n
Then list the imports. And finally list the functioning code block. NEVER make any changes to the source excel file. \n

If the user questions is instead a request to change any source data from the excel file, ONLY write the python code to print that you can only answer questions in "explore" mode and can't manipulate the data. \n


Remember that you are in a streamlit envirnonment, so follow the guidelines when returning the results:
- ALWAYS do return the python code to import pandas, even if code is not required. \n
- ALWAYS dp print the answer that you find at the end of the program. This will provide the user with the answer to their question. \n
- ALWAYS do save any graphs you make with seaborn as images in the folder "C:/Users/Administrator/Documents/github/reporter/ph_images" at the end of the program. This will provide the user with the answer to their question. \n
- ALWAYS do write code to create some sort of a graph whenever the user asks for wrtiting a graph. \n

Here is the user's original question, progress on executing previous tasks, and the current task you need to write code to execute. Write code to execute the last retrieved task from the plan: """
            ),
            (
                'placeholder',
                '{messages}'
            )
        ]
    )

    return prompt

def data_analyst_template():
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                'system',
                """You are a data analyst who is an expert in understanding data and coding, and provides logical answers to user questions.\n
Answer the user question about the excel file by referring to the conversation between the user and the system, \n
providing a logical and concise answer to the user's question. \n
Do not explain the code or return any code in the final answer. \n
Do provide a status update on whether or not the system was able to execute the request and if any errors, explain why the system got into an error. \n

Never include any file path names or information about saving images in the final response: """
            ),
            (
                'placeholder',
                '{messages}'
            )
        ]
    )

    return prompt

## Code chains

In [16]:
# Preparing code_chain()

code_prompt = code_chain_template()
code_chain = code_prompt | llm.with_structured_output(code)
analyst_prompt = data_analyst_template()
analyst_chain = analyst_prompt | llm

## Defining nodes

In [17]:

def plan_steps(state: GraphState):
    messages = state["messages"]
    past_tasks = state["past_tasks"]
    past_execs = state["past_execs"]
    reflect = state["reflect"]
    error = state["error"]
    iteration = state["iterations"]
    past_tasks = []
    reflect = ''

    if error == "yes":
        messages += [("system", f"Now I should try again to recreate a plan that doesn't without producing any errors, if the error was due to the plan. Let's rewrite a new plan if necessary.")]

    if iteration == 0:
        if len(past_execs) > 4:
            exec_string = ''
            for execs in past_execs[-4:0]:
                req = execs["question"]
                table = execs["answer"]
                exec_string += f"\nPast Question: {req}\Answer: {table}\n"
            messages += [("user", f"These are the past 4 questions from me, the user, along with the answers you provided: \n{exec_string}")]

        if len(past_execs) < 4:
            exec_string = ''
            for execs in past_execs:
                req = execs["question"]
                table = execs["answer"]
                exec_string += f"\Past Question: {req}\Answer: {table}\n"
            messages += [("system", f"These are the past few questions from me, the user, along with the answers you provided: \n{exec_string}")]


    planner_prompt = planner_template()
    planner = create_structured_output_runnable(Plan, ChatOpenAI(model="gpt-4o", temperature=0, streaming=True), planner_prompt)

    print("\nPlanning step begins...\n")

    plan = planner.invoke({"messages": messages})

    messages += [('system', f"For the user questions, this is the list of tasks I need to complete to find the answer: \n{plan.tasks}")]
    
    return {"tasks": plan.tasks, "messages": messages, "past_tasks": past_tasks, "reflect": reflect, "current_task": f"Plan completed..."}

# Function: Write code

def generate_code(state: GraphState):
    messages = state["messages"]
    iterations = state["iterations"]
    error = state["error"]
    generation = state["generation"]
    tasks = state["tasks"]
    past_tasks = state["past_tasks"]

    print("Generating code....")

    if error == "yes":
        messages += [("system", f"Now I should try again to generate code with the newly written plan that wouldn't produce any errors during execution, in case the error was due to the written code. If the error produced was not because of the code, I will use the same code. This is the plan to follow: \n{tasks}")]

    messages += [("system", "I need to follow the plan to write a readily executable program, which I will execute later, to print the answer to the user's question.")]
    generation = code_chain.invoke({"messages": messages})

    messages += [('system', f"This is the code solution written based ont the plan to answer the user question:\n{generation.imports}\n{generation.code}")]

    iterations += 1
    print("Reached end of generation...")

    #last_msgs = messages[-2:]
    #last_indx = -2*len(task)
    #messages = messages[:last_indx] + last_msgs
    return {"generation": generation, "messages": messages, "iterations": iterations, "current_task": "Executing code..."}

def check_code(state: GraphState):
    print("Code is being executed....\n--------\n")
    messages = state["messages"]
    generation = state["generation"]
    error = state["error"]
    iterations = state["iterations"]
    reflect = state["reflect"]
    response = ["response"]

    try:
        exec(generation.imports)

    except Exception as e:
        error = "yes"
        messages += [("system", f"Encountered an error with import statement: {e}")]

        print(f"Error with import statement: {e}")
        return {"error": error, "messages": messages, "current_task": "An error has occured. Retrying Solution..."}

    captured_output = StringIO()
    sys.stdout = captured_output

    try:
        print(generation.code)
        exec(generation.code)

    except Exception as e:
        error = "yes"
        messages += [("system", f"Encountered an error with code block statement: {e}")]

        print(f"Error with code block: {e}")
        return {"error": error, "messages": messages, "current_task": "An error has occured. Retrying Solution..."}

    captured_output = captured_output.getvalue()
    sys.stdout = sys.__stdout__

    response = captured_output
    messages += [("system", f"Printed from execution: {response}")]

    error = "no"
    print("\n--------\n")

    if iterations > 2:
        reflect = "yes"

    print("-----NO CODE TEST FAILURES-----")
    return {"messages": messages, "error": error, "reflect": reflect, "response": response, "current_task": "Typing Answer..."}

def reflect_code(state: GraphState):
    messages = state["messages"]

    messages += [
        (
            'system',
            """
            I tried to solve the problem and failed a unit test. I need to reflect on this failure based on the generated error.\n
            Write a few key suggestions to avoid making this mistake again.
            """,
        )
    ]

    reflections = code_chain.ainvoke({"messages": messages})
    messages += [("assistant", f"Here are reflections on the error: {reflections}")]

    return {"messages": messages}

def write_answer(state: GraphState):
    messages = state["messages"]
    response = state["response"]
    generation = state["generation"]

    result = analyst_chain.invoke({"messages": messages})

    response = result.content
    messages += [("system", f"Here is the answer to the user's question: \n{response}")]

    generation_string = f"{generation.imports}\n{generation.code}"

    print("Response:\n", response, "\n")

    return {"response": response, "messages": messages, "current_task": "Closing...", "generation": f"{generation_string}"}


def should_end(state: GraphState):
    error = state["error"]
    iterations = state["iterations"]
    max_iters = state["max_iterations"]
    reflect = state["reflect"]
    messages = state["messages"]

    if error == "no" or iterations == max_iters:
        print("----DECISION: FINISH----")
        #print("Messages: \n", messages)
        return "end"
        

    else:
        print("----DECISION: RE-TRY SOLUTION----")
        if reflect == "yes":
            print("Reflecting on error...")
            return "reflect_code"

        else:
            return "planner"


## Graph

In [18]:

wf = StateGraph(GraphState)

wf.add_node("planner", plan_steps)
wf.add_node("generate", generate_code)
wf.add_node("check_code", check_code)
wf.add_node("reflect_code", reflect_code)
wf.add_node("write_answer", write_answer)

wf.set_entry_point("planner")
wf.add_edge("planner", "generate")
wf.add_edge("generate", "check_code")
wf.add_conditional_edges(
    "check_code",
    should_end,
    {
        "end": "write_answer",
        "reflect_code": "reflect_code",
        "planner": "planner",
    }
)
wf.add_edge("reflect_code", "planner")
wf.add_edge("write_answer", END)

explore = wf.compile()


## main() function

In [25]:
print(10)

In [None]:
source = 'C:/Users/Administrator/Documents/github/reporter/sales_data_copy.xlsx'

def main():
    df = pd.read_excel(source)
    print(df.head(7))
    executions = []
    while True:
        req = input(">> ")
        formatted_req = format_request(req, source)
        temp = []

        inputs = {"messages": [('user', formatted_req)], "request": req, "source_path": source, "max_iterations": 3, "iterations": 0, "past_execs": executions, "response": ""}
        msg = explore.invoke(inputs)



main()