This notebook is meant to be an initial setup of an AI agent in AWS Bedrock. This allows for code interpretation and passing in files, which makes it ideal for an autonomous data analyst. We use Claude 3.5 Sonnet as the foundation model. This agent will be used in the user interface of the mapping application.

Based on AWS example: https://github.com/build-on-aws/agents-for-amazon-bedrock-sample-feature-notebooks/blob/main/notebooks/preview-agent-code-interpreter.ipynb

In [None]:
import boto3
import json
import time
import csv
import io
import os

## 1) Setup

This first code block is to be run once to initialize the agent on AWS Bedrock. This includes the appropriate IAM role and policy and instruction set for agent. We choose us-west-2 because that is the region that claude 3.5 is available to use code interpreter.

In [None]:
sts = boto3.client('sts')
caller_identity = sts.get_caller_identity()
ACCOUNT_ID = str(caller_identity['Account'])

REGION_NAME = 'us-west-2'
FOUNDATIONAL_MODEL = 'amazon.nova-pro-v1:0'
INFERENCE_PROFILE = f"us.{FOUNDATIONAL_MODEL}"
AGENT_NAME = f'climate-risk-map-ai-agent-{FOUNDATIONAL_MODEL.replace(":", "_").replace(".", "_")}'
IAM_ROLE_NAME = f'{AGENT_NAME}-role'[0:63]
MODEL_POLICY_NAME = f'{AGENT_NAME}-model-policy'
INFERENCE_POLICY_NAME = f'{AGENT_NAME}-inference-policy'
TRUST_POLICY = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "bedrock.amazonaws.com"
            },
            "Action": "sts:AssumeRole",
            "Condition": {
                "StringEquals": {
                    "aws:SourceAccount": ACCOUNT_ID
                },
                "ArnLike": {
                    "aws:SourceArn": f"arn:aws:bedrock:us-west-2:{ACCOUNT_ID}:agent/*"
                }
            }
        }
    ]
}
MODEL_POLICY = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "bedrock:InvokeModel",
                "bedrock:InvokeModelWithResponseStream"
            ],
            "Resource": [
                f"arn:aws:bedrock:us-west-2::foundation-model/{FOUNDATIONAL_MODEL}"
            ]
        }
    ]
}
INFERENCE_PROFILE_POLICY = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "bedrock:InvokeModel",
                "bedrock:InvokeModelWithResponseStream",
                "bedrock:GetInferenceProfile",
                "bedrock:GetFoundationModel"
            ],
            "Resource": [
                f"arn:aws:bedrock:us-west-2:{ACCOUNT_ID}:inference-profile/{INFERENCE_PROFILE}",
                f"arn:aws:bedrock:*::foundation-model/{FOUNDATIONAL_MODEL}"
            ]
        }
    ]
}

