# 🛠️ Agent with Human in the Loop

This notebook demonstrates how a Haystack Agent can interactively ask for user input when it's uncertain about the next step. We'll build a Human-in-the-Loop tool that the Agent can call dynamically. When the Agent encounters ambiguity or incomplete information, it will use a system prompt and trigger human feedback to continue solving the task.

For this purpose, we will create a DevOps Support Agent.

CI/CD pipelines occasionally fail for reasons that can be hard to diagnose including manually—broken tests, mis-configured environment variables, flaky integrations, etc. 

The support Agent is created using Haystack Tools and Agent and will perform the following tasks:

- Check for any failed CI workflows
- Fetch build logs and status
- Lookup and analyze git commits
- Suggest next troubleshooting steps
- Escalate to humans via Human-in-Loop tool

# Install the required dependencies

In [None]:
# Install the required dependencies
!pip install haystack-ai==2.13

Next, we configure our variables. We need to set API keys for OpenAI and GitHub token to access repository information.

In [1]:
from getpass import getpass
import os

if not os.environ.get("OPENAI_API_KEY"):
    os.environ["OPENAI_API_KEY"] = getpass("Enter your OpenAI API key:")

if not os.environ.get("GITHUB_TOKEN"):
    os.environ["GITHUB_TOKEN"] = getpass("Enter your GitHub token:")

## Define Tools
We start by defining the tools which will be used by our Agent.


`git_list_commits_tool`: Fetches and formats the most recent commits on a given GitHub repository and branch.  
  Useful for quickly surfacing the latest changes when diagnosing CI failures or code regressions.


In [3]:
import os, requests, zipfile, io
from collections import defaultdict
from typing import Annotated, List, Dict, Tuple
from haystack.tools import create_tool_from_function

def list_commits(
    repo: Annotated[
        str, 
        "The GitHub repository in 'owner/name' format, e.g. 'my-org/my-repo'"]
    ,
    branch: Annotated[
        str, 
        "The branch name to list commits from"] = "main"
) -> str:
    """
    Fetches the latest commits for a given GitHub repo and branch.
    Returns a formatted string of the 10 most recent commit SHAs and messages.
    """
    token = os.getenv("GITHUB_TOKEN")
    if not token:
        return "Error: GITHUB_TOKEN not set in environment."

    url = f"https://api.github.com/repos/{repo}/commits"
    headers = {"Authorization": f"token {token}"}
    params = {"sha": branch, "per_page": 10}
    resp = requests.get(url, headers=headers, params=params)
    resp.raise_for_status()
    commits = resp.json()

    lines = []
    for c in commits:
        sha = c["sha"][:7]
        msg = c["commit"]["message"].split("\n")[0]
        lines.append(f"- {sha}: {msg}")
    return "\n".join(lines)


git_list_commits_tool = create_tool_from_function(
    list_commits,
    name="git_list_commits",
    description="List the most recent commits for a GitHub repository and branch."
)


`git_get_diff_tool`: Retrieves the patch (diff) between two commits or branches in a GitHub repository.  
  Enables side-by-side inspection of code changes that may have triggered test failures.


In [None]:
def get_diff(
    repo: Annotated[
        str, 
        "The GitHub repository in 'owner/name' format, e.g. 'my-org/my-repo'"]
    ,
    base: Annotated[
        str, 
        "The base commit SHA or branch name"]
    ,
    head: Annotated[
        str, 
        "The head commit SHA or branch name"]
) -> str:
    """
    Fetches the diff between two commits (or branches) in a GitHub repo.
    Returns the patch text (up to the GitHub API's size limit).
    """
    token = os.getenv("GITHUB_TOKEN")
    if not token:
        return "Error: GITHUB_TOKEN not set in environment."

    url = f"https://api.github.com/repos/{repo}/compare/{base}...{head}"
    headers = {"Authorization": f"token {token}"}
    resp = requests.get(url, headers=headers)
    resp.raise_for_status()
    data = resp.json()
    return data.get("files", [])

git_diff_tool = create_tool_from_function(
    get_diff,
    name="git_get_diff",
    description="Get the diff (patch) between two commits or branches in a GitHub repository."
)

`ci_status_tool`: Checks the most recent GitHub Actions workflow runs for failures, downloads their logs, and extracts the first failed test name and error message. Automates root-cause identification by surfacing the exact test and error that caused a pipeline to fail.


In [19]:


