In [1]:
import os

import multiprocessing
from multiprocessing import shared_memory, Pool
import sys
import matplotlib
import glob
from datetime import datetime
import numpy as np
import xarray as xr
from climate_indices import compute, indices, utils
from concurrent.futures import ProcessPoolExecutor
from functools import partial
import logging
import rioxarray

logger = multiprocessing.log_to_stderr()
logger.setLevel(logging.DEBUG)

In [2]:
data_dir = "D:/Documents and Settings/azvoleff/OneDrive - Conservation International Foundation/Data/GPCC/"
in_files = glob.glob(data_dir + '*.nc')
da_prcp = xr.open_mfdataset(in_files, combine='by_coords').precip

# Get the precipitation data and reshape the array to have the time dimension as the inner-most axis:
da_prcp = da_prcp.transpose('lat', 'lon', 'time')
da_prcp

Unnamed: 0,Array,Chunk
Bytes,2.27 GiB,474.61 MiB
Shape,"(720, 1440, 588)","(720, 1440, 120)"
Count,20 Tasks,5 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 2.27 GiB 474.61 MiB Shape (720, 1440, 588) (720, 1440, 120) Count 20 Tasks 5 Chunks Type float32 numpy.ndarray",588  1440  720,

Unnamed: 0,Array,Chunk
Bytes,2.27 GiB,474.61 MiB
Shape,"(720, 1440, 588)","(720, 1440, 120)"
Count,20 Tasks,5 Chunks
Type,float32,numpy.ndarray


In [3]:
scales = [1, 2, 3, 6, 9, 12, 24]
# Use 1981 - 2010 as calibration period based on UNCCD SO3 GPG
calibration_year_initial = 1981
calibration_year_final = 2010
periodicity = compute.Periodicity.monthly

initial_year = int(da_prcp['time'][0].dt.year)
if periodicity == compute.Periodicity.monthly:
    period_times = 12
    gamma_time_coord = "month"
elif periodicity == compute.Periodicity.daily:
    period_times = 366
    gamma_time_coord = "day"
gamma_coords={"lat": da_prcp.lat, "lon": da_prcp.lon, gamma_time_coord: range(period_times)}

fitting_shape = (da_prcp.shape[0], da_prcp.shape[1], period_times)

shm_prcp = shared_memory.SharedMemory(create=True, size=da_prcp.data.nbytes)
shm_name_prcp = shm_prcp.name
prcp_shape = da_prcp.shape
prcp = np.ndarray(prcp_shape, dtype=da_prcp.dtype, buffer=shm_prcp.buf)
prcp[:,:,:] = da_prcp[:,:,:]
da_prcp.data = prcp

shm_spi = shared_memory.SharedMemory(create=True, size=da_prcp.data.nbytes)
shm_name_spi = shm_spi.name
spi = np.ndarray(prcp_shape, dtype=da_prcp.dtype, buffer=shm_spi.buf)

attrs_to_copy = [
    'Conventions',
    'ncei_template_version',
    'naming_authority',
    'standard_name_vocabulary',
    'institution',
    'geospatial_lat_min',
    'geospatial_lat_max',
    'geospatial_lon_min',
    'geospatial_lon_max',
    'geospatial_lat_units',
    'geospatial_lon_units',
]
global_attrs = {key: value for (key, value) in da_prcp.attrs.items() if key in attrs_to_copy}

ds_gamma = xr.Dataset(coords={"lat": da_prcp.lat, "lon": da_prcp.lon, gamma_time_coord: range(period_times)},
                      attrs=global_attrs)

Define a function that can be used to compute the gamma fitting parameters for a particular month scale:

In [5]:
%%writefile parallel_functions.py

# Based on https://stackoverflow.com/a/62041888/871101

import sys
from datetime import datetime
import logging
import multiprocessing
from multiprocessing import shared_memory


import numpy as np
import xarray as xr
from climate_indices import compute, indices, utils

logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
logger = logging.getLogger('parallel logger')
fh = logging.FileHandler('{}_{:%Y%m%d_%H%M%S}.log'.format(multiprocessing.current_process().name, datetime.now()))
logger.addHandler(fh)

