<img align="centre" src="../../Supplementary_data/Github_banner.jpg" width="100%">

# Enhanced Combined Drought Index - Soil Moisture Drought Index


## Background 

Drought is an extended period, during which, fresh water availability and accessibility for the ecosystem at a given time and place is below normal, due to unfavourable spatial and temporal distribution of rainfall, temperature, soil moisture and wind characteristics [(Balint et al., 2013)](https://doi.org/10.1016/B978-0-444-59559-1.00023-2). Severe droughts can affect large populations, leading to a long-term threat to people’s livelihoods and result in tremendous economic loss [(Enenkel et al., 2016)](https://doi.org/10.3390/rs8040340).

The **Enhanced Drought Index** is a combination of the following components: Vegetation, Precipitation, Temperature and Soil moisture.

This notebook will focus on the **Soil moisture** component, which considers soil moisture deficits and deficit persistence. The index based on soil moisture is named as the Soil Moisture Drought Index (SMDI). The Soil Moisture Drought Index (SMDI) is used to assess drought conditions based on soil moisture levels.SMDI is an essential for monitoring drought conditions, especially in areas where soil moisture directly affects agriculture and natural resources. Droughts can have serious impacts on agriculture, water resources, and ecosystems, and soil moisture is a key factor in understanding drought severity, as it influences crop growth and the availability of water in the soil.


The notebook outlines:

***
1. Loading Python and DE Africa Packages
2. Setting dask cluster
3. Loading area on interest
5. Load the WAPOR soil moisture dataset
6. Resampling of data to dekadal (10-day) intervals
7. Calculate the Soil Moisture Drought Index
8. Export the result as zip

## Getting started

To run this analysis, run all the cells in the notebook, starting with the "Load packages" cell. 

### Load packages

In [1]:
import json
import os

import dask
import geopandas as gpd
import numpy as np
import pandas as pd
import rioxarray
import toolz
import xarray as xr
import datacube
from deafrica_tools.bandindices import calculate_indices
from deafrica_tools.dask import create_local_dask_cluster
from deafrica_tools.datahandling import load_ard
from deafrica_tools.load_wapor import get_all_WaPORv3_mapsets, get_WaPORv3_info, load_wapor_ds
from dekad import group_by_dekad, bin_by_interest_period, get_dekad_no_in_year, get_no_data_mask, max_consecutive_ones
from odc.geo.geobox import GeoBox
from odc.geo.geom import Geometry
from pyarrow import Table
from pyarrow.parquet import write_table
from wapordl import wapor_map

### Set up a Dask cluster

Dask can be used to better manage memory use and conduct the analysis in parallel. 
For an introduction to using Dask with Digital Earth Africa, see the [Dask notebook](../../Beginners_guide/08_Parallel_processing_with_dask.ipynb).

>**Note**: We recommend opening the Dask processing window to view the different computations that are being executed; to do this, see the *Dask dashboard in DE Africa* section of the [Dask notebook](../../Beginners_guide/08_Parallel_processing_with_dask.ipynb).

To use Dask, set up the local computing cluster using the cell below.

In [2]:
create_local_dask_cluster()

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: /user/nanaboamah89@gmail.com/proxy/8787/status,

0,1
Dashboard: /user/nanaboamah89@gmail.com/proxy/8787/status,Workers: 1
Total threads: 4,Total memory: 26.21 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:44389,Workers: 1
Dashboard: /user/nanaboamah89@gmail.com/proxy/8787/status,Total threads: 4
Started: Just now,Total memory: 26.21 GiB

0,1
Comm: tcp://127.0.0.1:43459,Total threads: 4
Dashboard: /user/nanaboamah89@gmail.com/proxy/45407/status,Memory: 26.21 GiB
Nanny: tcp://127.0.0.1:43433,
Local directory: /tmp/dask-scratch-space/worker-u7l2b0gg,Local directory: /tmp/dask-scratch-space/worker-u7l2b0gg


### Analysis parameters

The following cell sets important parameters for the analysis:

* `country`: The country to run the analysis over.
* `resolution`: The x and y cell resolution of the satellite data in metres (spatial resolution). We'll use 5,000 m, which is approximately equal to the default CHIRPS resolution.
* `output_crs` : The coordinate reference system that the loaded data is to be reprojected to.
* `dask_chunks`:  the size of the dask chunks, dask breaks data into manageable chunks that can be easily stored in memory, e.g. `dict(x=1000,y=1000)`
* `time_range` : Time range to load data for.
* `ip` : The interest period to use to calculate the drought indices e.g. (3, 4, 5 dekads). It defines to what extent past observations are considered. Longer IPs detect more severe drought events. For example, if the IP=3 dekads, then the drought index (say PDI) of 0.35 for dekad 2 of 2006 implies actual drought for dekad 36 of 2005, dekad 1 of 2006 and dekad 2 of 2006.
* `output_dir` :  The directory in which to store results from the analysis.

**If running the notebook for the first time**, keep the default settings below.
This will demonstrate how the analysis works and provide meaningful results.

In [3]:
resolution = (-5000, 5000)
output_crs = "EPSG:6933"
dask_chunks = dict(x=3200,y=3200)
time_range = ("2014-01-01", "2015-12-31")
output_dir = "results"
# Corresponding to the six-months Standardized Precipitation Evapotranspiration Index
ip = 18

# Create the outpur directory if it does not exist.
os.makedirs(output_dir, exist_ok=True)

In [4]:
#loading the countries shapefiles
african_countries_url = "https://raw.githubusercontent.com/digitalearthafrica/deafrica-sandbox-notebooks/main/Supplementary_data/Rainfall_anomaly_CHIRPS/african_countries.geojson"
african_countries = gpd.read_file(african_countries_url)
print(np.unique(african_countries.COUNTRY))

['Algeria' 'Angola' 'Benin' 'Botswana' 'Burkina Faso' 'Burundi' 'Cameroon'
 'Cape Verde' 'Central African Republic' 'Chad' 'Comoros'
 'Congo-Brazzaville' 'Cote d`Ivoire' 'Democratic Republic of Congo'
 'Djibouti' 'Egypt' 'Equatorial Guinea' 'Eritrea' 'Ethiopia' 'Gabon'
 'Gambia' 'Ghana' 'Guinea' 'Guinea-Bissau' 'Kenya' 'Lesotho' 'Liberia'
 'Libya' 'Madagascar' 'Malawi' 'Mali' 'Mauritania' 'Morocco' 'Mozambique'
 'Namibia' 'Niger' 'Nigeria' 'Rwanda' 'Sao Tome and Principe' 'Senegal'
 'Sierra Leone' 'Somalia' 'South Africa' 'Sudan' 'Swaziland' 'Tanzania'
 'Togo' 'Tunisia' 'Uganda' 'Western Sahara' 'Zambia' 'Zimbabwe']


In [5]:
# Select the country of interest by providing the name below
country = "Ethiopia"
idx = african_countries[african_countries['COUNTRY'] == country].index[0]
geopolygon = Geometry(geom=african_countries.iloc[idx].geometry, crs=african_countries.crs)

# Create the output path
output_path = os.path.join(output_dir, f"SMDI_{country}.parquet")

### Load WaPOR soil moisture data
Load the WaPOR soil moisture data using the analysis parameters set in the previous section.

In [6]:
# List all available WAPOR datasets.
get_all_WaPORv3_mapsets()

Unnamed: 0,Mapset Code,Mapset Description
0,L1-AETI-A,Actual EvapoTranspiration and Interception (Gl...
1,L1-AETI-D,Actual EvapoTranspiration and Interception (Gl...
2,L1-AETI-M,Actual EvapoTranspiration and Interception (Gl...
3,L1-E-A,Evaporation (Global - Annual - 300m)
4,L1-E-D,Evaporation (Global - Dekadal - 300m)
5,L1-GBWP-A,Gross biomass water productivity (Annual - 300m)
6,L1-I-A,Interception (Global - Annual - 300m)
7,L1-I-D,Interception (Global - Dekadal - 300m)
8,L1-NBWP-A,Net biomass water productivity (Annual - 300m)
9,L1-NPP-D,Net Primary Production (Global - Dekadal - 300m)


In [7]:
# Select the Soil Moisture variable
variable = "L2-RSM-D"

In [8]:
# Get the bounding box of the area of interest.
region = [*geopolygon.boundingbox]

In [9]:
# Folder to store the downloaded wapor data in.
download_folder = os.path.join("WaPOR_v3", variable)
os.makedirs(download_folder, exist_ok=True)

In [10]:
%%time
# Download the required netcdf files to disk.
# Takes about 3hrs 45min on the 100GB sandbox.
# The nc_fp object is the file path to the stored netCDF
nc_fp = wapor_map(region=region, variable=variable, period=time_range, folder=download_folder, extension=".nc")

ValueError: No files found for selected region, variable and period.

In [11]:
# Load the netCDF or TIF file downloaded.
ds_sm_dekadal = load_wapor_ds(filename=nc_fp, variable=variable).rename({'time': 'dekad'})
ds_sm_dekadal

NameError: name 'nc_fp' is not defined

In [None]:
# Create a geobox to use to reproject the data.
geobox = GeoBox.from_geopolygon(geopolygon=geopolygon, resolution=abs(resolution[0]), crs=output_crs)

# Reproject using the geobox
ds_sm_dekadal = ds_sm_dekadal.odc.reproject(how=geobox)
ds_sm_dekadal

In [None]:
# Rasterize the country geopolygon and mask the loaded data.
country_mask = rasterize(poly=geopolygon, how=ds_sm_dekadal.odc.geobox)
ds_sm_dekadal = ds_sm_dekadal.where(country_mask)

In [None]:
# To DataArray
da_sm_dekadal = ds_sm_dekadal[variable]
da_sm_dekadal

In [None]:
# Set -9999 no-data values to NaN
da_sm_dekadal = da_sm_dekadal.where(da_sm_dekadal != da_sm_dekadal.attrs["nodata"])

da_sm_dekadal

### Group the modified timeseries using the `ip` parameter

The `ip` parameter determines to what extent past observations are considered. Longer IPs detect more severe drought events. The default `ip` used of **18 dekads** corresponds to the 6-month Standardized Precipitation Evapotranspiration Index.

For example, calculating the Soil Moisture Drought Index for the dekad `2011-01-11` using an interest period of `ip=3` requires rainfall data from the dekad `2010-12-21`, `2011-01-01` and `2011-01-11`.


**Each interest period is labelled using its end dekad.**

In [None]:
# Group the soil moisture dekadal timeseries by the interest period.
# The dictionary keys are the end dekad for each interest period.
# The values of the dictionary are the soil moisture data that belongs to the interest period
da_sm_binned_by_interest_period = bin_by_interest_period(ds=da_sm_dekadal, ip=ip)

In [None]:
# Get the interest periods
interest_periods = list(da_sm_binned_by_interest_period.keys())

### Get the average values for each interest period

Get the actual average soil moisture for each interest period.

In [None]:
da_sm_actual_avg_for_ip = xr.concat([da_sm_binned_by_interest_period[i].mean(dim="dekad").assign_coords(dekad=i).expand_dims({"dekad": 1}) for i in interest_periods], dim="dekad")
da_sm_actual_avg_for_ip

### Get the long term average for each interest period over the years of available data


In [None]:
# Group the interest periods by year by getting the dekad number in the year for the end dekad of
# the interest period. 
grouped_by_year = toolz.groupby(lambda dekad: get_dekad_no_in_year(date=dekad), interest_periods)

Get the long term average soil moisture for each interest period.

In [None]:
long_term_avg_sm = {}
for dekad_no_in_year, interest_periods_list in grouped_by_year.items():
    long_term_avg_for_period = xr.concat([da_sm_binned_by_interest_period[i] for i in interest_periods_list], dim="dekad").mean(dim="dekad")
    long_term_avg_sm[dekad_no_in_year] = long_term_avg_for_period
    
da_sm_long_term_avg_for_ip = xr.concat([long_term_avg_sm[get_dekad_no_in_year(i)].assign_coords(dekad=i).expand_dims({"dekad": 1}) for  i in interest_periods], dim="dekad")

assert all(da_sm_actual_avg_for_ip.dekad.values == da_sm_long_term_avg_for_ip.dekad.values)

da_sm_long_term_avg_for_ip

### Get the actual length of continous deficit in each interest period

For soil moisture the run length is the number of successive dekads in an interest period below the long term average for the same interest period.

In [None]:
%%time
# For each interest period get the maximum number of successive dekads below long term average soil moisture in the interest period.
sm_run_length = []
for interest_period in interest_periods:
    # Get the dekads in the interest period
    ds = da_sm_binned_by_interest_period[interest_period]
    
    # Identify pixels which are empty for all dekads
    no_data_mask = xr.apply_ufunc(
        get_no_data_mask,
        ds,
        input_core_dims=[["dekad"]],
        vectorize=True,
        dask="allowed",
    )
    
    # Get the long term average for the interest period
    long_term_avg = da_sm_long_term_avg_for_ip.sel(dekad=interest_period)
    
    # Get the pixels where the soil moisture is below the long term average soil moisture
    mask = xr.where(ds < long_term_avg, 1, 0)
    
    # Get the maximum number of successive dekads below long term average soil moisture.
    actual_run_length = xr.apply_ufunc(
        max_consecutive_ones,
        mask,
        input_core_dims=[["dekad"]],
        vectorize=True,
        dask="allowed",
    )
    
    # Modify the run length
    modified_run_length = (actual_run_length.max() + 1) - actual_run_length
    modified_run_length = modified_run_length.where(~no_data_mask)
    sm_run_length.append(modified_run_length.assign_coords(dekad=interest_period).expand_dims({"dekad": 1}))

da_sm_actual_run_length = xr.concat(sm_run_length, dim="dekad")
da_sm_actual_run_length

### Get the long term average of continous deficit in each interest period

In [None]:
# Get the long term average soil moisture run length for each interest period 
long_term_avg_run_lenth_sm = {}
for dekad_no_in_year, interest_periods_list in grouped_by_year.items():
    long_term_avg_for_period = da_sm_actual_run_length.sel(dekad=interest_periods_list).mean(dim="dekad")
    long_term_avg_run_lenth_sm[dekad_no_in_year] = long_term_avg_for_period
    
da_sm_long_term_avg_run_length_for_ip = xr.concat([long_term_avg_run_lenth_sm[get_dekad_no_in_year(i)].assign_coords(dekad=i).expand_dims({"dekad": 1}) for  i in interest_periods], dim="dekad")
da_sm_long_term_avg_run_length_for_ip

### Calculate the Soil Moisture Drought Index

In [None]:
SMDI = (da_sm_actual_avg_for_ip / da_sm_long_term_avg_for_ip) * np.sqrt((da_sm_actual_run_length / da_sm_long_term_avg_run_length_for_ip))
SMDI

In [None]:
#Scaled the Soil Moisture Drought Index
SMDI_scaled = (SMDI - SMDI.min()) / (SMDI.max() - SMDI.min())
SMDI_scaled

In [None]:
# Convert to DataFrame.
df = SMDI_scaled.to_dataframe().drop(columns="spatial_ref")
df

In [None]:
# Convert the DataFrame to an Arrow table
table = Table.from_pandas(df)

# Get the tables existing metadata
existing_meta = table.schema.metadata

# Dump the crs of the DataArray as new metadata to JSON.
meta_json = json.dumps(dict(crs=str(SMDI_scaled.rio.crs)))

# Combine the metadata
combined_meta = {
b"xarray.metadata": meta_json.encode(),
**existing_meta,
}

# Replace the metadata.
table = table.replace_schema_metadata(combined_meta)

### Export the results in a Zip file

In [None]:
# Write the table
write_table(table, output_path, compression="GZIP")

---

## Additional information

<b> License </b> The code in this notebook is licensed under the [Apache License, Version 2.0](https://www.apache.org/licenses/LICENSE-2.0).

Digital Earth Africa data is licensed under the [Creative Commons by Attribution 4.0](https://creativecommons.org/licenses/by/4.0/) license.

<b> Contact </b> If you need assistance, please post a question on the [DE Africa Slack channel](https://digitalearthafrica.slack.com/) or on the [GIS Stack Exchange](https://gis.stackexchange.com/questions/ask?tags=open-data-cube) using the `open-data-cube` tag (you can view previously asked questions [here](https://gis.stackexchange.com/questions/tagged/open-data-cube)).

If you would like to report an issue with this notebook, you can file one on [Github](https://github.com/digitalearthafrica/deafrica-sandbox-notebooks).

<b> Compatible datacube version :</b>

In [None]:
print(datacube.__version__)

**Last Tested:**

In [None]:
from datetime import datetime
datetime.today().strftime('%Y-%m-%d')