# Acoustic Shield - Training Data Pipeline

This notebook orchestrates the complete data pipeline:
1. Extract crash hotspots from GeoJSON
2. Enrich with weather data (Open-Meteo API)
3. Synthesize risk events
4. Build audio generation recipes
5. Run SageMaker Processing job to generate WAV files
6. Validate outputs

## Setup and Configuration

In [1]:
import sys
import json
import logging
from pathlib import Path

# Add parent directory to path
sys.path.insert(0, str(Path.cwd().parent))

import boto3
from sagemaker import Session
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput

from data_pipeline import (
    S3Client,
    HotspotExtractor,
    WeatherEnricher,
    RiskEventSynthesizer,
    RecipeBuilder
)

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

print("‚úì Imports complete")

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
‚úì Imports complete


In [2]:
# Configuration - No hard-coded regions!
RAW_BUCKET = 'acousticshield-raw'
ML_BUCKET = 'acousticshield-ml'
CRASH_FILE_KEY = 'crash_hotspots/sanjose_crashes.geojson'
SAGEMAKER_ROLE = 'role-sagemaker-processing'

# Processing parameters
TOP_N_HOTSPOTS = 100000
EVENTS_PER_HOTSPOT = 4

# Get region from bucket
s3_client = S3Client()
REGION = s3_client.get_bucket_region(RAW_BUCKET)

# Initialize SageMaker session and get role ARN
sagemaker_session = Session(boto_session=boto3.Session(region_name=REGION))
try:
    # Try to get role ARN from SageMaker
    role = sagemaker_session.get_execution_role()
except:
    # If not in SageMaker environment, construct role ARN
    account_id = boto3.client('sts', region_name=REGION).get_caller_identity()['Account']
    role = f'arn:aws:iam::{account_id}:role/{SAGEMAKER_ROLE}'

print(f"Configuration:")
print(f"  Raw bucket: {RAW_BUCKET}")
print(f"  ML bucket: {ML_BUCKET}")
print(f"  Region: {REGION}")
print(f"  SageMaker role: {role}")
print(f"  Crash file: s3://{RAW_BUCKET}/{CRASH_FILE_KEY}")
print(f"  Top hotspots: {TOP_N_HOTSPOTS}")
print(f"  Events per hotspot: {EVENTS_PER_HOTSPOT}")

Configuration:
  Raw bucket: acousticshield-raw
  ML bucket: acousticshield-ml
  Region: us-east-1
  SageMaker role: arn:aws:iam::764040442724:role/role-sagemaker-processing
  Crash file: s3://acousticshield-raw/crash_hotspots/sanjose_crashes.geojson
  Top hotspots: 100000
  Events per hotspot: 4


## Step 1: Extract Crash Hotspots

In [3]:
# Load crash data from S3
logger.info(f"Loading crash data from s3://{RAW_BUCKET}/{CRASH_FILE_KEY}")
crash_data = s3_client.read_json(RAW_BUCKET, CRASH_FILE_KEY)

2025-10-26 02:27:14,357 - __main__ - INFO - Loading crash data from s3://acousticshield-raw/crash_hotspots/sanjose_crashes.geojson


In [4]:
# Extract hotspots
extractor = HotspotExtractor(crash_data)
hotspots = extractor.extract_top_hotspots(top_n=TOP_N_HOTSPOTS)

# Get summary stats
stats = extractor.get_summary_stats()
print(f"\nüìä Crash Data Summary:")
print(f"  Total crashes: {stats['total_crashes']}")
print(f"  Total injuries: {stats['total_injuries']}")
print(f"  Total fatalities: {stats['total_fatalities']}")
print(f"  Speeding involved: {stats['speeding_involved_pct']:.1f}%")

print(f"\nüéØ Top 5 Hotspots:")
for hotspot in hotspots[:5]:
    print(f"  {hotspot['rank']}. {hotspot['location_name']}: {hotspot['crash_count']} crashes")
print(len(hotspots))


2025-10-26 02:27:48,841 - data_pipeline.hotspot_extractor - INFO - Loaded 601960 crash records
2025-10-26 02:27:50,756 - data_pipeline.hotspot_extractor - INFO - Extracted 11855 hotspots with enhanced metadata



üìä Crash Data Summary:
  Total crashes: 601960
  Total injuries: 243924
  Total fatalities: 2342
  Speeding involved: 3.9%

