In [5]:
from importlib import reload
import numpy as np
import xarray as xr
from numba import njit, guvectorize, prange

In [8]:
@guvectorize(["void(boolean[:], boolean[:], uint16[:])"], "(n),(n)->()", nopython=True)
def eca_precursor(b1, b2wr, KRprec):
    KRprec[0] = np.sum(b1 & b2wr)

In [9]:
@guvectorize(["void(bool_[:,:], bool_[:,:], uint16[:,:])"], "(m,n),(l,n)->(m,l)", nopython=True,)
def eca_precursors_pair(b2, b1w, result):
    """一次性计算所有位置对的 precursor 关系"""
    m = b2.shape[0]     # 第一数组的第一维
    l = b1w.shape[0]    # 第二数组的第一维
    
    for i in range(m):
        for j in range(l):
            result[i, j] = np.sum(b2[i, :] & b1w[j, :])

In [10]:
@njit(cache=True)
def eca_precursor_njit(b1, b2wr):
    KRprec = np.sum(b1 & b2wr)
    return KRprec

_ = eca_precursor_njit(np.array([True]), np.array([True]))

In [11]:
@njit(cache=True)
def eca_precursors_pair_njit(b2, b1w):
    """一次性计算所有位置对的 precursor 关系"""
    result = np.zeros((b2.shape[0], b1w.shape[0]), dtype=np.uint16)
    m = b2.shape[0]     # 第一数组的第一维
    l = b1w.shape[0]    # 第二数组的第一维
    
    for i in range(m):
        for j in range(l):
            result[i, j] = np.sum(b2[i, :] & b1w[j, :])
    return result

_ = eca_precursors_pair_njit(np.array([[True]]), np.array([[True]]))

In [12]:
@njit(cache=True, parallel=True)
def eca_precursors_pair_njitp(b2, b1w):
    """一次性计算所有位置对的 precursor 关系"""
    result = np.zeros((b2.shape[0], b1w.shape[0]), dtype=np.uint16)
    m = b2.shape[0]     # 第一数组的第一维
    l = b1w.shape[0]    # 第二数组的第一维
    
    for i in prange(m):
        for j in range(l):
            result[i, j] = np.sum(b2[i, :] & b1w[j, :])
    return result

_ = eca_precursors_pair_njitp(np.array([[True]]), np.array([[True]]))

## Loaded test

小型数据集测试

In [8]:
ds = xr.open_dataset("tests/data/era5.reanalysis.spi30d.0p25deg.china.1950-1979.nc").rename({"latitude": "lat", "longitude": "lon", "spi30d": "SPI1"})\
        .isel(lat=slice(0, 20), lon=slice(0, 20)).load()
ds

In [9]:
da_droughtA = (ds["SPI1"] < -1).copy().rename({"lon": "lonA", "lat": "latA"})
da_droughtB = (ds["SPI1"] < -1).copy().rename({"lon": "lonB", "lat": "latB"})

In [13]:
# da_droughtA_stack = da_droughtA.stack({"locA": ["latA", "lonA"]}).T
# da_droughtB_stack = da_droughtB.stack({"locB": ["latB", "lonB"]}).T

### `njit` cases

In [14]:
%%timeit -r 3
xr.apply_ufunc(eca_precursor_njit, da_droughtA, da_droughtB, vectorize=True,
               input_core_dims=[["time"], ["time"]], output_core_dims=[[]], 
               )

11 s ± 0 ns per loop (mean ± std. dev. of 1 run, 3 loops each)


In [15]:
%%timeit -r 3
xr.apply_ufunc(eca_precursors_pair_njit, da_droughtA, da_droughtB, vectorize=True, 
               input_core_dims=[["lonA", "time"], ["lonB", "time"]], 
               output_core_dims=[["lonA", "lonB"]],
               )

9.97 s ± 0 ns per loop (mean ± std. dev. of 1 run, 3 loops each)


In [16]:
%%timeit -r 3
xr.apply_ufunc(eca_precursors_pair_njitp, da_droughtA, da_droughtB, vectorize=True, 
               input_core_dims=[["lonA", "time"], ["lonB", "time"]], 
               output_core_dims=[["lonA", "lonB"]],
               )

1.29 s ± 6.5 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


结论：不并行条件下，直接二维计算会快一点，约15%。

### `guvectorize`

In [17]:
%%timeit -r 3
xr.apply_ufunc(eca_precursor, da_droughtA, da_droughtB, vectorize=False,
                input_core_dims=[["time"], ["time"]], output_core_dims=[[]], 
                )

8.69 s ± 571 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


