# 基于pandas和numpy的自定义多列rolling+apply函数

# Modules

In [None]:
# python env: miniconda, python==3.9.17, pandas==1.5.2, numpy==1.23.4
import numpy as np
import pandas as pd
from numpy.lib.stride_tricks import as_strided as stride

# 数据准备

In [None]:
# demo-data1：部分行情切片
from io import StringIO
data_str = '''
,open,high,low,close
0,132.960,133.340,132.940,133.105
1,133.110,133.255,132.710,132.755
2,132.755,132.985,132.640,132.735 
3,132.730,132.790,132.575,132.685
4,132.685,132.785,132.625,132.755
'''
dates = pd.date_range('20130101', periods=5, freq='D')
df_prc = pd.read_csv(StringIO(data_str), index_col=[0])
df_prc.index = dates
df_prc.index.name = 'datetime'
df_prc

In [None]:
# demo-data2：数值序列
dates = pd.date_range('20130101', periods=13, freq='D')
df = pd.DataFrame({'C': [1.6, 4.1, 2.7, 4.9, 5.4, 1.3, 6.6, 9.6, 3.5, 5.4, 10.1, 3.08, 5.38],
                   'D': [2.6, 4.1, 2.3, 4.9, 5.1, 1.3, 3.6, 5.6, 3.5, 4.4, 2.1, 1.08, 3.38]}, 
                   index=dates)
df['E'] = df.median(axis=1)
df.index.name = 'datetime'
df

# 解决思路

## DataFrame版

In [None]:
def rolling_group(df: pd.DataFrame, window: int, **kwargs):
    """
    To run rolling apply with multi-columns input
    :param df: pd.DataFrame with ONLY 2-dims like below:
                    open     high     low      close
        datetime                                      
        2013-01-01  132.960  133.340  132.940  133.105
        2013-01-02  133.110  133.255  132.710  132.755
        2013-01-03  132.755  132.985  132.640  132.735
        2013-01-04  132.730  132.790  132.575  132.685
        2013-01-05  132.685  132.785  132.625  132.755
    :param window: rolling windows, also means min_periods
    :param kwargs: other kwargs to be input to the apply func
    :return: pd.DataFrame.groupby object with muli-level index values like below:
                               open     high     low    close
    2013-01-03  2013-01-01   132.96   133.34  132.94  133.105
                2013-01-02   133.11  133.255  132.71  132.755
                2013-01-03  132.755  132.985  132.64  132.735
    """
    # move index to values
    v = df.reset_index().values
    
    # rolling cut sample
    dim0, dim1 = v.shape
    stride0, stride1 = v.strides

    stride_values = stride(v, (dim0 - (window - 1), window, dim1), (stride0, stride0, stride1))
    
    # concat as new DataFrame
    rolled_df = pd.concat({
        row: pd.DataFrame(values[:, 1:], columns=df.columns, index=values[:, 0].flatten())
        for row, values in zip(df.index[window - 1:], stride_values)
    })

    # return groupby object with level 0 the original index level 
    return rolled_df.groupby(level=0, **kwargs)


# groupby object
rolling_group(df, 3)
# iter check
print(next(iter(rolling_group(df, 3))))

In [None]:
# 功能测试
# add basic lambda func
# rolling_group(df, 3).apply(lambda df:df['C'].mean() + df['D'].max())  # 单一返回值
# rolling_group(df, 3).apply(lambda df:(df['C'].mean() , df['D'].max()))  # 单一tuple返回值
# rolling_group(df, 3).apply(lambda df:pd.Series((df['C'].mean() , df['D'].max())))  # 多列返回

# rolling_group(df_prc, 3).apply(lambda df:df.min(axis=1))  # dataframe返回,此时因为是groupby返回对象，所以索引会再增加一级

### 运用1：简单举例

In [None]:
def own_func(df):
    """
    attention: df has MultiIndex
    :param df:
    :return:
    """

    return pd.Series([df["C"].mean(), df["C"].max() + df["D"].min()])

rolling_group(df, 4).apply(own_func)

### 运用2：Beta计算

+ Beta是以股票ret为y，市场收益率为X回归的$\beta$

In [None]:
import statsmodels.api as sm

# 权重w和市场收益率mkt_ret模拟
w = np.array(range(4, 0, -1))/10
s_mkt_dt = pd.Series(range(1, len(dates)+1), index=dates)

