# Custom Agent with Termination Conditions

In [18]:
import asyncio
import json
import os
from pathlib import Path
from dotenv import load_dotenv
from IPython.display import display, Markdown

from typing import AsyncIterable, Any, Optional, Callable
from semantic_kernel.agents import ChatCompletionAgent, AgentResponseItem, ChatHistoryAgentThread
from semantic_kernel.contents import ChatMessageContent
from semantic_kernel.contents.streaming_chat_message_content import StreamingChatMessageContent
from semantic_kernel.contents.utils.author_role import AuthorRole
from semantic_kernel.agents.strategies.termination.termination_strategy import TerminationStrategy
from semantic_kernel.functions import KernelArguments
from semantic_kernel.connectors.ai.function_choice_behavior import FunctionChoiceBehavior
from semantic_kernel.contents import ChatMessageContent, FunctionCallContent, FunctionResultContent
# Reducers for managing chat history size and tokens
from semantic_kernel.contents import ChatHistorySummarizationReducer, ChatHistoryTruncationReducer

from semantic_kernel.functions import KernelArguments
from semantic_kernel.connectors.ai.open_ai import (
    AzureChatCompletion,
    AzureChatPromptExecutionSettings
)
from semantic_kernel.functions import KernelArguments
from semantic_kernel.agents import (
    GroupChatOrchestration, 
    RoundRobinGroupChatManager,
    ConcurrentOrchestration,
    SequentialOrchestration,
    HandoffOrchestration,
    OrchestrationHandoffs
)
from semantic_kernel.agents.runtime import InProcessRuntime
from semantic_kernel.agents.orchestration.tools import structured_outputs_transform
from semantic_kernel.agents.orchestration.group_chat import BooleanResult, GroupChatManager, MessageResult, StringResult
from semantic_kernel.connectors.ai.prompt_execution_settings import PromptExecutionSettings
from semantic_kernel.prompt_template import KernelPromptTemplate, PromptTemplateConfig
from typing_extensions import override


import sys
sys.path.append("..")

# Import FileSystemPlugin
from plugins.file_system import FileSystemPlugin

# Load environment variables
load_dotenv()

print("✅ All imports loaded successfully!")

✅ All imports loaded successfully!


## Standard Code like other notebooks

In [4]:
# Configure reasoning model - try Azure OpenAI first, then OpenAI
reasoning_completion = None
provider_name = None

if os.getenv("AZURE_REASONING_ENDPOINT"):
    print("🔵 Configuring Azure OpenAI o4-mini...")
    reasoning_completion = AzureChatCompletion(
        api_key=os.getenv("AZURE_REASONING_API_KEY"),
        endpoint=os.getenv("AZURE_REASONING_ENDPOINT"),
        deployment_name="o4-mini",  # o4-mini deployment
        instruction_role="developer",  # Required for o4 models
        service_id="reasoning"
    )
    
    chat_completion = AzureChatCompletion(
        api_key=os.getenv("AZURE_OPENAI_API_KEY"),
        endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
        deployment_name=os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME"),
    )

    print("✅ Chat completion services configured!")
    
    
    provider_name = "Azure OpenAI"
        
else:
    raise ValueError("❌ No reasoning model configured. Please set either AZURE_REASONING_* or OPENAI_API_KEY environment variables.")

print(f"✅ {provider_name} o4-mini reasoning model configured!")

🔵 Configuring Azure OpenAI o4-mini...
✅ Chat completion services configured!
✅ Azure OpenAI o4-mini reasoning model configured!


In [5]:
# Initialize FileSystemPlugin with consult/ as base directory
consult_path = Path("../consult").resolve()
print(f"📁 Setting FileSystemPlugin base path to: {consult_path}")

file_system_plugin = FileSystemPlugin(base_path=str(consult_path))

# Verify the directory exists
if not consult_path.exists():
    raise ValueError(f"❌ Directory {consult_path} does not exist!")
    
print(f"✅ FileSystemPlugin initialized with base path: {consult_path}")

📁 Setting FileSystemPlugin base path to: /home/agangwal/lseg-migration-agent/migration-agent/consult
✅ FileSystemPlugin initialized with base path: /home/agangwal/lseg-migration-agent/migration-agent/consult


