### AI Agents and Big Data Analytics - Better Together
Up to 80% of the data that enterprises generate is unstructured and semi-structured data. Your AI Agents will provide richer insights by querying TBs of data stored in S3 data lake.<br><br>
Do AI developers need to learn data engineering skills such as distributed spark processing, metadata catalogs, complex distributed SQL queries, NoSQL queries etc to query TBs and PBs of data in data lakes?<br><br>
Fortunately, the answer is no. You just need to know the basics. You can use Strands Agents, built-in strands tools such as use_aws, and MCP server such as AWS Data Processing MCP Server to accomplish what most data engineers can, while writing few lines of code.<br><br>
This notebook will help you bridge the gap between a data engineer who knows how to use AWS Glue, Amazon Athena with an AI engineer who knows how to use Strands SDK, Amazon Bedrock. You will be able to do sophisticated analytics with natural language questions.<br>


### GOAL - Perform big data analytics using AI Agents without writing big data analytics code. <br>
The AI Agent will use built-in strands tools and MCP server to get the job done. 

### In this Notebook
You will learn how to use AI Agents to discover metadata of big-data stored in parquet files using Agent tools and MCP server that in turn uses AWS Glue and Amazon Athena.<br><br>
We will use NYC Taxi and Limousine Commission (TLC) data

### Environment Setup
Install required dependencies for the notebook including Strands SDK, AWS SDK, and MCP client libraries.

In [None]:
# Install all required packages from requirements.txt
!pip install -r requirements.txt --quiet --upgrade

In [None]:
# Lab configurations

# CloudFormation Stack Name - change it as needed
STACK_NAME = "AwsLabBigDataAgentStack"

# Region
import boto3
region = boto3.Session().region_name

# Setup the logging
import logging

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    datefmt='%m/%d %H:%M:%S',
    filename='strands_debug.log'
)


## Data Pipeline using Strands Agent
### Overview

This agent downloads data from a URL and uploads it to an S3 bucket using dynamic partitioning.
### Capabilities

    Downloads data from specified URLs
    Validates downloaded content
    Uploads data to S3 with dynamic partitioning
    Handles error scenarios gracefully
    Provides logging and status updates

### Prerequisites

    AWS credentials configured
    Required Python packages:
        strands
        boto3

### Use Cloudformation template to create S3 bucket

In [None]:
%%time
# This cell will deploy AWS resources including S3 bucket and IAM roles. It requires about 3 mins to complete.
from utils.deploy_cfn import deploy_infrastructure
deploy_infrastructure(STACK_NAME)

### Import Dependencies and AWS Configuration
Import required libraries and configure AWS settings for the data processing workflow.

In [None]:
# Import required libraries
import os, time, boto3, json
from strands import Agent, tool
from strands.models import BedrockModel
from strands_tools import use_aws, file_write, file_read, file_write, sleep, python_repl
from datetime import datetime
from pprint import pprint
from pydantic import BaseModel, Field
from typing import Optional, List

# Bypass tool consent for automated execution
os.environ["BYPASS_TOOL_CONSENT"] = "true"
# Specify that if python_repl tool is used, it shouldnt wait for user interaction
os.environ["PYTHON_REPL_INTERACTIVE"] = "false"

model_list = ['deepseek.v3-v1:0', 'qwen.qwen3-coder-30b-a3b-v1:0', 'us.anthropic.claude-3-7-sonnet-20250219-v1:0', 'us.anthropic.claude-sonnet-4-20250514-v1:0', 'openai.gpt-oss-20b-1:0', 'openai.gpt-oss-120b-1:0']

# We will use the following model in Strands Agent
model_id = "us.anthropic.claude-3-7-sonnet-20250219-v1:0"

### Data ingestion using Strands Agents

Build a strands agents to download data from a URL and then upload that data into S3 bucket with data partitioning enabled. 

In [None]:
# Create a data ingestion agent.