def __wls_beta(ret, w=None):
    ret = ret.reset_index(0, drop=True).astype(float)
    if w is None:
        w = np.ones(ret.shape[0])

    s_mkt = s_mkt_dt[ret.index].astype(float)
    X = sm.add_constant(s_mkt, prepend=False)
    # print(X)
    model = sm.WLS(ret, X, weights=w).fit()
    # param0 for beta_mkt, param1 for _cons
    params = model.params.values
    error_std = (ret - model.predict(X)).std()
    return params[0], params[1], error_std


# rolling WLS with multi returns
df_reg_params = rolling_group(df_prc, window=4).agg(lambda i:__wls_beta(i, w=w))

# extract rolling wls reg beta_mkt
df_beta_mkt = df_reg_params.applymap(lambda x:x[0])
# extract rolling wls reg _cons
df_cons = df_reg_params.applymap(lambda x:x[1])
# extract rolling wls reg predict_error_std
df_error_std = df_reg_params.applymap(lambda x:x[2])

In [None]:
df_error_std, df_beta_mkt

### 运用3：区间相对强弱指标计算

+ 定义：当日收盘价位于N日高低点的相对位置

+ 公式：$$RS_t(N)=\frac{close_t-Min(N)}{Max(N)-Min(N)},$$其中$Min(N)=Min(\{low_{\tau}, \tau \in [t,..., t-(N-1)]\})$, $Max(N)=Max(\{high_{\tau}, \tau \in [t,..., t-(N-1)]\})$

In [None]:
def RS(df: pd.DataFrame):
    """
    
    :param df:
    :return:
    """
    # 这里有未来数据调用的情况，因为每次计算使用的对象是区间完整的DataFrame,所以最后返回时只能最后一行
    value = (df['close'] - df['low'].min()).div((df['high'].max()-df['low'].min()))
    return value[-1]


# 全样本，N=shape[0]
# RS(df_prc)  # 返回一个值

# 滚动样本
df_prc['RS'] = rolling_group(df_prc, 3).apply(RS)
df_prc

In [None]:
# RS的第二种计算：显式计算
df_prc_exp = df_prc.copy(deep=True)
df_prc_exp['_div'] = (df_prc['high'].rolling(3).max() - df_prc['low'].rolling(3).min())
df_prc_exp['div'] = df_prc['close'].sub(df_prc['low'].rolling(3).min())
df_prc_exp['rs'] = df_prc_exp['div'].div(df_prc_exp['_div'])
df_prc_exp

## 纯Numpy版

In [None]:
# 首先将通过numpy从内存提取数据单独写成函数
def get_stride_values(df: pd.DataFrame, window=5):
    # v的第一列默认索引
    v = df.reset_index().values
    dim0, dim1 = v.shape
    stride0, stride1 = v.strides
    
    stride_values = stride(v, 
                           (dim0 - (window - 1), window, dim1), 
                           (stride0, stride0, stride1))
    return stride_values


window = 3
stride_values = get_stride_values(df, 3)
stride_values.shape, stride_values[0]

In [None]:
# 返回测试
def own_func_np(narr, **kwargs):
    """
    :param narr:
    :return:
    """
    idx = narr[-1, 0]
    
    c = narr[:, 1]
    d = narr[:, 2]
    return idx, (np.mean(c), np.max(c) + np.min(d))


dim0  = df.shape[0]
result_values = np.full((dim0, 2), np.nan)
result_idx = np.full((dim0, 2), pd.Timestamp(1999, 1, 1))

for i, values in enumerate(stride_values, window - 1):
    break

idx, data = own_func_np(values)


result_values[i,] = data
result_idx[i] = idx
# result_values, result_idx

In [None]:
# 编写纯numpy版rolling函数
def rolling_np(df: pd.DataFrame, apply_func: callable, *ret_sub_shapes,
               window=3, **kwargs):
    """
    rolling with multiple columns on 2 dim pd.Dataframe
      - pure numpy process
      - available for multi-level return matrix
      - call apply function with numpy ndarray
    :param df:
    :param window:
    :param apply_func:
    :param ret_sub_shapes: dims of returns' shapes
    :param kwargs:
    :return:
    """
    stride_values = get_stride_values(df, window=window)
    
    dim0 = df.shape[0]
    
    # 设置返回的值列和索引列
    result_values = np.full((dim0, *ret_sub_shapes), np.nan)
    result_index = np.full(dim0, pd.Timestamp(1990, 1,1)) # 这里index必须和apply_func返回的index类型一致
    
    # 循环读取
    for i, values in enumerate(stride_values, window - 1):
        # col 0 是索引列，其余是值列
        idx, data = apply_func(values, **kwargs)
        
        result_values[i,] = data
        result_index[i] = idx
    return result_index, result_values


