# Drought Indicator Calculation

@author: Caroline Gasten

The present notebook caculates the SPI and SPEI per grid cell and then aggregates these DIs to the admin1-level and ethnic group territory level by applying different statistics (10th, 25th, 75th, 90th percentile, median and mean).

## Settings

In [None]:
#import packages
import xarray as xr
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import gamma, norm, fisk
import datetime as dt
from datetime_periods import period
import geopandas as gpd
from scipy.special import binom
from scipy.special import gamma as gam
import progressbar
import rioxarray

In [None]:
#paths
path_prec_input = #path to daily precipitation data
path_pet_input = #path to PET data
path_out = #path where gridded DI data shall be stored

In [None]:
# define fitting time period and calculation time period
time_fit = ['1993', '2023'] # for fitting the distribution a 30 year period is used from 1993 and 2023
time_calc = ['2002', '2023'] # SPI and SPEI are calculated from 2002 - 2023 (2 years prior to the period of investigation to account for aggregation periods and lags needed)

## Aggregation to monthly

In [None]:
i=0
for year in range(1993, 2023):
    #opening files
    file_tp = os.path.join(path_prec_input, 'era5_tp_%04d_daily.nc'%(year))
    file_pet = os.path.join(path_pet_input, 'PET_%04d_daily.nc'%(year))
    ds_tp_d = xr.open_dataarray(file_tp).rename({'x':'longitude', 'y':'latitude'}) * 10**3 #mm/day
    ds_pet_d = xr.open_dataarray(file_pet).rename({'x':'longitude', 'y':'latitude'}) #mm/day
    
    #resampling to monthly time step
    ds_tp_m_year = ds_tp_d.resample(time='M').sum()
    ds_pet_m_year = ds_pet_d.resample(time='M').sum(skipna=False)
    
    #concatenating yearly time series to one single time series
    if i == 0:
        ds_tp_m = ds_tp_m_year
        ds_pet_m = ds_pet_m_year
    else:
        ds_tp_m = xr.concat([ds_tp_m, ds_tp_m_year], dim='time')
        ds_pet_m = xr.concat([ds_pet_m, ds_pet_m_year], dim='time')
        
    i = 1

## Calculation of SPI

In [None]:
def spi_calc(ds_tp_monthly, n_months, time_fit, time_calc):
    """
    The present function aggregates the input dataset to the aggregation time period of interest (defined by n_months) and fits a gamma distribution based on the
    time period specified in time_fit. Then, it calculates the SPI values for the time period specified in time_calc.
    """
    # aggregation of monthly precipitation values based on aggregation time period
    ds_tp = ds_tp_monthly.rolling(time=n_months, min_periods=n_months).sum()
    
    # slicing datasets according to defined time periods of interest for fitting and calculating SPI values
    ds_fit = ds_tp.sel(time=slice(time_fit[0], time_fit[1]))
    ds_calc = ds_tp.sel(time=slice(time_calc[0], time_calc[1]))
    
    #set up xr.Dataset to store spi values in the same format as precipitation values
    xr_shape = [len(ds_calc.time), len(ds_calc.latitude), len(ds_calc.longitude)]
    spi_ds = xr.DataArray(np.empty(xr_shape),coords = dict(time = ds_calc.time, latitude=ds_calc.latitude,longitude=ds_calc.longitude), attrs = dict(unit='-', long_name=f'Standardized Precipitation Index {n_months} months'))
    
    #progress bar visualization
    n_calcs = len(ds_calc.longitude) * len(ds_calc.latitude) * 12
    bar = progressbar.ProgressBar(maxval=n_calcs)
    i=0
    bar.start()
    
    #fitting and calculating SPI values by separately fitting a gamma distribution for each grid cell and month of the year
    for lon in ds_tp.longitude:
        for lat in ds_tp.latitude:
            
            #selecting grid cell of interest
            df_fit = ds_fit.sel(longitude=lon, latitude=lat).to_dataframe().tp
            df_calc = ds_calc.sel(longitude=lon, latitude=lat).to_dataframe().tp
            for m in df_fit.index.month.unique():
                i  += 1
                bar.update(i)
                #selecting month of interest
                df_fit_m = df_fit[df_fit.index.month==m]
                df_calc_m = df_calc[df_calc.index.month==m]
                
                #fitting SPI values and calculating them for latxlonxmonth combination and writing it to a datarray which is iteratively filled with values for all grid cells and months
                spi_m = spi_for_month(df_fit_m, df_calc_m)
                spi_ds.loc[dict(time=spi_m.index, latitude=lat, longitude=lon)] = xr.DataArray(spi_m)
    bar.finish()          
    return spi_ds

