In [2]:
import pathlib
import time
from importlib import reload
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import xarray as xr
import cf_xarray as cfxr

In [3]:
from numba import njit, guvectorize, prange
# import dask.config
# dask.config.set(scheduler='threads')

# # 创建一个使用线程的 LocalCluster（使用 processes=False 表示线程模式）
from dask.distributed import Client, performance_report
client = Client(processes=False)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://10.245.92.223:8787/status,

0,1
Dashboard: http://10.245.92.223:8787/status,Workers: 1
Total threads: 48,Total memory: 251.39 GiB
Status: running,Using processes: False

0,1
Comm: inproc://10.245.92.223/1357108/1,Workers: 1
Dashboard: http://10.245.92.223:8787/status,Total threads: 48
Started: Just now,Total memory: 251.39 GiB

0,1
Comm: inproc://10.245.92.223/1357108/4,Total threads: 48
Dashboard: http://10.245.92.223:41401/status,Memory: 251.39 GiB
Nanny: None,
Local directory: /tmp/dask-scratch-space/worker-pis8p82r,Local directory: /tmp/dask-scratch-space/worker-pis8p82r


In [4]:
@guvectorize(["void(boolean[:], boolean[:], uint16[:])"], "(n),(n)->()", nopython=True)
def eca_precursor(b1, b2wr, KRprec):
    KRprec[0] = np.sum(b1 & b2wr)

In [5]:
@guvectorize(["void(bool_[:,:], bool_[:,:], uint16[:,:])"], "(m,n),(l,n)->(m,l)", nopython=True,)
def eca_precursors_pair(b2, b1w, result):
    """一次性计算所有位置对的 precursor 关系"""
    m = b2.shape[0]     # 第一数组的第一维
    l = b1w.shape[0]    # 第二数组的第一维
    
    for i in range(m):
        for j in range(l):
            result[i, j] = np.sum(b2[i, :] & b1w[j, :])

In [6]:
@njit(cache=True)
def eca_precursor_njit(b1, b2wr):
    KRprec = np.sum(b1 & b2wr)
    return KRprec

_ = eca_precursor(np.array([True]), np.array([True]))

In [7]:
@njit(cache=True)
def eca_precursors_pair_njit(b2, b1w):
    """一次性计算所有位置对的 precursor 关系"""
    result = np.zeros((b2.shape[0], b1w.shape[0]), dtype=np.uint16)
    m = b2.shape[0]     # 第一数组的第一维
    l = b1w.shape[0]    # 第二数组的第一维
    
    for i in range(m):
        for j in range(l):
            result[i, j] = np.sum(b2[i, :] & b1w[j, :])
    return result

_ = eca_precursors_pair_njit(np.array([[True]]), np.array([[True]]))

In [8]:
@njit(cache=True, parallel=True)
def eca_precursors_pair_njitp(b2, b1w):
    """一次性计算所有位置对的 precursor 关系"""
    result = np.zeros((b2.shape[0], b1w.shape[0]), dtype=np.uint16)
    m = b2.shape[0]     # 第一数组的第一维
    l = b1w.shape[0]    # 第二数组的第一维
    
    for i in prange(m):
        for j in range(l):
            result[i, j] = np.sum(b2[i, :] & b1w[j, :])
    return result

_ = eca_precursor_njit(np.array([[True]]), np.array([[True]]))

## prepare data

In [36]:
ds = xr.open_dataset("tests/data/era5.reanalysis.spi30d.0p25deg.china.1950-1979.nc").rename({"latitude": "lat", "longitude": "lon", "spi30d": "SPI1"})\
        .isel(lat=slice(0, 100), lon=slice(0, 20))
ds

In [37]:
da_droughtA = (ds["SPI1"] < -1).copy().rename({"lon": "lonA", "lat": "latA"})
da_droughtB = (ds["SPI1"] < -1).copy().rename({"lon": "lonB", "lat": "latB"})

In [38]:
da_droughtA_stack = da_droughtA.stack({"locA": ["latA", "lonA"]}).T
da_droughtB_stack = da_droughtB.stack({"locB": ["latB", "lonB"]}).T

### `njit` cases