logger.setLevel(logging.INFO)


def compute_gammas_lat(arguments: dict):
    logger.debug('at top of function "{}"'.format(sys._getframe(  ).f_code.co_name))
    dtype = arguments['dtype']
    lat_index = arguments['lat_index']
    lon_indices = arguments['lon_indices']
    scale = arguments['scale']
    initial_year = arguments['initial_year']
    calibration_year_initial = arguments['calibration_year_initial']
    calibration_year_final = arguments['calibration_year_final']
    periodicity = arguments['periodicity']
    prcp_shape = arguments['prcp_shape']
    fitting_shape = arguments['fitting_shape']
    
    shm_prcp = shared_memory.SharedMemory(arguments['shm_name_prcp'])
    prcp = np.ndarray(shape=prcp_shape, dtype=dtype, buffer=shm_prcp.buf)
    shm_alphas = shared_memory.SharedMemory(arguments['shm_name_alphas'])
    alphas = np.ndarray(shape=fitting_shape, dtype=dtype, buffer=shm_alphas.buf)
    shm_betas = shared_memory.SharedMemory(arguments['shm_name_betas'])
    betas = np.ndarray(shape=fitting_shape, dtype=dtype, buffer=shm_betas.buf)
    
    logger.debug("processing lat {}".format(lat_index))
    for lon_index in lon_indices:
        logger.debug("computing grid cell at lat {}, lon {}".format(lat_index, lon_index))
        # get the precipitation values for the lat/lon grid cell
        values = prcp[lat_index, lon_index]
        
        # skip over this grid cell if all NaN values
        if (np.ma.is_masked(values) and values.mask.all()) or np.all(np.isnan(values)):
            continue

        # convolve to scale
        scaled_values = \
            compute.scale_values(
                values,
                scale=scale,
                periodicity=periodicity,
            )

        # compute the fitting parameters on the scaled data
        alphas[lat_index, lon_index], betas[lat_index, lon_index] = \
            compute.gamma_parameters(
                scaled_values,
                data_start_year=initial_year,
                calibration_start_year=calibration_year_initial,
                calibration_end_year=calibration_year_final,
                periodicity=periodicity,
            )
        
def compute_spi_gamma_lat(arguments: dict):
    logger.debug('at top of function "{}"'.format(sys._getframe(  ).f_code.co_name))
    
    dtype = arguments['dtype']
    lat_index = arguments['lat_index']
    lon_indices = arguments['lon_indices']
    scale = arguments['scale']
    initial_year = arguments['initial_year']
    calibration_year_initial = arguments['calibration_year_initial']
    calibration_year_final = arguments['calibration_year_final']
    periodicity = arguments['periodicity']
    prcp_shape = arguments['prcp_shape']
    fitting_shape = arguments['fitting_shape']
    
    shm_prcp = shared_memory.SharedMemory(arguments['shm_name_prcp'])
    prcp = np.ndarray(shape=prcp_shape, dtype=dtype, buffer=shm_prcp.buf)
    shm_spi = shared_memory.SharedMemory(arguments['shm_name_spi'])
    spi = np.ndarray(shape=prcp_shape, dtype=dtype, buffer=shm_spi.buf)
    shm_alphas = shared_memory.SharedMemory(arguments['shm_name_alpha'])
    alphas = np.ndarray(shape=fitting_shape, dtype=dtype, buffer=shm_alphas.buf)
    shm_betas = shared_memory.SharedMemory(arguments['shm_name_beta'])
    betas = np.ndarray(shape=fitting_shape, dtype=dtype, buffer=shm_betas.buf)
        
    logger.debug("processing lat {}".format(lat_index))
    for lon_index in lon_indices:
        # get the values for the lat/lon grid cell
        values = prcp[lat_index, lon_index]

        # skip over this grid cell if all NaN values
        if (np.ma.is_masked(values) and values.mask.all()) or np.all(np.isnan(values)):
            continue

        gamma_parameters = {
            "alpha": alphas[lat_index, lon_index],
            "beta": betas[lat_index, lon_index],
        }

        # compute the SPI
        spi[lat_index, lon_index] = \
            indices.spi(
                values,
                scale=scale,
                distribution=indices.Distribution.gamma,
                data_start_year=initial_year,
                calibration_year_initial=calibration_year_initial,
                calibration_year_final=calibration_year_final,
                periodicity=periodicity,
                fitting_params=gamma_parameters,
            )

