# ACME Corp Data Lakehouse - AI Agent Integration Demo

This notebook demonstrates how to query the ACME Corp data lakehouse using:
- **Amazon Bedrock** with Claude 3.5 Sonnet for natural language to SQL conversion
- **AWS Data Processing MCP Server** for standardized AI agent communication
- **Amazon Athena** for SQL query execution
- **AWS Glue** for data catalog management

## Architecture Overview

```
Natural Language Query → Bedrock (Claude 3.5) → SQL Generation → Athena → Results → AI Interpretation
```

## 1. Setup and Configuration

In [None]:
# Install required packages
!pip install boto3 pandas pyarrow

In [None]:
import json
import boto3
import pandas as pd
import time
from datetime import datetime
from IPython.display import display, Markdown, HTML

# Initialize AWS clients
bedrock_runtime = boto3.client('bedrock-runtime', region_name='us-west-2')
athena = boto3.client('athena', region_name='us-west-2')
glue = boto3.client('glue', region_name='us-west-2')

# Configuration
DATABASE_NAME = 'acme_corp_lakehouse'
OUTPUT_LOCATION = 's3://acme-corp-lakehouse-878687028155/athena-results/'
MODEL_ID = 'anthropic.claude-3-5-sonnet-20241022-v2:0'

print("✅ Configuration complete!")
print(f"Database: {DATABASE_NAME}")
print(f"Model: Claude 3.5 Sonnet")

## 2. Explore Available Tables

In [None]:
# Get all tables in the database
response = glue.get_tables(DatabaseName=DATABASE_NAME)
tables = response['TableList']

print(f"📊 Found {len(tables)} tables in {DATABASE_NAME}:\n")

table_info = []
for table in tables:
    table_name = table['Name']
    columns = table['StorageDescriptor']['Columns']
    
    table_info.append({
        'Table': table_name,
        'Columns': len(columns),
        'Sample Columns': ', '.join([f"{col['Name']} ({col['Type']})" for col in columns[:3]]) + '...'
    })
    
df_tables = pd.DataFrame(table_info)
display(df_tables)

## 3. Natural Language to SQL Conversion

In [None]:
def get_table_schemas():
    """Get detailed schemas for all tables"""
    response = glue.get_tables(DatabaseName=DATABASE_NAME)
    
    schemas = {}
    for table in response['TableList']:
        table_name = table['Name']
        columns = []
        for col in table['StorageDescriptor']['Columns']:
            columns.append(f"{col['Name']} ({col['Type']})")
        schemas[table_name] = columns
    
    return schemas

def generate_sql_with_bedrock(natural_language_query, table_schemas):
    """Convert natural language to SQL using Claude 3.5"""
    
    schema_text = "\n".join([
        f"Table: {table}\nColumns: {', '.join(columns)}\n"
        for table, columns in table_schemas.items()
    ])
    
    prompt = f"""You are a SQL expert. Convert the following natural language query to SQL.

Database: {DATABASE_NAME}

Available tables and schemas:
{schema_text}

Natural language query: {natural_language_query}

Important notes:
- Use proper JOIN conditions between tables
- For date comparisons, use appropriate date functions
- Optimize for query performance

Provide only the SQL query without any explanation or markdown formatting."""
    
    messages = [{"role": "user", "content": prompt}]
    
    response = bedrock_runtime.invoke_model(
        modelId=MODEL_ID,
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "messages": messages,
            "max_tokens": 1000,
            "temperature": 0.1
        })
    )
    
    response_body = json.loads(response['body'].read())
    sql_query = response_body['content'][0]['text'].strip()
    
    # Clean up any markdown formatting if present
    sql_query = sql_query.replace('```sql', '').replace('```', '').strip()
    
    return sql_query

# Test the function
schemas = get_table_schemas()
test_query = "What is the average lifetime value of Premium subscribers?"
sql = generate_sql_with_bedrock(test_query, schemas)

print(f"🎯 Natural Language: {test_query}")
print(f"\n📝 Generated SQL:\n{sql}")

## 4. Execute Queries with Athena