In [6]:
MESSAGES = []
async def agent_response_callback(message: ChatMessageContent) -> None:
    """Display agent responses with function call details."""
    print(f"\n{'='*60}")
    print(f"📝 {message.name}: {message.role}")
    print(f"{'='*60}")
    
    MESSAGES.append(message.model_dump())
    
    # Display message content
    if message.content:
        print(f"\n💭 AGENT REASONING:")
        print(message.content)
    
    # Display function calls and results
    for item in message.items or []:
        if isinstance(item, FunctionCallContent):
            print(f"\n🔧 FUNCTION CALL: {item.name}")
            print(f"📥 Arguments: {json.dumps(item.arguments, indent=2)}")
            
        elif isinstance(item, FunctionResultContent):
            print(f"\n📤 FUNCTION RESULT:")
            try:
                # Try to parse and prettify JSON result
                result_data = json.loads(item.result) if isinstance(item.result, str) else item.result
                print(json.dumps(result_data, indent=2))
            except (json.JSONDecodeError, TypeError):
                # If not JSON, display as string
                print(str(item.result))


## Looping Agent 

In [7]:
class KeywordTermination(TerminationStrategy):
    """Simple termination: stop if last assistant message contains keyword."""
    keyword: str = "TERMINATE"
    async def should_agent_terminate(self, agent, history: list[ChatMessageContent]) -> bool:  # type: ignore[override]
        for msg in reversed(history):
            if msg.role == AuthorRole.ASSISTANT and msg.content and self.keyword.lower() in msg.content.lower():
                return True
        return False