data_upload_agent = Agent(
    model=BedrockModel(model_id=model_id, temperature=0.3),
    system_prompt=f"""You are an expert data engineer specializing in AWS data pipeline operations.

TASK OVERVIEW:
Download files from provided URLs and upload them to an S3 bucket with proper partitioning.

SPECIFIC REQUIREMENTS:
1. Download files from the given URLs to local storage in download folder
2. Extract S3 bucket name and other resources from stack outputs (created by CloudFormation stack: {STACK_NAME}) in {region} region
3. Verify the S3 bucket exists 
3. Upload files to the S3 bucket with appropriate partitioning structure. 
4. Use table name: data

EXECUTION GUIDELINES:
- Check S3 bucket availability before attempting uploads
- Implement proper error handling for downloads and uploads
- Use efficient partitioning strategy (e.g., by year/month/day if date fields exist)
- Verify file integrity after download
- Provide clear status updates on progress
- Handle large files appropriately to avoid memory issues
- Execute tasks sequentially without user intervention

TOOLS AVAILABLE:
- use_aws: For AWS operations
- python_repl: For data processing and file operations (ALWAYS use interactive=False)

Complete the task systematically in non-interactive mode and report final results.""",
    tools=[use_aws, python_repl]
)


# File URLs
file_urls = [
    "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-05.parquet",
    "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-06.parquet", 
    "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2025-05.parquet",
    "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2025-06.parquet"
]

# Direct, conversational request
data_upload_request = f"""
Hi! I need you to help me with a data upload task.

Here's what I need:
1. Get the S3 bucket name from CloudFormation stack "{STACK_NAME}"
2. Download these 4 files (only if they don't already exist locally)

{file_urls}

3. Upload them to S3 with this partition structure:
   - Extract taxi type, year, month from filename
   - Use S3 key: data/taxi_class=[type]/year=[year]/month=[month]/[filename]
"""

print(f'Starting data download')

response = data_upload_agent(data_upload_request)



The first step is to discover the metadata and catalog it. <br><br>
AWS Glue can help crawl the data, extract the metadata, and create a catalog using AWS Glue Catalog.<br><br>
Let's NOT write code to create AWS Glue jobs. Instead, let's write a SYSTEM PROMPT that will instruct an AI Agent to do this job for us.<br>

### Catalog Data in S3 as Glue Catalog Database and Tables
Create a Strands Agent that uses use_aws strands tool to create AWS Glue crawlers and catalog the S3 data automatically.

In [None]:
from utils_big_data import load_system_prompt_from_file
# Let's load the system prompt from file
crawl_task = load_system_prompt_from_file("crawl_task_prompt.txt", stack_name = STACK_NAME)
crawl_task

In [None]:
# Initialize Claude 3.7 Sonnet model via Bedrock
model = BedrockModel(model_id=model_id, temperature=0.1)  # Low temperature for consistent structured output

# Create the Strands Agent. 
# use_aws tool will create a glue crawler and create glue catalog db and table schema
# sleep tool will help the agent wait until the glue crwler job is finished.
# This task might take a few minutes. You can go to AWS Console and see the Glue Crawler jobs that are being created.
crawl_agent = Agent(model=model, tools=[use_aws, sleep])

crawl_response = crawl_agent(crawl_task)
print(crawl_response)

In [None]:
from utils_big_data import print_tokens_costs

# Let's print the token costs
print_tokens_costs(crawl_response)


## Extract Glue Database and Tables Names
Parse the agent response to extract the Glue database name and Table names for use in subsequent queries.

In [None]:
# Strands Agenst allows you to extract information in a structured dictionary output.

# Define the structured template
class GlueDbTableInfo(BaseModel):
    catalog_db_name: str = Field(description="Name of the Glue Catalog Database")
    catalog_table_names: List[str] = Field(description="List of glue table names")

# Use the structured_output method
glue_db_table_info = crawl_agent.structured_output(GlueDbTableInfo, f"Extract Glue Catalog Database name and the list of glue table names")
print(glue_db_table_info)
catalog_db_name = glue_db_table_info.catalog_db_name
catalog_table_names = glue_db_table_info.catalog_table_names

AWS Labs has created many MCP servers for AI Agent developers to consume. One of them is AWS data processing MCP server: https://awslabs.github.io/mcp/servers/aws-dataprocessing-mcp-server <br><br>
This MCP server exposes AI Agent tools to do operations on big data using AWS Glue, Amazon EMR, and Amazon Athena.<br><br>
The best part is that we don't have to know the intricacies of these services other than high level basics.<br><br>
Let's create an MCP client for this MCP server.