INSTRUCTION = """
You are an advanced AI instructor specializing in physical climate risk analysis for infrastructure assets, platform, operating within an application that visualizes asset-level vulnerability.
Your analysis must be educational, contextual, insightful, and framed to highlight potential financial implications of the identified risks.

**Core Analysis Directives & Metrics:**
* **Synthesize Three Key Metrics:** Your primary task is to analyze risk using the interplay of these three metrics:
    1.  `ensemble_mean`: **Projected FWI (Future Fire Weather Intensity)**. Interpret using the FWI scale provided below. This reflects *how intense* fire weather could be.
    2.  `ensemble_mean_historic_baseline`: **Historical FWI (Past Fire Weather Intensity)**. Use this primarily to calculate and emphasize the **change** or **increase** in future FWI potential compared to the past (`ensemble_mean` - `ensemble_mean_historic_baseline`). A large increase signifies significantly worsening fire weather conditions.
    3.  `burn_probability`: **Annual Burn Probability (Likelihood of Burning)**. Interpret this as the statistical likelihood (e.g., a value of 0.02 means a 2% chance per year) of that specific location burning based on landscape/fuel conditions circa 2021. This reflects *how likely* a fire is at that location.
* **Financial Risk Framing:** Explicitly connect the physical risk analysis to potential financial implications. You don't have dollar figures, so use qualitative framing. Examples:
    * "Assets facing a significant *increase* in FWI combined with a non-negligible `burn_probability` represent heightened potential for operational disruptions, costly repairs, and associated financial losses."
    * "The higher `burn_probability` in Area Y suggests a greater likelihood of wildfire occurrence, potentially leading to increased insurance premiums or mitigation costs for assets located there, especially those critical assets like high-voltage lines."
    * "Understanding this combined risk profile (intensity increase + likelihood) is crucial for financial planning, risk mitigation investment decisions, and asset management strategies."
* **Context is Crucial:**
    * Always interpret `ensemble_mean` (Projected FWI) using the FWI scale.
    * Always highlight the *change* from `ensemble_mean_historic_baseline`.
    * Explain `burn_probability` clearly (e.g., "a 1.5% annual likelihood of burning").
    * Mention metric limitations: FWI is weather-only (no fuel/topo); Burn Probability is based on ~2021 landscape conditions and simulations.
* **Specificity Wins:** Drill down into asset specifics using `osm_subtype`, `county`, `city`, and details like `line;voltage`, `line;name`, `plant;name`, `plant:source`. High-voltage lines or critical plants deserve special mention.
* **Comparative Insights:** Compare risks across asset subtypes, counties, or highlight assets with the most concerning *combined* risk profile (e.g., high FWI increase AND high burn probability).

**"Wow" Factors & Engagement:**
* **Pinpoint Combined High Risk:** Proactively identify the asset(s) with the most concerning *combination* of risk factors (e.g., highest FWI *increase* AND highest `burn_probability`). Describe why this combination is particularly noteworthy from a risk management perspective.
* **Highlight Change:** Emphasize the *magnitude of change* from the historical baseline FWI. Statements like "This location is projected to see fire weather intensity potential *increase by X points* compared to the historical average..." are impactful.
* **Narrative Flow:** Weave the three metrics into a compelling narrative. Start with the future intensity (FWI), compare it to the past (baseline change), and then layer in the likelihood (burn probability).
* **Actionable Framing:** Conclude analyses with statements that guide user thinking towards action or further investigation, linking back to financial/operational stability.
* **Suggest Follow-ups:** ALWAYS suggest 2-3 distinct, relevant follow-up questions based on the new metrics. Examples:
    * "Would you like to see which assets have the largest *increase* in FWI compared to the baseline?"
    * "Shall I list assets with a Burn Probability higher than [threshold, e.g., 1%]?"
    * "Can we compare the combined risk profile of [Substations] vs. [Power Lines] in [County X]?"

**Climate & Wildfire Knowledge:**
* **FWI (Fire Weather Index):** (Keep existing description)
* **FWI Scale:**
    * 0-5: Very low
    * 5-10: Low
    * 10-20: Moderate
    * 20-30: High
    * 30+: Extreme
* **FWI Limitations:** Does not include fuel loads or topography.
* **Burn Probability (BP):** Represents the *annual likelihood* of a 500m location burning based on simulations calibrated to 2006-2020 fire history and landscape conditions circa 2021 (fuels, vegetation). It accounts for how fire might spread, including 'oozing' into adjacent developed areas up to ~1 mile. Does not account for fuel changes post-2021. Higher values indicate greater likelihood of fire occurrence.
* **SSP Scenarios:** (Keep existing description, noting SSP5-8.5 is in use).

**Code Execution:**
* **Mandatory Use:** Always use the Python environment for calculations (averages, max/min, differences, counts, filtering, sorting) and specific data lookups from the provided 'data.csv'.
* **Integration:** Seamlessly integrate code results into your narrative. Present tables clearly if generated. Calculate the FWI change (`ensemble_mean` - `ensemble_mean_historic_baseline`) when relevant.

**Output Format:**
* Use MARKDOWN.
* `##` for main titles, `###`/`####` for sub-sections. NO `#` headings.
* Use bullet points and **bold text** for key metrics, asset names/IDs, location names, and significant findings (especially the FWI *change* and *combined* risk insights).
* Prioritize the most impactful combined risk insights for the demo. Be concise yet thorough.
"""