def get_failed_ci_runs_categorized(
    repo: Annotated[
      str,
      "The GitHub repository in 'owner/name' format, e.g. 'my-org/my-repo'"
    ],
    per_page: Annotated[
      int,
      "How many of the most recent workflow runs to check (default 50)"
    ] = 50
) -> str:
    """
    Lists recent GitHub Actions workflow runs for a repo, finds failures,
    downloads their logs, and extracts all failures, grouped by suite (inferred
    from the log file path).
    """
    token = os.getenv("GITHUB_TOKEN")
    if not token:
        return "Error: GITHUB_TOKEN environment variable not set."
    
    headers = {"Authorization": f"token {token}"}
    params = {"per_page": per_page}
    runs_url = f"https://api.github.com/repos/{repo}/actions/runs"
    
    resp = requests.get(runs_url, headers=headers, params=params)
    resp.raise_for_status()
    runs = resp.json().get("workflow_runs", [])
    
    failed_runs = [r for r in runs if r.get("conclusion") == "failure"]
    if not failed_runs:
        return f"No failed runs in the last {per_page} workflow runs for `{repo}`."

    def extract_all_failures(logs_url: str) -> List[Tuple[str, str, str]]:
        """
        Download and scan logs ZIP for all failure markers.
        Returns a list of tuples: (suite, test_name, error_line).
        """
        r = requests.get(logs_url, headers=headers)
        r.raise_for_status()
        z = zipfile.ZipFile(io.BytesIO(r.content))
        failures = []
        for filepath in z.namelist():
            if not filepath.lower().endswith(('.txt', '.log')):
                continue
            suite = filepath.split('/', 1)[0]  # infer suite as top-level folder or file
            with z.open(filepath) as f:
                for raw in f:
                    try:
                        line = raw.decode('utf-8', errors='ignore').strip()
                    except:
                        continue
                    if any(marker in line for marker in ("FAIL", "ERROR", "Exception", "error")):
                        parts = line.split()
                        test_name = next(
                            (p for p in parts if '.' in p or p.endswith("()")), 
                            parts[0] if parts else ""
                        )
                        failures.append((suite, test_name, line))
        return failures

    output = [f"Found {len(failed_runs)} failed run(s):"]
    for run in failed_runs:
        run_id   = run["id"]
        branch   = run.get("head_branch")
        event    = run.get("event")
        created  = run.get("created_at")
        logs_url = run.get("logs_url")
        html_url = run.get("html_url")

        failures = extract_all_failures(logs_url)
        if not failures:
            detail = "No individual failures parsed from logs."
        else:
            # Group by suite
            by_suite: Dict[str, List[Tuple[str,str]]] = defaultdict(list)
            for suite, test, err in failures:
                by_suite[suite].append((test, err))
            lines = []
            for suite, items in by_suite.items():
                lines.append(f"  ▶ **Suite**: `{suite}`")
                for test, err in items:
                    lines.append(f"    - **{test}**: {err}")
            detail = "\n".join(lines)

        output.append(
            f"- **Run ID**: {run_id}\n"
            f"  **Branch**: {branch}\n"
            f"  **Event**: {event}\n"
            f"  **Created At**: {created}\n"
            f"  **Failures by Suite**:\n{detail}\n"
            f"  **Logs ZIP**: {logs_url}\n"
            f"  **Run URL**: {html_url}\n"
        )

    return "\n\n".join(output)


# Wrap it as a Haystack Tool
ci_status_tool = create_tool_from_function(
    get_failed_ci_runs_categorized,
    name="ci_status_tool",
    description=(
        "Check the most recent GitHub Actions workflow runs for a given repository, "
        "list any that failed, download their logs, and extract all failures, "
        "grouped by test suite (inferred from log file paths)."
    )
)


`shell_tool`: Executes a local shell command with a configurable timeout, capturing stdout or returning detailed error output. Great for grepping, filtering, or tailing CI log files before passing only the relevant snippets to the LLM.


In [6]:
import subprocess
def run_shell_command(
    command: Annotated[
        str,
        "The shell command to execute, e.g. 'grep -E \"ERROR|Exception\" build.log'"
    ],
    timeout: Annotated[
        int,
        "Maximum time in seconds to allow the command to run"
    ] = 30
) -> str:
    """
    Executes a shell command with a timeout and returns stdout or an error message.
    """
    try:
        output = subprocess.check_output(
            command,
            shell=True,
            stderr=subprocess.STDOUT,
            timeout=timeout,
            universal_newlines=True
        )
        return output
    except subprocess.CalledProcessError as e:
        return f"❌ Command failed (exit code {e.returncode}):\n{e.output}"
    except subprocess.TimeoutExpired:
        return f"❌ Command timed out after {timeout} seconds."
    except Exception as e:
        return f"❌ Unexpected error: {str(e)}"

shell_tool = create_tool_from_function(
    run_shell_command,
    name="shell_tool",
    description=(
        "Execute a shell command (e.g., grep, awk) in a sandboxed environment with a timeout; "
        "returns the command’s stdout or a detailed error message."
    )
)


`human_in_loop_tool`: Prompts the user with a clarifying question via `input()` when the Agent encounters ambiguity or needs additional information. Ensures the Agent only interrupts for human feedback when strictly necessary.

In [7]:
def human_in_loop(
    question: Annotated[
        str,
        "A clarifying question to prompt the user for more information"
    ]
) -> str:
    """
    Prompts the user with the given question using Python's input() and returns their response.
    """
    user_message = input(f"[Agent needs your input] {question}\n> ")
    return user_message

