In [1]:
"""
This code processes global flood event data and population grids. It opens flood event files, adjusts them for
specific bands, downsamples them to match a population grid, and saves processed datasets by year. It then
aggregates these processed datasets to calculate the maximum number of affected people per grid cell annually.
 Finally, it compiles all processed data into a single dataset.

Input:
- Files in D:\Datasets\Gridded Population of the World
- Files in D:\Datasets\Global Flood Database\gfd_v1_4

Output:
- GFD_floods_yearly.nc
"""

"""
New packages used:
- dask: is a flexible library for parallel computing. The Client class is used to connect to a Dask cluster.
- geocube: is a package that helps in converting vector data into raster formats
- json: module provides functions to parse JSON
- datetime: manipulating dates and times
- re: provides support for working with regular expressions

"""

import os
import dask
import numpy as np
import xarray as xr
import pandas as pd
import geopandas as gpd
import matplotlib.pyplot as plt
import seaborn as sns
from dask.diagnostics import ProgressBar
from dask.distributed import Client
from geocube.api.core import make_geocube
from tqdm import tqdm

PATH = "D:\World Bank\CLIENT v2"
DATA_RAW = rf"{PATH}\Data\Data_raw"
DATA_PROC = rf"{PATH}\Data\Data_proc"
DATA_OUT = rf"{PATH}\Data\Data_out"
GFD_PATH = rf"D:\Datasets\Global Flood Database\gfd_v1_4"
GPW_PATH = rf"D:\Datasets\Gridded Population of the World"

client = Client()
client



0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 4,Total memory: 31.88 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:49940,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 4
Started: Just now,Total memory: 31.88 GiB

0,1
Comm: tcp://127.0.0.1:49959,Total threads: 1
Dashboard: http://127.0.0.1:49962/status,Memory: 7.97 GiB
Nanny: tcp://127.0.0.1:49943,
Local directory: C:\Users\ofici\AppData\Local\Temp\dask-scratch-space\worker-_yl9j522,Local directory: C:\Users\ofici\AppData\Local\Temp\dask-scratch-space\worker-_yl9j522

0,1
Comm: tcp://127.0.0.1:49961,Total threads: 1
Dashboard: http://127.0.0.1:49969/status,Memory: 7.97 GiB
Nanny: tcp://127.0.0.1:49945,
Local directory: C:\Users\ofici\AppData\Local\Temp\dask-scratch-space\worker-xlhet7rw,Local directory: C:\Users\ofici\AppData\Local\Temp\dask-scratch-space\worker-xlhet7rw

0,1
Comm: tcp://127.0.0.1:49960,Total threads: 1
Dashboard: http://127.0.0.1:49965/status,Memory: 7.97 GiB
Nanny: tcp://127.0.0.1:49947,
Local directory: C:\Users\ofici\AppData\Local\Temp\dask-scratch-space\worker-ubtznyn_,Local directory: C:\Users\ofici\AppData\Local\Temp\dask-scratch-space\worker-ubtznyn_

0,1
Comm: tcp://127.0.0.1:49964,Total threads: 1
Dashboard: http://127.0.0.1:49967/status,Memory: 7.97 GiB
Nanny: tcp://127.0.0.1:49949,
Local directory: C:\Users\ofici\AppData\Local\Temp\dask-scratch-space\worker-ua0lfr9l,Local directory: C:\Users\ofici\AppData\Local\Temp\dask-scratch-space\worker-ua0lfr9l


In [2]:
import os
from tqdm import tqdm

