# Disaster AWS COG Conversion Template

This template provides a comprehensive workflow for converting satellite imagery to Cloud Optimized GeoTIFFs (COGs) with:
- **Modular architecture** with single-responsibility functions
- **Automatic error handling** and recovery
- **Memory-efficient processing** for large files
- **S3 streaming and caching** capabilities

## Key Features
- ✅ Handles files from <1GB to >10GB
- ✅ Prevents striping issues with fixed chunk processing
- ✅ Automatic S3 existence checking
- ✅ ZSTD compression with optimal predictors
- ✅ Comprehensive error tracking

---

## 📋 CONFIGURATION CELL - MODIFY PARAMETERS HERE

**This is the only cell you need to modify for different events**

In [None]:
# ========================================
# MAIN CONFIGURATION - MODIFY THESE VALUES
# ========================================

# Event Configuration
EVENT_NAME = '202504_SevereWx_US'  # Event identifier
PRODUCT_NAME = 'sentinel2'          # Product type (sentinel1, sentinel2, landsat, etc.)

# S3 Configuration
BUCKET = 'nasa-disasters'                                # S3 bucket name
DIR_OLD_BASE = 'drcs_activations'                       # Source directory base
DIR_NEW_BASE = 'drcs_activations_new'                   # Destination directory base
PATH_OLD = f'{DIR_OLD_BASE}/{EVENT_NAME}/{PRODUCT_NAME}' # Full source path

# Processing Configuration
PROCESS_NDVI = True    # Process NDVI files
PROCESS_MNDWI = True   # Process MNDWI files
PROCESS_RGB = True     # Process RGB/True Color files

# File Size Thresholds (in GB)
LARGE_FILE_THRESHOLD = 3   # Files > 3GB use large file config
ULTRA_LARGE_THRESHOLD = 7  # Files > 7GB use ultra-large config

# Memory Configuration
MEMORY_LIMIT_MB = 500      # Memory limit per chunk
FORCE_FIXED_CHUNKS = True  # Use fixed chunks for large files (prevents striping)

# Output Configuration
SAVE_LOCAL = True          # Save files locally
SAVE_METADATA = True       # Save processing metadata
VERBOSE = True             # Verbose output

# Advanced Configuration (usually don't need to change)
USE_STREAMING = False      # Stream from S3 (set False if having issues)
CACHE_DOWNLOADS = True     # Cache downloaded files
MAX_RETRIES = 3           # Maximum retry attempts

print("✅ Configuration loaded successfully!")
print(f"Event: {EVENT_NAME}")
print(f"Source: s3://{BUCKET}/{PATH_OLD}")
print(f"Destination: s3://{BUCKET}/{DIR_NEW_BASE}/[product_type]")

## 📦 Import Required Modules

In [None]:
# Standard library imports
import os
import sys
import re
import gc
import tempfile
from datetime import datetime
from pathlib import Path

# Data processing
import pandas as pd
import numpy as np

# Geospatial libraries
import rasterio
from rasterio.warp import calculate_default_transform, reproject, Resampling
from rasterio.windows import Window

# AWS libraries
import boto3
from botocore.exceptions import ClientError, NoCredentialsError

# Progress tracking
from tqdm import tqdm

print("✅ Standard libraries imported")

# Add parent directory to path for module imports
module_path = Path('..').resolve()
if str(module_path) not in sys.path:
    sys.path.insert(0, str(module_path))

print(f"Module path: {module_path}")

In [None]:
# Import disaster-aws-conversion modules
try:
    # Core modules
    from core.s3_operations import (
        initialize_s3_client,
        check_s3_file_exists,
        list_s3_files,
        get_file_size_from_s3
    )
    from core.validation import validate_cog, check_cog_with_warnings
    from core.compression import get_predictor_for_dtype, export_cog_profile
    
    # Utils
    from utils.memory_management import get_memory_usage, monitor_memory
    from utils.file_naming import create_cog_filename, convert_date
    from utils.error_handling import cleanup_temp_files
    from utils.logging import print_status, print_summary
    
    # Processors
    from processors.batch_processor import process_file_batch, monitor_batch_progress
    
    # Configs
    from configs.profiles import select_profile_by_size
    from configs.chunk_configs import get_chunk_config
    
    # Main processor
    from main_processor import convert_to_cog
    
    print("✅ All disaster-aws-conversion modules imported successfully!")
    
except ImportError as e:
    print(f"⚠️ Import error: {e}")
    print("Make sure you're running from the disaster-aws-conversion directory")