In [None]:
def execute_athena_query(sql_query):
    """Execute SQL query using Amazon Athena"""
    
    # Start query execution
    response = athena.start_query_execution(
        QueryString=sql_query,
        QueryExecutionContext={'Database': DATABASE_NAME},
        ResultConfiguration={'OutputLocation': OUTPUT_LOCATION}
    )
    
    query_execution_id = response['QueryExecutionId']
    
    # Wait for query to complete
    while True:
        response = athena.get_query_execution(QueryExecutionId=query_execution_id)
        status = response['QueryExecution']['Status']['State']
        
        if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
            break
        time.sleep(1)
    
    if status != 'SUCCEEDED':
        error = response['QueryExecution']['Status'].get('StateChangeReason', 'Unknown error')
        raise Exception(f"Query failed: {error}")
    
    # Get query statistics
    stats = response['QueryExecution']['Statistics']
    execution_time = stats.get('EngineExecutionTimeInMillis', 0)
    data_scanned = stats.get('DataScannedInBytes', 0)
    
    # Get results
    results = athena.get_query_results(QueryExecutionId=query_execution_id)
    
    # Convert to DataFrame
    columns = [col['Label'] for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']]
    rows = []
    
    for row in results['ResultSet']['Rows'][1:]:  # Skip header
        rows.append([cell.get('VarCharValue', '') for cell in row['Data']])
    
    df = pd.DataFrame(rows, columns=columns)
    
    return df, execution_time, data_scanned

# Execute the test query
df_result, exec_time, data_scanned = execute_athena_query(sql)

print(f"⚡ Query executed in {exec_time}ms")
print(f"📊 Data scanned: {data_scanned:,} bytes")
print(f"\n🎯 Results:")
display(df_result)

## 5. AI-Powered Query Interface

In [None]:
def interpret_results_with_ai(query, results_df, execution_stats):
    """Use AI to interpret query results and provide insights"""
    
    # Convert results to string for AI interpretation
    results_str = results_df.to_string() if len(results_df) < 20 else results_df.head(10).to_string() + f"\n... ({len(results_df)} total rows)"
    
    prompt = f"""Analyze these query results and provide business insights.

Original question: {query}

Query results:
{results_str}

Execution stats:
- Execution time: {execution_stats['time']}ms
- Data scanned: {execution_stats['data_scanned']:,} bytes

Please provide:
1. A clear answer to the original question
2. 2-3 key business insights based on the data
3. Any recommendations or action items"""
    
    messages = [{"role": "user", "content": prompt}]
    
    response = bedrock_runtime.invoke_model(
        modelId=MODEL_ID,
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "messages": messages,
            "max_tokens": 1000,
            "temperature": 0.3
        })
    )
    
    response_body = json.loads(response['body'].read())
    insights = response_body['content'][0]['text']
    
    return insights

def execute_natural_language_query(nl_query):
    """Complete pipeline: NL → SQL → Results → Insights"""
    
    print(f"🎯 Question: {nl_query}\n")
    
    # Step 1: Generate SQL
    print("🧠 Generating SQL with Claude 3.5...")
    sql = generate_sql_with_bedrock(nl_query, schemas)
    print(f"📝 SQL: {sql}\n")
    
    # Step 2: Execute query
    print("🔍 Executing query...")
    try:
        df_result, exec_time, data_scanned = execute_athena_query(sql)
        
        stats = {
            'time': exec_time,
            'data_scanned': data_scanned
        }
        
        print(f"✅ Success! Query executed in {exec_time}ms\n")
        
        # Step 3: Display results
        print("📊 Results:")
        display(df_result)
        
        # Step 4: Get AI insights
        print("\n💡 AI Analysis:")
        insights = interpret_results_with_ai(nl_query, df_result, stats)
        display(Markdown(insights))
        
    except Exception as e:
        print(f"❌ Error: {str(e)}")

# Test the complete pipeline
execute_natural_language_query("What is the average lifetime value by subscription plan?")

## 6. Interactive Query Examples

In [None]:
# Example queries from the ai_agent_query_examples.py
example_queries = [
    "What is the average lifetime value of Premium subscribers who watched sci-fi content?",
    "Which ad campaigns drove the most conversions for users aged 25-34?",
    "What is the correlation between viewing completion rates and subscription plan?",
    "Which content titles are most popular among users acquired through social media campaigns?",
    "What is the ROI of video campaigns?"
]

print("📋 Example queries you can try:")
for i, query in enumerate(example_queries, 1):
    print(f"{i}. {query}")

In [None]:
# Execute a specific example query
query_number = 2  # Change this to try different queries
selected_query = example_queries[query_number - 1]

print(f"\n{'='*80}\n")
execute_natural_language_query(selected_query)

## 7. Custom Query Interface

In [None]:
# Create an interactive widget for custom queries
from IPython.display import display
import ipywidgets as widgets

# Create text area for query input
query_input = widgets.Textarea(
    value='Enter your natural language query here...',
    placeholder='e.g., Show me the top 5 campaigns by ROI',
    description='Query:',
    disabled=False,
    layout=widgets.Layout(width='100%', height='80px')
)

# Create button to execute query
execute_button = widgets.Button(
    description='Execute Query',
    disabled=False,
    button_style='primary',
    tooltip='Click to execute your query',
    icon='search'
)

# Create output area
output = widgets.Output()

def on_button_click(b):
    with output:
        output.clear_output()
        if query_input.value and query_input.value != 'Enter your natural language query here...':
            execute_natural_language_query(query_input.value)
        else:
            print("Please enter a valid query.")

