## In this notebook, we create flood maps which will serve as target for training a flood mapping simulator model.

In [1]:
import natsort
from pathlib import Path
from huggingface_hub import hf_hub_download
import time
import rasterio
from rasterio.merge import merge
from rasterio.mask import mask
from rasterio.enums import Resampling
from rasterio.features import rasterize
import os
from shapely.geometry import Polygon, Point
from shapely.ops import unary_union
import numpy as np
from matplotlib.colors import ListedColormap, BoundaryNorm
import matplotlib.pyplot as plt
import glob
from dask import delayed, compute
from tqdm import tqdm
from natsort import natsorted
import gc
from PIL import Image, ImageDraw, ImageFont # Import specific modules from PIL
import requests
import json
import geopandas as gpd
import geojson
from scipy.ndimage import binary_dilation
import pandas as pd
import polars as pl

  from .autonotebook import tqdm as notebook_tqdm


### Define a bunch of helper functions

In [2]:
# --------------------- Helper function to get tile name from lat/lon
def tile_name_from_latlon(lat, lon):

    """
    Example usage:
    Nairobi coordinates: 0.0236° S, 37.9062° E
    print(f"Nairobi tile: {tile_name_from_latlon(lat = 1.29, lon = 36.82)}")
    """

    # Data is in 3 x 3 degree tiles so
     
    lat_tile = (lat // 3) * 3
    lon_tile = (lon // 3) * 3
    lat_tile, lon_tile = int(lat_tile), int(lon_tile)

    lat_prefix = 'N' if lat_tile >= 0 else 'S'
    lon_prefix = 'E' if lon_tile >= 0 else 'W' 

    return f"{lat_prefix}{abs(lat_tile):02d}{lon_prefix}{abs(lon_tile):03d}"


#--------------------------- Function to download tiles from Hugging Face
def download_tile(tile,
                  repo_id = "ai-for-good-lab/ai4g-flood-dataset",
                  download_240m_buffer_tif = True,
                  download_80m_buffer_tif = True,
                  download_post_processing_parquet = True,
                  download_recurrence_80m_buffer_tif = True,
                  local_dir = None,
                  overwrite = True
                  ):
    
    print(f"\nDownloading data for tile: {tile}")
    
    if local_dir is None:
        local_dir = Path(f"flood_data")

    if not local_dir.exists():
        local_dir.mkdir(parents=True, exist_ok=True)

    root_dir = tile[:3]
    files_to_download = []
    if download_240m_buffer_tif:
        files_to_download.append(f"{root_dir}/{tile}/{tile}-240m-buffer.tif")

    if download_80m_buffer_tif:
        files_to_download.append(f"{root_dir}/{tile}/{tile}-80m-buffer.tif")

    if download_post_processing_parquet:
        files_to_download.append(f"{root_dir}/{tile}/{tile}-post-processing.parquet")

    if download_recurrence_80m_buffer_tif:
        files_to_download.append(f"{root_dir}/{tile}/{tile}-recurrence-80m-buffer.tif")

    for file_path in files_to_download:
        local_file_path = local_dir / Path(file_path).name
        if not local_file_path.exists() or overwrite:
            print(f"Downloading {file_path} to {local_file_path}")
            hf_hub_download(repo_id = repo_id,
                            filename = file_path,
                            local_dir = str(local_dir),
                            repo_type = "dataset",
            )
        else:
            print(f"File {local_file_path} already exists. Skipping download.")


    return

# ----------------- Helper function to produce normalized rgb values
def rgb(r, g, b):
    return (r / 255, g / 255, b / 255)



# --------------------- Helper function to get administrative boundary from GeoBoundaries
def get_admin_boundary(iso = 'KEN', adm = 'ADM0', save_path = None):
    
    # Query the GeoBoundaries API for the specified boundary
    url = f"https://www.geoboundaries.org/api/current/gbOpen/{iso}/{adm}"
    response = requests.get(url)
    if response.status_code != 200:
        raise ValueError(f"Error fetching data from GeoBoundaries API: {response.status_code}")
    
    # Parse the JSON response
    data = response.json()
    boundary_url = data['gjDownloadURL']

    # Download the GeoJSON boundary file
    geoboundary = gpd.read_file(boundary_url)

    if save_path is not None:
        save_path = Path(save_path)/f"{iso}_{adm}_boundary.geojson"
        geoboundary.to_file(save_path, driver='GeoJSON')
        print(f"Saved boundary to {save_path}")

    return geoboundary

# --------------------- Function to create thumbnail images from tif files
# cmap = ListedColormap(['lightgrey', 'orange', 'blue']) # 0, 1, 2
# Color maps inspired by Amit's work: https://arxiv.org/pdf/2411.01411
cmap = ListedColormap(
    [
        rgb(241, 241, 241), # 0 - light grey - no data
        rgb(208, 207, 212), # 1 - Exclusion mask
        rgb(0, 0, 255)    # 2 - Flooded - blue

    ]
)
norm = BoundaryNorm([-0.5,0.5,1.5,2.5], cmap.N)
def make_thumbnail(
    tif_paths,
    idx,
    png_dir,
    max_size = 1024,
    geoboundary_gdf = None,
    boundary_color = [0, 0, 0], # Black boundary
    boundary_width = 1,
    cmap = cmap,
    norm = norm,

    title = None,
    title_color = 'black',
    title_font_size = 24,
    font_path='arial.ttf', # Path to a .ttf font file

    update_bar = None,
):
    tif_path = tif_paths[idx]
    print(f"\nProcessing {tif_path}...")

    # Read and downsample
    with rasterio.open(tif_path) as src:
        raster_crs = src.crs
        h, w = src.height, src.width
        scale = max(h, w) / max_size

        out_h = max(1, int(h / scale))
        out_w = max(1, int(w / scale))

        arr = src.read(
            1,
            out_shape = (out_h, out_w),
            resampling = Resampling.nearest
        )
        new_transform = src.transform * src.transform.scale(
            (src.width / out_w),
            (src.height / out_h)
        )

    print(f"Downsampling by scale factor: {scale:.2f}")
    print(f"Original resolution in km: {src.transform[0]*111.32:.3f}, New resolution in km: {new_transform[0]*111.32:.3f}")

    # Apply colormap
    rgba = cmap(norm(arr))
    # Drop alpha channel and convert to uint8
    rgba = (rgba[..., :3] * 255).astype(np.uint8)

    # Overlay Geoboundary if provided
    if geoboundary_gdf is not None:
        print("Overlaying geoboundary...")
        try:
            boundary_reprojected = geoboundary_gdf.to_crs(raster_crs)

            boundary_mask = rasterize(
                shapes = boundary_reprojected.geometry.boundary,
                out_shape = (out_h, out_w),
                transform = new_transform,
                fill = 0,
                default_value = 1,
                dtype = np.uint8
            ).astype(bool)

            # Dilate the boundary to make it thicker
            if boundary_width > 1:
                boundary_mask = binary_dilation(boundary_mask, iterations = boundary_width - 1)

            # Overlay boundary on the RGBA image
            rgba[boundary_mask] = boundary_color

        except Exception as e:
            print(f"Error overlaying geoboundary: {e}")

    # Adding title to the image
    img = Image.fromarray(rgba)

    # If a title is provided, draw it on the image
    if title:
        draw = ImageDraw.Draw(img)

        # Load font
        try:
            font = ImageFont.truetype(font_path, title_font_size)
        except IOError:
            print(f"Could not load font at {font_path}. Using default font.")
            font = ImageFont.load_default()

        # Calculate text size and position
        text_bbox = draw.textbbox((0, 0), title, font = font)
        text_width = text_bbox[2] - text_bbox[0]
        text_height = text_bbox[3] - text_bbox[1]

        text_x = (img.width - text_width) // 2
        text_y = 10  # 10 pixels from the top

        # Draw text
        draw.text((text_x, text_y), title, font = font, fill = title_color)






       

    # --------------------------
    out_png = png_dir / f"{Path(tif_path).stem}_{out_h}_{out_w}.png"
    img.save(out_png)

    if update_bar is not None:
        update_bar()

    print(f"Saved thumbnail: {out_png.stem}")

    return


# --------------------- Function to generate monthly flood maps from parquet files
def generate_monthly_flood_maps(parquet_path, template_path, year, buffer_meters = 240, output_dir = None, update_bar = None):
    print(f"\n\nProcessing {parquet_path.name} for year {year}...")

    # Load template metadata from corresponding 240m buffer tif
    template_geotiff_path = template_path
    assert template_geotiff_path.exists(), f"Template GeoTIFF {template_geotiff_path} does not exist. Only 240m or 80m buffer tif files are available."
    assert template_geotiff_path.parent.stem == parquet_path.parent.stem, f"Template GeoTIFF {template_geotiff_path} does not match parquet file {parquet_path}."
    print(f"Using template GeoTIFF: {template_geotiff_path.name}")

    with rasterio.open(template_geotiff_path) as src:
        transform = src.transform
        height = src.height
        width = src.width
        bounds = src.bounds
        template_crs = src.crs
        exclusion_mask = src.read(1) == 1  # Exclusion mask pixels


    # Load parquet data using polars for efficiency
    df = pl.scan_parquet(parquet_path)
        
    for month in range(1, 13):
        print(f"\nGenerating flood map for {year}-{month:02d}...")
        # Filter data for the specific year and month
        expr = (
            (pl.col('year') == year) &
            (pl.col('month') == month) &
            (pl.col('dem_metric_2') < 10) &
            (pl.col('soil_moisture_sca') > 1) &
            (pl.col('soil_moisture_zscore') > 1) &
            (pl.col('soil_moisture') > 20) &
            (pl.col('temp') > 0) &
            (pl.col('land_cover') != 60) &
            (pl.col('edge_false_positives') == 0)     
        )
        monthly_df = df.filter(expr).collect()
        if monthly_df.shape[0] == 0:
            print(f"No flood points for {year}-{month:02d}. ")
            monthly_flood_map = np.zeros((height, width), dtype = 'uint8')

        else:
            # Convert to GeoDataFrame
            print("Creating GeoDataFrame from filtered rows")
            monthly_df = monthly_df.to_pandas()
            geometry = [Point(xy) for xy in zip(monthly_df['lon'], monthly_df['lat'])]
            gdf = gpd.GeoDataFrame(monthly_df, geometry = geometry, crs = "EPSG:4326")
            print(f"Number of flood points: {len(gdf)}")
            monthly_df = None
            geometry = None

            # Buffer points if required
            if buffer_meters and buffer_meters > 0:
                print(f"Buffering flood points by {buffer_meters} meters...")
                # Choose a CRS for buffering
                if template_crs.is_geographic:
                    print("Template CRS is geographic. Reprojecting gdf to EPSG:3857 for buffering...")
                    buf_crs = "EPSG:3857"
                else:
                    print("Template CRS is projected. Using template CRS for buffering...")
                    buf_crs = template_crs
                
                # Reproject gdf to buffer CRS
                print(f"Reprojecting gdf to {buf_crs} for buffering...")
                gdf = gdf.to_crs(buf_crs) # Units in meters

                # Buffer in meters
                print(f"Buffering points by {buffer_meters} meters...")
                gdf['geom_buf'] = gdf.geometry.buffer(buffer_meters)

                # Merge buffered geometries
                union_geom = unary_union(gdf['geom_buf'])

                # Reproject back to template CRS
                print(f"Reprojecting back to {template_crs}...")
                union_geom = gpd.GeoSeries([union_geom], crs = buf_crs).to_crs(template_crs).iloc[0]
                gdf = None

            else:
                print("No buffering: rasterizing single-point blobs")
                gdf = gdf.to_crs(template_crs)
                px = abs(transform.a)
                half_px = px / 2.0
                union_geom = [(pt.buffer(half_px)) for pt in gdf.geometry]
                gdf = None

                # print("No buffering applied.")
                # gdf = gdf.to_crs(template_crs)
                # union_geom = unary_union(gdf.geometry)


            # Rasterize the unioned geometry
            print("Rasterizing buffered geometries...")
            shapes = union_geom if isinstance(union_geom, list) else [union_geom]
            monthly_flood_map = rasterize(
                shapes,
                out_shape = (height, width),
                transform = transform,
                fill = 0, # Value 0 for non-flooded areas
                default_value = 2, # Value 2 for flooded areas
                all_touched = True,
                dtype = 'uint8'
            )
        
        # Apply exclusion mask
        if exclusion_mask is not None:
            print("Applying exclusion mask...")
            monthly_flood_map = np.where(exclusion_mask, 1, monthly_flood_map)

        # Save to GeoTIFF
        output_path = output_dir / f"{year}" / f"{year}-{month:02d}_{parquet_path.parent.stem}_flood_map.tif"
        output_path.parent.mkdir(parents=True, exist_ok=True)
        print(f"Saving flood map to {output_path}...")
        meta = {
            'driver': 'GTiff',
            'height': height,
            'width': width,
            'count': 1,
            'dtype': 'uint8',
            'crs': template_crs,
            'transform': transform,
            'compress': 'lzw'
        }
        with rasterio.open(output_path, 'w', **meta) as dst:
            dst.write(monthly_flood_map, 1)

        print(f"Flood pixels percent: {np.sum(monthly_flood_map == 2) / monthly_flood_map.size * 100:.4f}%")
        print(f"Completed {year}-{month:02d}.\n")
        
    print("--"*40)

    if update_bar is not None:
        update_bar()

    return


# --------------------- Function to create mosaic from multiple GeoTIFF files
def create_mosaic(geotiff_paths, output_path, update_bar = None):
    print(f"\nCreating mosaic for {len(geotiff_paths)} GeoTIFF files...")
    # Read all GeoTIFF files
    src_files_to_mosaic = []
    for fp in geotiff_paths:
        src = rasterio.open(fp)
        src_files_to_mosaic.append(src)

    # Quick check for consistent CRS
    crs_set = set([src.crs for src in src_files_to_mosaic])
    if len(crs_set) != 1:
        raise ValueError("Input GeoTIFF files have different CRS. Please reproject them to a common CRS first.")
    
    # Merge the GeoTIFF files
    mosaic, out_trans = merge(src_files_to_mosaic, method = 'max')
    out_meta = src_files_to_mosaic[0].meta.copy()
    out_meta.update({
        "driver": "GTiff",
        "height": mosaic.shape[1],
        "width": mosaic.shape[2],
        "transform": out_trans,
        "compress": "lzw"
    })

    # Write the mosaic to disk
    with rasterio.open(output_path, "w", **out_meta) as dest:
        dest.write(mosaic)

    print(f"Mosaic created and saved to {output_path}")

    # Close all opened datasets
    for src in src_files_to_mosaic:
        src.close()

    if update_bar is not None:
        update_bar()

    return

#### Determining tile names from Kenya lat/lon coordinates

In [3]:

# Bounding box for Kenya
min_lon, max_lon = 33.501, 41.974
min_lat, max_lat = -5.002, 5.506

# Calculate the starting latitude and longitude for the tiles
start_lat = (min_lat // 3) * 3
end_lat = (max_lat // 3) * 3
start_lon = (min_lon // 3) * 3
end_lon = (max_lon // 3) * 3



# Generate the list of tile names covering Kenya
tile_names = []
for lat in range(int(start_lat), int(end_lat) + 3, 3):
    for lon in range(int(start_lon), int(end_lon) + 3, 3):
        # print(f"\nlat: {lat}, lon: {lon}")
        tile_name = tile_name_from_latlon(lat, lon)
        # print(f"tile_name: {tile_name}")
        tile_names.append(tile_name)
        

tile_names = natsorted(list(set(tile_names)))  # Remove duplicates
print("Tiles covering Kenya:", tile_names)

Tiles covering Kenya: ['N00E033', 'N00E036', 'N00E039', 'N03E033', 'N03E036', 'N03E039', 'S03E033', 'S03E036', 'S03E039', 'S06E033', 'S06E036', 'S06E039']


#### Download flood maps from AI4Good Hugging Face repository

In [None]:
for tile in tile_names:
    download_tile(
        tile = tile,
        download_240m_buffer_tif = True,
        download_80m_buffer_tif = False,
        download_post_processing_parquet = True,
        download_recurrence_80m_buffer_tif = False,
        local_dir = None,
        overwrite = True
    )
    time.sleep(3)  # Sleep for 3 seconds between downloads to avoid rate limiting



    


Downloading data for tile: N03E039


  from .autonotebook import tqdm as notebook_tqdm


Downloading N03/N03E039/N03E039-240m-buffer.tif to flood_data\N03E039-240m-buffer.tif
Downloading N03/N03E039/N03E039-post-processing.parquet to flood_data\N03E039-post-processing.parquet
File flood_data\N03E039-post-processing.parquet already exists. Skipping download.

Downloading data for tile: S03E036
Downloading S03/S03E036/S03E036-240m-buffer.tif to flood_data\S03E036-240m-buffer.tif
Downloading S03/S03E036/S03E036-post-processing.parquet to flood_data\S03E036-post-processing.parquet
File flood_data\S03E036-post-processing.parquet already exists. Skipping download.

Downloading data for tile: S06E039
Downloading S06/S06E039/S06E039-240m-buffer.tif to flood_data\S06E039-240m-buffer.tif
Downloading S06/S06E039/S06E039-post-processing.parquet to flood_data\S06E039-post-processing.parquet
File flood_data\S06E039-post-processing.parquet already exists. Skipping download.

Downloading data for tile: S06E033
Downloading S06/S06E033/S06E033-240m-buffer.tif to flood_data\S06E033-240m-buff

#### Merge flood maps into a single tiff for visualization

In [6]:

# Directory where the downloaded flood map tiles are stored
data_dir = Path("flood_data")
assert data_dir.exists(), f"Data directory {data_dir} does not exist."

# List all downloaded 240m buffer TIFF files
geotiff_files = natsorted(list(data_dir.rglob("*-240m-buffer.tif")))
print(f"Found {len(geotiff_files)} GeoTIFF files.")

create_mosaic(
    geotiff_paths = geotiff_files,
    output_path = data_dir / "kenya_240m_buffer_flood_mosaic.tif"
)

Found 12 GeoTIFF files.

Creating mosaic for 12 GeoTIFF files...
Mosaic created and saved to flood_data\kenya_240m_buffer_flood_mosaic.tif


In [None]:
## Optional:Clip the mosaic to Kenya boundary
output_mosaic_path = data_dir / "kenya_240m_buffer_flood_mosaic.tif"
assert output_mosaic_path.exists(), f"Mosaic file {output_mosaic_path} does not exist."


kenya_coords = [[[33.501, -5.002], [41.974, -5.002], [41.974, 5.506], [33.501, 5.506], [33.501, -5.002]]]
geoms = [{'type': 'Polygon', 'coordinates': kenya_coords}]

output_clipped_path = data_dir / "kenya_240m_buffer_flood_clipped.tif"
with rasterio.open(output_mosaic_path) as src:
    out_image, out_transform = mask(src, geoms, crop = True)
    clip_meta = src.meta.copy()

clip_meta.update({
    "driver": "GTiff",
    "height": out_image.shape[1],
    "width": out_image.shape[2],
    "transform": out_transform,
    "compress": "lzw"

})

with rasterio.open(output_clipped_path, "w", **clip_meta) as dest:
    dest.write(out_image)

print(f"Clipped mosaic saved to {output_clipped_path}")

Clipped mosaic saved to flood_data\kenya_240m_buffer_flood_clipped.tif


#### Create thumbnail of overall flood map mosaic

In [7]:
png_dir = data_dir / "thumbnails"
png_dir.mkdir(parents = True, exist_ok = True)
max_size = 1024 # Max width or height of thumbnail


# Generate thumbnail for cropped and mosaicked flood map
kenya_boundary = get_admin_boundary(iso = 'KEN', adm = 'ADM0', save_path = None)
mosaic_clipped_tif = data_dir / "kenya_240m_buffer_flood_clipped.tif"
mosaic_tif = data_dir / "kenya_240m_buffer_flood_mosaic.tif"
tif_paths = [mosaic_clipped_tif, mosaic_tif]

for idx in range(len(tif_paths)):
    make_thumbnail(
        tif_paths = tif_paths,
        idx = idx,
        png_dir = png_dir,
        max_size = 1024,
        geoboundary_gdf = kenya_boundary,
        boundary_color = [0, 0, 0], # Black boundary
        boundary_width = 1,
       
    )




Processing flood_data\kenya_240m_buffer_flood_clipped.tif...
Downsampling by scale factor: 61.57
Original resolution in km: 0.019, New resolution in km: 1.143
Overlaying geoboundary...
Saved thumbnail: kenya_240m_buffer_flood_clipped_1024_825

Processing flood_data\kenya_240m_buffer_flood_mosaic.tif...
Downsampling by scale factor: 70.31
Original resolution in km: 0.019, New resolution in km: 1.305
Overlaying geoboundary...
Saved thumbnail: kenya_240m_buffer_flood_mosaic_1024_768


## Extract monthly flood maps from parquet files

In [8]:
## Find parquet files
data_dir = Path("flood_data")
assert data_dir.exists(), f"Data directory {data_dir} does not exist."

parquet_files = natsorted(list(data_dir.rglob("*-post-processing.parquet")))
print(f"Found {len(parquet_files)} parquet files.")


## ---- Configuration ----
output_dir = data_dir / "monthly_flood_maps"
output_dir.mkdir(parents = True, exist_ok = True)
years = natsorted(list(range(2015, 2025)))
buffer_meters = 240  # Use 240m to buffer flood maps

## Generate monthly flood maps in parallel

with tqdm(total = len(parquet_files) * len(years), desc = "Generating monthly flood maps") as bar:
    delayed_tasks = []
    for parquet_path in parquet_files:
        template_path = parquet_path.parent / f"{parquet_path.parent.stem}-240m-buffer.tif"
        for year in years:
            task = delayed(generate_monthly_flood_maps)(
                parquet_path = parquet_path,
                template_path = template_path,
                year = year,
                buffer_meters = buffer_meters,
                output_dir = output_dir,
                update_bar = bar.update
            )
            delayed_tasks.append(task)

    compute(*delayed_tasks, scheduler = 'threads', num_workers = os.cpu_count()//2)
            

Found 12 parquet files.


In [None]:
# Directory where the downloaded flood map tiles are stored
data_dir = Path("flood_data")
assert data_dir.exists(), f"Data directory {data_dir} does not exist."

# Empty dictionary to hold year monthly geotiff paths
years = natsorted(list(range(2015, 2025)))
yearly_monthly_geotiffs = {year: [] for year in years}

out_image_dir = data_dir / "monthly_flood_map_mosaics"
out_image_dir.mkdir(parents = True, exist_ok = True)

# Populate the dictionary
for year in years:
    # print(f"\n\nCreating mosaic for year {year}...")
    for month in range(1, 13):
        # print(f"\nCreating mosaic for {year}-{month:02d}...")
        monthly_geotiffs = (data_dir / "monthly_flood_maps" / f"{year}").rglob(f"*{year}-{month:02d}_*_flood_map.tif")
        monthly_geotiffs = natsorted(list(monthly_geotiffs))
        # print(f"Found {len(monthly_geotiffs)} GeoTIFFs for {year}-{month:02d}.")
        yearly_monthly_geotiffs[year].append(monthly_geotiffs)


In [None]:
from dask import delayed, compute
from tqdm import tqdm

with tqdm(total = len(years) * 12, desc = "Creating monthly flood map mosaics") as bar:
    delayed_tasks = []
    for year in years:
        for month_idx in range(12):
            geotiff_paths = yearly_monthly_geotiffs[year][month_idx]
            output_path = out_image_dir / f"{year}-{month_idx+1:02d}_kenya_flood_mosaic.tif"
            task = delayed(create_mosaic)(
                geotiff_paths = geotiff_paths,
                output_path = output_path,
                update_bar = bar.update
            )
            delayed_tasks.append(task)
    compute(*delayed_tasks, scheduler = 'threads', num_workers = min(10, os.cpu_count()//2))

#### Now we save the flood maps as thumbnails for visualization

In [None]:
# Define dtata dir
data_dir = Path("flood_data")
assert data_dir.exists(), f"Data directory {data_dir} does not exist."

output_dir = data_dir / "monthly_flood_map_mosaics"
png_dir = data_dir / "monthly_flood_map_mosaic_thumbnails"
png_dir.mkdir(parents = True, exist_ok = True)



tif_paths = natsorted(list(output_dir.rglob("*.tif")))
kenya_boundary = get_admin_boundary(iso = 'KEN', adm = 'ADM0', save_path = None)
with tqdm(total = len(tif_paths), desc = "Creating monthly flood map mosaic thumbnails") as bar:
    delayed_tasks = []
    for idx in range(len(tif_paths)):
        title = f"{tif_paths[idx].stem.split('_')[0]}"
        task = delayed(make_thumbnail)(
            tif_paths = tif_paths,
            idx = idx,
            png_dir = png_dir,
            max_size = 2**13,
            geoboundary_gdf = kenya_boundary,
            boundary_color = [0, 0, 0], # Black boundary
            boundary_width = 1,
            title = title,
            title_color = 'black',
            title_font_size = 32,
            font_path='arial.ttf', # Path to a .ttf font file
            update_bar = bar.update
        )
        delayed_tasks.append(task)
    compute(*delayed_tasks, scheduler = 'threads', num_workers = min(10, os.cpu_count()//2))




Creating monthly flood map mosaic thumbnails: 100%|██████████| 120/120 [06:26<00:00,  3.22s/it]


In [None]:
import imageio.v3 as iio
import glob

## Create mp4 or gif
data_dir = Path("flood_data")
png_dir = data_dir / "monthly_flood_map_mosaic_thumbnails"
assert png_dir.exists(), f"PNG directory {png_dir} does not exist."

png_files = natsorted(list(png_dir.rglob("*.png")))
print(f"Found {len(png_files)} PNG files for timelapse creation.")

def create_mp4_or_gif(png_files, output_path = "flood_map_timelapse", fps = 2, duration = 0.5, file_type = "mp4"):
    images = [iio.imread(png) for png in png_files] # R, C, Channel

    if file_type == "mp4":
        print("Creating mp4...")
        iio.imwrite(f"{output_path.as_posix()}.mp4", images, fps = fps)

    elif file_type == "gif":
        print("Creating gif...")
        iio.imwrite(f"{output_path.as_posix()}.gif", images, duration = duration)

    else:
        raise ValueError("file_type must be 'mp4' or 'gif'")
    
    
    return

create_mp4_or_gif(png_files, output_path = data_dir / "2015-2024_flood_map_timelapse", fps = 2, duration = 1, file_type = "gif")

Found 120 PNG files for timelapse creation.
Creating gif...
