In [3]:
%matplotlib inline
%matplotlib notebook
%load_ext autoreload
%autoreload 2

import vectorbtpro as vbt
import numpy as np
import pandas as pd

from numba import njit
import talib

# 管道优化
from vectorbtpro.returns import nb as ret_nb
from vectorbtpro.portfolio import nb as pf_nb
from vectorbtpro.portfolio.enums import Direction

vbt.settings.set_theme('dark')

# 总纲
* [Chunking Module](#Chunking Module)
* Execution Module

# Chunking Module

## Column-wise Multi-Index Process

### 生成示例 DataFrame

In [4]:
import pandas as pd
import numpy as np

# 定义参数
ARRAY_SIZE = 2000000

# 创建示例数据
data = {
    ('Indicator_Group_1', 'Indicator 1'): np.random.uniform(size=ARRAY_SIZE),
    ('Indicator_Group_1', 'Indicator 2'): np.random.uniform(size=ARRAY_SIZE),
    ('Indicator_Group_2', 'Indicator 1'): np.random.uniform(size=ARRAY_SIZE),
    ('Indicator_Group_2', 'Indicator 2'): np.random.uniform(size=ARRAY_SIZE),
}

# 创建 DataFrame
exp_df = pd.DataFrame(data)
exp_df.columns.set_names(['Indicator_Group_Name', 'Indicator_Name'], inplace=True)

# 转换时间索引
start_date = pd.to_datetime('2021-01-03 22:00:00+00:00')
end_date = start_date + pd.DateOffset(hours=ARRAY_SIZE - 1)
exp_df.index = pd.date_range(start=start_date, end=end_date, freq='1H')

# 显示 DataFrame
print(exp_df.columns)
print(exp_df.shape)
print(exp_df.iloc[:5])
print(exp_df.iloc[-5:])

MultiIndex([('Indicator_Group_1', 'Indicator 1'),
            ('Indicator_Group_1', 'Indicator 2'),
            ('Indicator_Group_2', 'Indicator 1'),
            ('Indicator_Group_2', 'Indicator 2')],
           names=['Indicator_Group_Name', 'Indicator_Name'])
(2000000, 4)
Indicator_Group_Name      Indicator_Group_1             Indicator_Group_2  \
Indicator_Name                  Indicator 1 Indicator 2       Indicator 1   
2021-01-03 22:00:00+00:00          0.206350    0.458267          0.323652   
2021-01-03 23:00:00+00:00          0.260635    0.376705          0.276636   
2021-01-04 00:00:00+00:00          0.399095    0.547079          0.852816   
2021-01-04 01:00:00+00:00          0.612698    0.420465          0.558242   
2021-01-04 02:00:00+00:00          0.171315    0.758769          0.523526   

Indicator_Group_Name                   
Indicator_Name            Indicator 2  
2021-01-03 22:00:00+00:00    0.831481  
2021-01-03 23:00:00+00:00    0.833149  
2021-01-04 00:00:00+00:00

### 分块 (@vbt.chunked) 实验

#### Decorator 装饰器 写法

In [5]:
def decorator1(func):
    def wrapper(*args, **kwargs):
        print("Decorator 1")
        print("args:", args)
        print("kwargs:", kwargs)
        func(*args, **kwargs)
    return wrapper

def decorator2(func):
    def preprocessing():
        print("Dec 2 preprocessing!")

    def postprocessing():
        print("Dec 2 postprocessing")

    def wrapper(*args, **kwargs):
        print("Decorator 2")
        preprocessing()
        print("args:", args)
        print("kwargs:", kwargs)
        func(*args, **kwargs)
        postprocessing()

    return wrapper

@decorator1
@decorator2
def my_function(*args, **kwargs):
    print("Original function")

my_function(1, 2, 3, x=4, y=5, z=6)

Decorator 1
args: (1, 2, 3)
kwargs: {'x': 4, 'y': 5, 'z': 6}
Decorator 2
Dec 2 preprocessing!
args: (1, 2, 3)
kwargs: {'x': 4, 'y': 5, 'z': 6}
Original function
Dec 2 postprocessing


In [6]:
%%time

import vectorbtpro as vbt
import pandas as pd
import numpy as np
import numba as nb
from numba import njit

@njit(parallel=True)
def rolling_mean_nb(np_array_in, window_size):
    rolling_mean_res = np.empty_like(np_array_in)

    for i in range(np_array_in.shape[1]):
        ser_in = np_array_in[:, i] # Squeeze the np.array manually

        # Create the window function
        window = np.ones(window_size) / window_size
        # conlvolve_mode = unicode_type("Hello, world!")

        # Calculate the rolling mean using numpy.convolve
        rolling_mean_i_dim = np.convolve(ser_in, window)[window_size-1:][:len(ser_in)-window_size+1]
        rolling_mean_i_dim = np.concatenate((np.full(window_size-1, np.nan), rolling_mean_i_dim))
        
        rolling_mean_res[:, i] = rolling_mean_i_dim
    return rolling_mean_res

@vbt.chunked(
    n_chunks=4, 
    merge_func="column_stack",
    size=vbt.ArraySizer(arg_query='df', axis=1),
    arg_take_spec=dict(
        df=vbt.ArraySlicer(axis=1),
        window_size=None,
    ), 
    engine="dask",
)
def rolling_mean_executor(
    df : pd.DataFrame, 
    window_size : int
) -> np.ndarray :
    print("df_vals_shape:", df.values.shape)
    res_np_array = rolling_mean_nb(df.values, window_size)
    return res_np_array

def rolling_mean(
    df : pd.DataFrame, 
    window_size : int
) -> pd.DataFrame:
    idx_ori, cols_ori = df.index, df.columns
    res_np_array = rolling_mean_executor(df, window_size)
    print(res_np_array.shape)
    res_df = pd.DataFrame(data=res_np_array, index=idx_ori, columns=cols_ori)
    return res_df

print(exp_df.shape)
print()
rolling_mean(exp_df, 3)

(2000000, 4)

df_vals_shape: (2000000, 1)
df_vals_shape: (2000000, 1)
df_vals_shape: (2000000, 1)
df_vals_shape: (2000000, 1)
(2000000, 4)
CPU times: user 4.02 s, sys: 95.5 ms, total: 4.12 s
Wall time: 3.95 s


Indicator_Group_Name,Indicator_Group_1,Indicator_Group_1,Indicator_Group_2,Indicator_Group_2
Indicator_Name,Indicator 1,Indicator 2,Indicator 1,Indicator 2
2021-01-03 22:00:00+00:00,,,,
2021-01-03 23:00:00+00:00,,,,
2021-01-04 00:00:00+00:00,0.288693,0.460684,0.484368,0.879638
2021-01-04 01:00:00+00:00,0.424143,0.448083,0.562565,0.724725
2021-01-04 02:00:00+00:00,0.394369,0.575438,0.644861,0.611541
...,...,...,...,...
2249-03-03 01:00:00+00:00,0.412668,0.811004,0.417788,0.489488
2249-03-03 02:00:00+00:00,0.398466,0.837783,0.451421,0.562898
2249-03-03 03:00:00+00:00,0.404473,0.684371,0.649716,0.371291
2249-03-03 04:00:00+00:00,0.615616,0.824658,0.494113,0.363082


In [9]:
%%time
# 单抽process_df的decorator出来

import vectorbtpro as vbt
import pandas as pd
import numpy as np
import numba as nb
from numba import njit

@njit
def rolling_mean_single_nb(ser_in, window_size):
    # Create the window function
    window = np.ones(window_size) / window_size
    # conlvolve_mode = unicode_type("Hello, world!")

    # Calculate the rolling mean using numpy.convolve
    rolling_mean_res = np.convolve(ser_in, window)[window_size-1:][:len(ser_in)-window_size+1]
    rolling_mean_res = np.concatenate((np.full(window_size-1, np.nan), rolling_mean_res))
    
    return rolling_mean_res

@njit(parallel=True)
def rolling_mean_nb(np_array_in, window_size):
    rolling_mean_res = np.empty_like(np_array_in)
    for i in range(np_array_in.shape[1]):
        ser_in = np_array_in[:, i]
        rolling_mean_res[:, i] = rolling_mean_single_nb(ser_in, window_size)
    return rolling_mean_res

def process_df(func) -> pd.DataFrame:
    
    def wrapper(*args, **kwargs):
        df = args[0]
        idx_ori, cols_ori = df.index, df.columns
        res_np_array = func(*args, **kwargs)
        res_df = pd.DataFrame(data=res_np_array, index=idx_ori, columns=cols_ori)
        return res_df
    
    return wrapper

@process_df
@vbt.chunked(
    n_chunks=4, 
    # chunk_len=3,
    merge_func="column_stack",
    size=vbt.ArraySizer(arg_query='df', axis=1),
    arg_take_spec=dict(
        df=vbt.ArraySlicer(axis=1),
        window_size=None,
    ), 
    engine="dask",
)
def rolling_mean_executor(
    df : pd.DataFrame, 
    window_size : int
) -> np.ndarray :
    print("df_vals_shape:", df.values.shape)
    res_np_array = rolling_mean_nb(df.values, window_size)
    return res_np_array



print(exp_df.shape)
print()
rolling_mean_executor(exp_df, 3)

(2000000, 4)

df_vals_shape: (2000000, 1)
df_vals_shape: (2000000, 1)
df_vals_shape: (2000000, 1)
df_vals_shape: (2000000, 1)
CPU times: user 2.23 s, sys: 80.1 ms, total: 2.31 s
Wall time: 2.23 s


Indicator_Group_Name,Indicator_Group_1,Indicator_Group_1,Indicator_Group_2,Indicator_Group_2
Indicator_Name,Indicator 1,Indicator 2,Indicator 1,Indicator 2
2021-01-03 22:00:00+00:00,,,,
2021-01-03 23:00:00+00:00,,,,
2021-01-04 00:00:00+00:00,0.288693,0.460684,0.484368,0.879638
2021-01-04 01:00:00+00:00,0.424143,0.448083,0.562565,0.724725
2021-01-04 02:00:00+00:00,0.394369,0.575438,0.644861,0.611541
...,...,...,...,...
2249-03-03 01:00:00+00:00,0.412668,0.811004,0.417788,0.489488
2249-03-03 02:00:00+00:00,0.398466,0.837783,0.451421,0.562898
2249-03-03 03:00:00+00:00,0.404473,0.684371,0.649716,0.371291
2249-03-03 04:00:00+00:00,0.615616,0.824658,0.494113,0.363082


## 接口参数说明

```python
chunked(
    *args, # 
    n_chunks=None,
    size=None,
    min_size=None,
    chunk_len=None,
    chunk_meta=None,
    skip_one_chunk=None,
    arg_take_spec=None,
    template_context=None,
    prepend_chunk_meta=None,
    merge_func=None,
    merge_kwargs=None,
    return_raw_chunks=False,
    silence_warnings=None,
    disable=None,
    forward_kwargs_as=None,
    execute_kwargs=None,
    **kwargs
)
```

* 函数功能:
    * 将函数的输入分块的装饰器。
    * 返回与传递的函数具有相同函数结构 (Function Signature) 的新函数。

In [57]:
arg_take_spec = dict(
    a=vbt.ChunkSelector(),
    args=vbt.ArgsTaker(
        None,
        vbt.ChunkSelector()
    ),
    b=vbt.SequenceTaker([
        None,
        vbt.ChunkSelector()
    ]),
    kwargs=vbt.KwargsTaker(
        c=vbt.MappingTaker(dict(
            d=vbt.ChunkSelector(),
            e=None
        ))
    )
)

@vbt.chunked(
    n_chunks=vbt.LenSizer(arg_query='a'),
    arg_take_spec=arg_take_spec
)
def f(a, *args, b=None, **kwargs):
    return a + sum(args) + sum(b) + sum(kwargs['c'].values())

In [58]:
f([1, 2, 3], 10, [1, 2, 3], b=(100, [1, 2, 3]), c=dict(d=[1, 2, 3], e=1000))

[1114, 1118, 1122]

In [None]:
@vbt.chunked(
    n_chunks=2,
    size=vbt.LenSizer(arg_query='a'),
    arg_take_spec=dict(a=vbt.ChunkSlicer())
)
def f(a):
    return np.mean(a)

In [None]:
@vbt.chunked(
    n_chunks=2,
    size=vbt.LenSizer(arg_query='a'),
    arg_take_spec=dict(a=None),
    merge_func="concat"
)
def f(chunk_meta, a):
    return a[chunk_meta.start:chunk_meta.end]

f(np.arange(10))

In [None]:
@vbt.chunked(
    n_chunks=2,
    size=vbt.LenSizer(arg_query='a'),
    arg_take_spec=dict(a=None),
    merge_func="concat",
    prepend_chunk_meta=False
)
def f(chunk_meta, a):
    return a[chunk_meta.start:chunk_meta.end]