Overwriting parallel_functions.py


In [6]:
import parallel_functions


def compute_gammas(da_prcp: xr.DataArray,
                   scale: int,
                   calibration_year_initial,
                   calibration_year_final,
                   periodicity: compute.Periodicity,
                   shm_name_prcp,
                   prcp_shape,
                   fitting_shape,
                   logger) -> (xr.DataArray, xr.DataArray):
    logger.info('at top of function "{}"'.format(sys._getframe(  ).f_code.co_name))
    
    # create a shared memory array for the gamma distribution alpha and beta fitting parameters
    memory_size = int(np.prod(fitting_shape) * da_prcp.dtype.itemsize)

    shm_alphas = shared_memory.SharedMemory(create=True, size=memory_size)
    shm_name_alphas = shm_alphas.name
    alphas = np.ndarray(shape=fitting_shape, dtype=da_prcp.dtype, buffer=shm_alphas.buf)
    alphas[:,:,:] = np.NaN
    
    shm_betas = shared_memory.SharedMemory(create=True, size=memory_size)
    shm_name_betas = shm_betas.name
    betas = np.ndarray(shape=fitting_shape, dtype=da_prcp.dtype, buffer=shm_betas.buf)
    betas[:,:,:] = np.NaN
    
    arguments_list = []
    for lat_index in range(da_prcp.shape[0]):
        arguments = {
            'lat_index': lat_index,
            'lon_indices': list(range(da_prcp.shape[1])),
            'dtype': da_prcp.dtype,
            'shm_name_prcp': shm_name_prcp,
            'shm_name_alphas': shm_name_alphas,
            'shm_name_betas': shm_name_betas,
            'scale': scale,
            'initial_year': initial_year,
            'calibration_year_initial': calibration_year_initial,
            'calibration_year_final': calibration_year_final,
            'periodicity': periodicity,
            'prcp_shape': da_prcp.shape,
            'fitting_shape': fitting_shape,
        }
        arguments_list.append(arguments)

    logger.debug('arguments list length is {}'.format(len(arguments_list)))
    logger.debug('arguments list [0] length is {}'.format(len(arguments_list[0])))
    with Pool(8) as p:
        p.map(parallel_functions.compute_gammas_lat, arguments_list)
    
    alphas_non_shm = np.ndarray(shape=fitting_shape, dtype=da_prcp.dtype)
    alphas_non_shm[:, :, :] = alphas[:,:,:]
    betas_non_shm = np.ndarray(shape=fitting_shape, dtype=da_prcp.dtype)
    betas_non_shm[:, :, :] = betas[:,:,:]
    
    alpha_attrs = {
        'description': 'shape parameter of the gamma distribution (also referred to as the concentration) ' + \
        f'computed from the {scale}-month scaled precipitation values',
    }
    da_alpha = xr.DataArray(
        data=alphas_non_shm,
        coords=gamma_coords,
        dims=tuple(gamma_coords.keys()),
        name=f"alpha_{str(scale).zfill(2)}",
        attrs=alpha_attrs,
    )
    beta_attrs = {
        'description': '1 / scale of the distribution (also referred to as the rate) ' + \
        f'computed from the {scale}-month scaled precipitation values',
    }
    da_beta = xr.DataArray(
        data=betas_non_shm,
        coords=gamma_coords,
        dims=tuple(gamma_coords.keys()),
        name=f"beta_{str(scale).zfill(2)}",
        attrs=beta_attrs,
    )

    return da_alpha, da_beta


