### Home Network Assistant
---

In this notebook, we will create our first crew agent. This crew agent is the Home Network Agent. This agent will have access to a knowledge base for retrieval at runtime which will contain API specs on the home networking system. In this notebook, we will create a Knowledge base for our Home Network Agent. This agent will have access to a knowledge base for retrieval at runtime which will contain API specs on the home networking system. 

CrewAI enables you to create AI teams where each agent has specific roles, tools, and goals, working together to accomplish complex tasks.

Think of it as assembling your dream team - each member (agent) brings unique skills and expertise, collaborating seamlessly to achieve your objectives. For more information on CrewAI, view [here](https://docs.crewai.com/introduction)

**Agent code generation and execution workflow**:

1. The workflow starts with information retrieval. We create a knowledge base, and store the information from the `Home Network openAPI spec` into the knowledge base. The OpenAPI spec is provided by the user in the `data` folder.

1. This knowledge base will be wrapped within a lambda function that will be invoked based on the user query. It will `retrieve` the top `k` results from the knowledge base and send it as an input to the next step.

1. Next, a `Home Networking` assistant agent will have access to the required API specs to use to generate code. It will use the information from the Knowledge base and generate code for the given API spec, save the code and execute the code based on the parameters provided by the user.

### Creating Agent

On this section we declare global variables that will be act as helpers during entire notebook and you will start to create your first agent.

In [None]:
# Install crew ai. For installation steps, follow the instructions here: https://docs.crewai.com/installation
!pip install 'crewai[tools]'

In [2]:
import IPython

# IPython.Application.instance().kernel.do_shutdown(True)

In [None]:
# Check your boto3 version
!pip freeze | grep boto3

In [None]:
# import the required packages and libraries
import os
import sys
import boto3
import logging
from typing import Optional
from dotenv import load_dotenv
# Get the current file's directory
current_dir = os.path.dirname(os.path.abspath('__file__'))
# Get the parent directory
parent_dir = os.path.dirname(current_dir)
print(parent_dir)
# Add the parent directory to sys.path
sys.path.append(parent_dir)
from globals import *

In [None]:
# Load the environment variables that are defined in the ".env" file.
load_dotenv

In [6]:
# set a logger
logging.basicConfig(format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
# configure the sts client, the boto3 session and other variables
sts_client = boto3.client('sts')
session = boto3.session.Session()

account_id = sts_client.get_caller_identity()["Account"]
region = "us-east-1" if session.region_name is None else session.region_name
account_id_suffix = account_id[:3]
agent_suffix = f"{region}-{account_id_suffix}"

s3_client = boto3.client('s3', region)
bedrock_client = boto3.client('bedrock-runtime', region)

In [8]:
import sys
sys.path.insert(0, ".")
sys.path.insert(1, "..")
# Import utility functions and helper functions for agents
from utils.utils import *

### Load the config file
--- 

Load the config file that contains information on the models, data directories, etc.

In [None]:
# Get the absolute path to the config file
# This config file contains data about the directory paths, the API specs that
# are used to generate the code, and the agent foundation models that are used to generate the code.
BASE_DIR = os.path.abspath(sys.path[1])
CONFIG_FPATH = os.path.join(BASE_DIR, CONFIG_FNAME)
config_data = load_config(CONFIG_FPATH)
logger.info(f"Loaded config from local file system: {json.dumps(config_data, indent=2)}")

## Code Generation Prompt: Bedrock prompt management
---

Let's create our sample code generation prompt by leveraging on Prompt Management for Amazon Bedrock. This is the prompt that is used by the sub agent to generate the code when it calls the generate_code tool.

In [None]:
def read_prompt_from_file(file_path: str) -> str:
    with open(file_path, 'r') as file:
        return file.read().strip()

prompt_file_path = os.path.join(
    config_data['dir_paths']['code_gen_prompts_prefix'],
    config_data['dir_paths']['code_gen_prompts'].get('doorbell_code_generation_prompt')
)

absolute_prompt_fpath = os.path.join(
    parent_dir,
    prompt_file_path
)

prompt_template_code_gen: str = read_prompt_from_file(absolute_prompt_fpath)
print(f"Code generation prompt that will be saved in prompt management within Bedrock: {prompt_template_code_gen}")

In [None]:
bedrock_agent = boto3.client(service_name = "bedrock-agent", region_name = region)
response = bedrock_agent.create_prompt(
    name = f"prompt-for-doorbell-configuration-code-gen",
    description = "Code generation prompt template that is used by the doorbell configuration agent to generate code",
    variants = [
        {
            "name": "variantOne",
            "templateConfiguration": {
                "text": {
                    "inputVariables": [
                        {
                            "name": "input"
                        },
                        {
                            "name": "output"
                        }
                    ],
                    "text": prompt_template_code_gen
                }
            },
            "templateType": "TEXT"
        }
    ],
    defaultVariant = "variantOne"
)

print(json.dumps(response, indent=2, default=str))
promptId = response["id"]
promptArn = response["arn"]
promptName = response["name"]
print(f"Prompt ID: {promptId}\nPrompt ARN: {promptArn}\nPrompt Name: {promptName}")

In [None]:
# Now that we have a draft prompt, we can create a version from it.
response = bedrock_agent.create_prompt_version(
    promptIdentifier = promptId
)
print(json.dumps(response, indent=2, default=str))

## JSON Knowledge source
---

In this section of the solution we will be creating our Crew Agent, initialize the instruction for the home networking system, and then provide it with a `knowledge source`. Knowledge in CrewAI is a powerful system that allows AI agents to access and utilize external information sources during their tasks. Think of it as giving your agents a reference library they can consult while working. 

We will be creating a `JSON Knowledge Source` that will store embeddings from the `JSON` API specs that we will provide.

In [None]:
api_specs: Dict = config_data['dir_paths'].get('api_specs')
logger.info(f"API specs that are provided are as follows: {api_specs}")

In [None]:
!pip install --upgrade pip
!pip install -U langchain_aws

In [15]:
from langchain.schema import Document
from langchain.embeddings import BedrockEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_text_splitters import CharacterTextSplitter

In [None]:
source_path = os.path.join('..', config_data['dir_paths']['data_prefix'], api_specs.get('doorbell_api_spec'))

with open(source_path, "r", encoding="utf-8") as f:
    file_text = f.read()
    
bedrock_client = boto3.client(service_name='bedrock-runtime', 
                              region_name='us-east-1')
bedrock_embeddings = BedrockEmbeddings(model_id=config_data['model_information']['embedding_model'],
                                       client=bedrock_client)

In [17]:
documents = [Document(page_content=file_text, metadata={})]

In [None]:
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(documents)

db = FAISS.from_documents(docs, bedrock_embeddings)

In [None]:
retriever_topk = db.as_retriever(search_kwargs={"k": 1})
docs_topk = retriever_topk.invoke('What is the signal strength of my porch camera?')
print("\nTop k (k=1) retrieval results:")
for d in docs_topk:
    print(d.page_content)

### Define tools that the CrewAI agent will have access to
---

In this portion of the notebook, we create different tools that the CrewAI agent will have access to. This includes a tool to query from the knowledge base, generate code based on the user query and the API to call, save the code and finally execute the code to provide the final result to the user.

In [46]:
import os
import sys
import ast
import json
import time
import boto3
import random
import logging
import tempfile
import subprocess
from crewai.tools import tool
from typing import List, Dict, Any
from crewai import LLM, Agent, Task, Crew

# Information on the code generation model
code_gen_data = config_data['code_generation_model_information']

def get_prompt_template(prompt_id: str) -> str:
    """
    Retrieve a prompt template from Bedrock prompt management.
    """
    try:
        client = boto3.client("bedrock-agent", region_name=os.environ.get("REGION", "us-east-1"))
        result = client.get_prompt(promptIdentifier=prompt_id)
        template = result["variants"][0]["templateConfiguration"]["text"]["text"]
        return template
    except Exception as e:
        logger.error(f"Error getting prompt template: {e}")
        raise

def call_bedrock(model_id: str, messages: list, temp: float, max_tokens: int, top_p: float) -> str:
    """
    Invoke Bedrock's converse API with the provided parameters.
    """
    try:
        client = boto3.client("bedrock-runtime")
        inference_config = {
            "temperature": temp,
            "maxTokens": max_tokens,
            "topP": top_p
        }
        start = time.time()
        response = client.converse(
            modelId=model_id,
            messages=messages,
            system=[{"text": "You are an expert in generating Python code for home networking APIs."}],
            inferenceConfig=inference_config
        )
        elapsed = time.time() - start
        logger.info(f"Bedrock response in {elapsed:.2f} sec")
        # Extract the generated code.
        code = response["output"]["message"]["content"][0]["text"]
        return code
    except Exception as e:
        logger.error(f"Error calling Bedrock converse: {e}")
        raise

# ─── TOOL 0: QUERY CHUNKS FROM THE IN MEMORY JSON KNOWLEDGE ───────────────────
@tool("knowledge_query_tool")
def knowledge_query_tool(query: str) -> List[Dict[str, str]]:
    """
    Searches the vector store knowledge base for relevant information based on the user's query.
    It uses a top-k retriever (with k=1) and returns a list of chunks, where each chunk is
    a dictionary with keys 'text' (the document content) and 'metadata' (optional metadata).
    """
    try:
        # Create a top-k retriever from the FAISS vector store (assumed to be defined as `db`)
        retriever_topk = db.as_retriever(search_kwargs={"k": 1})
        # Invoke the retriever with the query
        docs_topk = retriever_topk.invoke(query)
        results = [
            {"text": doc.page_content, "metadata": json.dumps(getattr(doc, "metadata", {}))}
            for doc in docs_topk
        ]
        return results
    except Exception as e:
        logger.error(f"Error querying knowledge source: {e}")
        return []


# ─── TOOL 1: CODE GENERATION VIA BEDROCK ──────────────────────────────────────
@tool("bedrock_code_generation_tool")
def bedrock_code_generation_tool(query: str, knowledge_chunks: List[Dict[str, Any]]) -> Dict[str, str]:
    """
    Generates Python code using Bedrock, based on the user's query and knowledge chunks.
    Returns a dictionary with code content that is used by the next save_code_tool tool
    """
    try:
        prompt_id = promptId
        template = get_prompt_template(prompt_id)
        # Combine knowledge chunks into a single string
        knowledge_content = "\n".join(chunk['text'] for chunk in knowledge_chunks)
        formatted_prompt = template.format(
            user_query=query,
            knowledge_content=knowledge_content,
            auth_token=os.getenv("HOME_NETWORK_AUTH_TOKEN")
        )
        messages = [{"role": "user", "content": [{"text": formatted_prompt}]}]
        # Get Bedrock model and inference parameters from environment variables.
        model = code_gen_data.get('code_generation_model')
        temperature = code_gen_data.get('temperature', 0.1)
        top_p = code_gen_data.get('top_p', 0.9)
        max_tokens = code_gen_data.get('max_tokens', 4096)
        code_output = call_bedrock(model, messages, temperature, max_tokens, top_p)
        # Wrap the generated code in a dictionary with the expected key.
        return {"code_content": str(code_output)}
    except Exception as err:
        logger.error(f"bedrock_code_generation_tool error: {err}")
        return {"code_content": f"Error generating code: {err}"}



# ─── TOOL 2: SAVE GENERATED CODE ─────────────────────────────────────────────
@tool("save_code_tool")
def save_code_tool(code_content: Dict) -> str:
    """
    Saves the provided code content to a temporary file and returns the file path.
    """
    try:
        base_dir = "generated_code"
        code_content: str = code_content["code_content"]
        os.makedirs(base_dir, exist_ok=True)
        file_path = os.path.join(base_dir, f"generated_code_{int(time.time())}.py")
        with open(file_path, "w") as fp:
            fp.write(code_content)
        logger.info(f"Code saved to {file_path}")
        return file_path
    except Exception as e:
        logger.error(f"Error in save_code_tool: {e}")
        return f"Error saving code: {e}"


# ─── TOOL 3: EXECUTE GENERATED CODE ───────────────────────────────────────────
@tool("execute_code_tool")
def execute_code_tool(file_path: str) -> str:
    """
    Executes the code saved at file_path after the save_code_tool tool is used. This tool needs
    to be used after the save_code_tool tool.
    """
    try:
        # For debugging, log the code that is about to be executed.
        with open(file_path, "r") as fp:
            code_text = fp.read()
        logger.info(f"Executing code:\n{code_text}")

        result = subprocess.run(
            [sys.executable, file_path],
            capture_output=True,
            text=True,
            timeout=int(os.environ.get("CODE_EXECUTION_TIMEOUT", "30")),
            env=os.environ.copy()
        )
        exec_result = f"result.stdout: {result.stdout}, stderr: {result.stderr}"
        logger.info(f"Execution result: {exec_result}")
        return exec_result
    except subprocess.TimeoutExpired:
        logger.error("Execution timed out")
        return "Execution timed out"
    except Exception as err:
        logger.error(f"Error executing code: {err}")
        return str(err)

### Create the doorbell configuration networking agent and Crew
---

In this portion of the solution, we will create an agent for home networking configuration assistance, that will have access to the tools defined above. We will then add this agent to the crew with the home networking knowledge base, which contains information about the API spec.

In [None]:
instructions_path: str = os.path.join(config_data['dir_paths']['agent_instructions_prefix'], 
                                      config_data['dir_paths']['agent_instructions'].get('doorbell_agent_instructions'))
agent_instruction = open(os.path.join(parent_dir, instructions_path), 'r').read()
print(agent_instruction)

In [None]:
from crewai.tools import tool
from crewai import LLM, Agent, Task, Crew, Process

# This is the FM used for the home networking sub agent
llm_instance = LLM(
    model=config_data['model_information']['doorbell_sub_agent_model'],
    temperature=0.1,
    max_tokens=2048,
    top_p=0.9,
)

# Create the home networking sub agent along with the tools assigned to it
doorbell_agent = Agent(
    role="Doorbell Configuration Assistant",
    goal=( "always use the knowledge base tool, then generate the code, then save the code and finally and always, execute the code" + agent_instruction
    ),
    backstory=(
        "I am an expert agent who looks for relevant API specs in the knowledge based on a user query, and then only I run the tools I have access to. I only run the knowledge base tool, generate code tool, save the code. Finally I will execute the code."
    ),
    tools=[knowledge_query_tool, bedrock_code_generation_tool, save_code_tool, execute_code_tool],
    verbose=True,
    memory=True,
    llm=llm_instance,
)

In [51]:
# define the tasks
knowledge_query_task = Task(
    description="Search the knowledge base for relevant information based on the user's query: '{query}'",
    expected_output="A list of knowledge chunks from the API spec relevant to the user's query.",
    agent=doorbell_agent,
    output_format="list"
)

code_generation_task = Task(
    description=(
        "Using the user's query and the retrieved knowledge chunks, generate the appropriate Python code."
    ),
    expected_output="The generated Python code.",
    agent=doorbell_agent,
    output_format="json",
    context=[knowledge_query_task]
)

save_code_task = Task(
    description="Save the generated Python code to a file.",
    expected_output="The file path where the code is saved.",
    agent=doorbell_agent,
    output_format="text",
    context=[code_generation_task]
)

execute_code_task = Task(
    description="Execute the saved Python code using the file path from the save code task. Always run this task.",
    expected_output="Output text after executing the python code.",
    agent=doorbell_agent,
    output_format="text",
    context=[save_code_task]
)

In [52]:
# Initialize the manager agent that will be responsible to check for responses and coordinating the agentic workflow - this would usually be a 
# reasoning model
manager_llm = LLM(
    # model=f"bedrock/{config_data['model_information']['home_network_sub_agent_model']}",
    model='anthropic.claude-3-haiku-20240307-v1:0',
    temperature=0.1,
    timeout=120,
    max_tokens=256,
    top_p=0.9,
)

In [None]:
# Finally, create a crew that holds the agent and its task.
crew = Crew(
    agents=[doorbell_agent],
    tasks=[knowledge_query_task, code_generation_task, save_code_task, execute_code_task],
    verbose=True,
    process=Process.sequential,
    # manager_llm=manager_llm
)

In [None]:
user_query = 'I want to get email notifications for deliveries but push notifications for when someone rings the doorbell.'
result = crew.kickoff(inputs={"query": user_query})