## MCP Client Setup
We wll get tools exposed by an MCP server to discover partition columns and keys from the data in S3.<br><br>
Initialize the AWS Data Processing MCP server client to provide AI agents with AWS Glue, EMR, and Athena capabilities.

In [None]:
# Import MCP client libraries
from mcp import stdio_client, StdioServerParameters
from strands.tools.mcp import MCPClient
import boto3
session = boto3.Session()
credentials = session.get_credentials()

# Create MCP client for AWS data processing server
# This provides tools for Glue, EMR, and Athena operations
data_mcp_client = MCPClient(lambda: stdio_client(
    StdioServerParameters(
        command="uvx",  # Use uvx to run the MCP server
        args= [
            "awslabs.aws-dataprocessing-mcp-server@latest",
            "--allow-write",  # Enable write operations
        ],
        env= {
            "AWS_ACCESS_KEY_ID": credentials.access_key,
            "AWS_SECRET_ACCESS_KEY": credentials.secret_key,
            "AWS_SESSION_TOKEN": credentials.token,
            "FASTMCP_LOG_LEVEL": "ERROR",  # Minimize logging noise
            "AWS_REGION": session.region_name      # Set AWS region
      }
    )
))


In [None]:
import boto3

sts_client = boto3.client('sts')
response = sts_client.get_caller_identity()

arn = response['Arn']
# Example: arn:aws:sts::123456789012:assumed-role/MyRole/MySession
if "assumed-role" in arn:
    role_name = arn.split('/')[-2]
    print(f"IAM Role Name: {role_name}")
else:
    print("Not an assumed role session.")

## Print the tools offered by the MCP Server
This will help you understand what the tools do. This is extracted by looking at doc strings or tool spec of the tools.<br><br>
Agents send the doc string or the tool spec of the tools to the LLM along with a task.<br>
This helps LLM to reason and decide which tool to use for which task.

In [None]:
# Connect to MCP client and execute task
import json

tools_char_count = 0
tool_count = 0
with data_mcp_client:
    # Get available tools from the MCP server
    data_tools = data_mcp_client.list_tools_sync()
    # first_tool = data_tools[0]
    # print(dir(first_tool))    
    # Iterate through each tool
    for tool in data_tools:
        print(f"Tool: {tool.tool_name}")
        if hasattr(tool, 'tool_spec'):            
            print(f"Tool: {tool.tool_spec['description']}")
            # print(json.dumps(tool.tool_spec, indent=2)) # uncomment this to see function parameters and what they mean   
            tools_char_count += len(json.dumps(tool.tool_spec))
            tool_count += 1
        print("-" * 50)
print(f"The number of characters in the spec of all the {tool_count} tools = {tools_char_count}")

### Understand the content that the tables have and what the columns mean. 
Store this in a file and pass it to the agent. This will help agent construct SQL queries properly.


In [None]:
# Get the definition of the table and columns and store it in a file.
table_def_system_prompt = f"""
You are an expert AWS data analyst assistant specializing in querying data stored in S3 data lakes using AWS Glue and Amazon Athena.

## IMPORTANT: 
- For any long running jobs, sleep for 20 seconds and check status until the job is finished. Do not give up until the job actually finishes.
"""
query_in = f"""Query the database {catalog_db_name}, identify the tables in it, identify the columns and their types. Gather more business context of what the database, tables, and the columns are storing by sampling a few rows of data. Then store the database name, table names, their purpose; and column name and its type and purpose in a json file named metadata.json."""

column_def_response = ""
# Get the data processing tools from MCP server
with data_mcp_client:
    data_tools = data_mcp_client.list_tools_sync()
    curated_data_tools = ['manage_aws_athena_query_executions', 'manage_aws_glue_tables']

    # Extract just the tools that we need.
    filtered_tools = [tool for tool in data_tools if tool.tool_name in curated_data_tools]
    filtered_tools += [file_write, file_read, sleep]

    # Pass the system prompt, the LLM we use with bedrock, and all the tools to the agent
    column_def = Agent(system_prompt = table_def_system_prompt, model=model, tools=filtered_tools)
    # Invoke the agent with each of the query
    column_def_response = column_def(query_in)