def compute_spi_gamma(da_prcp,
                      scale: int,
                      calibration_year_initial,
                      calibration_year_final,
                      periodicity,
                      shm_name_prcp,
                      prcp_shape,
                      shm_name_spi,
                      shm_name_alpha,
                      shm_name_beta,
                      fitting_shape,
                      logger) -> xr.DataArray:
    logger.info('at top of function "{}"'.format(sys._getframe(  ).f_code.co_name))
    
    shm_alphas = shared_memory.SharedMemory(shm_name_alpha)
    alphas = np.ndarray(shape=fitting_shape, dtype=da_prcp.dtype, buffer=shm_alphas.buf)
    
    shm_betas = shared_memory.SharedMemory(shm_name_beta)
    betas = np.ndarray(shape=fitting_shape, dtype=da_prcp.dtype, buffer=shm_betas.buf)
    
    arguments_list = []
    for lat_index in range(da_prcp.shape[0]):
        arguments = {
            'lat_index': lat_index,
            'lon_indices': list(range(da_prcp.shape[1])),
            'dtype': da_prcp.dtype,
            'shm_name_prcp': shm_name_prcp,
            'shm_name_spi': shm_name_spi,
            'shm_name_alpha': shm_name_alpha,
            'shm_name_beta': shm_name_beta,
            'scale': scale,
            'initial_year': initial_year,
            'calibration_year_initial': calibration_year_initial,
            'calibration_year_final': calibration_year_final,
            'periodicity': periodicity,
            'prcp_shape': da_prcp.shape,
            'fitting_shape': fitting_shape,
        }
        arguments_list.append(arguments)
    
    logger.debug('arguments list length is {}'.format(len(arguments_list)))
    logger.debug('arguments list [0] length is {}'.format(len(arguments_list[0])))
    with Pool(8) as p:
        p.map(parallel_functions.compute_spi_gamma_lat, arguments_list)

    shm_spi = shared_memory.SharedMemory(arguments['shm_name_spi'])
    spi = np.ndarray(shape=prcp_shape, dtype=da_prcp.dtype, buffer=shm_spi.buf)
    spi_non_shm = np.ndarray(shape=prcp_shape, dtype=da_prcp.dtype)
    spi_non_shm[:, :, :] = spi[:,:,:]
    
    # build a DataArray for this scale's SPI
    da_spi = xr.DataArray(
        data=spi_non_shm,
        coords=da_prcp.coords,
        dims=da_prcp.dims,
        name=f"spi_gamma_{str(scale).zfill(2)}",
    )
    da_spi.attrs = {
        'description': f'SPI ({scale}-{periodicity} gamma) computed from monthly precipitation ' + \
            f'data for the period {da_prcp.time[0]} through {da_prcp.time[-1]} using a ' + \
            f'calibration period from {calibration_year_initial} through {calibration_year_final}',
        'valid_min': -3.09,
        'valid_max': 3.09,
        'scale': scale,
        'long_name': f'{scale}-{periodicity} SPI(gamma)',
        'calibration_year_initial': calibration_year_initial,
        'calibration_year_final': calibration_year_final,
    }

    return da_spi

Compute the gamma fitting parameters for all scales and add these into a Dataset that we'll write to NetCDF:

In [7]:
%%time

logger.setLevel(logging.INFO)

for scale in scales:
    logger.info('processing scale {}'.format(scale))
    var_name_alpha = f"alpha_{str(scale).zfill(2)}"
    var_name_beta = f"beta_{str(scale).zfill(2)}"
    da_alpha, da_beta = compute_gammas(da_prcp,
                                       scale,
                                       calibration_year_initial,
                                       calibration_year_final,
                                       periodicity,
                                       shm_name_prcp,
                                       prcp_shape,
                                       fitting_shape,
                                       logger)    
    ds_gamma[f"alpha_{str(scale).zfill(2)}"] = da_alpha
    ds_gamma[f"beta_{str(scale).zfill(2)}"] = da_beta
netcdf_gamma = "D:/Documents and Settings/azvoleff/OneDrive - Conservation International Foundation/Data/GPCC/GPCC_monthly_v2020_1971-2019_monthly_gamma.nc"
ds_gamma.to_netcdf(netcdf_gamma)
print(np.nanmean(ds_gamma.alpha_12))
print(np.nanmean(ds_gamma.beta_12))

