In [None]:
import os
import json
import re
import requests
import ast
import traceback

# typing PyDantic
from typing import Any, Optional, Dict, Callable

# AgentState
from typing import Annotated, List, TypedDict, Optional
import operator

# get .env 
from dotenv import load_dotenv
load_dotenv()

# Watsonx imports
from ibm_watsonx_ai import APIClient, Credentials
from ibm_watsonx_ai.foundation_models import ModelInference
from ibm_watsonx_ai.metanames import GenTextParamsMetaNames as GenParams

# LangGraph + messages
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, ToolMessage

REACT_PROMPT = """You are a specialized agent with access to multiple tools. Your goal is to answer the user's request using the available tools when needed.

Available Tools:
{tools_list}

You must respond in one of two formats:

1. Final Answer
Think: [Your reasoning about why you have enough information]
Action: Final Answer
Action Input: [Your final response to the user]

2. Tool Call
Think: [Your reasoning about why you need to use a specific tool]
Action: [tool_name]
Action Input: [appropriate input parameters for the tool]

Begin.
"""

def get_llm(model_id: str = "meta-llama/llama-3-3-70b-instruct",
    max_new_tokens: int = 512,
    temperature: float = 0.2,
) -> ModelInference:
    
    creds = Credentials(api_key=os.getenv("WATSONX_API_KEY"), url=os.getenv("WATSONX_URL"))
    client = APIClient(credentials=creds)

    return ModelInference(
        model_id=model_id,
        params={GenParams.MAX_NEW_TOKENS: max_new_tokens, GenParams.TEMPERATURE: temperature},
        project_id=os.getenv("WATSONX_PROJECT_ID"), 
        api_client=client
    )

class AgentState(TypedDict):
    messages: Annotated[List[BaseMessage], operator.add]
    final_output: Optional[str]
    action_name: Optional[str]
    action_input: Optional[str]

def generate_transcript(topic: str, llm: Optional[ModelInference] = None) -> str:
    """Generate a YouTube transcript for kids on the given topic."""
    if llm is None:
        llm = get_llm()

    prompt = f"""
Generate a clear, spoken-style transcript for a YouTube trending video for kids about:
"{topic}"

Requirements:
- 400-700 words
- Conversational tone
- Clear structure: hook ‚Üí points ‚Üí ending
- No directions or metadata, only spoken words
- Age-appropriate content for children
"""

    response = llm.generate_text(prompt)

    if isinstance(response, dict) and "results" in response:
        return response["results"][0].get("generated_text", "")
    elif isinstance(response, str):
        return response
    return str(response)

def tavily_search(query: str, top_k: int = 5) -> dict:
    """
    Real Tavily search call.
    """
    api_key = os.getenv("TAVILY_API_KEY")
    endpoint = os.getenv("TAVILY_ENDPOINT")

    if not api_key or not endpoint:
        raise RuntimeError("Missing Tavily environment variables.")

    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json",
    }

    payload = {"query": query, "top_k": top_k}
    resp = requests.post(endpoint, headers=headers, json=payload, timeout=25)
    resp.raise_for_status()

    return resp.json()

# Enhanced tools dictionary with descriptions
TOOLS: Dict[str, Dict[str, Any]] = {
    "tavily_search": {
        "function": tavily_search,
        "description": "Searches the internet for current information on any topic. Use when you need up-to-date facts or recent information.",
        "input_schema": {
            "query": "string - the search query",
            "top_k": "integer - number of results (default: 5)"
        }
    },
    "generate_transcript": {
        "function": generate_transcript,
        "description": "Generates a YouTube-style transcript for kids on a given topic. Use when the user asks for content creation, scripts, or educational material for children.",
        "input_schema": {
            "topic": "string - the topic for the transcript"
        }
    }
}