In [16]:
class LoopingChatCompletionAgent(ChatCompletionAgent):
    """ChatCompletionAgent that self-loops until termination or max rounds.

    Key points:
      - Uses super().invoke / super().invoke_stream each round (keeps tools & function choice).
      - Reuses the SAME thread object between rounds so tool call context persists.
      - Optionally persists the thread across SEPARATE external invocations of this agent
        (set persist_across_invocations=True) so you can call invoke() multiple times and
        maintain the conversation state without manually passing thread.
      - Can optionally reduce chat history between rounds when the thread uses a ChatHistoryReducer
        (e.g., ChatHistorySummarizationReducer or ChatHistoryTruncationReducer).
    """
    def __init__(
        self,
        *,
        termination_strategy: TerminationStrategy | None = None,
        max_rounds: int = 12,
        verbose_round_logs: bool = True,
        persist_across_invocations: bool = True,
        reduce_between_rounds: bool = True,
        **base_kwargs: Any,
    ) -> None:
        super().__init__(**base_kwargs)
        self._termination_strategy = termination_strategy or KeywordTermination(maximum_iterations=max_rounds)
        self._max_rounds = max_rounds
        self._verbose = verbose_round_logs
        self._persist_across_invocations = persist_across_invocations
        self._reduce_between_rounds = reduce_between_rounds
        self._persistent_thread: ChatHistoryAgentThread | None = None

    def reset_thread(self) -> None:
        """Forget persisted conversation (start fresh next call)."""
        self._persistent_thread = None

    async def invoke(
        self,
        messages: str | ChatMessageContent | list[str | ChatMessageContent] | None = None,
        *,
        thread: ChatHistoryAgentThread | None = None,
        on_intermediate_message: Optional[Callable[[ChatMessageContent], Any]] = None,
        arguments: KernelArguments | None = None,
        kernel: "Kernel | None" = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentResponseItem[ChatMessageContent]]:
        # Resolve thread: explicit > persisted > None (let base create)
        active_thread = thread or (self._persistent_thread if self._persist_across_invocations else None)
        seeded = False
        for round_idx in range(self._max_rounds):
            last_assistant: ChatMessageContent | None = None
            base_iter = super().invoke(
                messages=messages if not seeded else None,
                thread=active_thread,
                on_intermediate_message=on_intermediate_message,
                arguments=arguments,
                kernel=kernel,
                **kwargs,
            )
            async for item in base_iter:
                active_thread = item.thread  # capture created thread from first round
                if item.message.role == AuthorRole.ASSISTANT:
                    last_assistant = item.message
                yield item
            seeded = True

            if self._verbose and last_assistant is not None:
                print(f"[Loop Round {round_idx}])")

            # Collect full history for termination check
            if active_thread is not None:
                full_history = [m async for m in active_thread.get_messages()]
            else:
                full_history = []

            if await self._termination_strategy.should_terminate(self, full_history):
                if self._verbose:
                    print(f"🔚 Termination condition met at round {round_idx}.")
                break
            
            # Optional: reduce chat history to manage tokens (summarize/truncate)
            if self._reduce_between_rounds and active_thread is not None:
                try:
                    await active_thread.reduce()
                except Exception as e:
                    if self._verbose:
                        print(f"⚠️ History reduction skipped due to error: {e}")

        if self._persist_across_invocations:
            self._persistent_thread = active_thread

    # Invoke stream not tested yet - but *should* work
    async def invoke_stream(
        self,
        messages: str | ChatMessageContent | list[str | ChatMessageContent] | None = None,
        *,
        thread: ChatHistoryAgentThread | None = None,
        on_intermediate_message: Optional[Callable[[ChatMessageContent], Any]] = None,
        arguments: KernelArguments | None = None,
        kernel: "Kernel | None" = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentResponseItem[StreamingChatMessageContent]]:
        active_thread = thread or (self._persistent_thread if self._persist_across_invocations else None)
        seeded = False
        for round_idx in range(1, self._max_rounds + 1):
            assistant_accum: list[str] = []
            base_iter = super().invoke_stream(
                messages=messages if not seeded else None,
                thread=active_thread,
                on_intermediate_message=on_intermediate_message,
                arguments=arguments,
                kernel=kernel,
                **kwargs,
            )
            async for item in base_iter:
                active_thread = item.thread
                if item.message.role == AuthorRole.ASSISTANT and item.message.content:
                    assistant_accum.append(item.message.content)
                yield item
            seeded = True

            if active_thread is not None:
                full_history = [m async for m in active_thread.get_messages()]
            else:
                full_history = []

            if await self._termination_strategy.should_terminate(self, full_history):
                if self._verbose:
                    snippet = "".join(assistant_accum)[:160]
                    print(f"🔚 (Streaming) Termination at round {round_idx}: {snippet}{'...' if len(''.join(assistant_accum)) > 160 else ''}")
                break

            # Optional: reduce chat history to manage tokens
            if self._reduce_between_rounds and active_thread is not None:
                try:
                    await active_thread.reduce()
                except Exception as e:
                    if self._verbose:
                        print(f"⚠️ History reduction skipped due to error: {e}")

        if self._persist_across_invocations:
            self._persistent_thread = active_thread


In [20]:
function_choice = FunctionChoiceBehavior.Auto(
    filters={"excluded_functions": ["AnalysisPlugin-search_in_files"]} # This examples show exclusion
)

# Configure a summarization reducer with custom prompt and options
custom_summary_prompt = (
    "Summarize earlier dialog. Include ALL relevant information that is needed to understand the work that has been done till now. The focus in code exploration, include technical details "
    "Do not invent details. Be thorough and carefully create a comprehensive and complete summary."
)

summary_execution_settings = AzureChatPromptExecutionSettings(
    max_completion_tokens=100_1000,
    # temperature=0.2,
)

history_reducer = ChatHistorySummarizationReducer(
    service=reasoning_completion if 'reasoning_completion' in globals() and reasoning_completion else chat_completion,
    target_count=10,              # aim to keep ~10 recent messages
    threshold_count=2,            # small buffer to avoid cutting pairs
    auto_reduce=False,            # we'll trigger reduction between rounds
    summarization_instructions=custom_summary_prompt,
    use_single_summary=True,      # keep a single rolling summary message
    fail_on_error=False,          # don't break loop if summarization fails
    include_function_content_in_summary=True,  # skip raw tool call payloads
    execution_settings=summary_execution_settings,
)