def load_population_data(bounds=None, generate=False):
    """
    This function processes population data by:
    1. Reading multiple TIFF files from the specified directory.
    2. Extracting the year from each filename and adding it as a coordinate to the dataset.
    3. Concatenating the datasets along the 'year' dimension into a single dataset.
    4. Filtering the dataset to include only data within the provided geographical bounds (if any).
    5. Cleaning the dataset by selecting the first band and removing the 'band' variable.
    6. Optionally saving the processed raster files if the 'generate' parameter is set to True.
    """
    print("Processing Population data...")

    # Select all files in GPW folder
    gpw_path = GPW_PATH
    files = os.listdir(gpw_path)
    files = [f for f in files if f.endswith(".tif")]
    
    # Compile into a single dataset
    dss = []
    for f in tqdm(files):
        
        ds = xr.open_dataset(os.path.join(gpw_path, f), chunks={"x": 43200/2, "y": 17174/2})
        ds["band_data"] = ds["band_data"].astype(np.uint32)
        if bounds is not None:
            ds = ds.sel(
                x=slice(bounds[0], bounds[2]), y=slice(bounds[3], bounds[1])
            )
        if generate:
            with ProgressBar():
                ds.sel(band=1).drop_vars("band").band_data.rio.to_raster(rf"E:\client_v2_data\{f.replace('.tif','_proc.tif')}")
                print(f"Saved {f.replace('.tif','_proc.tif')}")
        
        ds["year"] = int(f.split("_")[5])
        ds = ds.set_coords('year')
        dss += [ds]
        
    population = xr.concat(dss, dim="year")    
    
    # Filter if bounds are provided
    if bounds is not None:
        population = population.sel(
            x=slice(bounds[0], bounds[2]), y=slice(bounds[3], bounds[1])
        )
        
    # Clean band dimension
    population = population.sel(band=1).drop_vars(["band"])
    
    print("Done!")
    return population


In [3]:
import json
import re 
from datetime import datetime

def extract_number_from_filename(filename):
    """
    This function extracts a number from a filename using a regular expression and returns the number as an integer.
    """
    # Define the regex pattern to extract the number
    match = re.search(r'DFO_(\d+)_', filename)
    if match:
        assert len(match.groups()) == 1
        return int(match.group(1))
    return None

def extract_year_from_filename(filename):
    
    """
    This function extracts two dates from a filename using a regular expression, calculates the average
    date, and returns the year from the average date.
    """
    
    import re

    year = None
    # Define a regular expression pattern to match the dates
    pattern = r'From_(\d{8})_to_(\d{8})'

    # Use re.search to find the dates in the string
    match = re.search(pattern, filename)

    if match:
        # Extract the dates from the matched groups
        date_from = match.group(1)
        date_to = match.group(2)
        
        # Convert to a more readable format if needed
        # Convert strings to datetime.date objects
        date_from_dt = datetime.strptime(date_from, r'%Y%m%d').date()
        date_to_dt = datetime.strptime(date_to, r'%Y%m%d').date()

        # Calculate the average date
        average_date = date_from_dt + (date_to_dt - date_from_dt) / 2
        average_date = average_date.strftime(r'%Y-%m-%d')
        year = average_date.split("-")[0]
    else:
        print("Dates not found in the string.")

    return year


def check_word_in_json_files(directory, word):
    """
    This function searches for a specific word within JSON files in a given directory. Then iterates over the
    JSON files in the directory. For each file, it checks if the word appears in any of the JSON values,
    either as a string or within a list. It counts and prints the number of files checked and those containing
    the word, then extracts numbers from the filenames of the files containing the word and returns these IDs.
    """
    word = word.upper()
    files_checked = 0
    files_containing_word = []
    
    for filename in os.listdir(directory):
        if filename.endswith('.json'):
            files_checked += 1
            filepath = os.path.join(directory, filename)
            with open(filepath, 'r') as file:
                data = json.load(file)
                
                # Check if the word is in any of the values
                for value in data.values():
                    if isinstance(value, str) and word in value.upper():
                        files_containing_word += [filepath]
                        break
                    elif isinstance(value, list) and any(word in str(item).upper() for item in value):
                        files_containing_word += [filepath]
                        break
    
    print(f"Files checked: {files_checked}")
    print(f"Files containing '{word}': {len(files_containing_word)}")
    
    ids = [extract_number_from_filename(f) for f in files_containing_word]
    return ids

def process_gdw_file(ds):
    """
    This function filters data by the value of band 5 and returns data with band 5 = 1
    """
    masked = xr.where(ds.sel(band=5)==1, 0, ds.sel(band=1))
    return masked

def regrid_floods_to_population(floods, like):
    """
    This function function adjusts and regrids a flood dataset (floods) to match the resolution and extent of a
    reference dataset (like).The function returns the regridded flood dataset.
    """
    # Calculate the resolution ratios
    like = like.sel(x=slice(floods['x'].min(), floods['x'].max()), y=slice(floods['y'].max(), floods['y'].min()))
    target_lat = like.y
    target_lon = like.x

    floods_xslice = slice(floods['x'].min(), floods['x'].max())
    floods_yslice = slice(floods['y'].max(), floods['y'].min())
    lat_ratio = len(floods['y']) // len(target_lat.sel(y=floods_yslice))
    lon_ratio = len(floods['x']) // len(target_lon.sel(x=floods_xslice))
    
    # Coarsen the floods data by averaging
    coarsened_floods = floods.coarsen(y=int(lat_ratio), x=int(lon_ratio), boundary='trim').mean()
    
    # Rename dimensions to match target grid
    coarsened_floods = coarsened_floods.interp_like(like, method='linear', kwargs={'fill_value': 0})
    return coarsened_floods