def react_llm_node(state: AgentState):
    """Main reasoning node that decides which tool to use or provides final answer."""
    
    # Build conversation history
    conversation = ""
    for msg in state["messages"]:
        if isinstance(msg, AIMessage):
            conversation += f"Assistant: {msg.content}\n"
        elif isinstance(msg, HumanMessage):
            conversation += f"User: {msg.content}\n"
        elif isinstance(msg, ToolMessage):
            conversation += f"Tool Result: {msg.content}\n"

    # Build tools list for prompt
    tools_list = ""
    for tool_name, tool_info in TOOLS.items():
        tools_list += f"- {tool_name}: {tool_info['description']}\n"
        tools_list += f"  Input: {tool_info['input_schema']}\n\n"

    prompt = REACT_PROMPT.format(tools_list=tools_list) + "\n" + conversation

    llm = get_llm()
    llm_result = llm.generate_text(prompt)
    
    if isinstance(llm_result, dict) and "results" in llm_result:
        react_llm_result = llm_result["results"][0]["generated_text"]
    elif isinstance(llm_result, str):
        react_llm_result = llm_result
    else:
        react_llm_result = str(llm_result)

    return {"messages": [AIMessage(content=react_llm_result)]}

def react_llm_parser_and_decide_node(state: AgentState):
    """Parse the LLM response and decide next action."""
    
    last = state["messages"][-1]
    text = last.content or ""
    
    # Look for action pattern
    m = re.search(
        r"Action:\s*(.+?)\s*\nAction Input:\s*(.+)",
        text,
        re.DOTALL | re.MULTILINE
    )

    if not m:
        # Check if it's a final answer without explicit action
        if "Final Answer" in text or any(indicator in text.lower() for indicator in ["here is", "the answer is", "i can tell you"]):
            # Extract the actual answer content
            lines = text.split('\n')
            answer_lines = [line for line in lines if not line.startswith('Think:') and not line.startswith('Action:')]
            final_answer = ' '.join(answer_lines).strip()
            return {"action_name": None, "action_input": final_answer}
        return {"action_name": None, "action_input": None}

    action_name = m.group(1).strip()
    action_input_raw = m.group(2).strip()

    # Parse action input
    try:
        action_input = json.loads(action_input_raw)
    except:
        try:
            action_input = ast.literal_eval(action_input_raw)
        except:
            # For simple string inputs, create appropriate structure
            if action_name == "generate_transcript":
                action_input = {"topic": action_input_raw}
            elif action_name == "tavily_search":
                action_input = {"query": action_input_raw}
            else:
                action_input = {"raw": action_input_raw}
            
    if action_name.lower() == "final answer":
        return {"action_name": None, "action_input": action_input}
    
    return {"action_name": action_name, "action_input": action_input}

def tool_executor_node(state: AgentState):
    """Execute the selected tool."""
    name = state.get("action_name")
    data = state.get("action_input") or {}

    if not name:
        return {}

    tool_info = TOOLS.get(name)
    if not tool_info:
        result_text = f"Unknown tool: {name}"
    else:
        try:
            tool_fn = tool_info["function"]
            # Handle different tool input formats
            if name == "generate_transcript":
                if isinstance(data, dict) and "topic" in data:
                    result = tool_fn(topic=data["topic"])
                elif isinstance(data, str):
                    result = tool_fn(topic=data)
                else:
                    result = tool_fn(topic=str(data))
            elif name == "tavily_search":
                if isinstance(data, dict):
                    result = tool_fn(**data)
                else:
                    result = tool_fn(query=str(data))
            else:
                result = tool_fn(**data) if isinstance(data, dict) else tool_fn(data)
                
            result_text = json.dumps(result, ensure_ascii=False, indent=2)
        except Exception as e:
            result_text = json.dumps({"error": str(e), "trace": traceback.format_exc()})

    # Create tool message
    tool_msg = ToolMessage(
        content=result_text,
        tool_call_id=name
    )

    return {"messages": [tool_msg], "action_name": None, "action_input": None}

