# Step 2: Deploy Sales Agent with DynamoDB in Private VPC

This notebook creates a DynamoDB table with sales data and deploys a Strands sales agent using Amazon Bedrock AgentCore Runtime within the secure VPC infrastructure.

## Features
- DynamoDB table with synthetic sales data in private subnet
- Sales analysis agent with pattern recognition
- Secure deployment in private VPC
- Production-ready configurations

In [None]:
import json
import boto3
import random
from datetime import datetime, timedelta
from decimal import Decimal
from botocore.exceptions import ClientError

# Load VPC configuration from previous step
with open('vpc_config.json', 'r') as f:
    vpc_config = json.load(f)

print("✅ VPC configuration loaded")
print(f"VPC ID: {vpc_config['vpc_id']}")
print(f"Region: {vpc_config['region']}")

## 1. Install Dependencies

In [None]:
!pip install --force-reinstall -U -r requirements.txt --quiet

## 2. Create DynamoDB VPC Endpoint

In [None]:
# Create VPC endpoint for DynamoDB to keep traffic private
ec2 = boto3.client('ec2', region_name=vpc_config['region'])

try:
    # Create DynamoDB VPC endpoint (Gateway type)
    dynamodb_endpoint = ec2.create_vpc_endpoint(
        VpcId=vpc_config['vpc_id'],
        ServiceName=f"com.amazonaws.{vpc_config['region']}.dynamodb",
        VpcEndpointType='Gateway',
        RouteTableIds=list(vpc_config['route_tables'].values())
    )
    
    print(f"✅ DynamoDB VPC Endpoint created: {dynamodb_endpoint['VpcEndpoint']['VpcEndpointId']}")
    
except ClientError as e:
    if 'RouteAlreadyExists' in str(e):
        print("✅ DynamoDB VPC Endpoint already exists")
    else:
        print(f"⚠️  Error creating DynamoDB endpoint: {e}")

## 3. Create DynamoDB Table with Sales Data

In [None]:
# Initialize DynamoDB client
dynamodb = boto3.resource('dynamodb', region_name=vpc_config['region'])
dynamodb_client = boto3.client('dynamodb', region_name=vpc_config['region'])

table_name = 'SalesData'

# Create DynamoDB table
try:
    table = dynamodb.create_table(
        TableName=table_name,
        KeySchema=[
            {
                'AttributeName': 'sale_id',
                'KeyType': 'HASH'
            }
        ],
        AttributeDefinitions=[
            {
                'AttributeName': 'sale_id',
                'AttributeType': 'S'
            }
        ],
        BillingMode='PAY_PER_REQUEST',
        Tags=[
            {
                'Key': 'Project',
                'Value': vpc_config['project_name']
            }
        ]
    )
    
    print(f"✅ DynamoDB table '{table_name}' created")
    
    # Wait for table to be active
    table.wait_until_exists()
    print("✅ Table is now active")
    
except ClientError as e:
    if 'ResourceInUseException' in str(e):
        print(f"✅ Table '{table_name}' already exists")
        table = dynamodb.Table(table_name)
    else:
        print(f"❌ Error creating table: {e}")
        raise

## 4. Generate and Insert Synthetic Sales Data

In [None]:
# Generate synthetic sales data
def generate_sales_data(num_records=100):
    """Generate synthetic sales data"""
    products = ['Laptop', 'Smartphone', 'Tablet', 'Headphones', 'Monitor', 'Keyboard', 'Mouse', 'Webcam']
    regions = ['North', 'South', 'East', 'West', 'Central']
    sales_reps = ['Alice Johnson', 'Bob Smith', 'Carol Davis', 'David Wilson', 'Eva Brown']
    
    sales_data = []
    base_date = datetime.now() - timedelta(days=365)
    
    for i in range(num_records):
        sale_date = base_date + timedelta(days=random.randint(0, 365))
        product = random.choice(products)
        
        # Product-based pricing
        price_ranges = {
            'Laptop': (800, 2500),
            'Smartphone': (300, 1200),
            'Tablet': (200, 800),
            'Headphones': (50, 300),
            'Monitor': (150, 600),
            'Keyboard': (30, 150),
            'Mouse': (20, 100),
            'Webcam': (40, 200)
        }
        
        min_price, max_price = price_ranges[product]
        quantity = random.randint(1, 10)
        unit_price = random.randint(min_price, max_price)
        total_amount = quantity * unit_price
        
        sale = {
            'sale_id': f'SALE-{i+1:04d}',
            'date': sale_date.strftime('%Y-%m-%d'),
            'product': product,
            'quantity': quantity,
            'unit_price': Decimal(str(unit_price)),
            'total_amount': Decimal(str(total_amount)),
            'region': random.choice(regions),
            'sales_rep': random.choice(sales_reps),
            'customer_type': random.choice(['Enterprise', 'SMB', 'Individual'])
        }
        sales_data.append(sale)
    
    return sales_data

