In [1]:
# Crop Flood NetCDF to Brazil Borders with 50km Buffer
# This notebook crops the ensemble_return_period.nc file to Brazil ADM0 boundaries + 50km buffer

import os
import geopandas as gpd
import xarray as xr
import rioxarray as rxr
import numpy as np
from shapely.geometry import box
import dask
from dask.distributed import Client


In [2]:
# Configuration
BRAZIL_SHP = "../workspace/Brazil Borders/geoBoundaries-BRA-ADM0-all/geoBoundaries-BRA-ADM0.shp"
INPUT_NC = "../workspace/hazards/Flood/depth(cm)/ensemble_return_period.nc"
OUTPUT_NC = "../workspace/hazards/Flood/depth(cm)/ensemble_return_period_brazil_cropped.nc"
BUFFER_KM = 50  # 50km buffer in kilometers

# Dask configuration for parallel processing
# Set to None to use all available cores, or specify number of workers
N_WORKERS = None  # None = use all cores, or set to specific number like 4
USE_DASK_FOR_CLIP = True  # Set to True to use dask client for clipping (faster for large files)
# Chunk sizes for reading/clipping (larger chunks = better for clipping operations)
CLIP_CHUNK_LAT = 1024
CLIP_CHUNK_LON = 1024
# Chunk sizes for writing (larger chunks = faster writes but more memory)
WRITE_CHUNK_LAT = 1024
WRITE_CHUNK_LON = 1024


In [3]:
# Load Brazil ADM0 boundaries
print(f"[INFO] Loading Brazil boundaries from: {BRAZIL_SHP}")
brazil_gdf = gpd.read_file(BRAZIL_SHP)

# Check CRS
print(f"[INFO] Original CRS: {brazil_gdf.crs}")

# Ensure we're in WGS84 (EPSG:4326) for initial work
if brazil_gdf.crs != "EPSG:4326":
    print(f"[INFO] Transforming to EPSG:4326...")
    brazil_gdf = brazil_gdf.to_crs("EPSG:4326")

# Union all polygons into a single geometry
print(f"[INFO] Unioning {len(brazil_gdf)} polygon(s)...")
brazil_union = brazil_gdf.geometry.unary_union
print(f"[INFO] Brazil union geometry ready")


[INFO] Loading Brazil boundaries from: ../workspace/Brazil Borders/geoBoundaries-BRA-ADM0-all/geoBoundaries-BRA-ADM0.shp
[INFO] Original CRS: EPSG:4326
[INFO] Unioning 1 polygon(s)...


  brazil_union = brazil_gdf.geometry.unary_union


[INFO] Brazil union geometry ready


In [4]:
# Create 50km buffer
# Use EPSG:3857 (Web Mercator) for buffering since it uses meters as units
print(f"[INFO] Creating {BUFFER_KM}km buffer...")

# Transform to metric CRS for buffering
brazil_metric = gpd.GeoSeries([brazil_union], crs="EPSG:4326").to_crs("EPSG:3857")

# Buffer in meters (50km = 50000m)
buffer_meters = BUFFER_KM * 1000
brazil_buffered = brazil_metric.buffer(buffer_meters)

# Transform back to WGS84
brazil_buffered_wgs84 = brazil_buffered.to_crs("EPSG:4326")

print(f"[INFO] Buffer created successfully")
print(f"[INFO] Buffered geometry bounds: {brazil_buffered_wgs84.iloc[0].bounds}")


[INFO] Creating 50km buffer...
[INFO] Buffer created successfully
[INFO] Buffered geometry bounds: (-74.4396058736872, -34.12331173480674, -28.400412708627645, 5.717703083187006)


In [5]:
# Open the NetCDF file with optimized chunking for clipping operations
print(f"[INFO] Opening NetCDF file: {INPUT_NC}")
# Use larger chunks for better performance during clipping
ds = xr.open_dataset(INPUT_NC, chunks={"lat": CLIP_CHUNK_LAT, "lon": CLIP_CHUNK_LON})

print(f"[INFO] NetCDF dimensions: {dict(ds.sizes)}")
print(f"[INFO] NetCDF variables: {list(ds.data_vars)}")
print(f"[INFO] NetCDF coordinates: {list(ds.coords)}")

# Check if the dataset has spatial reference information
# If it has 'lat' and 'lon' as coordinates, we need to set CRS for rioxarray
if 'lat' in ds.coords and 'lon' in ds.coords:
    # Set CRS if not already set
    if 'spatial_ref' not in ds.coords:
        ds = ds.rio.write_crs("EPSG:4326")
    
    # Set spatial dimensions
    ds = ds.rio.set_spatial_dims(x_dim="lon", y_dim="lat")
    