In [40]:
%%timeit -n 3 -r 3
xr.apply_ufunc(eca_precursor_njit, da_droughtA, da_droughtB, vectorize=True,
               input_core_dims=[["time"], ["time"]], output_core_dims=[[]], 
               ) #dask="parallelized", output_dtypes=[np.uint16] 

3min 34s ± 272 ms per loop (mean ± std. dev. of 3 runs, 3 loops each)


In [41]:
%%timeit -n 1 -r 1
xr.apply_ufunc(eca_precursors_pair_njit, da_droughtA, da_droughtB, vectorize=True, 
               input_core_dims=[["lonA", "time"], ["lonB", "time"]], 
               output_core_dims=[["lonA", "lonB"]],
               )

3min 6s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [None]:
%%timeit -n 1 -r 1
eca_precursors_pair_njit(da_droughtA_stack.values, da_droughtB_stack.values)

1741330015.1323948
1741330220.0367894


### `guvectorize`

In [42]:
%%timeit -n 1 -r 1
xr.apply_ufunc(eca_precursor, da_droughtA, da_droughtB, vectorize=False,
                input_core_dims=[["time"], ["time"]], output_core_dims=[[]], 
                # dask="parallelized", output_dtypes=[np.uint16]
                )

3min 16s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [43]:
%%timeit -n 1 -r 1
xr.apply_ufunc(eca_precursors_pair, da_droughtA, da_droughtB, vectorize=False,
                input_core_dims=[["lonA", "time"], ["lonB", "time"]], 
                output_core_dims=[["lonA", "lonB"]],
                # dask="parallelized"
                )

3min 5s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [44]:
%%timeit -n 1 -r 1
xr.apply_ufunc(eca_precursors_pair, da_droughtA, da_droughtB, vectorize=False,
                input_core_dims=[["latA", "time"], ["latB", "time"]], 
                output_core_dims=[["latA", "latB"]],
                # dask="parallelized"
                )

5min 56s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


## Dask parallel

In [13]:
ds_chunk = xr.open_dataset("tests/data/era5.reanalysis.spi30d.0p25deg.china.1950-1979.nc").rename({"latitude": "lat", "longitude": "lon", "spi30d": "SPI1"})\
        .isel(lat=slice(0, 100), lon=slice(0, 100)).chunk({"lat": 20, "lon": 20})
da_droughtA_chunk = (ds_chunk["SPI1"] < -1).copy().rename({"lon": "lonA", "lat": "latA"})
da_droughtB_chunk = (ds_chunk["SPI1"] < -1).copy().rename({"lon": "lonB", "lat": "latB"})