def extract_final_answer(state: AgentState) -> Dict[str, Any]:
    """Extract and return the final answer from the conversation."""
    # Look for the last AI message that contains a final answer
    for msg in reversed(state["messages"]):
        if isinstance(msg, AIMessage):
            content = msg.content or ""
            # If it explicitly says "Final Answer"
            if "Final Answer" in content:
                m = re.search(r"Action Input:\s*(.+)$", content, re.S)
                if m:
                    return {"final_output": m.group(1).strip()}
                # Try to extract the answer after "Final Answer"
                lines = content.split('\n')
                for i, line in enumerate(lines):
                    if "Final Answer" in line and i + 1 < len(lines):
                        return {"final_output": lines[i + 1].strip()}
                return {"final_output": content}
            
            # If this is the last message and no tool calls were made, it might be the answer
            if msg == state["messages"][-1] and not any(isinstance(m, ToolMessage) for m in state["messages"][state["messages"].index(msg):]):
                return {"final_output": content}
    
    # Check if we have a final answer from the router
    if state.get("action_name") is None and state.get("action_input"):
        return {"final_output": state["action_input"]}
        
    return {"final_output": None}

def build_graph():
    """Build the LangGraph workflow."""
    graph = StateGraph(AgentState)

    # Add nodes
    graph.add_node("react_llm", react_llm_node)
    graph.add_node("router", react_llm_parser_and_decide_node)
    graph.add_node("tool_executor", tool_executor_node)
    graph.add_node("extract_final_answer", extract_final_answer)

    # Build workflow
    graph.add_edge(START, "react_llm")
    graph.add_edge("react_llm", "router")

    def route_fn(state: AgentState):
        """Route to final answer or tool execution."""
        return "extract_final_answer" if state.get("action_name") is None else "tool_executor"

    graph.add_conditional_edges(
        "router",
        route_fn,
        {
            "extract_final_answer": "extract_final_answer", 
            "tool_executor": "tool_executor"
        }
    )

    graph.add_edge("extract_final_answer", END)
    graph.add_edge("tool_executor", "react_llm")  # Loop back for multi-step reasoning
    
    return graph

def main():
    """Main execution function."""
    graph = build_graph()
    app = graph.compile()

    user_query = input("Enter your query/topic: ").strip()

    state: AgentState = {
        "messages": [HumanMessage(content=user_query)],
        "action_name": None,
        "action_input": None,
        "final_output": None,
    }

    print("\n==== Processing Your Request ====\n")
    
    # Run the graph
    result = app.invoke(state)
    
    final_output = result.get("final_output")
    
    if final_output:
        print("\n==== Final Answer ====\n")
        print(final_output)
    else:
        print("\n==== No Clear Final Answer Found ====\n")
        print("Conversation history:")
        for i, m in enumerate(result["messages"]):
            print(f"\n--- Step {i} ({type(m).__name__}) ---")
            print(m.content)

if __name__ == "__main__":
    main()


==== Processing Your Request ====


==== Final Answer ====

The tool call to find the best trending video on YouTube for children failed due to a connection error. Please try again later or check the API endpoint.


In [7]:
import os
from typing import Annotated, Literal, TypedDict
from dotenv import load_dotenv

# 1. The Modern Imports
from langchain_ibm import ChatWatsonx
from langchain_core.tools import tool
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_core.prompts import ChatPromptTemplate
from langgraph.graph import StateGraph, END, START
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode # <--- The professional way to run tools

load_dotenv()

# --- SECTION 1: DEFINE TOOLS (The Right Way) ---
# Using @tool decorators automatically generates the JSON schema. 
# No manual dicts required.

@tool
def tavily_search(query: str):
    """
    Perform a web search for up-to-date information.
    Use this when you need current events or facts.
    """
    # Mocking the actual API call for the demo (replace with your requests logic)
    return f"Search results for {query}: 1. WatsonX is an enterprise AI platform..."

@tool
def generate_transcript(topic: str):
    """
    Generates a YouTube-style transcript for kids on a given topic.
    """
    return f"Hey kids! Welcome back! Today we are talking about {topic}..."

# List of tools
tools = [tavily_search, generate_transcript]

# --- SECTION 2: THE BRAIN (Model Binding) ---

# We use ChatWatsonx because it handles the "Chat" structure natively.
# Replace "model_id" with your specific Llama 3 version available in WatsonX
llm = ChatWatsonx(
    model_id="meta-llama/llama-3-3-70b-instruct",
    url=os.getenv("WATSONX_URL"),
    project_id=os.getenv("WATSONX_PROJECT_ID"),
    params={
        "decoding_method": "greedy",
        "max_new_tokens": 500
    }
)