In [None]:
def spi_for_month(df_fit_m, df_calc_m):
    """
    The present function receives as input the dataframe of precipitation values for a certain aggregation time period, grid cell and month of the year 
    (over all years in question) and fits the gamma distribution to the values provided in df_fit_m. Based on the fitted distribution it calculates the 
    SPI values for the precipitation values in df_calc_m.
    """
    #remove zero values: gamma distribution not defined for zero, it is expected that in the arid environment especially 1month and 3month aggregation time periods may equal zero. Therefore, the mixed distribution approach needs to be used. see paper by European Commission
    df_fit_nonzero = df_fit_m[df_fit_m > 0]
    
    #probability of zero 
    n_samples = len(df_fit_m)
    n_zero = n_samples - len(df_fit_nonzero)
    prob_zero = n_zero/n_samples
    
    #fit gamma distribution
    a, loc, scale = gamma.fit(df_fit_nonzero, floc=0) #max likelihood method
    
    #cumulative distribution function for calculation time period to derive SPI values
    cdf_nonzero = gamma.cdf(df_calc_m[df_calc_m>0], a=a, loc=loc, scale=scale)
    cdf = pd.DataFrame(data=np.ones(len(df_calc_m))*prob_zero, index=df_calc_m.index, columns=['P_cum']) #probability assigned to precipitation values of 0 (or lower)
    cdf[df_calc_m>0] = cdf[df_calc_m>0] + (1 - prob_zero)*cdf_nonzero.reshape(len(cdf_nonzero), 1) #adding probability based on nonzero cdf to all other precipitation values
    spi = pd.Series(norm.ppf(cdf.P_cum), index = cdf.index) #SPI as values which correspond to the values resulting in the probabilities of the normalized cdf function
    
    return spi

In [None]:
#SPI-1
spi_1 = spi_calc(ds_tp_m, 1, time_fit, time_calc)
spi_1.to_netcdf(os.path.join(path_out,'SPI', 'SPI-1.nc'))
#problem when there are no really positive values -> cannot fit a distribution -> nan values but only limited to very few cells and months of the year, for statistical analysis at larger level these values are not considered to play a large difference

In [None]:
spi_1.isnull().sum()

In [None]:
#SPI-3
spi_3 = spi_calc(ds_tp_m, 3, time_fit, time_calc)
spi_3.to_netcdf(os.path.join(path_out,'SPI', 'SPI-3.nc'))

In [None]:
spi_3.isnull().sum()

In [None]:
#SPI-6
spi_6 = spi_calc(ds_tp_m, 6, time_fit, time_calc)
spi_6.to_netcdf(os.path.join(path_out,'SPI', 'SPI-6.nc'))

In [None]:
spi_6.isnull().sum()

In [None]:
#SPI-12
spi_12 = spi_calc(ds_tp_m, 12, time_fit, time_calc)
spi_12.to_netcdf(os.path.join(path_out,'SPI', 'SPI-12.nc'))

In [None]:
spi_12.isnull().sum()

## SPEI calculation