üéØ Top 5 Hotspots:
  1. KING RD & STORY RD: 2861 crashes
  2. CAPITOL EX & SENTER RD: 2807 crashes
  3. CAPITOL EX & STORY RD: 2781 crashes
  4. CAPITOL AV & MCKEE RD: 2493 crashes
  5. BLOSSOM HILL RD & SNELL AV: 2410 crashes
11855


## Step 2: Enrich with Weather Data

In [None]:
# Enrich hotspots with weather data from Open-Meteo API
logger.info("Fetching weather data for hotspots...")
enricher = WeatherEnricher()
enriched_hotspots = enricher.enrich_hotspots(hotspots, rate_limit_delay=0.5)

# Show sample enriched data
print(f"\nüå§Ô∏è  Sample Enriched Hotspot:")
sample = enriched_hotspots[0]
print(f"  Location: {sample['location_name']}")
print(f"  Crashes: {sample['crash_count']}")
print(f"  Weather:")
weather = sample['weather']
print(f"    Temperature: {weather['temperature_c']:.1f}¬∞C")
print(f"    Rain: {weather['rain_mm']:.1f}mm")
print(f"    Wind: {weather['wind_speed_kmh']:.1f} km/h")
print(f"    Risk: {enricher.categorize_weather_risk(weather)}")

2025-10-26 02:27:51,175 - __main__ - INFO - Fetching weather data for hotspots...
2025-10-26 02:27:51,190 - data_pipeline.weather_enricher - INFO - Fetching weather for 10162 unique coordinates (from 11855 hotspots) with 16 workers...


## Step 3: Synthesize Risk Events

In [None]:
# Generate synthetic risk events
logger.info("Synthesizing risk events...")
synthesizer = RiskEventSynthesizer(seed=42)
risk_events = synthesizer.synthesize_events(enriched_hotspots, events_per_hotspot=EVENTS_PER_HOTSPOT)

# Get distribution
distribution = synthesizer.get_event_distribution(risk_events)
print(f"\n‚ö†Ô∏è  Risk Event Distribution:")
print(f"  Total events: {distribution['total_events']}")
print(f"  By risk type:")
for risk_type, count in distribution['risk_type_distribution'].items():
    print(f"    {risk_type}: {count}")
print(f"  By weather risk:")
for weather_risk, count in distribution['weather_risk_distribution'].items():
    print(f"    {weather_risk}: {count}")

# Show sample event
print(f"\nüìã Sample Risk Event:")
sample_event = risk_events[0]
print(json.dumps(sample_event, indent=2))

## Step 4: Build Audio Recipes

In [None]:
# Build audio generation recipes
logger.info("Building audio recipes...")
builder = RecipeBuilder()
recipes = builder.build_recipes(risk_events)

# Get summary
summary = builder.get_recipe_summary(recipes)
print(f"\nüéµ Audio Recipe Summary:")
print(f"  Total recipes: {summary['total_recipes']}")
print(f"  Total audio duration: {summary['total_audio_duration_minutes']:.2f} minutes")
print(f"  By risk type:")
for risk_type, count in summary['risk_type_distribution'].items():
    print(f"    {risk_type}: {count} recipes")

# Show sample recipe
print(f"\nüéº Sample Recipe:")
sample_recipe = recipes[0]
print(json.dumps(sample_recipe, indent=2))

## Step 5: Save Intermediate Data to S3

In [None]:
# Save risk events to S3
logger.info("Saving risk events to S3...")
risk_events_key = 'risk_events/risk_events.json'
s3_path = s3_client.write_json(risk_events, RAW_BUCKET, risk_events_key)
print(f"‚úì Risk events saved to: {s3_path}")

# Save individual recipe files to S3 (required for processing job)
logger.info("Saving individual recipe files to S3...")
s3 = boto3.client('s3', region_name=REGION)
recipe_count = 0
for recipe in recipes:
    recipe_id = recipe.get('recipe_id', f'recipe_{recipe_count:05d}')
    recipe_key = f'recipes/train/{recipe_id}.json'
    s3.put_object(
        Bucket=ML_BUCKET,
        Key=recipe_key,
        Body=json.dumps(recipe, indent=2),
        ContentType='application/json'
    )
    recipe_count += 1

print(f"‚úì {recipe_count} recipe files saved to: s3://{ML_BUCKET}/recipes/train/")