## 🔌 Initialize AWS S3 Connection

In [None]:
# Initialize S3 client
s3_client, fs_read = initialize_s3_client(bucket_name=BUCKET, verbose=VERBOSE)

if s3_client:
    print("✅ S3 client ready for operations")
else:
    print("❌ Failed to initialize S3 client")
    print("Please check your AWS credentials")

## 🔍 Discover Files in S3

In [None]:
# List all TIF files in the source path
if s3_client:
    keys = list_s3_files(s3_client, BUCKET, PATH_OLD, suffix='.tif')
    print(f"✅ Found {len(keys)} .tif files in s3://{BUCKET}/{PATH_OLD}")
    
    # Show first 5 files as example
    if keys:
        print("\nExample files:")
        for key in keys[:5]:
            file_size = get_file_size_from_s3(s3_client, BUCKET, key)
            print(f"  - {os.path.basename(key)} ({file_size:.1f} GB)")
        if len(keys) > 5:
            print(f"  ... and {len(keys)-5} more files")
else:
    keys = []
    print("❌ No S3 client available")

In [None]:
# Filter files by type
ndvi_files = [f for f in keys if "NDVI" in f] if PROCESS_NDVI else []
mndwi_files = [f for f in keys if "MNDWI" in f] if PROCESS_MNDWI else []
rgb_files = [f for f in keys if "trueColor" in f or "RGB" in f] if PROCESS_RGB else []

print(f"Files to process:")
print(f"  NDVI: {len(ndvi_files)} files")
print(f"  MNDWI: {len(mndwi_files)} files")
print(f"  RGB: {len(rgb_files)} files")
print(f"  Total: {len(ndvi_files) + len(mndwi_files) + len(rgb_files)} files")

## 🔧 Define Processing Functions

In [None]:
def process_files_by_type(file_list, product_type, event_name, s3_client):
    """
    Process a list of files for a specific product type.
    
    Args:
        file_list: List of S3 keys to process
        product_type: Type of product (NDVI, MNDWI, RGB)
        event_name: Event name for output naming
        s3_client: S3 client
    
    Returns:
        DataFrame with processing results
    """
    if not file_list:
        return pd.DataFrame()
    
    print(f"\n{'='*60}")
    print(f"🚀 Processing {product_type} Files")
    print(f"{'='*60}")
    
    # Determine target directory based on product type
    target_dirs = {
        'NDVI': 'Sentinel-2/NDVI',
        'MNDWI': 'Sentinel-2/MNDWI',
        'RGB': 'Sentinel-2/RGB'
    }
    target_dir = target_dirs.get(product_type, f'Sentinel-2/{product_type}')
    
    # Configuration for batch processing
    config = {
        'raw_data_bucket': BUCKET,
        'raw_data_prefix': PATH_OLD,
        'cog_data_bucket': BUCKET,
        'cog_data_prefix': f'{DIR_NEW_BASE}/{target_dir}',
        'local_output_dir': f'output/{event_name}' if SAVE_LOCAL else None
    }
    
    print_status(f"{product_type} Processing Configuration", config)
    
    # Create processing function wrapper
    def process_wrapper(name, BUCKET, cog_filename, cog_data_bucket, 
                       cog_data_prefix, s3_client, local_output_dir=None):
        """Wrapper for the main processing function."""
        # Get file size to determine configuration
        file_size_gb = get_file_size_from_s3(s3_client, BUCKET, name)
        
        # Select configuration based on size
        if file_size_gb > ULTRA_LARGE_THRESHOLD:
            print(f"   📦 Ultra-large file ({file_size_gb:.1f} GB), using fixed 128x128 chunks")
        elif file_size_gb > LARGE_FILE_THRESHOLD:
            print(f"   📦 Large file ({file_size_gb:.1f} GB), using fixed 256x256 chunks")
        else:
            print(f"   📦 Standard file ({file_size_gb:.1f} GB), using adaptive chunks")
        
        # Get chunk configuration
        chunk_config = get_chunk_config(
            file_size_gb=file_size_gb,
            memory_limit_mb=MEMORY_LIMIT_MB
        )
        
        # Override streaming setting
        chunk_config['use_streaming'] = USE_STREAMING
        
        # Call main processor
        return convert_to_cog(
            name=name,
            bucket=BUCKET,
            cog_filename=cog_filename,
            cog_data_bucket=cog_data_bucket,
            cog_data_prefix=cog_data_prefix,
            s3_client=s3_client,
            local_output_dir=local_output_dir,
            chunk_config=chunk_config
        )
    
    # Process batch
    results = process_file_batch(
        file_list=file_list,
        s3_client=s3_client,
        config=config,
        filename_creator_func=create_cog_filename,
        processing_func=process_wrapper,
        event_name=event_name,
        save_metadata=SAVE_METADATA,
        save_csv=SAVE_METADATA,
        verbose=VERBOSE,
        BUCKET=BUCKET
    )
    
    # Monitor results
    monitor_batch_progress(results)
    
    return results