print(f"[INFO] NetCDF CRS: {ds.rio.crs}")
print(f"[INFO] NetCDF bounds: {ds.rio.bounds()}")


[INFO] Opening NetCDF file: ../workspace/hazards/Flood/depth(cm)/ensemble_return_period.nc
[INFO] NetCDF dimensions: {'ensemble': 1, 'GWL': 3, 'return_period': 7, 'lat': 54000, 'lon': 54000}
[INFO] NetCDF variables: ['flood_depth']
[INFO] NetCDF coordinates: ['GWL', 'return_period', 'ensemble', 'lon', 'lat', 'spatial_ref']
[INFO] NetCDF CRS: EPSG:4326
[INFO] NetCDF bounds: (-75.0, -35.000000000000014, -30.0, 10.0)


In [None]:
# Crop the dataset to Brazil boundaries with buffer (optimized with dask)
print(f"[INFO] Cropping NetCDF to Brazil boundaries + {BUFFER_KM}km buffer...")

# Optionally start Dask client early for clipping operations
if USE_DASK_FOR_CLIP:
    print(f"[INFO] Starting Dask client for parallel clipping operations...")
    from dask.distributed import Client
    clip_client = Client(n_workers=N_WORKERS, threads_per_worker=2, memory_limit='4GB')
    print(f"[INFO] Dask dashboard for clipping: {clip_client.dashboard_link}")
    try:
        from IPython.display import display, HTML
        display(HTML(f'<a href="{clip_client.dashboard_link}" target="_blank">Open Dask Dashboard (Clipping)</a>'))
    except:
        pass
else:
    clip_client = None

# Get the buffered geometry as a GeoDataFrame for clipping
clip_geometry = gpd.GeoDataFrame(geometry=brazil_buffered_wgs84, crs="EPSG:4326")

# OPTIMIZATION: First crop to bounding box (fast, lazy operation)
# This reduces data size before the expensive geometry clip
bbox = clip_geometry.total_bounds  # minx, miny, maxx, maxy
lon_min, lat_min, lon_max, lat_max = bbox
print(f"[INFO] Step 1/2: Fast bounding box crop (lazy operation)...")
print(f"[INFO] Bounding box: lon=[{lon_min:.2f}, {lon_max:.2f}], lat=[{lat_min:.2f}, {lat_max:.2f}]")

# Crop to bounding box - this is fast and lazy with xarray
# Note: lat is usually descending in NetCDF files, so we slice accordingly
ds_bbox = ds.sel(
    lon=slice(lon_min, lon_max),
    lat=slice(lat_max, lat_min)  # lat_max first because it's usually descending
)

print(f"[INFO] Bounding box crop complete (still lazy, no computation yet)")
print(f"[INFO] Reduced dimensions: {dict(ds_bbox.sizes)}")

# Re-set spatial dimensions for rioxarray after the sel() operation
# rioxarray needs to know which dimensions are spatial
ds_bbox = ds_bbox.rio.set_spatial_dims(x_dim="lon", y_dim="lat")

# OPTIMIZATION: Now do the precise geometry clip on the smaller dataset
print(f"[INFO] Step 2/2: Precise geometry clip (this may take time with dask)...")
print(f"[INFO] This operation is now much faster because we're clipping a smaller dataset")

# Use rioxarray clip - this will use dask for parallel processing
# The clip operation works with dask arrays, but may need to load some data
ds_cropped = ds_bbox.rio.clip(
    clip_geometry.geometry.values, 
    crs=clip_geometry.crs, 
    all_touched=True
)

print(f"[INFO] Cropping complete")
print(f"[INFO] Original dimensions: {dict(ds.sizes)}")
print(f"[INFO] Cropped dimensions: {dict(ds_cropped.sizes)}")
print(f"[INFO] Cropped bounds: {ds_cropped.rio.bounds()}")

# Close dask client if we started one for clipping
if clip_client is not None:
    print(f"[INFO] Closing Dask client used for clipping...")
    clip_client.close()


[INFO] Cropping NetCDF to Brazil boundaries + 50km buffer...
[INFO] Starting Dask client for parallel clipping operations...


[INFO] Dask dashboard for clipping: http://127.0.0.1:8787/status


