# TechXchange 2025 Lab
### IBM SDE Troubleshooting AI Agent
An **AI Agent** refers to a system or program that is capable completing a request by first planning the sequence of workflow steps and then performing the tasks in the steps by utilizing available tools. AI agents utilizes large language models (LLM) to understand the context of the request, plan and perform the tasks. 

This Python notebook provides code to create a simple AI agent designed to debug operational issues in a file processing scenario within the IBM Sterling FileGateway environment. It leverages a LangGraph graph for state management and runtime processing.

Once the agent is created, you assume the role of an operations specialist at an organization and interact with the IBM SDE Troubleshooting AI Agent using natural language questions. The agent helps identify root causes, resolve issues, and reprocess failed files.

When a question is asked, the AI agent uses a large language model (LLM) to understand the context, determine the sequence of required steps, and execute them using available tools. These tools include fetching the arrived key, retrieving error descriptions, updating code lists, and reprocessing failed files. The full sequence—including step execution, tool usage, and final response—is displayed in the output for review.

#### To use this Notebook:

Run each cell below one by one and make sure it completes successfully.

When prompted for API key enter it in the box and hit Enter and then continue running the cells to initialize the code.

Finally, when prompted enter your query and run the subsequent cells. Some example questions are provided.

You may run/repeat the query cells by entering and trying different queries.

#### Lab Exercise
Below is an outline of the code. The code is set up in Python Notebook cells.

1. Set up required libraries
2. Define functions to get and check credentials
3. Define tools that the AI agent can use
4. Configure the LLM
5. Define the LangGraph graph and functions for state and runtime processing.
6. Show a visual representation the graph - the AI agent with tools
7. Use the AI agent - ask the AI agent file related questions
8. Review the responses from AI agent


#### 1. Set up required libraries

In [None]:
# Set up libraries
%pip install langgraph==0.2.73
%pip install -U langchain-ibm==0.3.6
    
from typing import Annotated, Literal, TypedDict

from langchain_core.messages import HumanMessage

from langchain_core.tools import tool
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, StateGraph, MessagesState
from langgraph.prebuilt import ToolNode

import requests
import json
import time
import random

import requests
import urllib3

import xml.etree.ElementTree as ET
from collections import defaultdict

#### 2. Define functions to get and check credentials

In [None]:
# Function to get credentials
import getpass
import os

def _set_if_undefined(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"Please provide your {var}")

# Get credentials
# Set the Project IDs
# If using you own account, or lab account and project, the current project id will be configured and set automtically
_set_if_undefined("PROJECT_ID")

# When prompted for API key enter it in the box and hit Enter, move to next cell and then continue running the cells
_set_if_undefined("WATSONX_API_KEY")


In [None]:
# Print credential configurations for validation
print('Api key:',os.environ.get("WATSONX_API_KEY"))
print('Project id:',os.environ.get("PROJECT_ID"))

#### 3. Define tools that the AI agent can use

In [None]:
# Define tools that the AI agent can use

# The arrived key of the files is determined based on the producer name using the get_arrived_key() tool.
# When a natural language question is asked, the AI agent first determines the context and then the sequence to utilize one or more tools to provide a response 

# this tool fetches producer name, arrived key, and state of a file
@tool
def get_arrived_key(producer_name: str) -> str: 
    """Use this function to get the arrived key of files for the partner. Producer's name can be used instead of partner.
    
    After task execution, this method ensures that the agent requests or suggests a follow-up task, maintaining a continuous interaction or workflow chain.    
    """    

    # Disable insecure request warning
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    
    # Define the URL (GET request to list fgarrivedfiles)
    url = f"https://HOSTNAME/B2BAPIs/svc/fgarrivedfiles/"

    # Base64-encoded credentials
    auth_header = "Basic YXBpdXNlcjpQYXNzd29yZEAxMjM="
    
    # Set headers
    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json",
        "Authorization": auth_header
    }
    
    # Initialize variable before the request
    arrivedFileKey = None
    
    try:
        response = requests.get(url, headers=headers, verify=False)
        response.raise_for_status()

        fg_arrived_files = response.json()
        result = []
        for file in fg_arrived_files:
            result.append({
                "producerName": file.get("prodOrgName"),
                "arrivedFileKey": file.get("arrivedFileKey"),
                "status": file.get("arrivedFileState"),
                "fileName": file.get("fileName"),
                "fileTransferTimestamp": file.get("createTs")
            })

        return result

    except requests.RequestException as e:
        print(f"HTTP Request failed: {e}")
        return []

    except ValueError:
        print("Response is not valid JSON.")
        return []