In [None]:
def spei_calc(ds_tp_monthly, ds_pet_monthly, n_months, time_fit, time_calc):
    """
    The present function calculates the SPEI for each grid cell and month based on a ... distribution which is fitted for each latxlonxmonth-of-the-year 
    combination of the difference between n_months aggregation in precipitation and evapotranspiration over the time period specified in time_fit and 
    which is then used to calculate the SPEI values for the same latxlonxmonth-of-the-year combination but for the time period specified in time_calc.
    """
    #aggregation of TP-PET over the number of months specified in n_months
    ds_diff = ds_tp_monthly.rolling(time=n_months, min_periods=n_months).sum() - ds_pet_monthly.rolling(time=n_months, min_periods=n_months).sum().dropna(dim='time')
    ds_diff = ds_diff.rename('wb')
    
    #select the time periods to use for fitting and for calculation from the newly created dataset
    ds_fit = ds_diff.sel(time=slice(time_fit[0], time_fit[1]))
    ds_calc = ds_diff.sel(time=slice(time_calc[0], time_calc[1]))

    #set up xr.Dataarray to store spei values in the same format as precipitation and pet values
    xr_shape = [len(ds_calc.time), len(ds_calc.latitude), len(ds_calc.longitude)]
    spei_ds = xr.DataArray(np.zeros(xr_shape),coords = dict(time = ds_calc.time, latitude=ds_calc.latitude,longitude=ds_calc.longitude), attrs = dict(unit='-', long_name=f'Standardized Precipitation Evapotranspiration Index {n_months} months'))
    
    #progress bar visualization
    n_calcs = len(ds_calc.longitude) * len(ds_calc.latitude) * 12
    bar = progressbar.ProgressBar(maxval=n_calcs)
    i=0
    bar.start()
    
    #fitting and calculating SPEI values by separately fitting a log-logistic distribution for each grid cell and month of the year
    for lon in ds_calc.longitude:
        for lat in ds_calc.latitude:
            #selecting grid cell of interst
            df_fit = ds_fit.sel(longitude=lon, latitude=lat).to_dataframe().wb
            df_calc = ds_calc.sel(longitude=lon, latitude=lat).to_dataframe().wb
            for m in df_fit.index.month.unique():
                i  += 1
                bar.update(i)
                #selecting month of interest
                df_fit_m = df_fit[df_fit.index.month==m]
                df_calc_m = df_calc[df_calc.index.month==m]
                #fitting SPEI values and calculating them for latxlonxmonth combination and writing it to a datarray which is iteratively filled with values for all grid cells and months
                spei_m = spei_for_month(df_fit_m, df_calc_m)
                if spei_m.isnull().sum()>0:
                    print(f'null value at longitude {lon}, latitude{lat} and month {m}')
                spei_ds.loc[dict(time=spei_m.index, latitude=lat, longitude=lon)] = xr.DataArray(spei_m)
    bar.finish()
    return spei_ds

In [None]:
def spei_for_month(df_fit_m, df_calc_m):
    # fit log-logistic distribution using unbiased Probability Weighted Moments (PWM) -> based on Vicente-Serrano for parameter calculation, Begueria for using unbiased and Hosking 1986 for the formula for the PWM
    
    df_fit_m_sorted= df_fit_m.sort_values()
    
    #calculate unbiased PWM 0 to 2
    w_0=0
    w_1=0
    w_2=0
    i=0
    n = len(df_fit_m_sorted)
    for val in df_fit_m_sorted:
        i+=1
        w_0 += 1/n * binom(n-i, 0) * val/binom(n-1, 0)
        w_1 += 1/n * binom(n-i, 1) * val/binom(n-1, 1)
        w_2 += 1/n * binom(n-i, 2) * val/binom(n-1, 2)
    
    #calculate scale, shape and loc parameter for 3-parameter log-logistic distribution
    beta = (2*w_1 - w_0) / (6*w_1 - w_0 - 6*w_2) #shape
    alpha = (w_0 - 2 * w_1) * beta / (gam(1 + 1/beta) * gam(1 - 1/beta)) #scale
    gamma = w_0 - alpha * gam(1 + 1/beta) * gam(1 - 1/beta) #loc
    
    
    #obtain distribution for time period of interest
    prob_df =pd.DataFrame()
    prob_df['P-PET']=df_calc_m
    prob_df['pdf'] = beta / alpha * ((prob_df['P-PET'] - gamma)/alpha)**(beta - 1) * (1 + ((prob_df['P-PET'] - gamma)/alpha)**beta)**(-2)#fisk.pdf(prob_df['P-PET'], beta, loc=gamma, scale=alpha)
    prob_df['Fx'] = (1 + ((prob_df['P-PET'] - gamma)/alpha)**(-beta))**(-1)#fisk.cdf(prob_df['P-PET'], beta, loc=gamma, scale=alpha)

  
    #setting cumulative probability for values below origin to 0
    if prob_df.Fx.isnull().any()==1:
        df_debug = prob_df[prob_df.Fx.isnull()]
        for idx in df_debug.index:
            if (df_debug.loc[idx, 'P-PET'] - gamma)<0 and alpha>0:
                prob_df.loc[idx, 'Fx'] = 0
                prob_df.loc[idx, 'pdf'] = 0
    
                
    #standardize following procedure in Vicente-Serrano et al 2010 based on Abramowitz Stegun
    prob_df['P_ex'] = 1 - prob_df.Fx
    prob_df['SPEI'] = np.ones(len(prob_df))
    prob_df.loc[prob_df.P_ex>0.5, 'SPEI'] = -1
    prob_df.loc[prob_df.P_ex>0.5, 'P_ex'] = 1 - prob_df.loc[prob_df.P_ex>0.5, 'P_ex']
    prob_df.P_ex.where(prob_df.P_ex!=0, 10**(-10), inplace=True) #avoid that P=0
    prob_df['W'] = np.sqrt(-2 * np.log(prob_df.P_ex))
    C_0 = 2.515517
    C_1 = 0.802853
    C_2 = 0.010328
    d_1 = 1.432788
    d_2 = 0.189269
    d_3 = 0.001308
    prob_df['SPEI'] = prob_df.SPEI * (prob_df.W - (C_0 + C_1 * prob_df.W + C_2 * prob_df.W**2)/(1+d_1*prob_df.W + d_2 * prob_df.W**2 + d_3 * prob_df.W**3))
    #prob_df.loc[prob_df.SPEI<-4, 'SPEI'] = -4
    return prob_df.SPEI

