# Theil-Sen slopes for NDVI with dask

## Overview

- **Key ideas:** 
    - Learning how to use [dask](https://docs.dask.org/en/latest/) with [xarray](http://xarray.pydata.org/en/stable/) for climate data (vegetation, actually).
    - This notebook is a dask experiment for better performance when computing Theil-Sen slopes for GIMMS NDVI scaled data in South America.
- **References:**
    - [Dask's best practices](https://docs.dask.org/en/latest/best-practices.html).
    - [Dask's setup](https://docs.dask.org/en/latest/setup.html).
    - [Dask' scheduling](https://docs.dask.org/en/latest/scheduling.html).
    - [Ask Ubuntu: Cores vs Threads: How many threads should I run on this machine?](https://askubuntu.com/questions/668538/cores-vs-threads-how-many-threads-should-i-run-on-this-machine)
    - [How to efficiently parallelize Dask Dataframe computation on a Single Machine](https://medium.com/analytics-vidhya/how-to-efficiently-parallelize-dask-dataframe-computation-on-a-single-machine-1f10b5b02177).

## Initial setups

In [1]:
# Load packages.
import sys
import time
import warnings

import xarray as xr
import dask as da

from dask.distributed import LocalCluster, Client, progress

# Better visualization.
xr.set_options(display_style="text")

# Do not print warnings.
warnings.filterwarnings("ignore")

%matplotlib inline

In [2]:
%%time

print(">>> Versions:")
print("* xarray:", xr.__version__)
print("* dask:", da.__version__)
print("\n>>> Computer:")
!echo "* Hostname: ${HOSTNAME}"
!echo "* User: ${USER}"
!echo "* Directory: $(pwd)"
!echo "* Date: $(date)"
!echo "* CPUs: $(nproc)"

>>> Versions:
* xarray: 0.16.0
* dask: 2.25.0

>>> Computer:
* Hostname: n007
* User: sandroal
* Directory: /work/sandroal/notebooks/single_variables
* Date: Sun Jan 17 21:19:21 BRST 2021
* CPUs: 40
CPU times: user 16 ms, sys: 32 ms, total: 48 ms
Wall time: 620 ms


In [3]:
# Display information about the CPU architecture.
!lscpu

Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                40
On-line CPU(s) list:   0-39
Thread(s) per core:    2
Core(s) per socket:    10
Socket(s):             2
NUMA node(s):          2
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 62
Stepping:              4
CPU MHz:               1200.000
BogoMIPS:              4988.39
Virtualization:        VT-x
L1d cache:             32K
L1i cache:             32K
L2 cache:              256K
L3 cache:              25600K
NUMA node0 CPU(s):     0-9,20-29
NUMA node1 CPU(s):     10-19,30-39


In [4]:
# My repository.
repository = "/work/sandroal/cdlearn/"

# Include once my repository in the path for searching libraries.
if repository not in sys.path:
    sys.path.append(repository)
    
# Import my package.
import cdlearn

In [5]:
%%time

# Best performance (9 min 14.8s):
# n_workers = 8
# threads_per_worker = 5

# Setup cluster.
params = {
    "n_workers": 8,           
    "threads_per_worker": 5,
    "processes": True
}
cluster = LocalCluster(**params)
client = Client(cluster)

CPU times: user 684 ms, sys: 104 ms, total: 788 ms
Wall time: 2.37 s


In [6]:
print(cluster)

LocalCluster('tcp://127.0.0.1:54249', workers=8, threads=40, memory=67.46 GB)


In [7]:
client

0,1
Client  Scheduler: tcp://127.0.0.1:54249  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 8  Cores: 40  Memory: 67.46 GB


In [8]:
# Shuffled trends data files will be located here.
OUTPUT_FOLDER = "/work/sandroal/data_sets/GIMMS/shuffled_trends/"

# Data files location.
MAIN_FOLDER = "/work/sandroal/data_sets/GIMMS/"

## Load scaled NDVI data

In [9]:
%%time

# Into memory.
DS_NDVI_SCALED = xr.open_dataset(
    filename_or_obj=MAIN_FOLDER + "ppdata_ndvi_scaled.nc"
).load()

# Only land pixels.
DS_NDVI_SCALED = DS_NDVI_SCALED.where(
    DS_NDVI_SCALED.land_mask==True, drop=True
)

# Show it.
print(DS_NDVI_SCALED)

<xarray.Dataset>
Dimensions:    (lat: 912, lon: 666, time: 828)
Coordinates:
    land_mask  (lat, lon) int8 0 0 0 0 0 0 0 0 0 0 0 0 ... 0 0 0 0 0 0 0 0 0 0 0
  * lon        (lon) float64 -89.96 -89.88 -89.79 ... -33.88 -32.46 -32.38
  * lat        (lat) float64 -55.96 -55.88 -55.79 -55.71 ... 19.79 19.88 19.96
  * time       (time) datetime64[ns] 1981-07-01 1981-07-15 ... 2015-12-15
Data variables:
    ndvi       (time, lat, lon) float32 nan nan nan nan nan ... nan nan nan nan
CPU times: user 5.44 s, sys: 3.02 s, total: 8.47 s
Wall time: 12.1 s


In [10]:
print(">>> Size in memory:", DS_NDVI_SCALED.nbytes / 1e9, "GB")

>>> Size in memory: 2.012308944 GB


In [11]:
%%time

# Coerce all arrays in this dataset into dask arrays with the given chunks.
DS_NDVI_SCALED_CHUNKED = DS_NDVI_SCALED.chunk({"lat": 128, "lon": 100})

CPU times: user 3.86 s, sys: 72 ms, total: 3.93 s
Wall time: 3.69 s


In [12]:
# The array's data as a dask or numpy array.
DS_NDVI_SCALED_CHUNKED.ndvi.data

Unnamed: 0,Array,Chunk
Bytes,2.01 GB,42.39 MB
Shape,"(828, 912, 666)","(828, 128, 100)"
Count,57 Tasks,56 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 2.01 GB 42.39 MB Shape (828, 912, 666) (828, 128, 100) Count 57 Tasks 56 Chunks Type float32 numpy.ndarray",666  912  828,

Unnamed: 0,Array,Chunk
Bytes,2.01 GB,42.39 MB
Shape,"(828, 912, 666)","(828, 128, 100)"
Count,57 Tasks,56 Chunks
Type,float32,numpy.ndarray


## Performance test

In [13]:
%%time

# Lazy calculation.
DS_TREND_TS = cdlearn.statistics.theil_slopes_boosted(
    data_set=DS_NDVI_SCALED_CHUNKED, 
    var_code="ndvi", 
    dim="time"
)

# Show it.
print(DS_TREND_TS)

<xarray.Dataset>
Dimensions:      (lat: 912, lon: 666)
Coordinates:
    land_mask    (lat, lon) int8 dask.array<chunksize=(128, 100), meta=np.ndarray>
  * lon          (lon) float64 -89.96 -89.88 -89.79 ... -33.88 -32.46 -32.38
  * lat          (lat) float64 -55.96 -55.88 -55.79 -55.71 ... 19.79 19.88 19.96
Data variables:
    slopes       (lat, lon) float64 dask.array<chunksize=(128, 100), meta=np.ndarray>
    intercept    (lat, lon) float64 dask.array<chunksize=(128, 100), meta=np.ndarray>
    lower_slope  (lat, lon) float64 dask.array<chunksize=(128, 100), meta=np.ndarray>
    upper_slope  (lat, lon) float64 dask.array<chunksize=(128, 100), meta=np.ndarray>
CPU times: user 16 ms, sys: 4 ms, total: 20 ms
Wall time: 20.4 ms


In [14]:
# Trigger computation, keeping data as dask arrays.
DS_TREND_TS = DS_TREND_TS.persist()
    
# Watch progress.
progress(DS_TREND_TS)

VBox()

In [15]:
# End cluster instance.
client.close()
cluster.close()