[INFO/MainProcess] processing scale 1
[INFO/MainProcess] at top of function "compute_gammas"
[INFO/MainProcess] processing scale 2
[INFO/MainProcess] at top of function "compute_gammas"
[INFO/MainProcess] processing scale 3
[INFO/MainProcess] at top of function "compute_gammas"
[INFO/MainProcess] processing scale 6
[INFO/MainProcess] at top of function "compute_gammas"
[INFO/MainProcess] processing scale 9
[INFO/MainProcess] at top of function "compute_gammas"
[INFO/MainProcess] processing scale 12
[INFO/MainProcess] at top of function "compute_gammas"
[INFO/MainProcess] processing scale 24
[INFO/MainProcess] at top of function "compute_gammas"


inf
26.876635
Wall time: 6min 27s


Compute the SPI using the pre-computed gamma fitting parameters for all scales and add these into a SPI(gamma) Dataset that we'll write to NetCDF:

In [8]:
ds_gamma = xr.open_dataset("D:/Documents and Settings/azvoleff/OneDrive - Conservation International Foundation/Data/GPCC/GPCC_monthly_v2020_1971-2019_monthly_gamma.nc")
ds_gamma

In [9]:
%%time
ds_spi = xr.Dataset(
    coords=da_prcp.coords,
    attrs=global_attrs,
)
for scale in scales:
    logger.info('processing scale {}'.format(scale))

    # Clear the SPI array in shared memory before calculating values for each scale - note that compute_spi_gamma
    # will return an array that does not use this shared memory (and hence ds_spi can store spi series for multiple
    # scales). The shared memory spi array however IS used to store values temporarily while the SPI is calculated
    # in parallel by multiple processes
    spi[:,:,:] = np.NaN
    
    ds_gamma_betas = ds_gamma[f"beta_{str(scale).zfill(2)}"]
    ds_gamma_alphas = ds_gamma[f"alpha_{str(scale).zfill(2)}"]
    
    memory_size = int(np.prod(fitting_shape) * da_prcp.dtype.itemsize)
    var_name_alpha = f"alpha_{str(scale).zfill(2)}"
    var_name_beta = f"beta_{str(scale).zfill(2)}"    
        
    shm_alphas = shared_memory.SharedMemory(create=True, size=memory_size)
    shm_name_alpha = shm_alphas.name
    alphas = np.ndarray(shape=fitting_shape, dtype=da_prcp.dtype, buffer=shm_alphas.buf)
    alphas[:,:,:] = ds_gamma_alphas
    
    shm_betas = shared_memory.SharedMemory(create=True, size=memory_size)
    shm_name_beta = shm_betas.name
    betas = np.ndarray(shape=fitting_shape, dtype=da_prcp.dtype, buffer=shm_betas.buf)
    betas[:,:,:] = ds_gamma_betas
    
    da_spi = compute_spi_gamma(da_prcp,
                               scale,
                               calibration_year_initial,
                               calibration_year_final,
                               periodicity,
                               shm_name_prcp,
                               prcp_shape,
                               shm_name_spi,
                               shm_name_alpha,
                               shm_name_beta,
                               fitting_shape,
                               logger)
    ds_spi[f"spi_gamma_{str(scale).zfill(2)}"] = da_spi
    
netcdf_spi = "D:/Documents and Settings/azvoleff/OneDrive - Conservation International Foundation/Data/GPCC/GPCC_monthly_v2020_1971-2019_monthly_gamma_SPI.nc"
ds_spi.to_netcdf(netcdf_spi)

[INFO/MainProcess] processing scale 1
[INFO/MainProcess] at top of function "compute_spi_gamma"
[INFO/MainProcess] processing scale 2
[INFO/MainProcess] at top of function "compute_spi_gamma"
[INFO/MainProcess] processing scale 3
[INFO/MainProcess] at top of function "compute_spi_gamma"
[INFO/MainProcess] processing scale 6
[INFO/MainProcess] at top of function "compute_spi_gamma"
[INFO/MainProcess] processing scale 9
[INFO/MainProcess] at top of function "compute_spi_gamma"
[INFO/MainProcess] processing scale 12
[INFO/MainProcess] at top of function "compute_spi_gamma"
[INFO/MainProcess] processing scale 24
[INFO/MainProcess] at top of function "compute_spi_gamma"


