In [13]:
import functools
from typing import Callable, Dict, List, Tuple, Union

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scr.activation_funcs import ReLU, identity, sign, tanh
from scr.binary_operators import (
    get_x,
    get_y,
    operators_add,
    operators_diff,
    operators_max,
    operators_min,
    operators_multiple,
    x_is_greater_than_y,
)
from scr.trader import Trader
from tqdm.notebook import tqdm

In [1]:
# import torch
# from torch.nn import (Tanh,ReLU)

# def dynamics(y_t:torch.tensor,sigma):

#     a,b = y_t
#     y_next = torch.zeros_like(y_t)
#     size = y_t.shape[1]
#     relu = ReLU()

#     y_next[0] =  1.0 * torch.tanh(a) + 0.8 * a * b + 1.0 * b - 1.0 * relu(torch.minimum(a,b)) + sigma * torch.randn(size)
#     y_next[1] = 0.6 * torch.sign(b) + 0.5 * a * b - 1.0 * torch.maximum(a,b) + sigma * torch.randn(size)

#     return y_next

# sigma_true = 0.1

# T_total = 2000
# y = torch.zeros((2, T_total))
# y_without_noise = torch.zeros((2, T_total))

# name_stock = dynamics(y,sigma_true)
# noise = dynamics(name_stock,0)

# # Nomura = name_stock[0]
# # PFN = name_stock[1]
# name_stock[:,0] = torch.tensor([0.5,0.5])
# noise[:,0] = torch.tensor([0.5,-0.5])
# name_stock_label = ["Nomura", "PFN"]

# plt.figure(figsize=(18,5))
# plt.xlim([T_total - 100,T_total])
# plt.plot(name_stock[0].numpy(), color = "#cc0000", label = name_stock_label[0])
# plt.plot(name_stock[1].numpy(), color = "#083090", label = name_stock_label[1])
# plt.plot(noise[0].numpy(), color = "#cc0000", linestyle = "--", label = name_stock_label[0] + "(w/o noise)")
# plt.plot(noise[1].numpy(), color = "#083090", linestyle = "--", label = name_stock_label[1] + "(w/o noise)")
# plt.xlabel("time", fontsize = 18)
# plt.ylabel("y", fontsize = 18)
# plt.legend()

In [37]:
def dynamics(y_t: np.ndarray, sigma: float):

    a, b = y_t
    y_next = np.zeros_like(y_t)
    size = y_t.shape[1]

    y_next[0] = 1.0 * np.tanh(a) + 0.8 * a * b + 1.0 * b - 1.0 * ReLU(
        np.minimum(a, b)) + sigma * np.random.randn(size)
    y_next[1] = 0.6 * np.sign(b) + 0.5 * a * b - 1.0 * np.maximum(
        a, b) + sigma * np.random.randn(size)

    return y_next


T_total = 2000
sigma = 0.1
y = np.zeros((2, T_total))
data = dynamics(y, sigma)

In [15]:

# 如果有N个交易员的第I个股票用create_formulae生成一组formulae
def create_formulae(M: int,
                    A: List[Callable],
                    O: List[Callable],
                    stock_num: int,
                    max_lag: int = 9,
                    l: int = 1,
                    seed: int = None) -> List[Callable]:
    """构造$\Theta$
       $\Theta=\sum^{M}_{j}w_{j}A_{j}(O_{j}(r_{P_{j}}[t-D_{j}],r_{Q_{j}}[t-F_{j}]))$
       
    Args:
        M (int): 每位交易员表达式最大项数
        A (List[Callable]): 激活函数列表
        O (List[Callable]): 二元操作符函数列表
        stock_num (int): 股票个数
        max_lag (int, optional): 数据延迟最大取值. Defaults to 9.
        l (int, optional): 交易延迟量,即观察到数据后不可能立马进行交易,需要等待l时间. Defaults to 1.
        seed (int, optional): 随机数种子. Defaults to None.

    Returns:
        List: _description_
    """
    def _formula_func(data: np.ndarray, active_func: Callable,
                      binary_oper: Callable, P: int, Q: int, F: int,
                      D: int) -> float:
        """公式生成

        Args:
            data (np.ndarray): 传入数据0轴为股票 1为时间

        """

        t = data.shape[1]  # 获取时间长度
        if t < (max_lag + l):
            raise ValueError('数据时间序列长度(t=%s)不能小于max_lag + l(max_lag+l=%s)!' %
                             (t, max_lag + l))

        indices: np.ndarray = np.arange(max_lag + l, t)

        x: np.ndarray = np.take(data[P], indices - D)

        y: np.ndarray = np.take(data[Q], indices - F)

        # return active_func(binary_oper(data[P][t - D], data[Q][t - F]))
        return active_func(binary_oper(x, y))

    if seed:

        np.random.seed(seed)

    m = np.random.choice(M)  # 根据M最大项数选择个数

    # 构建延迟数
    a = np.arange(1, max_lag + l + 1)  # 最少为1天
    D: np.ndarray = np.random.choice(a, m)
    F: np.ndarray = np.random.choice(a, m)

    # 随机选择股票
    P: np.ndarray = np.random.choice(stock_num, m)
    Q: np.ndarray = np.random.choice(stock_num, m)

    # 列表中存放构成公式的"算子"
    formulae: np.ndarray = np.array(
        [np.random.choice(A, m),
         np.random.choice(O, m), P, Q, D, F],
        dtype='object').T

    # 列表中的func后续近仅需要传入data及对应的T即可得到返回值
    formulae_ls_func: List[Callable] = [
        functools.partial(_formula_func,
                          active_func=row[0],
                          binary_oper=row[1],
                          P=row[2],
                          Q=row[3],
                          F=row[4],
                          D=row[5]) for row in formulae
    ]

    return formulae_ls_func