In [None]:
#SPEI-1
spei_1 = spei_calc(ds_tp_m, ds_pet_m, 1, time_fit, time_calc)
spei_1.to_netcdf(os.path.join(path_out, 'SPEI', 'SPEI-1.nc'))

In [None]:
#SPEI-3
spei_3 = spei_calc(ds_tp_m, ds_pet_m, 3, time_fit, time_calc)
spei_3.to_netcdf(os.path.join(path_out, 'SPEI', 'SPEI-3.nc'))

In [None]:
#SPEI-6
spei_6 = spei_calc(ds_tp_m, ds_pet_m, 6, time_fit, time_calc)
spei_6.to_netcdf(os.path.join(path_out, 'SPEI', 'SPEI-6.nc'))

In [None]:
#SPEI-12
spei_12 = spei_calc(ds_tp_m, ds_pet_m, 12, time_fit, time_calc)
spei_12.to_netcdf(os.path.join(path_out, 'SPEI', 'SPEI-12.nc'))

## SPI & SPEI per adm-1 unit

### Settings

In [None]:
import geopandas as gpd
import rioxarray
from shapely.geometry import mapping
import math as mt
from geocube.api.core import make_geocube
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime as dt
import xarray as xr
import progressbar

In [None]:
path_prec_input = #path to daily precipitation data
path_pet_input = #path to daily PET data
path_out = #path where spatial statistics shall be stored
path_adm = #path to administrative unit shapefiles

### Functions

In [None]:
def clip_to_shape(drought_xds, mask_shapefile, crs='epsg:4326',  incl_all_touching=True):
    """
    clips drought dataset based on mask extents
    drought_xds: xarray Dataset containing the spi or spei values along dimensions time, latitude and longitude
    crs: string of the coordinate reference system in EPSG or WSG format
    mask_shapefile: mask shapefile loaded into python
    incl_all_touching: boolean which indicates if all cells touching the border should be included even if their centroid lies outside
    """
    drought_xds.rio.write_crs(crs, inplace=True)
    drought_clipped = drought_xds.rio.clip(mask_shapefile.geometry, all_touched=incl_all_touching, from_disk=True)
    return drought_clipped