Wall time: 28min 21s


In [10]:
np.nanmean(ds_spi.spi_gamma_12)

0.021683106

Plot a time step to validate that the SPI values look reasonable:

In [None]:
# test versus this map: https://spei.csic.es/map/maps.html#months=4#month=8#year=2012
ds_spi["spi_gamma_12"].isel(time=500).plot(cmap='BrBG')

In [None]:
# test versus this map: https://spei.csic.es/map/maps.html#months=4#month=8#year=2012
ds_spi["spi_gamma_12"].isel(lat=399, lon=560).plot()

In [8]:
ds_spi = xr.open_dataset("D:/Documents and Settings/azvoleff/OneDrive - Conservation International Foundation/Data/GPCC/GPCC_monthly_v2020_1971-2019_monthly_gamma_SPI.nc")
ds_spi = ds_spi.transpose('time', 'lat', 'lon')
ds_spi.rio.write_crs('EPSG:4326', inplace=True)
for var in ds_spi.data_vars:
    logger.info('processing var {}'.format(var))
    ds_spi[var].data = ds_spi[var].data * 1000
    ds_spi[var].data[np.isnan(ds_spi[var].data)] = -32768
    ds_spi[var].rio.write_nodata(-32768, inplace=True)
    ds_spi[var] = ds_spi[var].astype('int16')
    year_start = int(ds_spi[var]["time"].dt.year.min())
    year_end = int(ds_spi[var]["time"].dt.year.max())
    ds_spi[var].rio.to_raster(os.path.join(data_dir, 'GPCC_monthly_v2020_{}-{}_monthly_gamma_SPI_lag{}.tif'.format(year_start, year_end, ds_spi[var].scale)),
                             compress='deflate', tiled=True, dtype='int16')

[INFO/MainProcess] processing var spi_gamma_01
[INFO/MainProcess] processing var spi_gamma_02
[INFO/MainProcess] processing var spi_gamma_03
[INFO/MainProcess] processing var spi_gamma_06
[INFO/MainProcess] processing var spi_gamma_09
[INFO/MainProcess] processing var spi_gamma_12
[INFO/MainProcess] processing var spi_gamma_24


In [59]:

ds_spi[var].rio.nodata
print(ds_spi[var])
ds_spi[var].rio.write_nodata(-32768, inplace=True)
ds_spi[var].rio.set_nodata(-32768, inplace=True)
print(ds_spi[var].rio.set_nodata(-32768, inplace=True))
print(ds_spi[var].rio.nodata)
print(ds_spi[var])

<xarray.DataArray 'spi_gamma_01' (time: 588, lat: 720, lon: 1440)>
[609638400 values with dtype=float32]
Coordinates:
  * lat          (lat) float64 89.88 89.62 89.38 89.12 ... -89.38 -89.62 -89.88
  * lon          (lon) float64 -179.9 -179.6 -179.4 -179.1 ... 179.4 179.6 179.9
  * time         (time) datetime64[ns] 1971-01-01 1971-02-01 ... 2019-12-01
    spatial_ref  int32 0
Attributes:
    description:               SPI (1-monthly gamma) computed from monthly pr...
    valid_min:                 -3.09
    valid_max:                 3.09
    scale:                     1
    long_name:                 1-monthly SPI(gamma)
    calibration_year_initial:  1981
    calibration_year_final:    2010
    grid_mapping:              spatial_ref
    _FillValue:                -32768.0
<xarray.DataArray 'spi_gamma_01' (time: 588, lat: 720, lon: 1440)>
[609638400 values with dtype=float32]
Coordinates:
  * lat          (lat) float64 89.88 89.62 89.38 89.12 ... -89.38 -89.62 -89.88
  * lon         

nan