# Generate and insert data
print("🔄 Generating synthetic sales data...")
sales_records = generate_sales_data(150)

# Batch insert data
print("🔄 Inserting data into DynamoDB...")
with table.batch_writer() as batch:
    for record in sales_records:
        batch.put_item(Item=record)

print(f"✅ Inserted {len(sales_records)} sales records into DynamoDB")

# Show sample data
print("\n📊 Sample sales records:")
for i, record in enumerate(sales_records[:3]):
    print(f"{i+1}. {record['date']} - {record['product']} x{record['quantity']} = ${record['total_amount']} ({record['region']})")

## 5. Create Sales Analysis Agent

In [None]:
from strands import Agent, tool
from strands.models import BedrockModel
import boto3
from boto3.dynamodb.conditions import Key, Attr
from datetime import datetime, timedelta
from collections import defaultdict

# Initialize DynamoDB for agent tools
dynamodb_resource = boto3.resource('dynamodb', region_name=vpc_config['region'])
sales_table = dynamodb_resource.Table('SalesData')

# Create sales analysis tools
@tool
def get_sales_history(days: int = 30) -> str:
    """Get sales history for the last N days"""
    try:
        # Get date range
        end_date = datetime.now()
        start_date = end_date - timedelta(days=days)
        
        # Scan table (in production, use GSI with date)
        response = sales_table.scan()
        items = response['Items']
        
        # Filter by date range
        filtered_sales = []
        for item in items:
            sale_date = datetime.strptime(item['date'], '%Y-%m-%d')
            if start_date <= sale_date <= end_date:
                filtered_sales.append(item)
        
        # Calculate summary
        total_sales = sum(float(item['total_amount']) for item in filtered_sales)
        total_quantity = sum(int(item['quantity']) for item in filtered_sales)
        
        return f"Sales History ({days} days): {len(filtered_sales)} transactions, Total: ${total_sales:,.2f}, Quantity: {total_quantity} units"
        
    except Exception as e:
        return f"Error retrieving sales history: {str(e)}"

@tool
def analyze_top_products(limit: int = 5) -> str:
    """Analyze top-selling products"""
    try:
        response = sales_table.scan()
        items = response['Items']
        
        # Group by product
        product_stats = defaultdict(lambda: {'quantity': 0, 'revenue': 0, 'transactions': 0})
        
        for item in items:
            product = item['product']
            quantity = int(item['quantity'])
            revenue = float(item['total_amount'])
            
            product_stats[product]['quantity'] += quantity
            product_stats[product]['revenue'] += revenue
            product_stats[product]['transactions'] += 1
        
        # Sort by revenue
        sorted_products = sorted(product_stats.items(), key=lambda x: x[1]['revenue'], reverse=True)
        
        result = f"Top {limit} Products by Revenue:\n"
        for i, (product, stats) in enumerate(sorted_products[:limit], 1):
            result += f"{i}. {product}: ${stats['revenue']:,.2f} ({stats['quantity']} units, {stats['transactions']} transactions)\n"
        
        return result
        
    except Exception as e:
        return f"Error analyzing products: {str(e)}"

@tool
def analyze_sales_by_region() -> str:
    """Analyze sales performance by region"""
    try:
        response = sales_table.scan()
        items = response['Items']
        
        # Group by region
        region_stats = defaultdict(lambda: {'revenue': 0, 'transactions': 0})
        
        for item in items:
            region = item['region']
            revenue = float(item['total_amount'])
            
            region_stats[region]['revenue'] += revenue
            region_stats[region]['transactions'] += 1
        
        # Sort by revenue
        sorted_regions = sorted(region_stats.items(), key=lambda x: x[1]['revenue'], reverse=True)
        
        result = "Sales by Region:\n"
        for region, stats in sorted_regions:
            avg_transaction = stats['revenue'] / stats['transactions'] if stats['transactions'] > 0 else 0
            result += f"• {region}: ${stats['revenue']:,.2f} ({stats['transactions']} transactions, avg: ${avg_transaction:.2f})\n"
        
        return result
        
    except Exception as e:
        return f"Error analyzing regions: {str(e)}"

print("✅ Sales analysis tools created")

## 6. Initialize Sales Agent