# Wrap it as a Haystack Tool
human_in_loop_tool = create_tool_from_function(
    human_in_loop,
    name="human_in_loop",
    description="Ask the user a clarifying question and return their response via input()."
)


# Create and Configure the Agent
Create a Haystack Agent instance and configure its behavior with a system prompt and tools.

In [9]:
from haystack.components.agents import Agent
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage

agent = Agent(
    chat_generator=OpenAIChatGenerator(),
    tools=[git_list_commits_tool, git_diff_tool, ci_status_tool, shell_tool, human_in_loop_tool],
    system_prompt=(
        "You are a DevOps support assistant with the following capabilities:\n"
        "1. Fetch commits and diffs from GitHub\n"
        "2. Check CI failures\n"
        "3. Analyze logs\n"
        "4. Ask users for clarification\n\n"
        "IMPORTANT: Whenever you are unsure about any details or need clarification, "
        "you MUST use the human_in_loop tool to ask the user questions. "
        "For example, if the user's request is vague or missing crucial information, "
        "use human_in_loop to get the necessary details."
    ),
)

# Run the Agent
agent.warm_up()
#response = agent.run(messages=[ChatMessage.from_user("Did any CI fail deepset-ai/haystack repo?")])


In [25]:
message=[ChatMessage.from_user("For the given repo, check for any failed CI workflows. Then debug and analyze the workflow by finding the root cause of the failure. Find the commit that caused the failure and list the changes in the commit. Ask the user for more information if needed. Call human in loop at the end")]
result = agent.run(messages=message)

In [21]:
print(result["messages"][-1].text)

I've identified the failed CI workflows for the repository `deepset-ai/haystack-experimental`. Here are the details:

### Failed Workflow Runs
1. **Run ID**: **14824297966**
   - **Branch**: `fix-init-params-in-state`
   - **Created At**: 2025-05-04
   - **Failures**:
     - `Integration windows-latest`: Multiple failed tests due to `PipelineRuntimeError` with components.
     - `Integration macos-latest`: Similar to the previous suite.
     - `0_linting.txt`: No errors reported.
   
   - **Log Summary**:
     - [Logs Zip](https://api.github.com/repos/deepset-ai/haystack-experimental/actions/runs/14824297966/logs)
     - [Workflow URL](https://github.com/deepset-ai/haystack-experimental/actions/runs/14824297966)

2. **Run ID**: **14824014070**
   - **Branch**: `fix-init-params-in-state`
   - **Created At**: 2025-05-04
   - **Failures**:
     - `Integration windows-latest`: Same issues as above with different tests failing due to serialization problems.
     - The tests seem to fail due

--- Trying Alternate Prompt ----

In [None]:

agent = Agent(
    chat_generator=OpenAIChatGenerator(),
    tools=[git_list_commits_tool, git_diff_tool, ci_status_tool, shell_tool, human_in_loop_tool],
    system_prompt=(
        "You are DevOpsGPT, a specialized support assistant for diagnosing and fixing CI/CD failures.\n\n"
        "Capabilities & Tools:\n"
        "- git_list_commits: list recent commits for a repo/branch\n"
        "- git_get_diff: show code changes between two commits/branches\n"
        "- ci_status_tool: detect failed GitHub Actions runs and extract the first failed test & error\n"
        "- shell_tool: run shell commands (e.g., grep logs) with timeout and safe sandboxing\n"
        "- human_in_loop: prompt the user for clarification when you lack critical details\n\n"
        "Behavior Guidelines:\n"
        "- Tool-First: Always attempt to resolve the user's request by chaining tools.\n"
        "- Concise Reasoning: Before calling each tool, think (briefly) about why it's needed—then call it.\n"
        "- Minimal Interruptions: Only use human_in_loop if:\n"
        "  * Required parameters are missing or ambiguous (e.g. repo name, run ID, branch)\n"
        "  * A tool returns an unexpected error that needs human insight\n"
        "- Structured Outputs: After your final tool call, summarize findings in Markdown:\n"
        "  * Run ID, Branch, Test, Error\n"
        "  * Next Steps (e.g., \"Add null-check to line 45\", \"Rerun tests after env fix\")\n"
        "- Error Handling: If a tool fails (e.g. 404, timeout), surface the error and decide whether to retry, choose another tool, or ask for clarification.\n"
        "- Respect Rate Limits: Don't over-fetch—use per_page judiciously (default ≤ 20).\n\n"
        "Exit Condition:\n"
        "Once you've provided a complete, actionable summary and next steps, stop—do not call any more tools."
    ),
)

# Run the Agent
agent.warm_up()
#response = agent.run(messages=[ChatMessage.from_user("Did any CI fail deepset-ai/haystack repo?")])


In [None]:
message=[ChatMessage.from_user("For the given repo, check for any failed CI workflows. Then debug and analyze the workflow by finding the root cause of the failure. Find the commit that caused the failure and list the changes in the commit. Ask the user for more information if needed. Call human in loop at the end")]
result = agent.run(messages=message)

In [None]:
print(result["messages"][-1].text)