looping_agent = LoopingChatCompletionAgent(
    service=reasoning_completion if 'reasoning_completion' in globals() and reasoning_completion else chat_completion,
    name="LoopingAnalysisAgent",
    description="Self-looping analysis agent that keeps invoking itself until TERMINATE appears.",
    instructions=(
        "You will analyze the repository using available filesystem tools. "
        "Perform iterative exploration: list directories, inspect files, summarize. "
        "When you have produced a final structured markdown report, append the word TERMINATE."),
    # Test pluging
    plugins=[file_system_plugin],
    # Added as might be helpful. # Can set True and include logging right in our agent, but seems like a bad idea?
    verbose_round_logs=True, 
    # Termination strategies work. With Max Rounds
    termination_strategy=KeywordTermination(maximum_iterations=20, keyword="TERMINATE"),
    max_rounds=5,
    # Weather or not to persist thread internally.
    persist_across_invocations=True,
    reduce_between_rounds=True,
    # Function choice and arguments also work
    function_choice_behavior=function_choice,
    arguments=KernelArguments(
        settings=AzureChatPromptExecutionSettings(
            max_completion_tokens=100_000,
            reasoning_effort="high",
        )
    )
)

# Prime a thread that uses our reducer so the agent will carry it forward
initial_thread = ChatHistoryAgentThread(chat_history=history_reducer)


In [None]:
async for response in looping_agent.invoke(
    messages="Begin a concise analysis of the consult project. Use tools.",
    thread=initial_thread,
    on_intermediate_message=agent_response_callback
):
    print("===" * 10)
    print(f"✅ Final Response from {response.name}: {response.content}")
    print("===" * 10)


In [11]:
# Since we are using persist_across_invocations=True, we can call invoke() again
async for response in looping_agent.invoke(
    messages="Can you share this info in a tabular format?",
    thread=initial_thread,
    # on_intermediate_message=agent_response_callback,
):
    print(f"✅ Final Response from {response.name}: {response.content}")

✅ Final Response from LoopingAnalysisAgent: Here’s the same information organized into a Markdown table.  

| Category             | Summary                                                                                                                                                                                                                                                                                                                                                                                              |
|----------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Purpose              | An LLM-powered tool to automate pro

In [12]:
# Custom Termination Strategy
# This is where we will check memory tool output and based on that decide to stop or not!

class TwoLoopTerminationStrategy(TerminationStrategy):
    """Simple termination: Stop only after two calls."""
    call_count: int = 0
    async def should_agent_terminate(self, agent, history: list[ChatMessageContent]) -> bool:  # type: ignore[override]
        self.call_count += 1
        if self.call_count >= 2:
            return True
        return False


In [13]:
looping_agent = LoopingChatCompletionAgent(
    service=reasoning_completion if 'reasoning_completion' in globals() and reasoning_completion else chat_completion,
    name="LoopingAnalysisAgent",
    description="Self-looping analysis agent that keeps invoking itself until TERMINATE appears.",
    instructions=(
        "You will analyze the repository using available filesystem tools. "
        "Perform iterative exploration: list directories, inspect files, summarize. "
        "When you have produced a final structured markdown report, append the word TERMINATE."),
    plugins=[file_system_plugin],
    verbose_round_logs=True, # Can set True and include logging right in our agent, but seems like a bad idea?
    termination_strategy=TwoLoopTerminationStrategy(),
    max_rounds=5,
    persist_across_invocations=True,
    reduce_between_rounds=True,
    function_choice_behavior=function_choice, # Confirm function choice works
    arguments=KernelArguments(  # confirm arguments work.
        settings=AzureChatPromptExecutionSettings(
            max_completion_tokens=100_000,
            reasoning_effort="high",
        )
    )
)

# Reuse the same reducer-backed thread or create a fresh one
initial_thread = ChatHistoryAgentThread(chat_history=history_reducer)

async for response in looping_agent.invoke(
    messages="Begin a concise analysis of the consult project. Use tools.",
    thread=initial_thread,
    on_intermediate_message=agent_response_callback
):
    print("===" * 10)
    print(f"✅ Final Response from {response.name}: {response.content}")
    print(f"✅ Final ITEMS {response.items}\n METADATA: {response.metadata}")
    print()
    print("===" * 10)