In [None]:
# Initialize Bedrock model
model_id = "us.anthropic.claude-3-7-sonnet-20250219-v1:0"
model = BedrockModel(model_id=model_id)

# Create sales agent
sales_agent = Agent(
    model=model,
    tools=[get_sales_history, analyze_top_products, analyze_sales_by_region],
    system_prompt="""You are a professional sales analyst AI assistant with access to sales data.
    
    Your capabilities:
    - Analyze sales history and trends
    - Identify top-performing products
    - Compare regional performance
    - Provide actionable insights for sales teams
    
    Always provide clear, data-driven insights and recommendations. When presenting numbers, 
    format them clearly and highlight key patterns or trends you observe."""
)

print("✅ Sales agent initialized with Bedrock model")

## 7. Test Sales Agent Locally

In [None]:
# Test the sales agent locally
test_queries = [
    "What are our sales for the last 30 days?",
    "Show me the top 3 products by revenue",
    "How are different regions performing?",
    "What patterns do you see in our sales data?"
]

print("🧪 Testing sales agent locally...\n")

for i, query in enumerate(test_queries, 1):
    print(f"Test {i}: {query}")
    try:
        response = sales_agent(query)
        print(f"Response: {response.message['content'][0]['text'][:200]}...")
    except Exception as e:
        print(f"Error: {e}")
    print("-" * 50)

## 8. Create Production Sales Agent File

In [None]:
%%writefile sales_agent.py
from strands import Agent, tool
from bedrock_agentcore.runtime import BedrockAgentCoreApp
from strands.models import BedrockModel
import boto3
from datetime import datetime, timedelta
from collections import defaultdict
import os

app = BedrockAgentCoreApp()

# Initialize DynamoDB
dynamodb = boto3.resource('dynamodb')
sales_table = dynamodb.Table('SalesData')

# Sales analysis tools
@tool
def get_sales_history(days: int = 30) -> str:
    """Get sales history for the last N days"""
    try:
        end_date = datetime.now()
        start_date = end_date - timedelta(days=days)
        
        response = sales_table.scan()
        items = response['Items']
        
        filtered_sales = []
        for item in items:
            sale_date = datetime.strptime(item['date'], '%Y-%m-%d')
            if start_date <= sale_date <= end_date:
                filtered_sales.append(item)
        
        total_sales = sum(float(item['total_amount']) for item in filtered_sales)
        total_quantity = sum(int(item['quantity']) for item in filtered_sales)
        
        return f"Sales History ({days} days): {len(filtered_sales)} transactions, Total: ${total_sales:,.2f}, Quantity: {total_quantity} units"
        
    except Exception as e:
        return f"Error retrieving sales history: {str(e)}"

@tool
def analyze_top_products(limit: int = 5) -> str:
    """Analyze top-selling products"""
    try:
        response = sales_table.scan()
        items = response['Items']
        
        product_stats = defaultdict(lambda: {'quantity': 0, 'revenue': 0, 'transactions': 0})
        
        for item in items:
            product = item['product']
            quantity = int(item['quantity'])
            revenue = float(item['total_amount'])
            
            product_stats[product]['quantity'] += quantity
            product_stats[product]['revenue'] += revenue
            product_stats[product]['transactions'] += 1
        
        sorted_products = sorted(product_stats.items(), key=lambda x: x[1]['revenue'], reverse=True)
        
        result = f"Top {limit} Products by Revenue:\n"
        for i, (product, stats) in enumerate(sorted_products[:limit], 1):
            result += f"{i}. {product}: ${stats['revenue']:,.2f} ({stats['quantity']} units, {stats['transactions']} transactions)\n"
        
        return result
        
    except Exception as e:
        return f"Error analyzing products: {str(e)}"

@tool
def analyze_sales_by_region() -> str:
    """Analyze sales performance by region"""
    try:
        response = sales_table.scan()
        items = response['Items']
        
        region_stats = defaultdict(lambda: {'revenue': 0, 'transactions': 0})
        
        for item in items:
            region = item['region']
            revenue = float(item['total_amount'])
            
            region_stats[region]['revenue'] += revenue
            region_stats[region]['transactions'] += 1
        
        sorted_regions = sorted(region_stats.items(), key=lambda x: x[1]['revenue'], reverse=True)
        
        result = "Sales by Region:\n"
        for region, stats in sorted_regions:
            avg_transaction = stats['revenue'] / stats['transactions'] if stats['transactions'] > 0 else 0
            result += f"• {region}: ${stats['revenue']:,.2f} ({stats['transactions']} transactions, avg: ${avg_transaction:.2f})\n"
        
        return result
        
    except Exception as e:
        return f"Error analyzing regions: {str(e)}"