## Genero archivos por shock:

Agrega los datos de floods en la misma dimensión que Griddeed Population of The World: 30-arc sec. Reduce la resolución de la grilla de floods al tamaño de la de población, promedidando cada 3 celdas. Entonces, el punto de la grilla de población nos va a decir aproximamandemente que % de la población fue afectada, asumiendo que se distribuyen uniformemente en esa celda (i.e. calcula el % del area afectada). Para ajustar correctamente la grilla, interpolamos con población al final

In [4]:
pop = load_population_data(bounds=None, generate=False)
# pop = xr.open_dataset(rf"{GPW_PATH}\gpw_v4_population_count_rev11_2000_30_sec.tif")

Processing Population data...


Cannot find the ecCodes library
100%|██████████| 4/4 [00:03<00:00,  1.31it/s]

Done!





In [None]:
"""
This step processes flood event files by opening each TIFF file, extracting the year and shock ID from the filename,
processing the raster bands, downsampling to match the population grid, and saving the results as NetCDF files in
a dictionary using years as keys.
"""

## Procesamiento 1:
#   Abre todas los eventos de inundación, procesa las bandas y downsamplea, generando un tif por evento con la grilla de población	 

# Lists all files in the GFD_PATH directory.
files = os.listdir(GFD_PATH)
# Filters only files ending with .tif.
files = [f for f in files if f.endswith(".tif")]
# Creates an empty grid matching pop for the year 2000.
population_grid = xr.zeros_like(pop.sel(year=2000))

dss = {}
tasks = []
# Iterates over each TIF file found in GFD_PATH.
for i, f in tqdm(enumerate(files), total=len(files)):
    with dask.config.set(**{'array.slicing.split_large_chunks': False}):
        
        year = extract_year_from_filename(f)
        shockid = extract_number_from_filename(f)
        
        # Opens the TIF file as an xarray dataset (ds) using xr.open_dataset.
        ds = xr.open_dataset(os.path.join(GFD_PATH, f), engine="rasterio", chunks={"x": 1000, "y": 1000})
        ds = process_gdw_file(ds)
        # x_bounds = slice(ds.x.min(), ds.x.max())
        # y_bounds = slice(ds.y.max(), ds.y.min())
        # pop_slice = pop.sel(year=2000, x=x_bounds, y=y_bounds)
        ds_aggregated = regrid_floods_to_population(ds, population_grid)
        ds_aggregated = ds_aggregated.assign_coords({"year": year})
        ds_aggregated = ds_aggregated.assign_coords({"id": shockid})
        # Saves the processed dataset as a NetCDF file in DATA_PROC/gfd_proc with a modified name (f.replace('.tif', '_proc.nc')).
        ds_aggregated = ds_aggregated.to_netcdf(os.path.join(DATA_PROC, "gfd_proc", f"{f.replace('.tif', f'_proc.nc')}"), compute=False)
        
        # Storing in dss: dss is a dictionary where each key is a year and the value is a list of processed xarray datasets for that year.
        if year not in dss.keys():
            dss[year] = [ds_aggregated]
        else:
            dss[year] += [ds_aggregated]

In [None]:
for year, dss_year in tqdm(dss.items()):
    dask.compute(*dss_year)
# tasks += [xr.concat(dss_year, dim="id").max(dim="id").to_netcdf(os.path.join(DATA_PROC, "gfd_proc", f"{year}_out.nc"), compute=False)]

In [5]:
"""
This step processes flood event data by creating an empty grid, normalizing processed event data to a global extent, and
calculating the maximum number of affected people for each year. It iterates over processed files, fills the grid
with event data, and organizes datasets by year. Finally, it concatenates yearly datasets and saves the results
as NetCDF files representing the maximum impact of floods for each year.
"""
## Paso 2:
#   Agrega los tif procesados en un solo archivo, calculando la máxima cantidad de personas afectadas cada año

