# Download S3 Logs to DataFrame

This notebook downloads all JSON log files from the `staging-eks-otel-logging` S3 bucket and loads them into a pandas DataFrame.

The files are organized in a partitioned structure:
```
logs/
  year=2025/
    month=12/
      day=04/
        hour=15/
          minute=21/
            logs_*.json
```

## Setup

Before running this notebook, ensure you have the required dependencies installed:

```bash
uv sync --group dev
```

Also ensure your AWS credentials are configured (via environment variables, AWS credentials file, or IAM role).

In [1]:
import json
from typing import Any

import boto3
import pandas as pd

# Configure AWS client
# Credentials can be set via:
# - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables
# - AWS credentials file (~/.aws/credentials)
# - IAM role (if running on EC2/ECS)
s3_client = boto3.client('s3')

BUCKET_NAME = 'staging-eks-otel-logging'
PREFIX = 'logs/'

## List all JSON files in the bucket

In [2]:
def list_all_json_files(bucket: str, prefix: str) -> list[str]:
    """List all JSON files in the S3 bucket with the given prefix."""
    json_files = []
    paginator = s3_client.get_paginator('list_objects_v2')

    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        if 'Contents' in page:
            for obj in page['Contents']:
                key = obj['Key']
                if key.endswith('.json'):
                    json_files.append(key)

    return json_files

# List all JSON files
print("Listing all JSON files in bucket...")
all_files = list_all_json_files(BUCKET_NAME, PREFIX)
print(f"Found {len(all_files)} JSON files")
print("Sample files (first 5):")
for f in all_files[:5]:
    print(f"  - {f}")

Listing all JSON files in bucket...
Found 34 JSON files
Sample files (first 5):
  - logs/year=2025/month=12/day=03/hour=18/minute=15/logs_217695606.json
  - logs/year=2025/month=12/day=03/hour=18/minute=15/logs_364109350.json
  - logs/year=2025/month=12/day=03/hour=18/minute=15/logs_480615163.json
  - logs/year=2025/month=12/day=03/hour=18/minute=15/logs_481319329.json
  - logs/year=2025/month=12/day=03/hour=18/minute=19/logs_107713162.json


## Download and parse JSON files

In [10]:
def download_and_parse_json(bucket: str, key: str) -> list[dict[str, Any]]:  # noqa: C901
    """Download a JSON file from S3 and parse it, extracting individual logRecords.

    Returns a list of log record dictionaries. The function looks for:
    - 'resourceLogs' array (OpenTelemetry format)
    - 'logRecords' array (AWS CloudWatch format)
    - Direct array of log entries
    - Single log entry object

    Each log record will have metadata added (S3 key, bucket, partition info).
    """
    try:
        response = s3_client.get_object(Bucket=bucket, Key=key)
        content = response['Body'].read().decode('utf-8')
        parsed = json.loads(content)

        # Extract partition information from the key path
        # Format: logs/year=2025/month=12/day=04/hour=15/minute=21/logs_*.json
        partition_info = {}
        parts = key.split('/')
        for part in parts:
            if '=' in part:
                key_val = part.split('=')
                if len(key_val) == 2:
                    partition_key = key_val[0]
                    partition_value = key_val[1]
                    partition_info[f'_partition_{partition_key}'] = partition_value

        # Extract log records from various possible structures
        log_records = []

        if isinstance(parsed, list):
            # If the root is an array, treat each element as a log record
            log_records = parsed
        elif isinstance(parsed, dict):
            # Check for common log record array fields
            if 'resourceLogs' in parsed:
                # OpenTelemetry format: extract from resourceLogs array
                for resource_log in parsed.get('resourceLogs', []):
                    # Extract scopeLogs which contain the actual log records
                    for scope_log in resource_log.get('scopeLogs', []):
                        log_records.extend(scope_log.get('logRecords', []))
            elif 'logRecords' in parsed:
                # AWS CloudWatch format
                log_records = parsed['logRecords']
            elif 'logs' in parsed:
                # Alternative format
                log_records = parsed['logs']
            else:
                # If no recognized structure, treat the whole object as a single log record
                log_records = [parsed]

        # Add metadata to each log record
        result = []
        for record in log_records:
            if isinstance(record, dict):
                # Create a copy to avoid modifying the original
                enriched_record = record.copy()

                # Add S3 metadata
                enriched_record['_s3_key'] = key
                enriched_record['_s3_bucket'] = bucket

                # Add partition information
                enriched_record.update(partition_info)

                result.append(enriched_record)

        return result
    except Exception as e:
        print(f"Error downloading {key}: {e}")
        return []