# Initialize Bedrock model
model_id = "us.anthropic.claude-3-7-sonnet-20250219-v1:0"
model = BedrockModel(model_id=model_id)

# Create sales agent
agent = Agent(
    model=model,
    tools=[get_sales_history, analyze_top_products, analyze_sales_by_region],
    system_prompt="""You are a professional sales analyst AI assistant with access to sales data.
    
    Your capabilities:
    - Analyze sales history and trends
    - Identify top-performing products
    - Compare regional performance
    - Provide actionable insights for sales teams
    
    Always provide clear, data-driven insights and recommendations. When presenting numbers, 
    format them clearly and highlight key patterns or trends you observe."""
)

@app.entrypoint
def invoke(payload):
    """Sales agent entrypoint"""
    user_input = payload.get("prompt", "Hello! How can I help you analyze sales data?")
    user_id = payload.get("user_id", "anonymous")
    
    print(f"📊 Processing sales query from user: {user_id}")
    print(f"Query: {user_input}")
    
    response = agent(user_input)
    return response.message['content'][0]['text']

if __name__ == "__main__":
    app.run()

## 9. Configure AgentCore Runtime Deployment

In [None]:
from bedrock_agentcore_starter_toolkit import Runtime
from boto3.session import Session

boto_session = Session()
region = vpc_config['region']

agentcore_runtime = Runtime()
agent_name = "sales-analyst-agent"

# Configure with VPC settings
response = agentcore_runtime.configure(
    entrypoint="sales_agent.py",
    auto_create_execution_role=True,
    auto_create_ecr=True,
    requirements_file="requirements.txt",
    region=region,
    agent_name=agent_name,
    # VPC Configuration
    vpc_config={
        "vpc_id": vpc_config['vpc_id'],
        "subnet_ids": [vpc_config['subnets'][f'private_{i+1}'] for i in range(2)],
        "security_group_ids": [vpc_config['security_groups']['agentcore_sg_id']]
    }
)

print("✅ Sales Agent configured for VPC deployment")
response

## 10. Launch Sales Agent to AgentCore Runtime

In [None]:
print("🚀 Launching sales agent to AgentCore Runtime in private VPC...")
launch_result = agentcore_runtime.launch()
print("✅ Sales agent launched successfully")
launch_result

## 11. Check Deployment Status

In [None]:
import time

print("⏳ Checking deployment status...")
status_response = agentcore_runtime.status()
status = status_response.endpoint['status']
end_status = ['READY', 'CREATE_FAILED', 'DELETE_FAILED', 'UPDATE_FAILED']

while status not in end_status:
    print(f"Status: {status}")
    time.sleep(10)
    status_response = agentcore_runtime.status()
    status = status_response.endpoint['status']

print(f"✅ Final status: {status}")
status

## 12. Test Production Sales Agent

In [None]:
# Test the deployed sales agent
test_queries = [
    {"prompt": "What are our sales for the last 30 days?", "user_id": "sales_manager"},
    {"prompt": "Show me the top 5 products by revenue", "user_id": "product_manager"},
    {"prompt": "How are different regions performing?", "user_id": "regional_director"},
    {"prompt": "What insights can you provide about our sales patterns?", "user_id": "ceo"}
]

print("🧪 Testing production sales agent...\n")

for i, payload in enumerate(test_queries, 1):
    print(f"Test {i}: {payload['prompt']} (User: {payload['user_id']})")
    try:
        invoke_response = agentcore_runtime.invoke(payload)
        response_text = invoke_response['response'][0]
        print(f"Response: {response_text[:300]}...")
    except Exception as e:
        print(f"Error: {e}")
    print("-" * 50)

## 13. Save Deployment Information

In [None]:
# Save deployment information for future reference
deployment_info = {
    "agent_arn": launch_result.agent_arn,
    "agent_id": launch_result.agent_id,
    "ecr_uri": launch_result.ecr_uri,
    "region": region,
    "vpc_id": vpc_config['vpc_id'],
    "dynamodb_table": table_name,
    "agent_type": "sales_analyst",
    "status": status
}

with open('deployment_info.json', 'w') as f:
    json.dump(deployment_info, f, indent=2)

print("✅ Deployment information saved to deployment_info.json")
print("\n🎉 Sales Agent deployment complete!")
print(f"Agent ARN: {launch_result.agent_arn}")
print(f"Status: {status}")
print(f"DynamoDB Table: {table_name}")
print(f"Deployed in VPC: {vpc_config['vpc_id']}")