bedrock_agent = boto3.client(service_name = 'bedrock-agent', region_name = REGION_NAME)
iam = boto3.client('iam')

print("Creating the IAM policy and role...")

# Create IAM role and attach policy

try:
    role = iam.create_role(
        RoleName=IAM_ROLE_NAME,
        AssumeRolePolicyDocument = json.dumps(TRUST_POLICY)
    )

    iam.put_role_policy(
        RoleName=IAM_ROLE_NAME,
        PolicyName = MODEL_POLICY_NAME,
        PolicyDocument = json.dumps(MODEL_POLICY)
    )

    iam.put_role_policy(
        RoleName=IAM_ROLE_NAME,
        PolicyName = INFERENCE_POLICY_NAME,
        PolicyDocument = json.dumps(INFERENCE_PROFILE_POLICY)
    )
except Exception as e:
    print(f"{str(e)}")
    role = iam.get_role(RoleName=IAM_ROLE_NAME)

roleArn = role['Role']['Arn']

# --- ADD DELAY ---
print("Waiting for IAM propagation...")
time.sleep(2) # Wait for 15 seconds
# --- END DELAY ---


print(f"IAM Role: {roleArn[:13]}{'*' * 12}{roleArn[25:]}")

print("Creating the agent...")

# Create the Bedrock Agent
response = bedrock_agent.create_agent(
    agentName=AGENT_NAME,
    foundationModel=INFERENCE_PROFILE,
    instruction=INSTRUCTION,
    agentResourceRoleArn=roleArn,
)

agentId = response['agent']['agentId']

print("Waiting for agent status of 'NOT_PREPARED'...")
# Wait for agent to reach 'NOT_PREPARED' status
agentStatus = ''
while agentStatus != 'NOT_PREPARED':
    response = bedrock_agent.get_agent(
        agentId=agentId
    )
    agentStatus = response['agent']['agentStatus']
    print(f"Agent status: {agentStatus}")
    time.sleep(2)

######################################### Configure code interpreter for the agent
response = bedrock_agent.create_agent_action_group(
    
    actionGroupName='CodeInterpreterAction',
    actionGroupState='ENABLED',
    agentId=agentId,
    agentVersion='DRAFT',
    parentActionGroupSignature='AMAZON.CodeInterpreter' # <-  To allow your agent to generate, 
                                                        #     run, and troubleshoot code when trying 
                                                        #     to complete a task, set this field to 
                                                        #     AMAZON.CodeInterpreter. 
                                                        #     You must leave the `description`, `apiSchema`, 
                                                        #     and `actionGroupExecutor` fields blank for 
                                                        #     this action group.
)

actionGroupId = response['agentActionGroup']['actionGroupId']

print("Waiting for action group status of 'ENABLED'...")

# Wait for action group to reach 'ENABLED' status
actionGroupStatus = ''
while actionGroupStatus != 'ENABLED':
    response = bedrock_agent.get_agent_action_group(
        agentId=agentId,
        actionGroupId=actionGroupId,
        agentVersion='DRAFT'
    )
    actionGroupStatus = response['agentActionGroup']['actionGroupState']
    print(f"Action Group status: {actionGroupStatus}")
    time.sleep(2)

print("Preparing the agent...")

# Prepare the agent for use
response = bedrock_agent.prepare_agent(
    agentId=agentId
)

print("Waiting for agent status of 'PREPARED'...")

# Wait for agent to reach 'PREPARED' status
agentStatus = ''
while agentStatus != 'PREPARED':
    response = bedrock_agent.get_agent(
        agentId=agentId
    )
    agentStatus = response['agent']['agentStatus']
    print(f"Agent status: {agentStatus}")
    time.sleep(2)

# Create an alias for the agent
response = bedrock_agent.create_agent_alias(
    agentAliasName='test',
    agentId=agentId
)

agentAliasId = response['agentAlias']['agentAliasId']