# this tool fetches error description of a file
@tool
def get_error_description(arrived_key: str) -> str: 
    """Use this function to get the error description of files based on the arrived key.
    
    After task execution, this method ensures that the agent requests or suggests a follow-up task, maintaining a continuous interaction or workflow chain.
    """   

    # Disable insecure request warning
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    
    # Define the URL
    url = "https://HOSTNAME/bpaas/v1/fetcherror"
    
    # XML payload with dynamic arrived_key
    payload_xml = f"""<?xml version="1.0" encoding="UTF-8"?>
    <FetchErrorRequest>
        <ARRIVEDFILE_KEY>{arrived_key}</ARRIVEDFILE_KEY>
    </FetchErrorRequest>
    """
    
    # Authorization header
    auth_header = "Basic YXBpdXNlcjpQYXNzd29yZEAxMjM="
    
    # Set headers
    headers = {
        "Content-Type": "application/xml",
        "Accept": "application/xml",
        "Authorization": auth_header
    }
    
    # Send POST request
    response = requests.post(url, data=payload_xml, headers=headers, verify=False)
    
    # Helper to strip namespaces from tags
    def strip_namespace(tag):
        return tag.split('}', 1)[-1] if '}' in tag else tag
    
    # XML to dict parser that handles multiple children with same tag
    def xml_to_dict(element):
        result = defaultdict(list)
        for child in element:
            key = strip_namespace(child.tag)
            if len(child):
                value = xml_to_dict(child)
            else:
                value = child.text.strip() if child.text and child.text.strip() else ""
            result[key].append(value)
        return {k: v[0] if len(v) == 1 else v for k, v in result.items()}
    
    def extract_error_and_process(data):
        # Top-level key, e.g. "result"
        top_key = next(iter(data))
        rows = data[top_key].get("row", [])
    
        # Normalize to list if single dict
        if isinstance(rows, dict):
            rows = [rows]
    
        error_message = None
        error_description = None
        producer_name = None
        arrived_key = None
    
        for row in rows:
            name = row.get("NAME")
            value = row.get("VALUE")
            if name == "ErrorMessage" and value and not value.startswith("[No Value"):
                error_message = value
            elif name == "ErrorDescription" and value and not value.startswith("[No Value"):
                error_description = value
            elif name == "ProducerName" and value and not value.startswith("[No Value"):
                producer_name = value
            elif name == "ProducerFilename" and value and not value.startswith("[No Value"):
                file_name = value
        
    
        return error_message, error_description, producer_name, arrived_key, file_name
    
    if response.status_code == 200:
        try:
            root = ET.fromstring(response.text)
            response_dict = {strip_namespace(root.tag): xml_to_dict(root)}
            response_json = json.dumps(response_dict, indent=2)    
            # Extract ErrorMessage and ErrorDescription
            error_message, error_description, producer_name, arrived_key, file_name = extract_error_and_process(response_dict)
    
        except ET.ParseError as e:
            print("XML parsing error:", e)
    else:
        print("Failed to retrieve data.")

    return error_message, error_description, producer_name, arrived_key, file_name


# this tool resolves dynamic consumer identification configuration issues by updating the codelist.
@tool
def update_codelist_dynamic_consumer(producer_name: str, consumer_name: str) -> str:
    """Use this function to resolve producer–consumer association or consumer identification issues by updating the consumer name in the codelist for the listed producer. The partner's name can be used in place of the producer's.

    After task execution, this method ensures that the agent requests or suggests a follow-up task to reprocess files, maintaining a continuous interaction or workflow chain.   
    """    

    # Define the URL and payload
    url = "https://HOSTNAME:443/B2BAPIs/svc/codelistcodes/"
    payload = {
        "CodeList": "TX_DetermineConsumer|||1",
        "codeSet": [
            {
                "senderCode": producer_name,
                "receiverCode": consumer_name,
                "description": consumer_name
            }
        ]
    }
    
    # Base64-encoded credentials
    auth_header = "Basic YXBpdXNlcjpQYXNzd29yZEAxMjM="
    
    # Set headers
    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json",
        "Authorization": auth_header
    }
    
    # Send POST request
    response = requests.post(url, json=payload, headers=headers, verify=False)
    
    return_message = None
    if response.status_code == 201:
        return_message = "Producer name = " + producer_name + "Consumer name = " + consumer_name + " are successfully associated"
    else:
        return_message = "Failed to associate " + "Producer name = " + producer_name + " with Consumer name = " + consumer_name   

    return return_message