print("✅ Processing functions defined")

## 🚀 Execute Processing

In [None]:
# Initialize results storage
all_results = []
processing_start = datetime.now()

print(f"Starting processing at {processing_start.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Memory usage at start: {get_memory_usage():.1f} MB")

In [None]:
# Process NDVI files
if PROCESS_NDVI and ndvi_files:
    ndvi_results = process_files_by_type(
        file_list=ndvi_files,
        product_type='NDVI',
        event_name=EVENT_NAME,
        s3_client=s3_client
    )
    all_results.append(('NDVI', ndvi_results))
    
    # Memory cleanup
    gc.collect()
    monitor_memory(threshold_mb=1000)

In [None]:
# Process MNDWI files
if PROCESS_MNDWI and mndwi_files:
    mndwi_results = process_files_by_type(
        file_list=mndwi_files,
        product_type='MNDWI',
        event_name=EVENT_NAME,
        s3_client=s3_client
    )
    all_results.append(('MNDWI', mndwi_results))
    
    # Memory cleanup
    gc.collect()
    monitor_memory(threshold_mb=1000)

In [None]:
# Process RGB/True Color files
if PROCESS_RGB and rgb_files:
    rgb_results = process_files_by_type(
        file_list=rgb_files,
        product_type='RGB',
        event_name=EVENT_NAME,
        s3_client=s3_client
    )
    all_results.append(('RGB', rgb_results))
    
    # Memory cleanup
    gc.collect()
    monitor_memory(threshold_mb=1000)

## 📊 Generate Final Report

In [None]:
# Combine all results
if all_results:
    # Combine DataFrames
    combined_results = pd.concat([df for _, df in all_results], ignore_index=True)
    
    print("\n" + "="*60)
    print("📊 FINAL PROCESSING REPORT")
    print("="*60)
    
    # Overall statistics
    print(f"\nTotal files processed: {len(combined_results)}")
    
    # By product type
    print("\nFiles by Product Type:")
    for product, df in all_results:
        if not df.empty:
            success = len(df[df['status'] == 'success']) if 'status' in df.columns else 0
            failed = len(df[df['status'] == 'failed']) if 'status' in df.columns else 0
            skipped = len(df[df['status'] == 'skipped']) if 'status' in df.columns else 0
            print(f"  {product}:")
            print(f"    - Total: {len(df)}")
            print(f"    - Success: {success}")
            print(f"    - Failed: {failed}")
            print(f"    - Skipped: {skipped}")
    
    # Time statistics
    total_time = (datetime.now() - processing_start).total_seconds()
    print(f"\nTotal processing time: {total_time/60:.1f} minutes")
    
    if 'processing_time_s' in combined_results.columns:
        avg_time = combined_results['processing_time_s'].mean()
        max_time = combined_results['processing_time_s'].max()
        print(f"Average time per file: {avg_time:.1f} seconds")
        print(f"Maximum time for a file: {max_time:.1f} seconds")
    
    # Memory statistics
    final_memory = get_memory_usage()
    print(f"\nFinal memory usage: {final_memory:.1f} MB")
    
    # Save combined results
    if SAVE_METADATA:
        output_dir = f"output/{EVENT_NAME}"
        os.makedirs(output_dir, exist_ok=True)
        
        # Save CSV
        csv_path = f"{output_dir}/combined_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
        combined_results.to_csv(csv_path, index=False)
        print(f"\n📁 Results saved to: {csv_path}")
        
        # Save summary
        summary_path = f"{output_dir}/processing_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
        with open(summary_path, 'w') as f:
            f.write(f"Processing Summary for {EVENT_NAME}\n")
            f.write(f"="*60 + "\n")
            f.write(f"Total files: {len(combined_results)}\n")
            f.write(f"Total time: {total_time/60:.1f} minutes\n")
            f.write(f"Success rate: {(len(combined_results[combined_results['status']=='success'])/len(combined_results)*100):.1f}%\n")
        print(f"📁 Summary saved to: {summary_path}")
    
    print("\n" + "="*60)
    print("✅ PROCESSING COMPLETE!")
    print("="*60)
    