ret_idx, ret_values = rolling_np(df, own_func_np, 2, window=3)
result_df = pd.DataFrame(ret_values, index=ret_idx)
result_df

### 运用1：Beta计算

#### 全样本计算

In [None]:
# 权重w和市场收益率mkt_ret模拟
w = np.array(range(4, 0, -1))/10
s_mkt_dt = pd.Series(range(1, len(df.index)+1), index=df.index)

df['_mkt'] = s_mkt_dt
df

In [None]:
# 方法1：矩阵乘法求解（下文集成为__mat_B函数
# Y: t*n，每一列是个股地时间序列回报，每一行是时间
# X: t*K, 每一列是因子时间序列收益率，每一行是时间
# B: K*n, 每一列是个股的因子暴露，每一行是一个因子
# 这里最好都是没有index的数据
X, Y = df['_mkt'], df[['C', 'D', 'E']].values
# 加入常数项
X = np.c_[np.ones(X.shape[0]), X] 
# 这里使用伪逆，避免矩阵不满秩带来的错误
# B = np.linalg.pinv(X.T@X)@X.T@Y

# 抽象为加权最小二乘回归（等权时即为OLS）
w = np.array(range(X.shape[0], 0, -1))
w = w/w.sum()

W = np.diag(w)
B = np.linalg.pinv(X.T@W@X)@X.T@W@Y


# E: 残差，alpha矩阵，t*n，每一列是个股时间序列上的alpha
E = Y - X@B
B

In [None]:
# 方法2：基于sm.WLS计算，结果同矩阵计算一致
# X = sm.add_constant(Y, prepend=False)
print(X)
model = sm.WLS(Y, X, weights=w).fit()
# 第一个是ret的系数，第二个是常数项，因为后续要用到残差，所以都保留
params = model.params
params

#### 滚动计算

In [None]:
def __mat_B(narr, w=None):
    """
    computing Weighted-Least-Square with matrix method
      - pure numpy process
    :param narr: numpy ndarray with col 0 the 
    :param w: weights series
    :return:
    """
    # col 0 是索引列
    idx = narr[:,0]
    # col -1 是X列，本例里是市场收益率s_mkt_ret
    X = narr[:, -1].astype(float)
    # col 1:-1 是Y列，本例里是个股的收益率矩阵
    Y = narr[:,1:-1].astype(float)
    
    t, n = Y.shape[0], Y.shape[1]
    
    # X的第一列是常数列，其余列为X列
    X = np.c_[np.ones(t), X]
    if w is None:
        w = np.ones(t)
    W = np.diag(w)
    B = np.linalg.pinv(X.T@W@X)@X.T@W@Y

    return idx[-1], B


# 全样本
__mat_B(df.reset_index().values)

# 滚动样本1 结果测试
stride1 = get_stride_values(df, window=3)[0]
__mat_B(stride1)

In [None]:
# 这里2*3是B的形状
ret_idx, ret_values = rolling_np(df, __mat_B, 2, 3, w=None, window=3)
# ret_idx, ret_values

In [None]:
# 这里列的顺序如下：
# 资产1的常数项系数、X1系数...Xm系数，资产2的常数项系数、X1系数...Xm系数，...，资产N的常数项系数、X1系数...Xm系数
df_betas = (
    pd.DataFrame(ret_values.reshape(ret_values.shape[0], 1, -1).squeeze(), index=ret_idx)
    # .values.reshape(values.shape[0], 2, -1)  # 转回原形状
)
df_betas

### 运用2：区间相对强弱指标计算

In [None]:
strides_prc = get_stride_values(df_prc, window=3)
strides_prc

In [None]:
def __np_RS(narr):
    
    # col 0 为索引列
    idx = narr[:, 0]
    
    # OHLC对应1-4列
    o, h, l, c = narr.T[1:5]
    
    value = (c - np.nanmin(l))/(np.nanmax(h)-np.nanmin(l))
    return idx[-1], value[-1]


idx_rs, values_rs = rolling_np(df_prc, __np_RS, 1, window=3)
df_prc['RS_np'] = values_rs
df_prc