# BINDING: This is the key. We attach tools to the LLM signature.
# The LLM now "knows" these functions exist and will output specific structures to call them.
llm_with_tools = llm.bind_tools(tools)

# --- SECTION 3: STATE & NODES ---

class AgentState(TypedDict):
    # add_messages handles the history automatically. No manual string concatenation.
    messages: Annotated[list[BaseMessage], add_messages]

def reasoner_node(state: AgentState):
    """
    The Agent Node. It simply looks at history and decides what to do.
    """
    return {"messages": [llm_with_tools.invoke(state["messages"])]}

# We do not need a "parser" node. The ToolNode handles execution automatically.
tool_node = ToolNode(tools)

# --- SECTION 4: CONTROL FLOW (The Router) ---

def should_continue(state: AgentState) -> Literal["tools", "__end__"]:
    """
    Decide if we loop or stop.
    """
    messages = state["messages"]
    last_message = messages[-1]
    
    # If the LLM returned a tool_call, we MUST go to the tools node
    if last_message.tool_calls:
        return "tools"
    
    # Otherwise, we are done
    return "__end__"

# --- SECTION 5: THE GRAPH ARCHITECTURE ---

workflow = StateGraph(AgentState)

# Add Nodes
workflow.add_node("agent", reasoner_node)
workflow.add_node("tools", tool_node)

# Add Edges
workflow.add_edge(START, "agent")

# Conditional Edge: This replaces your complex "router node"
workflow.add_conditional_edges(
    "agent",
    should_continue,
)

# The Cycle: After tools run, feedback into the agent
workflow.add_edge("tools", "agent")

# Compile
app = workflow.compile()


# --- SECTION 6: EXECUTION ---

import json
import uuid

def reasoner_node(state: AgentState):
    """
    The Agent Node with Self-Correction Logic.
    """
    response = llm_with_tools.invoke(state["messages"])
    
    # --- RESILIENCE BLOCK ---
    # Check if the model tried to call a tool but the parser missed it
    if not response.tool_calls and response.content:
        content = response.content
        if isinstance(content, str) and content.strip().startswith("[") and "type" in content:
            try:
                # 1. Attempt to parse the content as a list
                # The model output came as a string representing a list of strings
                parsed_list = json.loads(content)
                
                tool_calls = []
                for item in parsed_list:
                    # 2. If the item is a string (double-encoded JSON), parse it again
                    if isinstance(item, str):
                        item = json.loads(item)
                    
                    # 3. Standardize to LangChain ToolCall format
                    if item.get("type") == "function" or "name" in item:
                        # Extract name and args depending on format
                        func_name = item.get("name") or item.get("function", {}).get("name")
                        func_args = item.get("parameters") or item.get("function", {}).get("arguments") or {}
                        
                        if func_name:
                            tool_calls.append({
                                "name": func_name,
                                "args": func_args,
                                "id": f"call_{uuid.uuid4()}" # Required by LangGraph
                            })
                
                if tool_calls:
                    print(f"‚úÖ SYSTEM: Auto-corrected {len(tool_calls)} raw tool calls.")
                    response.tool_calls = tool_calls
                    # Clear content so we don't re-process it or confuse the next step
                    response.content = "" 
                    
            except json.JSONDecodeError:
                pass # It was just normal text, ignore
    # ------------------------

    return {"messages": [response]}



# 1. Add ToolMessage and AIMessage to your imports
from langchain_core.messages import BaseMessage, HumanMessage, ToolMessage, AIMessage

# 2. Replace the main() function with this type-safe version
def main():
    user_query = "Search for what is trending in AI today and then write a kids transcript about it."
    print(f"User: {user_query}\n")
    
    inputs = {"messages": [HumanMessage(content=user_query)]}
    
    try:
        for event in app.stream(inputs, config={"recursion_limit": 10}):
            for key, value in event.items():
                print(f"\n--- Node: {key} ---")
                
                # Get the last message generated by this node
                last_msg = value["messages"][-1]
                
                # --- TYPE SAFETY CHECK ---
                
                # CASE 1: The LLM (Agent) is speaking
                if isinstance(last_msg, AIMessage):
                    if last_msg.tool_calls:
                        print(f"üõ†Ô∏è  Thinking: I need to use tools: {[t['name'] for t in last_msg.tool_calls]}")
                    else:
                        print(f"ü§ñ Agent: {last_msg.content}")
                
                # CASE 2: The Tool is responding
                elif isinstance(last_msg, ToolMessage):
                    print(f"‚ö° Tool Output: {last_msg.content[:100]}...") # Truncate long outputs
                    
                # CASE 3: Fallback
                else:
                    print(f"Unknown Message: {last_msg}")
                    
    except Exception as e:
        import traceback
        traceback.print_exc()