# Genera grilla vacía donde voy relleando los datos de cada evento
# Creates an empty grid matching pop for the year 2000.
grid = xr.zeros_like(pop.sel(year=2000))
# Assigns dummy coordinates "year" and "id" to initialize the dataset. These are overwritten later.
grid = grid.assign_coords({"year": "999"})
grid = grid.assign_coords({"id": 999})
grid["band_data"] = grid["band_data"].astype(np.float32)
# Saves the empty grid as a NetCDF file named "empty_grid.nc" in DATA_PROC.
grid.to_netcdf(os.path.join(DATA_PROC, "empty_grid.nc"))

grid = xr.open_dataset(os.path.join(DATA_PROC, "empty_grid.nc"), chunks={"x": 1000, "y": 1000})

# Lists all files in the DATA_PROC/gfd_proc folder.
processed_files = os.listdir(os.path.join(DATA_PROC, "gfd_proc"))
# Filters only files ending with "_proc.nc".
processed_files = [f for f in processed_files if f.endswith("_proc.nc")]
dss = {}
# Iterates over each processed file, reads the xarray datasets, and normalizes the grid for all flood events.
for f in tqdm(processed_files):
    year = extract_year_from_filename(f)
    ds_grid = xr.zeros_like(grid)
    ds = xr.open_dataset(os.path.join(DATA_PROC, "gfd_proc", f), chunks={"x": 1000, "y": 1000})
    ds_grid.loc[{"x":slice(ds.x.min(), ds.x.max()), "y":slice(ds.y.max(), ds.y.min())}] = ds.fillna(0) 
    ds_grid["year"] = year
    if year not in dss.keys():
        dss[year] = [ds_grid]
    else:
        dss[year] += [ds_grid]
        
# Para cada año, concateno y calculo la máxima cantidad de personas afectadas
tasks = []
for year, dss_year in tqdm(dss.items()):
    # Concatenates all flood datasets for a year and calculates the maximum for each cell (i.e., if there was a flood).
    # Saves the result as a NetCDF file named "{year}_out.nc" in DATA_PROC/gfd_proc
    xr.concat(dss_year, dim="id").max(dim="id").to_netcdf(os.path.join(DATA_PROC, "gfd_proc", f"{year}_out.nc"))
    print(f'Saved {os.path.join(DATA_PROC, "gfd_proc", f"{year}_out.nc")}')

100%|██████████| 937/937 [00:36<00:00, 25.52it/s]
  5%|▌         | 1/19 [01:11<21:29, 71.63s/it]

Saved D:\World Bank\CLIENT v2\Data\Data_proc\gfd_proc\2000_out.nc


 11%|█         | 2/19 [03:23<30:23, 107.24s/it]

Saved D:\World Bank\CLIENT v2\Data\Data_proc\gfd_proc\2001_out.nc


 16%|█▌        | 3/19 [08:33<53:16, 199.78s/it]

Saved D:\World Bank\CLIENT v2\Data\Data_proc\gfd_proc\2002_out.nc


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
 21%|██        | 4/19 [15:57<1:13:59, 295.96s/it]

Saved D:\World Bank\CLIENT v2\Data\Data_proc\gfd_proc\2003_out.nc


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
 26%|██▋       | 5/19 [21:24<1:11:44, 307.46s/it]

Saved D:\World Bank\CLIENT v2\Data\Data_proc\gfd_proc\2004_out.nc


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
 32%|███▏      | 6/19 [27:19<1:10:07, 323.62s/it]

Saved D:\World Bank\CLIENT v2\Data\Data_proc\gfd_proc\2005_out.nc


 37%|███▋      | 7/19 [32:30<1:03:53, 319.42s/it]

Saved D:\World Bank\CLIENT v2\Data\Data_proc\gfd_proc\2006_out.nc


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
 42%|████▏     | 8/19 [41:14<1:10:29, 384.54s/it]

Saved D:\World Bank\CLIENT v2\Data\Data_proc\gfd_proc\2007_out.nc


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
 47%|████▋     | 9/19 [46:39<1:00:58, 365.85s/it]

Saved D:\World Bank\CLIENT v2\Data\Data_proc\gfd_proc\2008_out.nc


 53%|█████▎    | 10/19 [50:31<48:40, 324.54s/it] 

Saved D:\World Bank\CLIENT v2\Data\Data_proc\gfd_proc\2009_out.nc


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
 58%|█████▊    | 11/19 [56:41<45:06, 338.32s/it]