# this tool reprocesses the files
@tool
def reprocess_files(arrived_key: str) -> str: 
    """
    Use this function to reprocess a failed file for a producer (partner) by triggering the replay of a previously received file, using the arrived key extracted from the error description output. 
    This is typically done when a transmission or transformation has failed and needs to be re-attempted.

    Don't update the consumer name in the codelist for the listed producer unless it is explicitly requested.
    
    ### Parameters:
    - arrived_key (str): A unique identifier representing the specific file to be reprocessed. This key is
      generally associated with the arrival or ingestion of the file in the system.

    ### Returns:
    - str: A message indicating whether the reprocessing was successful or failed.

    ### Agent Behavior:
    After invoking this tool, the agent should:
    - Confirm the outcome to the user (success/failure),
    - Requests or suggest a follow-up task to debug other file transfer issues, maintaining a continuous interaction or workflow chain.
    """
    
    # Disable insecure request warning
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    
    # Define the URL and payload
    url = f"https://HOSTNAME/B2BAPIs/svc/fgarrivedfiles/{arrived_key}/actions/replayarrivedfile/"
    
    payload = {
        "replayComment": "reprocess"
    }
    
    # Base64-encoded credentials
    auth_header = "Basic YXBpdXNlcjpQYXNzd29yZEAxMjM="
    
    # Set headers
    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json",
        "Authorization": auth_header
    }
    
    # Send POST request
    response = requests.post(url, json=payload, headers=headers, verify=False)
    
    return_message = None
    if response.status_code == 200:
        return_message =  "File reprocessed successfully for the listed producer"
    else:
        return_message = "Failed to reprocess the file for for the listed producer" 

    return return_message

tools = [get_arrived_key, get_error_description, update_codelist_dynamic_consumer, reprocess_files]

tool_node = ToolNode(tools)



#### 4. Configure the LLM

In [None]:
#Configure the LLM

from ibm_watsonx_ai.metanames import GenTextParamsMetaNames
parameters = {
    GenTextParamsMetaNames.DECODING_METHOD: "sample", #"greedy", #"sample"
    GenTextParamsMetaNames.MIN_NEW_TOKENS: 150,
    GenTextParamsMetaNames.MAX_NEW_TOKENS: 250,
    GenTextParamsMetaNames.TEMPERATURE: 0,
    #GenTextParamsMetaNames.TOP_K: 50,
    #GenTextParamsMetaNames.TOP_P: 1,
}

from langchain_ibm import ChatWatsonx
model = ChatWatsonx(
    model_id="mistralai/mistral-large", 
    url="https://us-south.ml.cloud.ibm.com", 
    apikey=os.environ.get("WATSONX_API_KEY"),
    project_id=os.environ.get("PROJECT_ID"),
    params=parameters,
).bind_tools(tools)

#### 5. Define the LangGraph graph and functions for state and runtime processing.

In [None]:
# Define the function that determines whether to continue or not
def should_continue(state: MessagesState) -> Literal["tools", END]:
    messages = state['messages']
    last_message = messages[-1]
    # If the LLM makes a tool call, then we route to the "tools" node
    if last_message.tool_calls:
        return "tools"
    # Otherwise, we stop (reply to the user)
    return END


# Define the function that calls the model
def call_model(state: MessagesState):
    messages = state['messages']
    response = model.invoke(messages)
    # We return a list, because this will get added to the existing list
    return {"messages": [response]}



In [None]:
# Define a new graph
workflow = StateGraph(MessagesState)

# Define the two nodes we will cycle between
workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)

# Set the entrypoint as `agent`
# This means that this node is the first one called
workflow.add_edge(START, "agent")

# add a conditional edge
workflow.add_conditional_edges(
    # First, we define the start node. We use `agent`.
    # This means these are the edges taken after the `agent` node is called.
    "agent",
    # Next, we pass in the function that will determine which node is called next.
    should_continue,
)

# add a normal edge from `tools` to `agent`.
# after `tools` is called, `agent` node is called next.
workflow.add_edge("tools", 'agent')


# Compile graph
app = workflow.compile()


#### 6. Show a visual representation the graph - the AI agent with tools

In [None]:
# Show graph
from IPython.display import display, Image
display(Image(app.get_graph().draw_mermaid_png()))

#### 7. Use the AI agent - ask the AI agent files related question



Ask the AI agent these questions. Review the response.

- what can you do for me?
  
- list all the files for all producers

- list all the files for test_prod2 producer

- list all the failed files for test_prod2 producer

- get the error description of failed files for test_prod2 producer

- update the test_cons2 consumer name in the codelist for the test_prod2 producer and reprocess failed files for test_prod2

#### Write the query in the box and hit Enter and then continue running the next cells

In [None]:
human_query = input("Type a question and hit enter: ")

In [None]:
# Use the runtime
final_state = app.invoke(
    {"messages": [HumanMessage(content=human_query)]},
    config={"configurable": {"thread_id": random.randint(1, 100)}}
)

# Extract messages
messages = final_state["messages"]

for m in messages:
        m.pretty_print()
        
# Find the first human message
# for m in messages:
#     if m.type == "human":
#         m.pretty_print()
#         break  # Only print the first human message

# # # Find the last AI message
# for m in reversed(messages):
#     if m.type == "ai":
#         m.pretty_print()
#         break  # Only print the last AI message

#### 8. Review the response above from AI agent
Run/repeat the above cells with different queries, i.e., rerun the human_query and final_state = app.invoke() cells with different queries