In [None]:
def stats_by_adm(gadm_file, adm_name, DI_name, country, crs='epsg:4326', incl_all_touching=True):
    """
    function calculates the spatial statistics of drought indicator over admin unit
    gadm_file: shapefile of the admin unit
    adm_name: admin-unit name
    DI_name: name of the drought indicator
    crs: coordinate reference system of the drought raster dataset, default epsg:4326
    incl_all_touching: method to use to cut out values of drought indicators
    """
    #open drought dataset
    if 'SPI' in DI_name:
        DI = xr.open_dataarray(os.path.join(path_out, 'SPI', '%s.nc'%DI_name)).rename('spi')
    elif 'SPEI' in DI_name:
        DI = xr.open_dataarray(os.path.join(path_out, 'SPEI', '%s.nc'%DI_name)).rename('spei')
        
    #retrieve admin-unit from country dataframe of admin1-units
    mask = gadm_file[gadm_file.index == adm_name]
    
    #clip drought data and save to dataset
    DI_county = clip_to_shape(DI, mask, crs, incl_all_touching)
    if 'SPI' in DI_name:
        DI_county.to_netcdf(os.path.join(path_out, 'SPI', 'clipped', '%s-%s.nc'%(DI_name, adm_name)))
    elif 'SPEI' in DI_name:
        DI_county.to_netcdf(os.path.join(path_out, 'SPEI', 'clipped', '%s-%s.nc'%(DI_name, adm_name)))
        
    #calculate spatial statistics    
    df_di_county = pd.DataFrame(DI_county.time.values, columns=['time']).set_index('time')
    df_di_county['Mean'] = DI_county.mean(dim=['longitude', 'latitude'])
    df_di_county['Median'] = DI_county.median(dim=['longitude','latitude'])
    df_di_county['Min'] = DI_county.min(dim=['longitude', 'latitude'])
    df_di_county['Max'] = DI_county.max(dim=['longitude', 'latitude'])
    df_di_county['P10'] = DI_county.quantile(0.1, dim=['longitude', 'latitude'])
    df_di_county['P25'] = DI_county.quantile(0.25, dim=['longitude', 'latitude'])
    df_di_county['Median'] = DI_county.quantile(0.5, dim=['longitude', 'latitude'])
    df_di_county['P75'] = DI_county.quantile(0.75, dim=['longitude', 'latitude'])
    df_di_county['P90'] = DI_county.quantile(0.9, dim=['longitude', 'latitude'])
    df_di_county['Std'] = DI_county.std(dim=['longitude', 'latitude'])
    
    #save spatial statistics
    if 'SPI' in DI_name:
        df_di_county.to_csv(os.path.join(path_out, 'SPI', 'stats', '%s-%s-%s.csv'%(country, DI_name, adm_name)))
    elif 'SPEI' in DI_name:
        df_di_county.to_csv(os.path.join(path_out, 'SPEI', 'stats', '%s-%s-%s.csv'%(country, DI_name, adm_name)))

### Statistics Calculations

In [None]:
#for Kenya, Somalia, South Sudan and Uganda
for country in ['KEN', 'UGA', 'SOM', 'SSD']:
    
    #reading and dissolving GADM 3.6 shapefiles to admin-1 level
    adm_bound = gpd.read_file(os.path.join(path_adm, country, 'gadm36_%s.gpkg'%(country)))
    adm1 = adm_bound.dissolve(by='NAME_1').loc[:, ['GID_1', 'geometry']]
    
    #progress bar visualization
    print(country)
    n_calcs = 8 * len(adm1)
    bar = progressbar.ProgressBar(maxval=n_calcs)
    i=0
    bar.start()
    
    #calculation of statistics for each DI per month and administrative unit
    for di in ['SPI-1', 'SPI-3', 'SPI-6', 'SPI-12', 'SPEI-1', 'SPEI-3', 'SPEI-6', 'SPEI-12']:
        for county in np.unique(adm1.index):
            stats_by_adm(adm1, county, di, country)
            i+=1
            bar.update(i)

In [None]:
#for Ethiopia because of different file strucutre

#reading admin-1 level shapefiles as for Ethiopia saved differently
country = 'ETH'
adm1 = gpd.read_file(os.path.join(path_adm, country, 'gadm36_%s_1.shp'%(country))).loc[:, ['NAME_1', 'GID_1', 'geometry']].set_index('NAME_1')

#visualization of progress bar
n_calcs = 8 * len(adm1)
bar = progressbar.ProgressBar(maxval=n_calcs)
i=0
bar.start()

