In [1]:
import os
import logging
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.sqlite import SqliteSaver
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
from typing import List, TypedDict, Annotated
import operator
import gradio as gr
import atexit
import sqlite3

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")

load_dotenv()

api_key = os.getenv("OPENAI_API_KEY")

if api_key:
    print("API key found successfully.")
else:
    print("API key not found.")

class AgentState(TypedDict):
    task: str
    plan: str
    itinerary: str
    critique: str
    content: List[str]
    revision_number: int
    max_revision: int
    count: Annotated[int, operator.add]

    
class TravelPlanner():
    def __init__(self):
        self.model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
        self.prompt_setup()
        self.build_graph()

    def prompt_setup(self):
        self.PLAN_PROMPT = "Create a high-level outline for a travel itinerary based on the provided task. Include key destinations, travel dates, and considerations like budget or preferences."

        self.ITINERARY_PROMPT = "Generate a detailed travel itinerary based on the outline and user input. Revise as per critique if provided. Format it as:\n------\nPlace: \nDates: \nEstimated spending: xx USD\nMode of commute from <Origin>: yy\nEstimated time to reach <Destination>: zzz Hrs vv Minutes\nItinerary:\n    Day 1: <DATE>\n    - Activity\n    - Activity\n    Day 2: <DATE>\n    - Activity\n    - Activity\n------\nContent: {content}"

        self.CRITIQUE_PROMPT = "Critique the travel itinerary with detailed recommendations for improvement. Suggest adjustments to destinations, activities, or logistics if needed."

    def build_graph(self):
        builder = StateGraph(AgentState)
        builder.add_node("planner", self.plan_node)
        builder.add_node("itinerary_generator", self.itinerary_node)
        builder.add_node("critique", self.critique_node)

        builder.set_entry_point("planner")

        builder.add_edge("planner", "itinerary_generator")
        builder.add_edge("itinerary_generator", "critique")

        builder.add_conditional_edges(
            "critique",
            self.should_continue,
            {"END": END, "itinerary_generator": "itinerary_generator"}
        )

        self.builder = builder

    def plan_node(self, state: AgentState):
        logging.info("Entered plan_node with state: %s", state)
        messages = [
            {"role": "system", "content": self.PLAN_PROMPT},
            {"role": "user", "content": state["task"]}
        ]
        response = self.model.invoke(messages)
        logging.info("Generated plan: %s", response.content)
        return {
            "plan": response.content,
            "count": state.get("count", 0) + 1
        }

    def itinerary_node(self, state: AgentState):
        logging.info("Entered itinerary_node with state: %s", state)
        content = state.get("plan", "No plan availabel")
        messages = [
            {"role": "system", "content": self.ITINERARY_PROMPT.format(content=content)},
            {"role": "user", "content": state["plan"]}
        ]
        
        if state.get("critique"):
            messages.append({"role": "user", "content": f"Previous Critique: {state['critique']}"})
        
        response = self.model.invoke(messages)
        logging.info("Generated itinerary: %s", response.content)
        return {
            "itinerary": response.content,
            "revision_number": state.get("revision_number", 0) + 1
        }

    def critique_node(self, state: AgentState):
        logging.info("Entered critique_node with state: %s", state)
        messages = [
            {"role": "system", "content": self.CRITIQUE_PROMPT},
            {"role": "user", "content": state["itinerary"]}
        ]
        response = self.model.invoke(messages)
        logging.info("Critique generated: %s", response.content)
        return {"critique": response.content}
    
    def should_continue(self, state):
        logging.info("Checking should_continue with state: %s", state)
        return "END" if state["revision_number"] >= state["max_revision"] else "itinerary_generator"

