# 03 Global Processing with GeoAnomalyMapper (GAM)

This advanced tutorial covers large-scale analysis, such as global anomaly mapping. We'll set up global processing, optimize for parallelism, handle large datasets, and manage memory.

**Objectives**:
- Configure for global tiling.
- Use Dask for parallel execution.
- Optimize performance and memory.
- Process and merge global results.

**Prerequisites**: Complete 01/02; high-RAM machine (32GB+) or cloud recommended. Global run may take hours/days.

In [None]:
# Imports
import yaml
import dask
from dask.distributed import Client
from gam.core.pipeline import run_pipeline
from gam.core.global_processor import GlobalProcessor  # For tiling
import gam
print(f"GAM version: {gam.__version__}")
print(f"Dask version: {dask.__version__}")

## Step 1: Global Configuration Setup

Define global bbox (entire Earth) and tiling parameters. Use coarse resolution for feasibility.

In [None]:
# Global bbox: full Earth
global_bbox = (-90, 90, -180, 180)
modalities = ["gravity"]  # Start with one for demo; add others for full

# Global config: Coarse grid, large tiles
config = {
    "data": {
        "bbox": global_bbox,
        "modalities": modalities,
        "cache_dir": "../data/global_cache"  # Large cache needed
    },
    "preprocessing": {
        "grid_res": 1.0  # Coarse (1° ~111km) for global
    },
    "modeling": {
        "threshold": 3.0,  # Strict for global noise
        "max_iterations": 30  # Faster convergence
    },
    "visualization": {
        "map_type": "2d",
        "export_formats": ["geotiff", "csv"]
    },
    "core": {
        "output_dir": "../results/global",
        "parallel_workers": -1,  # All cores
        "tile_size": 30,  # 30° tiles (12 tiles lat x 12 lon = 144 total)
        "rate_limit_delay": 2.0  # Slower for global API calls
    }
}

with open("global_config.yaml", "w") as f:
    yaml.dump(config, f)
print("Global config saved. Expected tiles: ~144")

## Step 2: Set Up Parallel Processing with Dask

Launch Dask client for distributed computing. Monitor via dashboard.

In [None]:
# Start Dask client (local cluster)
client = Client(n_workers=4, threads_per_worker=2, memory_limit='4GB')  # Adjust to your machine
print("Dask dashboard:", client.dashboard_link)

# For cloud/HPC: Use dask-jobqueue or Kubernetes
# from dask_jobqueue import SLURMCluster
# cluster = SLURMCluster(...)
# client = Client(cluster)

## Step 3: Global Processing Setup and Execution

Use GlobalProcessor to tile and parallelize. This decomposes Earth into tiles.

In [None]:
# Create global processor
processor = GlobalProcessor(config)

# Generate tiles (30°)
tiles = processor.generate_tiles(global_bbox, tile_size=config['core']['tile_size'])
print(f"Generated {len(tiles)} tiles")

# Parallel run on tiles (delayed for Dask)
from dask import delayed
delayed_runs = [delayed(run_pipeline)(
    bbox=tile,
    modalities=modalities,
    config=config,
    output_dir=f"{config['core']['output_dir']}/tile_{i}"
) for i, tile in enumerate(tiles)]

# Compute (parallel execution)
print("Starting global computation... Monitor dashboard.")
tile_results = client.compute(delayed_runs).result()

print("Global processing complete!")

## Step 4: Merge Global Results

Combine tile anomalies, handle overlaps, generate global map.

In [None]:
# Merge anomalies from tiles
all_anomalies = []
for res in tile_results:
    all_anomalies.append(res['anomalies'])
global_anomalies = pd.concat(all_anomalies, ignore_index=True)

# Handle overlaps (simple: average confidence)
if len(global_anomalies) > 0:
    global_anomalies = global_anomalies.groupby(['lat', 'lon', 'depth']).agg({
        'confidence': 'mean',
        'score': 'max',
        'anomaly_type': 'first'
    }).reset_index()

print(f"Global anomalies: {len(global_anomalies)}")
global_anomalies.to_csv(config['core']['output_dir'] + "/global_anomalies.csv", index=False)

# Global visualization (coarse map)
global_viz = generate_visualization(
    global_anomalies,
    type="2d",
    output_dir=config['core']['output_dir'],
    config=config['visualization']
)
print("Global map saved:", global_viz)

## Step 5: Performance Optimization and Memory Management

Tips for scaling: Chunking, spilling, monitoring.

In [None]:
# Memory monitoring example (using psutil)
import psutil
print(f"Current RAM usage: {psutil.virtual_memory().percent:.1f}%")

# Optimization tips in code:
# 1. Lazy loading: Use dask.array for large grids
# Example: processed = xr.open_dataset('large_grid.nc', chunks={'lat': 1000})

# 2. Reduce resolution for scouting
scout_config = config.copy()
scout_config['preprocessing']['grid_res'] = 5.0  # Very coarse

# 3. Spilling: Set in Dask client (already 4GB limit above)

# Re-run a single tile with optimization
sample_tile = tiles[0]
optimized_results = run_pipeline(
    bbox=sample_tile,
    modalities=modalities,
    config=scout_config
)
print("Optimized tile complete (coarse res).")

## Handling Large Datasets

- **Storage**: Use external SSD/cloud for cache (100GB+ for global).
- **Batching**: Process modalities separately if memory tight.
- **Cloud Scaling**: Deploy Dask on AWS EMR or Google Dataproc.
- **Monitoring**: Watch dashboard for task failures; resume from checkpoints.

**Example Output**: Global CSV with worldwide anomalies; GeoTIFF for world map.

For production, integrate with HPC schedulers. See [Deployment Guide](../configuration/deployment.md).

## Conclusion

Global processing enables worldwide anomaly mapping. Optimize based on hardware; start with subsets.

Next: Explore use cases in [Examples](../examples/).