## Step 6: Generate AI-Enhanced Audio WAV Files

In [None]:
# Generate AI-Enhanced Audio WAV Files
print("\n" + "="*70)
print("üéµ STEP 6: GENERATE AI-ENHANCED AUDIO WAV FILES")
print("="*70)
print("Using Bedrock AI to optimize audio parameters for realistic sound")

try:
    from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
    
    # Create processor
    processor = ScriptProcessor(
        role=role,
        image_uri=f'764040442724.dkr.ecr.us-east-1.amazonaws.com/cpu-pytorch-boto-update:custom-reqs-20251026-0044',
        command=['python3'],
        instance_count=25,
        instance_type="ml.c5.2xlarge",
        volume_size_in_gb=200,
        max_runtime_in_seconds=36000
        base_job_name='acousticshield-audio-gen',
        sagemaker_session=sagemaker_session
    )
    
    # Run processing job
    processor.run(
        inputs=[
            ProcessingInput(
                source=f's3://{ML_BUCKET}/recipes/train/',
                destination='/opt/ml/processing/input'
            )
        ],
        outputs=[
            ProcessingOutput(
                output_name='audio_wav',
                source='/opt/ml/processing/output',
                destination=f's3://{ML_BUCKET}/train/'
            )
        ],
        code='../processing/bedrock_audio_generator.py',
        arguments=['--region', REGION],  # AI enabled by default
        wait=True,
        logs=True
    )
    
    print("‚úÖ AI-enhanced WAV file generation complete!")
    print("üí° Audio parameters optimized by Bedrock Claude AI")
    
except Exception as e:
    print(f"‚ùå Error: {e}")
    import traceback
    traceback.print_exc()

## Step 7: Validate Outputs

In [None]:
# List generated WAV files
s3 = boto3.client('s3', region_name=REGION)
wav_files = []

response = s3.list_objects_v2(Bucket=ML_BUCKET, Prefix='train/')
if 'Contents' in response:
    wav_files = [obj['Key'] for obj in response['Contents'] if obj['Key'].endswith('.wav')]

print(f"\nüéµ Generated WAV Files: {len(wav_files)}")
if wav_files:
    print(f"First 10 files:")
    for f in wav_files[:10]:
        print(f"  {f}")

if len(wav_files) > 10:
    print(f"    ... and {len(wav_files) - 10} more")

In [None]:
# Count WAV files by risk type
risk_type_counts = {'normal': 0, 'tireskid': 0, 'emergencybraking': 0, 'collisionimminent': 0}

for wav_file in wav_files:
    filename = wav_file.lower()
    for risk_type in risk_type_counts.keys():
        if risk_type in filename:
            risk_type_counts[risk_type] += 1
            break

print(f"\nüìä WAV Files by Risk Type:")
for risk_type, count in risk_type_counts.items():
    print(f"  {risk_type.title()}: {count} files")

# Show sample file info
if wav_files:
    print(f"\nüìã Sample WAV File:")
    sample_key = wav_files[0]
    obj = s3.head_object(Bucket=ML_BUCKET, Key=sample_key)
    print(f"  File: {sample_key}")
    print(f"  Size: {obj['ContentLength']:,} bytes")
    print(f"  Duration: ~5 seconds @ 22.05kHz")
    print(f"  Format: 16-bit PCM WAV")
    print(f"\n‚úÖ Files are ready for ML training!")

## Summary

In [None]:
print("\n" + "="*70)
print("üéâ ACOUSTIC SHIELD DATA PIPELINE COMPLETE")
print("="*70)
print(f"\nüìç Crash Hotspots Analyzed: {len(hotspots)}")
print(f"‚ö†Ô∏è  Risk Events Generated: {len(risk_events)}")
print(f"üéµ Audio Recipes Created: {len(recipes)}")
print(f"ü§ñ AI-Enhanced WAV Files: {len(wav_files)}")
print(f"\nüíæ Data Locations:")
print(f"  Risk Events: s3://{RAW_BUCKET}/{risk_events_key}")
# print(f"  Recipes: s3://{RAW_BUCKET}/{recipes_key}")
print(f"  Audio Files: s3://{ML_BUCKET}/train/")
print(f"\n‚úÖ Ready for ML model training!")
print(f"üß† All audio parameters optimized by Bedrock AI")
print(f"üéØ Each risk type has distinct audio characteristics")
print("="*70)