In [18]:
%%timeit -r 3
xr.apply_ufunc(eca_precursors_pair, da_droughtA, da_droughtB, vectorize=False,
                input_core_dims=[["lonA", "time"], ["lonB", "time"]], 
                output_core_dims=[["lonA", "lonB"]],
                )

8.5 s ± 367 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


In [19]:
%%timeit -r 3
xr.apply_ufunc(eca_precursors_pair, da_droughtA, da_droughtB, vectorize=False,
                input_core_dims=[["latA", "time"], ["latB", "time"]], 
                output_core_dims=[["latA", "latB"]],
                )

8.72 s ± 191 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


结论：对于guvectorize，不并行时一维计算和二维计算效率差异不大

### guvectorize with built-in parallel for loaded dataset

In [47]:
@guvectorize(["void(boolean[:], boolean[:], uint16[:])"], "(n),(n)->()", nopython=True, target="parallel")
def eca_precursor_parallel(b1, b2wr, KRprec):
    KRprec[0] = np.sum(b1 & b2wr)

@guvectorize(["void(bool_[:,:], bool_[:,:], uint16[:,:])"], "(m,n),(l,n)->(m,l)", nopython=True, target="parallel")
def eca_precursors_pair_parallel(b2, b1w, result):
    """一次性计算所有位置对的 precursor 关系"""
    m = b2.shape[0]     # 第一数组的第一维
    l = b1w.shape[0]    # 第二数组的第一维
    
    for i in range(m):
        for j in range(l):
            result[i, j] = np.sum(b2[i, :] & b1w[j, :])

In [11]:
%%timeit -r 3
xr.apply_ufunc(eca_precursor_parallel, da_droughtA, da_droughtB, vectorize=False,
               input_core_dims=[["time"], ["time"]], output_core_dims=[[]],  
               )

629 ms ± 6.84 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


In [12]:
%%timeit -r 3
xr.apply_ufunc(eca_precursors_pair_parallel, da_droughtA, da_droughtB, vectorize=False,
               input_core_dims=[["lonA", "time"], ["lonB", "time"]], output_core_dims=[["lonA", "lonB"]],  
               )

1.39 s ± 34.3 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


结论：对于guvectorize，并行二维计算效率反而出现了损失，损失率超100%

总结：
1. 计算效率最高的是一维并行guvectorize：0.63ms，其次是二维并行njit：1.29s
2. 不并行条件下，guvectorize一二维差异不大，njit二维略快

## Dask parallel: General

利用Dask并行计算

In [44]:
from dask.distributed import Client, performance_report
client = Client(processes=False)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://10.245.92.223:8787/status,

0,1
Dashboard: http://10.245.92.223:8787/status,Workers: 1
Total threads: 48,Total memory: 251.39 GiB
Status: running,Using processes: False

0,1
Comm: inproc://10.245.92.223/1749322/38,Workers: 1
Dashboard: http://10.245.92.223:8787/status,Total threads: 48
Started: Just now,Total memory: 251.39 GiB

0,1
Comm: inproc://10.245.92.223/1749322/41,Total threads: 48
Dashboard: http://10.245.92.223:38975/status,Memory: 251.39 GiB
Nanny: None,
Local directory: /tmp/dask-scratch-space/worker-x5ah17gc,Local directory: /tmp/dask-scratch-space/worker-x5ah17gc


In [6]:
ds_chunk = xr.open_dataset("tests/data/era5.reanalysis.spi30d.0p25deg.china.1950-1979.nc").rename({"latitude": "lat", "longitude": "lon", "spi30d": "SPI1"})\
        .isel(lat=slice(0, 100), lon=slice(0, 100)).chunk({"lat": 20, "lon": 20})
da_droughtA_chunk = (ds_chunk["SPI1"] < -1).copy().rename({"lon": "lonA", "lat": "latA"})
da_droughtB_chunk = (ds_chunk["SPI1"] < -1).copy().rename({"lon": "lonB", "lat": "latB"})
# print basic info about chunk sizes and number of chunks
print(da_droughtA_chunk.shape, da_droughtA_chunk.chunksizes)

(10957, 100, 100) Frozen({'time': (10957,), 'latA': (20, 20, 20, 20, 20), 'lonA': (20, 20, 20, 20, 20)})


### njit cases

In [9]:
%%timeit -r 3 # 实际上并没有并行
with performance_report(filename="tests/logs/njit_single.html"):
    task = xr.apply_ufunc(eca_precursor_njit, da_droughtA_chunk, da_droughtB_chunk, vectorize=True,
               input_core_dims=[["time"], ["time"]], output_core_dims=[[]],  
               dask="parallelized", output_dtypes=[np.uint16]
               ).compute()