class TravelPlannerGUI():
    def __init__(self, graph, conn):
        self.conn = conn
        self.graph = graph
        self.demo = self.create_interface()
        self.logs = ""

    def create_interface(self):
        with gr.Blocks() as demo:
            gr.Markdown("# AI Travel Planner")
            
            with gr.Tab("Sumbit Task"):
                with gr.Row():
                    task_input = gr.Textbox(label="Enter your task", placeholder="Give me itinerary for NYC", lines=2)
                    submit_button = gr.Button("Submit")

                live_output = gr.Textbox(label="Result", placeholder="Result will appear here...", lines=10, interactive=False)
                submit_button.click(self.run_agent, inputs=[task_input], outputs=[live_output])

            with gr.Tab("Intermediate Stage"):
                task = gr.Textbox(label="Task", interactive=False, lines=3)
                plan = gr.Textbox(label="Plan", interactive=False, lines=10)
                itinerary = gr.Textbox(label="Itinerary", interactive=False, lines=10)
                critique = gr.Textbox(label="Critique", interactive=False, lines=10)

                refresh_button = gr.Button("Refresh")
                refresh_button.click(self.refresh_all, inputs=[], outputs=[task, plan, itinerary, critique])
            
            with gr.Tab("Live logs"):
                logs = gr.Textbox(label="Logs", interactive=False, lines=10)

                refresh_logs_button = gr.Button("Refresh Logs")
                refresh_logs_button.click(self.get_logs, inputs=[], outputs=[logs])

        return demo


    def run_agent(self, task):
        config = {
            "task": task,
            "content": [],
            "revision_number": 0,
            "max_revision": 2,
            "count": 0
        }

        thread = {"configurable": {"thread_id": "1"}}
        output = ""

        for event in self.graph.stream(config, thread):
            logging.info("Stream event: %s", event)

            try:
                for node_name, state_update in event.items():
                    if node_name == "itinerary_generator":
                        revision = state_update.get("revision_number", 0)
                        draft = state_update.get("itinerary", "Processing...")
                        output += f"Itinerary #{revision}:\n{draft}\n\n"

                    elif node_name == "planner":
                        output += f"Planning Stage:\n" + state_update.get("plan", "No plan available") + "\n\n"
                    self.logs += f"Processed logs: {event}"

            except Exception as e:
                logging.error("Error during graph execution: %s", str(e))
                output += f"Error: {str(e)}"

        return output

    def refresh_all(self):
        try:
            current_state = self.graph.get_state({"configurable": {"thread_id": "1"}}).values
            logging.info("Current state from refresh: %s", current_state)

            return (
                current_state.get("task", "No task available"),
                current_state.get("plan", "No plan available"),
                current_state.get("itinerary", "No itinerary available"),
                current_state.get("critique", "No critique available")
            )
        except AttributeError:
            return ("No state yet", "No state yet", "No state yet", "No state yet")
        except Exception as e:
            logging.error("Error refreshing state: %s", str(e))
            return("Error", "Error", "Error", "Error")

    def get_logs(self):
        return self.logs or "No logs available"

    def launch(self):
        self.demo.launch(share=False)

def cleanup_connection(conn):
    if conn:
        conn.close()
        logging.info("SQLite connection closed")

if __name__ == "__main__":
    logging.info("Starting application...")

    try:
        multi_agent = TravelPlanner()
        builder = multi_agent.builder

        conn = sqlite3.connect(":memory:", check_same_thread=False)
        atexit.register(cleanup_connection, conn)
        memory = SqliteSaver(conn)

        graph = builder.compile(checkpointer=memory)

        app = TravelPlannerGUI(graph, conn)
        app.launch()

    except Exception as e:
        logging.error("Failed to start application: %s", str(e))
        if "conn" in locals():
            conn.close()
        raise


2025-12-01 18:44:57,758 [INFO] Starting application...


API key found successfully.
* Running on local URL:  http://127.0.0.1:7860


2025-12-01 18:45:00,610 [INFO] HTTP Request: GET http://127.0.0.1:7860/gradio_api/startup-events "HTTP/1.1 200 OK"
2025-12-01 18:45:00,655 [INFO] HTTP Request: HEAD http://127.0.0.1:7860/ "HTTP/1.1 200 OK"


* To create a public link, set `share=True` in `launch()`.


2025-12-01 18:45:00,980 [INFO] HTTP Request: GET https://api.gradio.app/pkg-version "HTTP/1.1 200 OK"