[INFO] Using dask-friendly clip_box() method...
[INFO] Bounding box: lon=[-74.44, -28.40], lat=[-34.12, 5.72]
[INFO] Variable 'flood_depth' is dask array
[INFO]   Chunks: ((1,), (1, 1, 1), (1, 1, 1, 1, 1, 1, 1), (1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 752), (1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 1024, 752))
[INFO] Cropping to bounding box with rio.clip_box() (dask-friendly)...
[INFO] clip_box() completed in 0.2 seconds (should be fast/instant if lazy)
[IN

In [None]:
# Save the cropped NetCDF using Dask for parallel processing
print(f"[INFO] Saving cropped NetCDF to: {OUTPUT_NC}")

# Ensure output directory exists
os.makedirs(os.path.dirname(OUTPUT_NC), exist_ok=True)

# Start Dask client for parallel processing
print(f"[INFO] Starting Dask client with {N_WORKERS or 'all available'} workers...")
client = Client(n_workers=N_WORKERS, threads_per_worker=2, memory_limit='4GB')
dashboard_url = client.dashboard_link
print(f"[INFO] Dask dashboard available at: {dashboard_url}")
print(f"[INFO] Open this URL in your browser to monitor progress")
print(f"[INFO] Dashboard shows: task progress, memory usage, worker status, etc.")

# Display clickable link in Jupyter
try:
    from IPython.display import display, HTML
    display(HTML(f'<a href="{dashboard_url}" target="_blank">Open Dask Dashboard in new tab</a>'))
except:
    pass

# Optimize chunking for writing - larger chunks are faster for I/O
print(f"[INFO] Rechunking data for optimal writing (chunks: lat={WRITE_CHUNK_LAT}, lon={WRITE_CHUNK_LON})...")
ds_cropped_write = ds_cropped.chunk({
    'lat': WRITE_CHUNK_LAT,
    'lon': WRITE_CHUNK_LON,
    'ensemble': 1,
    'GWL': 1,
    'return_period': 1
})

# Prepare encoding for all data variables
# Use faster compression settings: lower complevel and use blosc if available
encoding = {}
for var in ds_cropped_write.data_vars:
    encoding[var] = {
        "zlib": True,
        "complevel": 1,  # Lower compression = faster (1 is fastest, 9 is slowest)
        "dtype": "float32",
        "_FillValue": np.float32(np.nan),
        "chunksizes": (1, 1, 1, WRITE_CHUNK_LAT, WRITE_CHUNK_LON),  # Match chunk sizes
    }

# Copy all attributes and add a note about cropping
ds_cropped_write.attrs.update({
    **ds.attrs,
    "note": f"Cropped to Brazil ADM0 boundaries with {BUFFER_KM}km buffer. Original file: {os.path.basename(INPUT_NC)}"
})

# Write to NetCDF - xarray will use dask for parallel writing
print(f"[INFO] Preparing NetCDF write operation...")
print(f"[INFO] Dataset size: {dict(ds_cropped_write.sizes)}")
print(f"[INFO] Total data points: {ds_cropped_write.sizes['lat'] * ds_cropped_write.sizes['lon'] * ds_cropped_write.sizes['GWL'] * ds_cropped_write.sizes['return_period']:,}")

# Check chunk info
for var in ds_cropped_write.data_vars:
    print(f"[INFO] Variable '{var}' has {len(ds_cropped_write[var].chunks[3])} lat chunks, {len(ds_cropped_write[var].chunks[4])} lon chunks")
    total_chunks = 1
    for dim_chunks in ds_cropped_write[var].chunks:
        total_chunks *= len(dim_chunks)
    print(f"[INFO]   Total chunks to write: {total_chunks}")

print(f"[INFO] Starting parallel write operation...")
print(f"[INFO] Monitor progress at: {client.dashboard_link}")
print(f"[INFO] This will take several minutes - watch the dashboard for task progress...")

import time
from dask.diagnostics import ProgressBar

write_start = time.time()

# Direct write - xarray handles dask arrays automatically
# The distributed client will parallelize chunk writes
print(f"[INFO] Writing NetCDF file with parallel dask computation...")
print(f"[INFO] You should see tasks appearing in the dashboard now: {client.dashboard_link}")

# Use ProgressBar for terminal progress (may not work with distributed)
try:
    with ProgressBar():
        ds_cropped_write.to_netcdf(
            OUTPUT_NC,
            encoding=encoding,
            engine='netcdf4',
            compute=True  # Compute immediately with dask client active
        )
except:
    # Fallback without progress bar
    ds_cropped_write.to_netcdf(
        OUTPUT_NC,
        encoding=encoding,
        engine='netcdf4',
        compute=True
    )

write_elapsed = time.time() - write_start
print(f"[INFO] Write completed in {write_elapsed/60:.1f} minutes ({write_elapsed:.1f} seconds)")

print(f"[OK] Cropped NetCDF saved successfully!")

# Close dask client
client.close()

# Calculate file sizes for comparison
original_size = os.path.getsize(INPUT_NC) / (1024**3)  # GB
cropped_size = os.path.getsize(OUTPUT_NC) / (1024**3)  # GB
print(f"[INFO] Original file size: {original_size:.2f} GB")
print(f"[INFO] Cropped file size: {cropped_size:.2f} GB")
print(f"[INFO] Size reduction: {(1 - cropped_size/original_size)*100:.1f}%")


[INFO] Saving cropped NetCDF to: ../workspace/hazards/Flood/depth(cm)/ensemble_return_period_brazil_cropped.nc
[INFO] Starting Dask client with all available workers...


Perhaps you already have a cluster running?
Hosting the HTTP server on port 52644 instead


[INFO] Dask dashboard available at: http://127.0.0.1:52644/status
[INFO] Open this URL in your browser to monitor progress
[INFO] Dashboard shows: task progress, memory usage, worker status, etc.


[INFO] Rechunking data for optimal writing (chunks: lat=1024, lon=1024)...
[INFO] Preparing NetCDF write operation...
[INFO] Dataset size: {'GWL': 3, 'return_period': 7, 'ensemble': 1, 'lon': 53328, 'lat': 47810}
[INFO] Total data points: 53,541,845,280
[INFO] Variable 'flood_depth' has 47 lat chunks, 53 lon chunks
[INFO]   Total chunks to write: 52311
[INFO] Starting parallel write operation...
[INFO] Monitor progress at: http://127.0.0.1:52644/status
[INFO] This will take several minutes - watch the dashboard for task progress...
[INFO] Writing NetCDF file with parallel dask computation...
[INFO] You should see tasks appearing in the dashboard now: http://127.0.0.1:52644/status


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
2025-11-03 22:30:19,738 - bokeh.server.protocol_handler - ERROR - error handling message
 message: Message 'PATCH-DOC' content: {'events': [{'kind': 'ModelChanged', 'model': {'id': 'p3251'}, 'attr': 'start', 'new': -0.8450000000000006}, {'kind': 'ModelChanged', 'model': {'id': 'p3251'}, 'attr': 'end', 'new': 7.845000000000001}]} 
 error: AssertionError()
Traceback (most recent call last):
  File "/Users/bertrandgallice/code/Theia-Finance-Labs/climate.risk.tool/.venv/lib/python3.12/site-packages/bokeh/server/protocol_handler.py", line 94, in handle
    work = await handler(message, connection)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bertrandgallice/code/Theia-Finance-Labs/climate.risk.tool/.venv/lib/pyt

In [None]:
# Optional: Quick visualization to verify the crop
try:
    import matplotlib.pyplot as plt
    
    # Get a sample variable (first data variable)
    var_name = list(ds_cropped.data_vars)[0]
    
    # Take first time slice if multi-dimensional
    sample_data = ds_cropped[var_name]
    if len(sample_data.dims) > 2:
        # Get first slice of non-spatial dimensions
        dims_to_slice = {dim: 0 for dim in sample_data.dims if dim not in ['lat', 'lon']}
        sample_data = sample_data.isel(**dims_to_slice)
    
    print(f"[INFO] Creating quick visualization of variable: {var_name}")
    
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))
    
    # Plot original extent
    ds[var_name].isel(**dims_to_slice).plot(ax=ax1, cmap='Blues', add_colorbar=True)
    clip_geometry.boundary.plot(ax=ax1, color='red', linewidth=2, label='Brazil + 50km buffer')
    ax1.set_title('Original NetCDF (full extent)')
    ax1.legend()
    ax1.set_aspect('equal')
    
    # Plot cropped
    sample_data.plot(ax=ax2, cmap='Blues', add_colorbar=True)
    clip_geometry.boundary.plot(ax=ax2, color='red', linewidth=2)
    ax2.set_title('Cropped NetCDF (Brazil + 50km buffer)')
    ax2.set_aspect('equal')
    
    plt.tight_layout()
    plt.show()
    
except ImportError:
    print("[INFO] Matplotlib not available, skipping visualization")
except Exception as e:
    print(f"[WARN] Could not create visualization: {e}")
