In [1]:
import os
import sys
import inspect

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from collections import defaultdict

from basefactor import fetcher
from factorbase import FactorBase
from basicfunc import *
from localsimulator import *
from dataclean import *
from operation import Operation

c:\Users\hazc\anaconda3\Lib\site-packages\iFinDPy.pth


# 检查一个完整的模拟流程

## 1.准备数据

In [15]:
class FactorPool(FactorBase):
    def __init__(self):
        super(FactorPool, self).__init__()
        self.start_date = "20200201"
        self.end_date = "20231231"

        self.winsorize = False
        self.zscore = False
        self.ind_neu = False
        self.fillna = False

        self.group_detail = "off"
        self.stockwise_export = "off"
        self.prefix = "tzt"
        self.path = "./pnl"

        self.need_fields = ["OPEN", "CLOSE", "VOL", "VWAP"]

    def alpha_1(self):
        open = self.need_data["OPEN"]
        vol = self.need_data["VOL"]
        alpha = -1 * Operation.corr(open, vol, 10, 2)
        return alpha

    def alpha_2(self):
        close = self.need_data["CLOSE"]
        open = self.need_data["OPEN"]
        alpha = (close - open) 
        return alpha

fpool = FactorPool()

In [16]:
fpool.get_constants()
fpool.load_data()
fpool.calculate_alpha()

2024-05-29 15:48:42 [INFO] 2 alphas will be calculated (Simsummary in alphabetical order).
2024-05-29 15:48:42 [INFO] Alphas calculated.


## 检查单个因子回测的错误性

In [17]:
startdate, enddate = fpool.start_date, fpool.end_date
actdays = basicfunc.get_datelist(startdate, enddate, 1, -1)
iszt = basicfunc.loadcache(actdays[0], enddate, "ISZT")
date_list = basicfunc.loadcache(actdays[0], enddate, "DAYS")
istp = basicfunc.loadcache(actdays[0], enddate, "ISTP")
vwap_ret = basicfunc.loadcache(actdays[0], enddate, "VWAPRET") / 100
index_ret = basicfunc.loadcache(actdays[0], enddate, "IRE500")
group = basicfunc.loadcache(actdays[0], enddate, "WIND01")

startdi_new = date_list.tolist().index(actdays[1])
enddi_new = startdi_new + len(actdays) - 1

tickers = basicfunc.loadcache(actdays[0], actdays[-1], "STOCKS")

In [26]:
simu = Simulator()
histdays = enddi_new - startdi_new + 1
bt_date = date_list[startdi_new:enddi_new + 1].copy()
alpha = fpool.alpha[1]
raw_alpha = simu.keepzdt(alpha.copy(), startdi_new, enddi_new, iszt, istp)
alpha = simu.keepzdt(alpha, startdi_new, enddi_new, iszt, istp)
alpha = simu.scalebook(alpha)
palpha = alpha[0 : enddi_new - startdi_new + 1].copy()
palpha[palpha <= 0] = np.nan
nalpha = alpha[0 : enddi_new - startdi_new + 1].copy()
nalpha[nalpha > 0] = np.nan
alpha[np.isnan(alpha)] = 0.0
palpha[np.isnan(palpha)] = 0.0
nalpha[np.isnan(nalpha)] = 0.0
logresult = ""
recordstr = ""

In [27]:
dumpum = np.sum(~(alpha[0 : enddi_new - startdi_new + 1] == 0), axis=1)
dumdays = np.where(dumpum <= 50)[0]


# check the situation that one stock has over weight in the trading day
ta = np.nanmax(palpha, axis=1) / np.nansum(palpha, axis=1)
tb = np.nanmax(nalpha, axis=1) / np.nansum(nalpha, axis=1)
ta[np.isinf(ta)] = 0.0
tb[np.isinf(tb)] = 0.0
tadays = np.where(ta > 0.1)[0]
tbdays = np.where(tb > 0.1)[0]

In [28]:
retmatrix = np.r_[
    np.full((1, alpha.shape[1]), 0.0),
    alpha[0: histdays - 1] * vwap_ret[startdi_new + 1: enddi_new + 1],
]

holdpnl = np.r_[
            np.nansum(np.full((2, alpha.shape[1]), 0.0), axis=1),
            np.nansum(np.minimum(alpha[1:histdays-1], alpha[0 : histdays - 2])*vwap_ret[startdi_new+2 : enddi_new + 1], axis=1),
        ]

tradepnl = np.nansum(retmatrix, axis=1) - holdpnl
tradecap = np.round(
    np.r_[
        np.nansum(np.abs(alpha[0])),
        np.nansum(np.abs(alpha[1:histdays] - alpha[0:histdays-1]), axis=1),
        ],
        decimals=2,
)

In [31]:
delta = alpha[1:histdays] - alpha[0 : histdays - 1]
pdelta = palpha[1:histdays] - palpha[0 : histdays - 1]
ndelta = nalpha[1:histdays] - nalpha[0 : histdays - 1]
poscost = np.r_[
            tradecap[0] * 0.0002 * 1, # 0.0002 is the transaction cost
            np.nansum(np.where(delta > 0, delta, 0), axis=1) * 0.0002 * 1,
        ]
negcost = np.r_[
            0.0, np.nansum(np.where(delta < 0, delta, 0), axis=1) * -0.0012 * 1
        ]

pospnl = np.around(
            np.r_[
                0.0,
                np.nansum(
                    np.where(alpha[0 : histdays - 1] > 0, retmatrix[1:], 0), axis=1
                ),
            ]
            - poscost
            - poscost,
            decimals=2,
        )

# * Use xarray to calc IC now(behavior like df.corr but faster 300x)
IC = np.around(
            np.r_[
                0.0,
                xr.corr(
                    xr.DataArray(alpha[0 : enddi_new - startdi_new], dims=("date", "ticker")),
                    xr.DataArray(
                        vwap_ret[startdi_new + 1 : enddi_new + 1], dims=("date", "ticker")
                    ),
                    dim="ticker",
                ),
            ],
            decimals=6,
        )

inx_ret = np.around(
            np.r_[0.0, index_ret[startdi_new + 1 : enddi_new + 1]], decimals=6
        )
        
        # Coverage Rate
poscov = np.around(np.nansum(alpha > 0, axis=1), decimals=1)
negcov = np.around(np.nansum(alpha < 0, axis=1), decimals=1)

longsize = np.around(np.nansum(palpha, axis=1), decimals=1)
shortsize = np.around(np.nansum(nalpha, axis=1), decimals=1)
pnl = np.around(np.nansum(retmatrix, axis=1) - poscost - negcost, decimals=2)
ret = np.around(pnl / np.nansum(abs(alpha), axis=1), decimals=6)
ret[np.isnan(ret)] = 0.0

## 2. 开始回测