#calculation of statistics for each DI per month
for di in ['SPI-1', 'SPI-3', 'SPI-6', 'SPI-12', 'SPEI-1', 'SPEI-3', 'SPEI-6', 'SPEI-12']:
    for county in np.unique(adm1.index):
        stats_by_adm(adm1, county, di, country)
        i+=1
        bar.update(i)

## SPI and SPEI for ethnic group territory

### Settings

In [None]:
path_eth = #path to ethnic group territory shapefiles

### Functions

In [None]:
def stats_by_eth(eth_file, eth_name, DI_name, crs='epsg:4326', incl_all_touching=True):
      
    """
    function calculates the spatial statistics of drought indicator over ethnic group level
    eth_file: shapefile of the ethnic group territory
    eth_name: ethnic group name
    DI_name: name of the drought indicator
    crs: coordinate reference system of the drought raster dataset, default epsg:4326
    incl_all_touching: method to use to cut out values of drought indicators
    """
    
    #open drought dataset
    if 'SPI' in DI_name:
        DI = rioxarray.open_rasterio(os.path.join(path_out, 'SPI', '%s.nc'%DI_name)).rename('spi')
    elif 'SPEI' in DI_name:
        DI = rioxarray.open_rasterio(os.path.join(path_out, 'SPEI', '%s.nc'%DI_name)).rename('spei')
        
    #clip drought dataset to ethnic group territory shape
    mask = eth_file
    DI_eth = clip_to_shape(DI, mask, crs, incl_all_touching)
    if 'SPI' in DI_name:
        DI_eth.to_netcdf(os.path.join(path_out, 'SPI', 'clipped', 'Eth_%s-%s.nc'%(DI_name, eth_name)))
    elif 'SPEI' in DI_name:
        DI_eth.to_netcdf(os.path.join(path_out, 'SPEI', 'clipped', 'Eth_%s-%s.nc'%(DI_name, eth_name)))                                                        
    
    df_di_eth = pd.DataFrame(DI_eth.time.values, columns=['time']).set_index('time')
    
    #calculate spatial statistics
    df_di_eth['Mean'] = DI_eth.mean(dim=['x', 'y'])
    df_di_eth['Median'] = DI_eth.median(dim=['x','y'])
    df_di_eth['Min'] = DI_eth.min(dim=['x', 'y'])
    df_di_eth['Max'] = DI_eth.max(dim=['x', 'y'])
    df_di_eth['P10'] = DI_eth.quantile(0.1, dim=['x', 'y'])
    df_di_eth['P25'] = DI_eth.quantile(0.25, dim=['x', 'y'])
    df_di_eth['Median'] = DI_eth.quantile(0.5, dim=['x', 'y'])
    df_di_eth['P75'] = DI_eth.quantile(0.75, dim=['x', 'y'])
    df_di_eth['P90'] = DI_eth.quantile(0.9, dim=['x', 'y'])
    df_di_eth['Std'] = DI_eth.std(dim=['x', 'y'])
    
    #save output datafiles
    if 'SPI' in DI_name:
        df_di_eth.to_csv(os.path.join(path_out, 'SPI', 'stats','Ethnic_group',  'Eth_%s-%s.csv'%(DI_name, eth_name)))
    elif 'SPEI' in DI_name:
        df_di_eth.to_csv(os.path.join(path_out, 'SPEI', 'stats', 'Ethnic_group', 'Eth_%s-%s.csv'%(DI_name, eth_name)))
    
    return df_di_eth

### Statistics Calculations

In [None]:
#names of ethnic groups under investigation
eth_groups = ["Toposa", "Dassanetch", "Pokot", "Borana", "Gabra", "Turkana"] 

In [None]:
#progress bar
n_calcs = 8 * len(eth_groups)
bar = progressbar.ProgressBar(maxval=n_calcs)
i=0
bar.start()

#loop through ethnic groups and DIs to calculate spatial statistics of each DI over respective territory
for eth in eth_groups:
    eth_shape = gpd.read_file(os.path.join(path_eth, 'Eth_%s.shp'%(eth)))
    for di in ['SPI-1', 'SPI-3', 'SPI-6', 'SPI-12', 'SPEI-1', 'SPEI-3', 'SPEI-6', 'SPEI-12']:
        i+=1
        bar.update(i)
        DI_eth = stats_by_eth(eth_shape, eth, di)