# Download and parse all files
print(f"Downloading and parsing {len(all_files)} files...")
all_data = []

for i, file_key in enumerate(all_files, 1):
    if i % 100 == 0:
        print(f"Processing file {i}/{len(all_files)}...")

    entries = download_and_parse_json(BUCKET_NAME, file_key)
    all_data.extend(entries)

print(f"Successfully downloaded {len(all_files)} files")
print(f"Total log entries: {len(all_data)}")

Downloading and parsing 34 files...
Successfully downloaded 34 files
Total log entries: 47


## Create DataFrame

In [11]:
# Convert to DataFrame
# If the JSON files contain nested structures, pandas will handle them appropriately
df = pd.DataFrame(all_data)

print(f"DataFrame shape: {df.shape}")
print(f"\nColumns: {list(df.columns)}")
print("\nFirst few rows:")
df.head()

DataFrame shape: (47, 13)

Columns: ['timeUnixNano', 'observedTimeUnixNano', 'severityNumber', 'severityText', 'body', '_s3_key', '_s3_bucket', '_partition_year', '_partition_month', '_partition_day', '_partition_hour', '_partition_minute', 'attributes']

First few rows:


Unnamed: 0,timeUnixNano,observedTimeUnixNano,severityNumber,severityText,body,_s3_key,_s3_bucket,_partition_year,_partition_month,_partition_day,_partition_hour,_partition_minute,attributes
0,1717016223073301200,1717016223073302800,9,INFO,{'stringValue': '[NashonServer nodeId=1] Nasho...,logs/year=2025/month=12/day=03/hour=18/minute=...,staging-eks-otel-logging,2025,12,3,18,15,
1,1717016223073301200,1717016223073302800,9,INFO,{'stringValue': '[NashonServer nodeId=1] Nasho...,logs/year=2025/month=12/day=03/hour=18/minute=...,staging-eks-otel-logging,2025,12,3,18,15,
2,1717016223073301200,1717016223073302800,9,INFO,{'stringValue': '[NashonServer nodeId=1] Nasho...,logs/year=2025/month=12/day=03/hour=18/minute=...,staging-eks-otel-logging,2025,12,3,18,15,
3,1717016223073301200,1717016223073302800,9,INFO,{'stringValue': '[NashonServer nodeId=1] Nasho...,logs/year=2025/month=12/day=03/hour=18/minute=...,staging-eks-otel-logging,2025,12,3,18,15,
4,1717016223073301200,1717016223073302800,9,INFO,{'stringValue': '[NashonServer nodeId=1] Nasho...,logs/year=2025/month=12/day=03/hour=18/minute=...,staging-eks-otel-logging,2025,12,3,18,15,


## Extract body and attributes from log records

In [12]:
def extract_body(record: dict[str, Any]) -> str | None:
    """Extract the body/content from a log record."""
    if 'body' in record:
        body = record['body']
        # Handle different body formats
        if isinstance(body, dict):
            # OpenTelemetry format: body can be {'stringValue': '...'} or {'bytesValue': '...'}
            if 'stringValue' in body:
                return body['stringValue']
            elif 'bytesValue' in body:
                return body['bytesValue']
            else:
                return str(body)
        else:
            return str(body)
    return None

def extract_attributes(record: dict[str, Any]) -> dict[str, Any]:  # noqa: C901
    """Extract attributes from a log record and convert to flat dictionary."""
    attrs = {}

    if 'attributes' in record:
        attributes = record['attributes']

        # Handle array format: [{'key': '...', 'value': {...}}]
        if isinstance(attributes, list):
            for attr in attributes:
                if isinstance(attr, dict) and 'key' in attr:
                    key = attr['key']
                    value = attr.get('value', {})

                    # Extract value from OpenTelemetry format
                    if isinstance(value, dict):
                        # Check for common value types
                        if 'stringValue' in value:
                            attrs[key] = value['stringValue']
                        elif 'intValue' in value:
                            attrs[key] = value['intValue']
                        elif 'doubleValue' in value:
                            attrs[key] = value['doubleValue']
                        elif 'boolValue' in value:
                            attrs[key] = value['boolValue']
                        elif 'bytesValue' in value:
                            attrs[key] = value['bytesValue']
                        else:
                            attrs[key] = str(value)
                    else:
                        attrs[key] = value
        # Handle dictionary format
        elif isinstance(attributes, dict):
            attrs = attributes.copy()

    return attrs

# Extract body and attributes for each log record
print("Extracting body and attributes from log records...")

df['log_body'] = df.apply(lambda row: extract_body(row.to_dict()), axis=1)

# Extract attributes and create separate columns
attributes_list = df.apply(lambda row: extract_attributes(row.to_dict()), axis=1)

# Convert attributes dictionaries to DataFrame and merge
if len(attributes_list) > 0:
    attrs_df = pd.DataFrame(list(attributes_list))
    # Prefix attribute columns to avoid conflicts
    attrs_df.columns = [f'attr_{col}' for col in attrs_df.columns]

    # Merge attributes back to main dataframe
    df = pd.concat([df, attrs_df], axis=1)

print(f"DataFrame shape after extraction: {df.shape}")
new_cols = [col for col in df.columns if col.startswith('attr_') or col == 'log_body']
print(f"\nNew columns added: {new_cols}")
print("\nSample log bodies:")
df[['log_body'] + [col for col in df.columns if col.startswith('attr_')]][:5]

Extracting body and attributes from log records...
DataFrame shape after extraction: (47, 29)

New columns added: ['log_body', 'attr_mcp.tool.name', 'attr_timestamp', 'attr_query.param.limit', 'attr_query.param.query_type', 'attr_response.success', 'attr_response.size_bytes', 'attr_response.result_count', 'attr_response.top_score', 'attr_response.query_time_ms', 'attr_response.total_results', 'attr_query.full_text', 'attr_response.results_json', 'attr_response.chunk_retrieved', 'attr_response.content_length', 'attr_response.chunk_json']

Sample log bodies:


Unnamed: 0,log_body,attr_mcp.tool.name,attr_timestamp,attr_query.param.limit,attr_query.param.query_type,attr_response.success,attr_response.size_bytes,attr_response.result_count,attr_response.top_score,attr_response.query_time_ms,attr_response.total_results,attr_query.full_text,attr_response.results_json,attr_response.chunk_retrieved,attr_response.content_length,attr_response.chunk_json
0,[NashonServer nodeId=1] Nashon Server started,,,,,,,,,,,,,,,
1,[NashonServer nodeId=1] Nashon Server started,,,,,,,,,,,,,,,
2,[NashonServer nodeId=1] Nashon Server started,,,,,,,,,,,,,,,
3,[NashonServer nodeId=1] Nashon Server started,,,,,,,,,,,,,,,
4,[NashonServer nodeId=1] Nashon Server started,,,,,,,,,,,,,,,


## Explore the data

In [13]:
# Basic info about the dataframe
print("DataFrame info:")
df.info()

print("\n\nDataFrame description:")
df.describe()

DataFrame info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 47 entries, 0 to 46
Data columns (total 29 columns):
 #   Column                         Non-Null Count  Dtype  
---  ------                         --------------  -----  
 0   timeUnixNano                   47 non-null     object 
 1   observedTimeUnixNano           47 non-null     object 
 2   severityNumber                 47 non-null     int64  
 3   severityText                   8 non-null      object 
 4   body                           47 non-null     object 
 5   _s3_key                        47 non-null     object 
 6   _s3_bucket                     47 non-null     object 
 7   _partition_year                47 non-null     object 
 8   _partition_month               47 non-null     object 
 9   _partition_day                 47 non-null     object 
 10  _partition_hour                47 non-null     object 
 11  _partition_minute              47 non-null     object 
 12  attributes                     39 no

Unnamed: 0,severityNumber,attr_response.top_score,attr_response.query_time_ms
count,47.0,32.0,32.0
mean,9.0,0.636363,234.098785
std,0.0,0.115774,367.593745
min,9.0,0.032266,6.13451
25%,9.0,0.62668,8.340955
50%,9.0,0.657086,96.402764
75%,9.0,0.685447,258.672476
max,9.0,0.711202,1732.42569


## Optional: Save to local file

Uncomment the cell below to save the dataframe to a parquet or CSV file.

In [14]:
# Uncomment to save the dataframe
# df.to_parquet('s3_logs.parquet', index=False)
# df.to_csv('s3_logs.csv', index=False)