In [14]:
%%timeit -n 1 -r 1
with performance_report(filename="tests/logs/guvectorize_single.html"):
    result = xr.apply_ufunc(eca_precursor, da_droughtA_chunk, da_droughtB_chunk, vectorize=False,
               input_core_dims=[["time"], ["time"]], output_core_dims=[[]],  
               dask="parallelized", dask_gufunc_kwargs={"allow_rechunk": True},
               ).compute()

  tmp = blockwise(


49.5 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [33]:
ds_largechunk = xr.open_dataset("tests/data/era5.reanalysis.spi30d.0p25deg.china.1950-1979.nc").rename({"latitude": "lat", "longitude": "lon", "spi30d": "SPI1"})\
        .isel(lat=slice(0, 100), lon=slice(0, 100)).chunk({"lat": -1, "lon": 5})
da_droughtA_largechunk = (ds_largechunk["SPI1"] < -1).copy().rename({"lon": "lonA", "lat": "latA"})
da_droughtB_largechunk = (ds_largechunk["SPI1"] < -1).copy().rename({"lon": "lonB", "lat": "latB"})

In [34]:
%%timeit -n 1 -r 1
with performance_report(filename="tests/logs/guvectorize_single.html"):
    result = xr.apply_ufunc(eca_precursor, da_droughtA_largechunk, da_droughtB_largechunk, vectorize=False,
               input_core_dims=[["time"], ["time"]], output_core_dims=[[]],  
               dask="parallelized",
               ).compute()


  tmp = blockwise(


50.5 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [11]:
%%timeit -n 1 -r 1 # 实际上并没有并行起来
task = xr.apply_ufunc(eca_precursor_njit, da_droughtA_chunk, da_droughtB_chunk, vectorize=True,
               input_core_dims=[["time"], ["time"]], output_core_dims=[[]],  
               dask="parallelized", output_dtypes=np.uint16
               ).compute()
print(task.shape)

  tmp = blockwise(


20min 53s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [18]:
%%timeit -n 1 -r 1
xr.apply_ufunc(eca_precursors_pair, da_droughtA_chunk, da_droughtB_chunk, vectorize=False,
                input_core_dims=[["lonA", "time"], ["lonB", "time"]], 
                output_core_dims=[["lonA", "lonB"]],
                dask="parallelized", dask_gufunc_kwargs={"allow_rechunk": True}
                ).compute()

  tmp = blockwise(


1min 2s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


41.9s vs. 48.4s

In [15]:
%%timeit -n 1 -r 1 # really slow, not acctually launch parallel
with performance_report(filename="tests/logs/njit_pair.html"):
    task = xr.apply_ufunc(eca_precursors_pair_njit, da_droughtA_chunk, da_droughtB_chunk, vectorize=True,
                    input_core_dims=[["lonA", "time"], ["lonB", "time"]], 
                    output_core_dims=[["lonA", "lonB"]],
                    dask="parallelized",
                    dask_gufunc_kwargs={"allow_rechunk": True},
                    output_dtypes=np.uint16
                   ).compute()
print(task.shape)

  tmp = blockwise(


(100, 100, 100, 100)
5min 23s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [16]:
%%timeit -n 1 -r 1
with performance_report(filename="tests/logs/njit_parallel_pair.html"):
    xr.apply_ufunc(eca_precursors_pair_njitp, da_droughtA_chunk, da_droughtB_chunk, vectorize=True,
                    input_core_dims=[["lonA", "time"], ["lonB", "time"]], 
                    output_core_dims=[["lonA", "lonB"]],
                    dask="parallelized",
                    dask_gufunc_kwargs={"allow_rechunk": True},
                    output_dtypes=np.uint16
                    ).compute()

  tmp = blockwise(


20.6 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


## numba parallel w loaded dataset

In [38]:
da_droughtA_stack_load = da_droughtA_chunk.stack({"locA": ["latA", "lonA"]}).T.values
da_droughtB_stack_load = da_droughtB_chunk.stack({"locB": ["latB", "lonB"]}).T.values

In [43]:
%%timeit -n 1 -r 1
eca_precursors_pair_njitp(da_droughtA_unchunk_stack, da_droughtB_unchunk_stack)

11.1 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [39]:
da_droughtA_load = da_droughtA_chunk.copy().load()
da_droughtB_load = da_droughtB_chunk.copy().load()

In [36]:
@guvectorize(["void(boolean[:], boolean[:], uint16[:])"], "(n),(n)->()", nopython=True, target="parallel")
def eca_precursor_parallel(b1, b2wr, KRprec):
    KRprec[0] = np.sum(b1 & b2wr)

@guvectorize(["void(bool_[:,:], bool_[:,:], uint16[:,:])"], "(m,n),(l,n)->(m,l)", nopython=True, target="parallel")
def eca_precursors_pair_parallel(b2, b1w, result):
    """一次性计算所有位置对的 precursor 关系"""
    m = b2.shape[0]     # 第一数组的第一维
    l = b1w.shape[0]    # 第二数组的第一维
    
    for i in prange(m):
        for j in range(l):
            result[i, j] = np.sum(b2[i, :] & b1w[j, :])

In [40]:
%%timeit -n 1 -r 1
xr.apply_ufunc(eca_precursor_parallel, da_droughtA_load, da_droughtB_load, vectorize=False,
               input_core_dims=[["time"], ["time"]], output_core_dims=[[]],  
               )

4min 58s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [41]:
da_droughtA_stack_load = da_droughtA_chunk.stack({"locA": ["latA", "lonA"]}).T.load()
da_droughtB_stack_load = da_droughtB_chunk.stack({"locB": ["latB", "lonB"]}).T.load()

In [42]:
%%timeit -n 1 -r 1
xr.apply_ufunc(eca_precursors_pair_parallel, da_droughtA_stack_load, da_droughtB_stack_load, vectorize=False,
               input_core_dims=[["locA", "time"], ["locB", "time"]], output_core_dims=[["locA", "locB"]],  
               dask="parallelized",
               ).compute()

15min 11s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [45]:
client.close()