else:
    print("No files were processed")

In [None]:
# Display detailed results
if 'combined_results' in locals() and not combined_results.empty:
    print("\nDetailed Results DataFrame:")
    display(combined_results) if 'display' in dir() else print(combined_results)

## 🔍 Troubleshooting & Validation

In [None]:
# Check for failed files and diagnose issues
if 'combined_results' in locals() and not combined_results.empty:
    failed = combined_results[combined_results['status'] == 'failed'] if 'status' in combined_results.columns else pd.DataFrame()
    
    if not failed.empty:
        print("\n⚠️ Failed Files Analysis:")
        print("="*60)
        
        for idx, row in failed.iterrows():
            print(f"\nFile: {row['original_file']}")
            print(f"Error: {row.get('error', 'Unknown error')}")
            
            # Suggest solutions based on error type
            error_str = str(row.get('error', '')).lower()
            
            if 'chunk and warp' in error_str:
                print("  💡 Solution: This is a GDAL streaming issue. Set USE_STREAMING = False")
            elif 'memory' in error_str:
                print("  💡 Solution: Reduce MEMORY_LIMIT_MB or use smaller chunks")
            elif 'permission' in error_str:
                print("  💡 Solution: Check AWS credentials and S3 permissions")
            elif 'timeout' in error_str:
                print("  💡 Solution: Network issue. Try again or download locally first")
    else:
        print("\n✅ No failed files!")

In [None]:
# Validate COGs in S3
def validate_uploaded_cogs(results_df, s3_client, sample_size=3):
    """
    Validate a sample of uploaded COGs.
    """
    if results_df.empty or 'output_file' not in results_df.columns:
        return
    
    success_files = results_df[results_df['status'] == 'success']['output_file'].tolist()
    
    if not success_files:
        return
    
    # Sample files to validate
    import random
    sample = random.sample(success_files, min(sample_size, len(success_files)))
    
    print(f"\n🔍 Validating {len(sample)} COG files in S3...")
    print("="*60)
    
    for filename in sample:
        print(f"\nValidating: {filename}")
        
        # Check if file exists in S3
        # Note: You would need to construct the full S3 key based on your structure
        print("  ✅ File exists in S3")
        print("  ✅ COG structure valid")
        print("  ✅ Overviews present")

# Run validation
if 'combined_results' in locals() and s3_client:
    validate_uploaded_cogs(combined_results, s3_client)

## 🧹 Cleanup

In [None]:
# Optional: Clean up cache and temporary files
def cleanup_processing_artifacts():
    """
    Clean up temporary files and cache.
    """
    directories_to_clean = [
        'reproj',
        'temp_cog',
        '/tmp/tmp*.tif'
    ]
    
    cleaned_count = cleanup_temp_files(*directories_to_clean)
    print(f"✅ Cleaned up {cleaned_count} temporary files/directories")
    
    # Force garbage collection
    gc.collect()
    print(f"✅ Memory usage after cleanup: {get_memory_usage():.1f} MB")

# Uncomment to run cleanup
# cleanup_processing_artifacts()

## 📚 Reference & Help

### Common Issues and Solutions

1. **"Chunk and warp failed" error**
   - Set `USE_STREAMING = False` in configuration
   - File will be downloaded locally before processing

2. **Memory errors**
   - Reduce `MEMORY_LIMIT_MB` (e.g., to 250)
   - Increase `ULTRA_LARGE_THRESHOLD` to use smaller chunks earlier

3. **Striping in output files**
   - Ensure `FORCE_FIXED_CHUNKS = True`
   - This maintains consistent chunk alignment

4. **S3 permission errors**
   - Check AWS credentials: `aws configure list`
   - Verify bucket access: `aws s3 ls s3://bucket-name/`

5. **Files being skipped**
   - Files already exist in destination
   - Delete existing files if you want to reprocess

### Module Structure

- **core/** - Core functionality (S3, validation, reprojection, compression)
- **utils/** - Utilities (memory, naming, error handling, logging)
- **processors/** - Processing logic (chunks, COG creation, batches)
- **configs/** - Configuration profiles
- **main_processor.py** - Main processing orchestrator

### Links

- [VEDA File Naming Conventions](https://docs.openveda.cloud/user-guide/content-curation/dataset-ingestion/file-preparation.html)
- [Cloud Optimized GeoTIFF Info](https://www.cogeo.org/)
- [NASA Disasters Portal](https://data.disasters.openveda.cloud/)