print(task.shape)

  tmp = blockwise(


(100, 100, 100, 100)


  tmp = blockwise(


(100, 100, 100, 100)


  tmp = blockwise(


(100, 100, 100, 100)


  tmp = blockwise(


(100, 100, 100, 100)
21min 3s ± 4.3 s per loop (mean ± std. dev. of 3 runs, 1 loop each)


In [19]:
%%timeit -r 3
with performance_report(filename="tests/logs/njit_pair.html"):
    task = xr.apply_ufunc(eca_precursors_pair_njit, da_droughtA_chunk, da_droughtB_chunk, vectorize=True,
               input_core_dims=[["lonA", "time"], ["lonB", "time"]], output_core_dims=[["lonA", "lonB"]],  
               dask="parallelized", 
               dask_gufunc_kwargs={"allow_rechunk": True},
               output_dtypes=[np.uint16]
               ).compute()
print(task.shape)

  tmp = blockwise(


(100, 100, 100, 100)


  tmp = blockwise(


(100, 100, 100, 100)


  tmp = blockwise(


(100, 100, 100, 100)


  tmp = blockwise(


(100, 100, 100, 100)
5min 17s ± 2.72 s per loop (mean ± std. dev. of 3 runs, 1 loop each)


In [None]:
%%timeit -r 3
with performance_report(filename="tests/logs/njit_pair_parallel.html"):
    task = xr.apply_ufunc(eca_precursors_pair_njitp, da_droughtA_chunk, da_droughtB_chunk, vectorize=True,
                    input_core_dims=[["lonA", "time"], ["lonB", "time"]], 
                    output_core_dims=[["lonA", "lonB"]],
                    dask="parallelized",
                    dask_gufunc_kwargs={"allow_rechunk": True},
                    output_dtypes=[np.uint16]
                    ).compute()
print(task.shape)

  tmp = blockwise(
  tmp = blockwise(
  tmp = blockwise(
  tmp = blockwise(


20.5 s ± 262 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


结论：
+ 一维njit在dask下计算效率低下（并行但CPU负载极低）；非并行二维njit计算效率仍然低下；并行二维njit
+ njit会导致dask client反应极其迟缓

### guvectorize

In [13]:
%%timeit -r 3
with performance_report(filename="tests/logs/guvectorize_single.html"):
    result = xr.apply_ufunc(eca_precursor, da_droughtA_chunk, da_droughtB_chunk, vectorize=False,
               input_core_dims=[["time"], ["time"]], output_core_dims=[[]],  
               dask="parallelized", dask_gufunc_kwargs={"allow_rechunk": True},
               ).compute()
print(result.shape)

  tmp = blockwise(


(100, 100, 100, 100)


  tmp = blockwise(


(100, 100, 100, 100)


  tmp = blockwise(


(100, 100, 100, 100)


  tmp = blockwise(


(100, 100, 100, 100)
49.2 s ± 240 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


In [14]:
%%timeit -r 3
with performance_report(filename="tests/logs/guvectorize_pair.html"):
    result = xr.apply_ufunc(eca_precursors_pair, da_droughtA_chunk, da_droughtB_chunk, vectorize=False,
                input_core_dims=[["lonA", "time"], ["lonB", "time"]], 
                output_core_dims=[["lonA", "lonB"]],
                dask="parallelized", dask_gufunc_kwargs={"allow_rechunk": True}
                ).compute()
print(result.shape)

  tmp = blockwise(


(100, 100, 100, 100)


  tmp = blockwise(


(100, 100, 100, 100)


  tmp = blockwise(


(100, 100, 100, 100)


  tmp = blockwise(


(100, 100, 100, 100)
1min 7s ± 1.82 s per loop (mean ± std. dev. of 3 runs, 1 loop each)


In [16]:
%%timeit -r 3
with performance_report(filename="tests/logs/guvectorize_single_parallel.html"):
    result = xr.apply_ufunc(eca_precursor_parallel, da_droughtA_chunk, da_droughtB_chunk, vectorize=False,
                input_core_dims=[["time"], ["time"]], output_core_dims=[[]],
                dask="parallelized", dask_gufunc_kwargs={"allow_rechunk": True}
                ).compute()
print(result.shape)

  tmp = blockwise(


(100, 100, 100, 100)


  tmp = blockwise(


(100, 100, 100, 100)


  tmp = blockwise(


(100, 100, 100, 100)


  tmp = blockwise(


(100, 100, 100, 100)
58.3 s ± 125 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


In [17]:
%%timeit -r 3
with performance_report(filename="tests/logs/guvectorize_pair_parallel.html"):
    result = xr.apply_ufunc(eca_precursors_pair_parallel, da_droughtA_chunk, da_droughtB_chunk, vectorize=False,
                input_core_dims=[["lonA", "time"], ["lonB", "time"]], 
                output_core_dims=[["lonA", "lonB"]],
                dask="parallelized", dask_gufunc_kwargs={"allow_rechunk": True}
                ).compute()
print(result.shape)

  tmp = blockwise(


(100, 100, 100, 100)


  tmp = blockwise(


(100, 100, 100, 100)


  tmp = blockwise(


(100, 100, 100, 100)


  tmp = blockwise(


(100, 100, 100, 100)
47.6 s ± 105 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


In [20]:
%%timeit -r 3
with performance_report(filename="tests/logs/guvectorize_pair_parallel(lat).html"):
    result = xr.apply_ufunc(eca_precursors_pair_parallel, da_droughtA_chunk, da_droughtB_chunk, vectorize=False,
                input_core_dims=[["latA", "time"], ["latB", "time"]], 
                output_core_dims=[["latA", "latB"]],
                dask="parallelized", dask_gufunc_kwargs={"allow_rechunk": True}
                ).compute()
print(result.shape)

  tmp = blockwise(


(100, 100, 100, 100)


  tmp = blockwise(


(100, 100, 100, 100)


  tmp = blockwise(


(100, 100, 100, 100)


  tmp = blockwise(


(100, 100, 100, 100)
47.4 s ± 72.3 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


结论：
+ 对于非并行的guvectorize: 二维并不比一维快，尽管graph size小得多，但是仍然损失20%左右的性能；log显示，二维时guvectorize的CPU利用效率不佳，而一维能几乎达到100%
+ 对于并行的guvectorize：二维略快于一维10%

## Large chunks on only one spatial dimention

In [45]:
ds_largechunk = xr.open_dataset("tests/data/era5.reanalysis.spi30d.0p25deg.china.1950-1979.nc").rename({"latitude": "lat", "longitude": "lon", "spi30d": "SPI1"})\
        .isel(lat=slice(0, 100), lon=slice(0, 100)).chunk({"lat": 20, "lon": -1})
da_droughtA_largechunk = (ds_largechunk["SPI1"] < -1).copy().rename({"lon": "lonA", "lat": "latA"})
da_droughtB_largechunk = (ds_largechunk["SPI1"] < -1).copy().rename({"lon": "lonB", "lat": "latB"})

In [51]:
%%timeit -r 3
test = xr.apply_ufunc(eca_precursors_pair_njitp, da_droughtA_largechunk, da_droughtB_largechunk, vectorize=True,
            input_core_dims=[["lonA", "time"], ["lonB", "time"]], 
            output_core_dims=[["lonA", "lonB"]],
            dask="parallelized", dask_gufunc_kwargs={"allow_rechunk": True},
            output_dtypes=[np.uint16]
            ).compute()
print(test.shape)

(100, 100, 100, 100)
(100, 100, 100, 100)
(100, 100, 100, 100)
(100, 100, 100, 100)
11.1 s ± 109 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


In [52]:
%%timeit -r 3
test = xr.apply_ufunc(eca_precursor_parallel, da_droughtA_largechunk, da_droughtB_largechunk, vectorize=False,
            input_core_dims=[["time"], ["time"]], 
            output_core_dims=[[]],
            dask="parallelized", dask_gufunc_kwargs={"allow_rechunk": True}
            ).compute()
print(test.shape)

(100, 100, 100, 100)
(100, 100, 100, 100)
(100, 100, 100, 100)
(100, 100, 100, 100)
47.7 s ± 319 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


In [None]:
%%timeit -r 3
test = xr.apply_ufunc(eca_precursors_pair_parallel, da_droughtA_largechunk, da_droughtB_largechunk, vectorize=False,
            input_core_dims=[["lonA", "time"], ["lonB", "time"]], 
            output_core_dims=[["lonA", "lonB"]],
            dask="parallelized", dask_gufunc_kwargs={"allow_rechunk": True}
            ).compute()
print(test.shape)

(100, 100, 100, 100)


(100, 100, 100, 100)
(100, 100, 100, 100)
(100, 100, 100, 100)
43 s ± 50.3 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


总结：
1. 计算效率最高的是二维并行njit：11.1s；其次是二维并行guvectorize：43s
2. 仅在一个空间维度上设置chunk不会出现 `PerformanceWarning: Increasing number of chunks by factor`
3. 相比于general，在非input_core_dims上进行chunks效率略高

In [53]:
client.close()