Saved D:\World Bank\CLIENT v2\Data\Data_proc\gfd_proc\2010_out.nc


 63%|██████▎   | 12/19 [1:00:38<35:52, 307.51s/it]

Saved D:\World Bank\CLIENT v2\Data\Data_proc\gfd_proc\2011_out.nc


 68%|██████▊   | 13/19 [1:05:12<29:45, 297.55s/it]

Saved D:\World Bank\CLIENT v2\Data\Data_proc\gfd_proc\2012_out.nc


 74%|███████▎  | 14/19 [1:08:05<21:38, 259.73s/it]

Saved D:\World Bank\CLIENT v2\Data\Data_proc\gfd_proc\2013_out.nc


 79%|███████▉  | 15/19 [1:11:30<16:12, 243.24s/it]

Saved D:\World Bank\CLIENT v2\Data\Data_proc\gfd_proc\2014_out.nc


 84%|████████▍ | 16/19 [1:15:16<11:54, 238.22s/it]

Saved D:\World Bank\CLIENT v2\Data\Data_proc\gfd_proc\2015_out.nc


 89%|████████▉ | 17/19 [1:18:50<07:41, 230.90s/it]

Saved D:\World Bank\CLIENT v2\Data\Data_proc\gfd_proc\2016_out.nc


 95%|█████████▍| 18/19 [1:21:38<03:32, 212.06s/it]

Saved D:\World Bank\CLIENT v2\Data\Data_proc\gfd_proc\2017_out.nc


100%|██████████| 19/19 [1:25:03<00:00, 268.59s/it]

Saved D:\World Bank\CLIENT v2\Data\Data_proc\gfd_proc\2018_out.nc





In [7]:
"""
This step completes the flood analysis process by compiling all processed results into a single annual database file
(GFD_floods_yearly.nc). Each NetCDF file contains structured and normalized flood data for each year.

"""

## Paso 3: compila todo en una unica base
# Para cada año, concateno y calculo la máxima cantidad de personas afectadas

# Creates an empty list to store all processed flood datasets.
all_ds = []
# Iterates through the dictionary dss, where each key represents a year and each value is a list of processed flood datasets for that year.
for year, dss_year in tqdm(dss.items()):
    # Opens the NetCDF file containing the processed flood results for a specific year.
    ds = xr.open_dataset(os.path.join(DATA_PROC, "gfd_proc", f"{year}_out.nc"), chunks={"x":1000, "y":1000})
    # Adds a "year" coordinate to the dataset ds, indicating the corresponding year.
    ds["year"] = year
    # Appends the dataset ds to the all_ds list.
    all_ds += [ds]
# Concatenates all flood datasets stored in all_ds along the "year" dimension. This creates a single dataset (full_ds)
#  containing consolidated flood data for each year.
full_ds = xr.concat(all_ds, dim="year")
# Saves the compiled dataset as a NetCDF file named "GFD_floods_yearly.nc" in the DATA_OUT directory.
full_ds.to_netcdf(os.path.join(DATA_OUT, "GFD_floods_yearly.nc"))

This may cause some slowdown.
Consider scattering data ahead of time and using futures.


In [24]:
import xarray as xr
ds = xr.open_dataset(r"D:\World Bank\CLIENT v2\Data\Data_out\ERA5_droughts_yearly.nc")

In [26]:
import pandas as pd

pd.read_parquet(r"D:\World Bank\CLIENT v2\Data\Data_proc\shocks\flooded_2000_6_zonal_stats.parquet")

Unnamed: 0,area_affected,cells_affected,total_cells,population_affected,population_affected_n,total_population
9.0,0.0,0.0,25,,0.0,0
10.0,0.0,0.0,19,,0.0,0
11.0,0.0,0.0,10,,0.0,0
12.0,0.0,0.0,7,,0.0,0
13.0,0.0,0.0,9,,0.0,0
...,...,...,...,...,...,...
37659.0,0.0,0.0,86,,0.0,0
37660.0,0.0,0.0,22,,0.0,0
37661.0,0.0,0.0,226,,0.0,0
37662.0,0.0,0.0,766,,0.0,0


In [15]:
tuple(adm_id_full["ID"].sizes[d] for d in ['x', 'y'])


(43200, 17173)

In [None]:
# Ver progreso en el dashboard: http://127.0.0.1:8787/status
dask.compute(*tasks)