📝 LoopingAnalysisAgent: AuthorRole.ASSISTANT

🔧 FUNCTION CALL: FileSystemPlugin-list_directory
📥 Arguments: "{\"path\":\".\",\"max_depth\":\"2\"}"

📝 LoopingAnalysisAgent: AuthorRole.TOOL

📤 FUNCTION RESULT:
{
  "success": true,
  "data": {
    "tree": "./ (17 files, 10 dirs)\n\u251c\u2500\u2500 consultation_analyser/ (12 files, 8 dirs)\n\u2502   \u251c\u2500\u2500 authentication/ (3 files, 1 dirs)\n\u2502   \u251c\u2500\u2500 consultations/ (7 files, 7 dirs)\n\u2502   \u251c\u2500\u2500 email/ (3 files, 1 dirs)\n\u2502   \u251c\u2500\u2500 error_pages/ (2 files, 1 dirs)\n\u2502   \u251c\u2500\u2500 lit/ (3 files, 2 dirs)\n\u2502   \u251c\u2500\u2500 settings/ (5 files)\n\u2502   \u251c\u2500\u2500 support_console/ (5 files, 3 dirs)\n\u2502   \u251c\u2500\u2500 templates/ (1 files)\n\u251c\u2500\u2500 docs/ (2 files, 1 dirs)\n\u2502   \u251c\u2500\u2500 architecture/ (0 files, 1 dirs)\n\u251c\u2500\u2500 frontend/ (5 files)\n\u251c\u2500\u2500 infrastructure/ (17 files, 2 dirs)\n\u250

## Using in Orchestration Patterns

In [14]:
def get_agents(focus_list):
    agent_list = []
    for focus in focus_list:
        agent_list.append(
            LoopingChatCompletionAgent(
                service=reasoning_completion if 'reasoning_completion' in globals() and reasoning_completion else chat_completion,
                name=f"LoopingAnalysisAgent-{focus.replace(' ', '_')}", # Unique names are important.
                description="Self-looping analysis agent that keeps invoking itself until TERMINATE appears.",
                instructions=(
                    "You will analyze the repository using available filesystem tools. "
                    "Perform iterative exploration: list directories, inspect files, summarize. "
                    "When you have produced a final structured markdown report, append the word TERMINATE."
                    f"Your MAIN FOCUS IS {focus}."),
                plugins=[file_system_plugin],
                verbose_round_logs=True, # Can set True and include logging right in our agent, but seems like a bad idea?
                termination_strategy=KeywordTermination(maximum_iterations=20, keyword="TERMINATE"),
                max_rounds=5,
                persist_across_invocations=True,
                function_choice_behavior=function_choice, # Confirm function choice works
                arguments=KernelArguments(  # confirm arguments work.
                    settings=AzureChatPromptExecutionSettings(
                        max_completion_tokens=100_000,
                        reasoning_effort="high",
                    )
                )
            )
        )
    return agent_list

focus_list = ["Infrastructure as code", "Microservices architecture", "Serverless computing"]

agents = get_agents(focus_list)


In [15]:
concurrent_orchestration = ConcurrentOrchestration(
    members=agents,
    # Uncomment to see that it works.
    # Just to show its possible. Ofc currently doesn't differentiate b/w the multiple agents
    # However, its should be possible to differentiate based on agent name!
    # agent_response_callback=agent_response_callback
)

runtime = InProcessRuntime()
runtime.start()

orchestration_result = await concurrent_orchestration.invoke(
        task="Carry out the analysis based on your assigned focus.",
        runtime=runtime,
    )
    
results = await orchestration_result.get(timeout=600)

🔚 (Streaming) Termination at round 4: # Consultation Analyser: Serverless Architecture Analysis

## 1. Project Overview  
- A consultation analysis platform combining a Python/Django-style monolith ...


TimeoutError: 

In [None]:
results[0].name, results[1].name, results[2].name

('LoopingAnalysisAgent-Serverless_computing',
 'LoopingAnalysisAgent-Infrastructure_as_code',
 'LoopingAnalysisAgent-Microservices_architecture')