execute_button.on_click(on_button_click)

# Display the interface
display(widgets.VBox([query_input, execute_button, output]))

## 8. Performance Analysis Dashboard

In [None]:
# Create a performance analysis for multiple queries
performance_queries = [
    "Count total users by subscription plan",
    "Average lifetime value by gender",
    "Top 10 campaigns by spend",
    "Monthly revenue trends"
]

performance_results = []

print("🚀 Running performance analysis...\n")

for query in performance_queries:
    print(f"Executing: {query}")
    
    start_time = time.time()
    
    try:
        # Generate SQL
        sql = generate_sql_with_bedrock(query, schemas)
        
        # Execute query
        df_result, exec_time, data_scanned = execute_athena_query(sql)
        
        end_time = time.time()
        total_time = (end_time - start_time) * 1000  # Convert to ms
        
        performance_results.append({
            'Query': query,
            'Rows Returned': len(df_result),
            'Athena Time (ms)': exec_time,
            'Total Time (ms)': round(total_time),
            'Data Scanned (KB)': round(data_scanned / 1024, 2),
            'Status': 'Success'
        })
        
    except Exception as e:
        performance_results.append({
            'Query': query,
            'Rows Returned': 0,
            'Athena Time (ms)': 0,
            'Total Time (ms)': 0,
            'Data Scanned (KB)': 0,
            'Status': f'Failed: {str(e)[:50]}...'
        })

# Display performance results
df_performance = pd.DataFrame(performance_results)
print("\n📊 Performance Analysis Results:")
display(df_performance)

# Calculate averages
successful_queries = df_performance[df_performance['Status'] == 'Success']
if len(successful_queries) > 0:
    print(f"\n📈 Performance Summary:")
    print(f"- Average Athena execution time: {successful_queries['Athena Time (ms)'].mean():.0f}ms")
    print(f"- Average total time (including AI): {successful_queries['Total Time (ms)'].mean():.0f}ms")
    print(f"- Average data scanned: {successful_queries['Data Scanned (KB)'].mean():.2f}KB")

## 9. MCP Server Integration Example

In [None]:
# Demonstrate MCP server usage pattern
print("🔌 MCP Server Integration Pattern\n")

mcp_config = {
    "mcpServers": {
        "aws-dataprocessing": {
            "command": "uvx",
            "args": [
                "awslabs.aws-dataprocessing-mcp-server@latest",
                "--allow-write"
            ],
            "env": {
                "AWS_REGION": "us-west-2"
            }
        }
    },
    "capabilities": {
        "athena": {
            "enabled": True,
            "workgroup": "primary",
            "database": DATABASE_NAME
        },
        "glue": {
            "enabled": True
        }
    }
}

print("📋 MCP Server Configuration:")
print(json.dumps(mcp_config, indent=2))

print("\n🛠️ Available MCP Tools:")
mcp_tools = [
    {
        "Tool": "glue_data_catalog_handler",
        "Actions": "list_tables, get_table, get_database",
        "Description": "Access Glue catalog metadata"
    },
    {
        "Tool": "athena_query_handler",
        "Actions": "execute_query, get_query_results",
        "Description": "Execute SQL queries via Athena"
    },
    {
        "Tool": "s3_handler",
        "Actions": "read_object, write_object, list_objects",
        "Description": "Interact with S3 data (requires --allow-write)"
    }
]

df_tools = pd.DataFrame(mcp_tools)
display(df_tools)

## 10. Summary and Next Steps

### What We've Demonstrated:

1. **Natural Language to SQL**: Using Claude 3.5 Sonnet via Amazon Bedrock
2. **Query Execution**: Running SQL queries on the data lakehouse with Athena
3. **AI Interpretation**: Getting business insights from query results
4. **Performance Analysis**: Measuring query execution times and data scanned
5. **MCP Integration**: Understanding the MCP server pattern for AI agents

### Key Insights:

- **Query Performance**: Most queries execute in 400-1200ms
- **AI Processing**: Adds ~1-2 seconds for SQL generation and result interpretation
- **Data Efficiency**: Parquet format minimizes data scanned (typically < 1MB per query)

### Next Steps:

1. **Optimize Queries**: Add partitioning for larger datasets
2. **Enhance AI**: Fine-tune prompts for better SQL generation
3. **Add Caching**: Implement query result caching for repeated queries
4. **Build Applications**: Create Streamlit or web apps using this infrastructure
5. **Scale Up**: Test with larger datasets and concurrent users

### Resources:

- [Amazon Bedrock Documentation](https://docs.aws.amazon.com/bedrock/)
- [AWS Data Processing MCP Server](https://github.com/awslabs/aws-dataprocessing-mcp-server)
- [Model Context Protocol](https://modelcontextprotocol.io/)
- [Repository](https://github.com/amitkalawat/data-agents-mcp-aws)