if __name__ == "__main__":
    main()

User: Search for what is trending in AI today and then write a kids transcript about it.


--- Node: agent ---
üõ†Ô∏è  Thinking: I need to use tools: ['tavily_search']

--- Node: tools ---
‚ö° Tool Output: Search results for trending in AI today: 1. WatsonX is an enterprise AI platform......

--- Node: agent ---
üõ†Ô∏è  Thinking: I need to use tools: ['generate_transcript']

--- Node: tools ---
‚ö° Tool Output: Hey kids! Welcome back! Today we are talking about WatsonX is an enterprise AI platform......

--- Node: agent ---
ü§ñ Agent: The current trend in AI is WatsonX, an enterprise AI platform. Here's a kids' transcript about it: "Hey kids! Welcome back! Today we are talking about WatsonX is an enterprise AI platform..."


In [None]:
import os
import json
import uuid
import traceback
from typing import Annotated, Literal, TypedDict, List
from dotenv import load_dotenv

# --- LangChain / LangGraph Imports ---
from langchain_ibm import ChatWatsonx
from langchain_core.tools import tool
from langchain_core.messages import BaseMessage, HumanMessage, ToolMessage, AIMessage
from langgraph.graph import StateGraph, END, START
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.memory import MemorySaver  # <--- The Memory Component

# Load environment variables
load_dotenv()

# ==========================================
# SECTION 1: THE TOOLS (The Hands)
# ==========================================

@tool
def tavily_search(query: str):
    """
    Perform a web search for up-to-date information. 
    Use this when you need current events, facts, or trending topics.
    """
    # In a real scenario, you would call the Tavily API here.
    # For this demo, we mock the return to ensure it runs without extra keys.
    print(f"    (System: Searching internet for '{query}'...)")
    return f"Search Results for '{query}': 1. WatsonX is trending for its enterprise safety. 2. Agentic workflows are the new standard in 2025. 3. Python 3.14 was just released."

@tool
def generate_transcript(topic: str):
    """
    Generates a YouTube-style transcript for kids on a given topic.
    """
    print(f"    (System: Generating content for '{topic}'...)")
    return f"Hey there Super Kids! Welcome back to the channel! Today we are learning about {topic}! It's like a robot brain that helps people do work faster!"

tools = [tavily_search, generate_transcript]

# ==========================================
# SECTION 2: THE MODEL (The Brain)
# ==========================================

# We use Llama 3.3 70B Instruct as it is the most capable model available in your env.
llm = ChatWatsonx(
    model_id="meta-llama/llama-3-3-70b-instruct",
    url=os.getenv("WATSONX_URL"),
    project_id=os.getenv("WATSONX_PROJECT_ID"),
    params={
        "decoding_method": "greedy",
        "max_new_tokens": 1000,
        "temperature": 0.1, # Keep temp low for precise tool calling
    }
)

# Bind the tools to the LLM so it knows they exist
llm_with_tools = llm.bind_tools(tools)

# ==========================================
# SECTION 3: STATE & LOGIC (The Nervous System)
# ==========================================

class AgentState(TypedDict):
    # 'add_messages' ensures we append to history rather than overwriting it
    messages: Annotated[List[BaseMessage], add_messages]