# Wait for agent alias to be prepared
agentAliasStatus = ''
while agentAliasStatus != 'PREPARED':
    response = bedrock_agent.get_agent_alias(
        agentId=agentId,
        agentAliasId=agentAliasId
    )
    agentAliasStatus = response['agentAlias']['agentAliasStatus']
    print(f"Agent alias status: {agentAliasStatus}")
    time.sleep(2)

print('Done.\n')

print(f"agentId: {agentId}, agentAliasId: {agentAliasId}")

In [None]:
AGENT_NAME

## 2) Utility Functions

These functions are useful to use when interacting with the agent.

In [None]:
BEDROCK_AGENT_RUNTIME = boto3.client(service_name = 'bedrock-agent-runtime', region_name = REGION_NAME)

In [None]:
def read_csv_bytes(file_path):
    """Reads a CSV file and returns its content as bytes.

    Args:
        file_path (str): The path to the CSV file.

    Returns:
        bytes: The content of the CSV file as bytes, or None if an error occurs.
    """
    try:
        with open(file_path, 'rb') as file:
            csv_bytes = file.read()
            return csv_bytes
    except FileNotFoundError:
        print(f"Error: File not found: {file_path}")
        return None
    except Exception as e:
         print(f"An error occurred: {e}")
         return None

def invoke(inputText, sessionId, file_path, showTrace=False, endSession=False):

    try:

        # Invoke the Agent - Sends a prompt for the agent to process and respond to.
        response = BEDROCK_AGENT_RUNTIME.invoke_agent(
            agentAliasId=agentAliasId,   # (string) – [REQUIRED] The alias of the agent to use.
            agentId=agentId,             # (string) – [REQUIRED] The unique identifier of the agent to use.
            sessionId=sessionId,         # (string) – [REQUIRED] The unique identifier of the session. Use the same value across requests to continue the same conversation.
            inputText=inputText,         # (string) - The prompt text to send the agent.
            endSession=endSession,       # (boolean) – Specifies whether to end the session with the agent or not.
            enableTrace=True,            # (boolean) – Specifies whether to turn on the trace or not to track the agent's reasoning process.
            sessionState = {
                "files": [
                    {
                        "name": "test_data.json",
                        "source": {
                            "byteContent": {
                                "data": read_csv_bytes(file_path=file_path),
                                "mediaType": "text/csv"
                            },
                            "sourceType": "BYTE_CONTENT"
                       },
                        "useCase": "CODE_INTERPRETER"
                    }
                ]
             }
        )

        # The response of this operation contains an EventStream member. 
        event_stream = response["completion"]

        # When iterated the EventStream will yield events.
        for event in event_stream:

            # chunk contains a part of an agent response
            if 'chunk' in event:
                chunk = event['chunk']
                if 'bytes' in chunk:
                    text = chunk['bytes'].decode('utf-8')
                    print(f"Chunk: {text}")
                else:
                    print("Chunk doesn't contain 'bytes'")

            # files contains intermediate response for code interpreter if any files have been generated.
            if 'files' in event:
                files = event['files']['files']
                for file in files:
                    name = file['name']
                    type = file['type']
                    bytes_data = file['bytes']
                    
                    # It the file is a PNG image then we can display it...
                    if type == 'image/png':
                        # Display PNG image using Matplotlib
                        img = plt.imread(io.BytesIO(bytes_data))
                        plt.figure(figsize=(10, 10))
                        plt.imshow(img)
                        plt.axis('off')
                        plt.title(name)
                        plt.show()
                        plt.close()
                        
                    # If the file is NOT a PNG then we save it to disk...
                    else:
                        # Save other file types to local disk
                        with open(name, 'wb') as f:
                            f.write(bytes_data)
                        print(f"File '{name}' saved to disk.")

    except Exception as e:
        print(f"Error: {e}")

In [None]:
BEDROCK_AGENT_RUNTIME = boto3.client(service_name = 'bedrock-agent-runtime', region_name = "us-west-2")

In [None]:
BEDROCK_AGENT_RUNTIME.list_sessions(maxResults=25)