In [1]:
from semantic_kernel import Kernel
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
from semantic_kernel.connectors.ai.open_ai import AzureChatPromptExecutionSettings
import os
from langsmith import traceable
import json
import asyncio
from dotenv import load_dotenv
load_dotenv()

True

In [2]:
AZURE_OPENAI_API_KEY=os.environ["AZURE_OPENAI_API_KEY"]
AZURE_OPENAI_ENDPOINT=os.environ["AZURE_OPENAI_ENDPOINT"]
AZURE_OPENAI_API_VERSION=os.environ["AZURE_OPENAI_API_VERSION"]

In [3]:
kernel = Kernel()

service = AzureChatCompletion(
    deployment_name="gpt-4o-mini",
    endpoint=AZURE_OPENAI_ENDPOINT,
    api_key=AZURE_OPENAI_API_KEY,
    api_version=AZURE_OPENAI_API_VERSION
)

kernel.add_service(service)

In [4]:
from pydantic import BaseModel, Field
from typing import List

class Plan(BaseModel):
    goal: str = Field(description="User goal")
    steps: List[str] = Field(description="Ordered steps to solve goal")


In [15]:
planner_prompt = """
You are a planning agent.

Break the user's goal into clear ordered steps. Steps should not be more than 3.

Goal: {{$input}}

Return JSON:
- goal
- steps
"""

request_settings_planner = AzureChatPromptExecutionSettings(temperature=0,max_tokens=1000,
                                                           response_format=Plan)
planner_function = kernel.add_function(
    plugin_name="planner",
    function_name="create_plan",
    prompt=planner_prompt,
    prompt_execution_settings=request_settings_planner
)

@traceable(name="Planning Step")
async def run_planner(user_goal: str):
    result = await kernel.invoke(planner_function, input=user_goal)
    return Plan.model_validate(json.loads(str(result.value[0].content)))

In [16]:
executor_prompt = """
Execute the following step in context of the overall goal.

Goal: {{$goal}}
Current Step: {{$step}}

Provide result for this step within 400 words
"""
request_settings_executor = AzureChatPromptExecutionSettings(temperature=0,max_tokens=2000,
                                                           response_format=Plan)
executor_function = kernel.add_function(
    plugin_name="executor", function_name="execute_step", 
    prompt=executor_prompt, prompt_execution_settings=request_settings_executor

)

@traceable(name="Execution Step")
async def run_executor(goal, step):
    return await kernel.invoke(
        executor_function,
        goal=goal,
        step=step
    )

In [17]:
@traceable(name="Planning Agent")
async def planning_agent(user_goal):

    # Step 1: Create Plan
    plan = await run_planner(user_goal)
    print("\n[Plan Created]")
    for i, s in enumerate(plan.steps, 1):
        print(f"{i}. {s}")

    results = []

    # Step 2: Execute Steps Sequentially
    for step in plan.steps:
        step_result = await run_executor(plan.goal, step)
        results.append(str(step_result))

    # Step 3: Combine Results
    final_answer = "\n".join(results)
    return final_answer


In [18]:
answer = await planning_agent(
    "Explain how to design a production-grade RAG system"
)


[Plan Created]
1. Define the requirements and objectives of the RAG system, including data sources, user needs, and performance metrics.
2. Select appropriate technologies and frameworks for building the RAG system, considering scalability, reliability, and integration capabilities.
3. Implement the system architecture, focusing on data processing, retrieval mechanisms, and user interface, followed by thorough testing and optimization.


In [19]:
print(answer)

{"goal":"Explain how to design a production-grade RAG system","steps":["Identify the primary objectives of the RAG system, such as improving information retrieval, enhancing user experience, or automating responses.","Determine the key data sources that will feed into the RAG system, including structured databases, unstructured documents, APIs, and external knowledge bases.","Understand user needs by conducting surveys or interviews to gather insights on what users expect from the RAG system, including types of queries and preferred response formats.","Establish performance metrics to evaluate the effectiveness of the RAG system, such as response accuracy, retrieval speed, user satisfaction, and system reliability."]}
{"goal":"Explain how to design a production-grade RAG system","steps":["Identify the core components of a RAG system (Retrieval-Augmented Generation)","Research and evaluate various technologies and frameworks suitable for each component","Consider scalability options for