def reasoner_node(state: AgentState):
    """
    The main reasoning node. Includes a resilience pattern to fix 
    malformed JSON if the model outputs a string instead of a structured object.
    """
    response = llm_with_tools.invoke(state["messages"])
    
    # --- RESILIENCE BLOCK (Auto-Correction) ---
    # Sometimes models return a string looking like JSON instead of actual tool calls.
    # We catch that here and fix it manually.
    if not response.tool_calls and response.content:
        content = response.content
        if isinstance(content, str) and content.strip().startswith("[") and "type" in content:
            try:
                parsed_list = json.loads(content)
                tool_calls = []
                for item in parsed_list:
                    if isinstance(item, str): item = json.loads(item)
                    
                    # Extract standard fields
                    func_name = item.get("name") or item.get("function", {}).get("name")
                    func_args = item.get("parameters") or item.get("function", {}).get("arguments") or {}
                    
                    if func_name:
                        tool_calls.append({
                            "name": func_name,
                            "args": func_args,
                            "id": f"call_{uuid.uuid4()}"
                        })
                
                if tool_calls:
                    print(f"    (System: Auto-corrected {len(tool_calls)} raw tool calls)")
                    response.tool_calls = tool_calls
                    response.content = "" # Clear text so we don't confuse the next step
            except Exception:
                pass # If parsing fails, just let it be normal text
    # ------------------------------------------

    return {"messages": [response]}

def should_continue(state: AgentState) -> Literal["tools", "__end__"]:
    """
    Router Logic: If the agent wants to call a tool, go to 'tools'.
    Otherwise, stop.
    """
    messages = state["messages"]
    last_message = messages[-1]
    
    if last_message.tool_calls:
        return "tools"
    return "__end__"

# ==========================================
# SECTION 4: THE GRAPH (The Architecture)
# ==========================================

workflow = StateGraph(AgentState)

# 1. Add Nodes
workflow.add_node("agent", reasoner_node)
workflow.add_node("tools", ToolNode(tools)) # Using LangGraph's prebuilt optimized ToolNode

# 2. Add Edges
workflow.add_edge(START, "agent")

# 3. Conditional Edge
workflow.add_conditional_edges(
    "agent",
    should_continue,
)

# 4. Loop Back (The Cycle)
workflow.add_edge("tools", "agent")

# 5. Compile with Memory
# This is the key "Stateful" component.
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)

# ==========================================
# SECTION 5: EXECUTION (The Interview Demo)
# ==========================================

def print_stream(stream_event):
    """Helper to pretty-print the events typesafely."""
    for key, value in stream_event.items():
        if "messages" in value:
            last_msg = value["messages"][-1]
            
            # TYPE SAFETY: Check strictly what kind of message this is
            if isinstance(last_msg, AIMessage):
                if last_msg.tool_calls:
                    names = [t['name'] for t in last_msg.tool_calls]
                    print(f"üõ†Ô∏è  Thinking: I need to use tools: {names}")
                else:
                    print(f"ü§ñ Agent: {last_msg.content}")
            
            elif isinstance(last_msg, ToolMessage):
                # Truncate output for cleanliness
                content = str(last_msg.content)
                print(f"‚ö° Tool Output: {content[:80]}...")

def main():
    # A unique thread ID simulates a specific user session.
    # The agent uses this ID to look up previous messages in MemorySaver.
    thread_id = "google_interview_session_v1"
    config = {"configurable": {"thread_id": thread_id}}

    print(f"\n=== SESSION START (Thread ID: {thread_id}) ===\n")

    # --- TURN 1 ---
    print("--- Turn 1: Initial Research Request ---")
    user_input_1 = "Search for what is trending in AI today."
    print(f"User: {user_input_1}")
    
    inputs_1 = {"messages": [HumanMessage(content=user_input_1)]}
    
    try:
        # Recursion limit safeguards against infinite loops
        for event in app.stream(inputs_1, config=config):
            print_stream(event)
    except Exception as e:
        traceback.print_exc()

    # --- TURN 2 (Testing State) ---
    print("\n--- Turn 2: Follow-up (Testing Memory) ---")
    # Note: We do NOT repeat the topic. The agent must recall it from state.
    user_input_2 = "Write a kids transcript about that."
    print(f"User: {user_input_2}")
    
    inputs_2 = {"messages": [HumanMessage(content=user_input_2)]}
    
    try:
        for event in app.stream(inputs_2, config=config):
            print_stream(event)
    except Exception as e:
        traceback.print_exc()
        
    print("\n=== SESSION END ===")

if __name__ == "__main__":
    main()