def ols_func(exog: np.ndarray, endog: np.ndarray) -> np.ndarray:

    A: np.ndarray = np.c_[np.ones(len(exog)), exog]
    b: np.ndarray = np.linalg.lstsq(A, endog, rcond=None)[0]
    
    return b

In [None]:
class Trader(object):
    def __init__(self,
                 M: int,
                 A: List[Callable],
                 O: List[Callable],
                 stock_num: int,
                 max_lag: int = 9,
                 l: int = 1,
                 seed: int = None) -> None:

        activation_funcs: List[Callable] = [identity, ReLU, sign, tanh]
        binary_operators: List[Callable] = [
            operators_max, operators_min, operators_add, operators_diff, get_x,
            get_y, operators_multiple, x_is_greater_than_y
        ]

        if seed:
            np.random.seed(seed)

        # 生成公式
        ## List中的下表对应的是数据中的股票
        self.formulate: List[List[Callable]] = [
            create_formulae(M,
                            activation_funcs,
                            binary_operators,
                            stock_num=stock_num,
                            max_lag=max_lag,
                            l=l) for _ in range(stock_num)
        ]
        # 生成权重
        ## List中的下表对应的是数据中的股票
        self.weight: List[np.ndarray] = self.get_randn_weight()

    def get_randn_weight(self) -> List[np.ndarray]:
        """随机生成权重"""

        return [np.random.randn(len(formula)) for formula in self.formulate]

    def calc_factors(self, data: np.ndarray) -> None:

        factors: List = []  # 储存因子

        for stocki_formula in self.formulate:

            p: List = [formula(data) for formula in stocki_formula]

            p: np.ndarray = np.vstack(p).T

            factors.append(p)

        # 获取因子
        self.factors: List[np.ndarray] = factors

    def learn(self, endog: np.ndarray, func: Callable = ols_func) -> None:

        for num, factor in enumerate(self.factors):

            self.weight[num] = ols_func(factor, endog)

    def predict(self) -> np.ndarray:

        y_pred: List = []
        for factor, w in zip(self.factors, self.weight):
            y_pred.append(factor[-1] @ w)

        return np.vstack(y_pred).T


class Company(object):
    def __init__(self,
                 stock_names: List[str],
                 M: int,
                 max_lag: int,
                 l: int,
                 activation_funcs: List[Callable],
                 binary_operators: List[Callable],
                 traders_num: int,
                 Q: float = 0.5,
                 seed: int = None) -> None:

        if seed:
            np.random.seed(seed)

        self.traders: List[Trader] = [
            Trader(M, activation_funcs, binary_operators, len(stock_names),
                   max_lag, l) for i in range(traders_num)
        ]

    def fit(self, train_data: np.ndarray):

        pass

In [40]:
# 假设只有一个交易员
# 第一个交易员的第0个股票的情况

activation_funcs:List[Callable] = [identity, ReLU, sign, tanh]
binary_operators:List[Callable] = [
    operators_max, operators_min, operators_add,
                                  operators_diff, get_x, get_y, operators_multiple, x_is_greater_than_y
]

# 一位trader的Theta
formulate:List = create_formulae(10,activation_funcs,binary_operators,stock_num=2,max_lag=2,l=1,seed=42)
# 根据M生成的实际因子数
factor_num:int = len(formulate)
# 生成初始权重数据
w:np.ndarray = np.random.randn(factor_num) 

In [55]:
# 模型训练的过程
# train训练
p:List = []

for num,formula in enumerate(formulate):
    
    p.append(formula(data[:,:20]))
    
p:np.ndarray = np.vstack(p).T