In [1]:
"""
data.py
Function:
    some label functions
Class:
    GetData_index

"""
import sys
sys.path.append('./dataquery')
import DataQuery.MysqlAPI as MysqlAPI
import DataQuery.DataToolkit as DataToolkit 
import re
import pymysql
import pandas as pd
import numpy as np
import datetime
import argparse
import csv
import logging
import os
from tqdm import tqdm,tqdm_notebook, trange
from sklearn.model_selection import train_test_split
from sklearn.metrics import matthews_corrcoef, confusion_matrix, multilabel_confusion_matrix
from matplotlib import pyplot as plt 
import pickle  
import talib

import torch
from torch.nn import CrossEntropyLoss, MSELoss
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import (DataLoader, RandomSampler, SequentialSampler,TensorDataset, Subset)
from torchvision import datasets, models, transforms



logging.basicConfig(level = logging.INFO)
logger = logging.getLogger(__name__)

In [2]:
import tushare as ts
print(ts.__version__)
# initial
ts.set_token('dbf6d939f7f819c12cbb82bd46d03c5df31c7c42e2201d8451e6c576')
pro = ts.pro_api()


1.2.45


In [3]:
import sys
sys.version

'3.7.0 (default, Oct  9 2018, 10:31:47) \n[GCC 7.3.0]'

# label

In [4]:
# label news according to markets data
def label_direction_3(trend):
    if abs(trend) <= 0.02:
        return 1
    return 2 if trend > 0 else 0

def label_direction_wy(trend):
    for index, i in enumerate([-0.005,0.005]):
        if trend < i:
            return index
    return index+1

def label_direction_5(trend):
    for index, i in enumerate([-0.02,-0.005,0.005,0.02]):
        if trend < i:
            return index
    #print(trend)
    return index+1
    
def label_direction_2(trend):
    return 0 if trend <= 0 else 1

def label_swing(trend):
    if abs(trend) <= 0.02:
        return 0
    return 1

In [5]:
label_direction_5(0.005)

3

In [6]:
def show_label_distribution(self, data_df):
    a = data_df['label'].value_counts(sort=False).sort_index()
    max_class_ratio = a.max()/a.sum()
    logger.info('%s\n'%code + str(a) + '\nmax_class = %s' % max_class_ratio)
    return a

In [7]:
def get_device(n):
    # move data to GPU
    device = torch.device("cuda:%d"%n if torch.cuda.is_available() else "cpu")
    #device = 'cuda:5'
    logger.info(device)
    return device

# data_prepare

In [8]:

class GetData_index():
    """
    key func:
        self.generate
        output:
            train_dataset, dev_dataset
            type : TensorDataset

    """

    def __init__(self,
                 task_name='index_P',
                 datapath='/home/wy506wd/data/',
                 train_date=["2016-01-01", "2019-07-01"],
                 index_code_list=['000300.SH', '000905.SH', '000016.SH'],
                 device=torch.device("cpu"),
                 days_for_train=15,
                 train_proportion=0.8,
                 fromtime='2009-10-01',
                 endtime='2019-07-01',
                 label_back_days=5,
                 label_func=label_direction_2,
                 normalization_parameters = {}
                 ):

        self.datapath = datapath
        # self.model_dir = '/home/wy506wd/download/'
        self.data_dir = self.datapath
        self.task_name = task_name
        self.fromtime = fromtime
        self.endtime = endtime
        self.index_code_list = index_code_list
        self.device = device
        self.days_for_train = days_for_train
        self.train_proportion = train_proportion
        self.label_back_days = label_back_days
        self.label_func = label_func

        self.dynamic_input_size = None
        self.basic_input_size = None
        self.output_size = None
        self.features = {}
        self.normalization_parameters = normalization_parameters

    def generate(self):
        data_dict = {}
        for index_code in self.index_code_list:
            logger.info(index_code)
            index_data = self.fetch_data(index_code)
            index_data = self._calculate_features(index_data)
            index_data = self.set_label(index_data, self.label_func, self.label_back_days)
            index_data = self._normalization(index_data)
            index_data = self.get_tushare_data(index_code, index_data)
            # basic_data = self.get_basic_data(index_data)
            # tecnical_data = self.get_technical_data(index_data)
            # save
            data_dict[index_code] = index_data

        self.data_dict = data_dict
        dataset = self.produce_dataset(data_dict)

        # return 'everything looks fine'
        return dataset

    def set_label(self, index_data, label_func, label_back_days):
        # back n days
        # logger.info('label_func = %s'%label_func)
        logger.info('label_back_days = %s' % label_back_days)

        tmp_column = (index_data['close'].shift(-label_back_days) - index_data['close']) / index_data['close']
        # tmp_column = index_data['return_mkt'] # 未来数据
        index_data['label'] = tmp_column.map(lambda x: label_func(x))
        self.output_size = index_data['label'].value_counts().shape[0]
        return index_data

    def produce_dataset(self, data_dict):
        _ = True
        for code, index_data in data_dict.items():
            logger.info(code)
            basic = self.get_basic_data(code,index_data)
            technical = self.get_technical_data(index_data)
            output = self.get_output_data(index_data)

            self.features[code] = [basic, technical, output]

            if _:
                all_basic = basic
                all_technical = technical
                all_output = output
                _ = False
            else:
                all_basic = np.vstack((all_basic, basic))
                all_technical = np.vstack((all_technical, technical))
                all_output = np.hstack((all_output, output))

        # train_dataset
        self.all_basic = all_basic
        self.all_technical = all_technical
        self.all_output = all_output
        basic, technical, output = self.cpu_2_device(all_basic, all_technical, all_output)
        logger.info(basic.shape)
        logger.info(technical.shape)
        logger.info(output.shape)
        self.dataset = [basic, technical, output]

        return self.dataset

    def get_input_by_index(self, index_code):
        basic, technical, output = self.features[index_code]
        basic, technical, output = self.cpu_2_device(basic, technical, output)
        return [basic, technical, output]

    def cpu_2_device(self, all_basic, all_technical, all_output):
        basic = torch.from_numpy(all_basic).float()
        technical = torch.from_numpy(all_technical).float()
        output = torch.from_numpy(all_output)

        basic = basic.to(self.device)
        technical = technical.to(self.device)
        output = output.to(self.device)
        return basic, technical, output

    def _normalization(self, index_data):
        # normalization
        # for column in ['open', 'close', 'high', 'low', 'volume', 'money']:
        #     index_data[column], df_max, df_min = DataToolkit.min_max_scale(index_data[column])
        for column in []:
            index_data[column], df_max, df_min = DataToolkit.standard_scale(index_data[column])
        logger.info(index_data.shape)
        return index_data

    def get_macd_factors(self, index_data: pd.DataFrame):
        '''
        MACD
        https://en.wikipedia.org/wiki/MACD
        '''

        stock_data = index_data['close']

        stock_data_12 = stock_data.rolling(window=12).mean()
        stock_data_26 = stock_data.rolling(window=26).mean()

        macd_dif = (stock_data_12 - stock_data_26)
        macd_macd = (stock_data_12 - stock_data_26).rolling(window=9).mean()

        return macd_dif, macd_macd

    def get_rsi_factors(self, index_data: pd.DataFrame):
        # TODO rsi
        close = index_data['close']
        real = RSI(close, timeperiod=14)

        return real

    def get_KDJ_factors(self, index_data: pd.DataFrame):
        '''
        随机指标
        back_n MA Data cal
        https://wiki.mbalib.com/wiki/%E9%9A%8F%E6%9C%BA%E6%8C%87%E6%A0%87
        '''

        stock_data = index_data['close']
        close = index_data['close']
        high = index_data['high'].rolling(window=9).max()
        low = index_data['low'].rolling(window=9).min()

        kd_k = (close - low) / (high - low)
        kd_d = kd_k.rolling(window=3).mean()
        return kd_k, kd_d

    def get_DMI_factors(self, index_data: pd.DataFrame):

        close = index_data['close']
        high = index_data['high']
        low = index_data['low']

        high_diff = np.maximum(high - high.shift(1), 0)
        low_diff = np.maximum(low.shift(1) - low, 0)

        dm_positive = high_diff.copy()
        dm_negative = low_diff.copy()

        dm_positive[high_diff <= low_diff] = 0
        dm_negative[high_diff > low_diff] = 0

        tr = np.maximum(np.maximum(abs(high - low), abs(low - close.shift(1))),
                        abs(high - close.shift(1)))  # .reset_index(drop=True)
        # print(tr)
        di_positive = dm_positive.rolling(window=14).mean() / tr.rolling(window=14).mean() * 100
        di_negative = dm_negative.rolling(window=14).mean() / tr.rolling(window=14).mean() * 100
        # print(di_positive, di_negative)
        # print(di_positive[di_positive==0].count())
        dx = abs(di_positive - di_negative) / abs(di_positive + di_negative)

        index_data['dmi_tr_ma14'] = tr
        index_data['dmi_di_ma14_positive'] = di_positive
        index_data['dmi_di_ma14_negative'] = di_negative
        index_data['dmi_dx14'] = dx
        index_data['dmi_adx_ma14'] = dx.rolling(window=14).mean()

        return index_data

    def get_standardized_moment(self, index_data: pd.DataFrame):
        '''
        偏度与峰度的计算
        https://en.wikipedia.org/wiki/Standardized_moment
        '''
        standardized_moment_data = index_data['return_mkt']
        index_data['standardized_moment_skewness'] = standardized_moment_data.rolling(window=30).skew()
        index_data['standardized_moment_kurtosis'] = standardized_moment_data.rolling(window=30).kurt()
        return index_data

    def get_chaikin_oscillator(self, index_data: pd.DataFrame):
        '''
        蔡金
        https://en.wikipedia.org/wiki/Chaikin_Analytics
        chaikin_oscillator
        '''
        Money_Flow_Volume = ((index_data['close'] - index_data['low']) - (index_data['high'] - index_data['close'])) / (
                index_data['high'] - index_data['low']) * index_data['volume']
        adl = Money_Flow_Volume.cumsum()
        index_data['chaikin_oscillator'] = adl.rolling(window=3).mean() - adl.rolling(window=10).mean()
        return index_data

    #     def get__factors(self,index_data :pd.DataFrame):
    #         pass
    #     def get__factors(self,index_data :pd.DataFrame):
    #         pass
    #     def get__factors(self,index_data :pd.DataFrame):
    #         pass

    def _calculate_features(self, index_data: pd.DataFrame):

        # 动力指标 momentum_n
        for i in [2, 4, 8, 16]:
            for code in self.index_code_list:
                index_data[f'momentum_{i}'] = index_data.close - index_data.close.shift(i)
            # index_data['momentum']

        # 心理线 psychological_line
        back_days = 10
        index_data['psychological_line'] = 0
        for d in range(back_days):
            index_data['psychological_line'] += index_data.return_mkt.shift(d).map(lambda x: 1 if x > 0 else 0)
        index_data['psychological_line'] = index_data['psychological_line'] / back_days  # *100
        # index_data['psychological_line']

        # 容量比率 Volumn Ratio
        back_days = 5
        VUP = index_data.open.map(lambda x: 0)
        V = index_data.open.map(lambda x: 0)
        for d in range(back_days):
            VUP += index_data.return_mkt.shift(d).map(lambda x: 1 if x > 0 else 0) * index_data.volume.shift(d)
        for d in range(back_days):
            V += index_data.volume.shift(d)
        index_data['VR'] = VUP / V  # *100
        # index_data['VR']

        # calculate kinds of factors
        index_data['macd_dif'], index_data['macd_macd'] = self.get_macd_factors(index_data)
        index_data['kd_k'], index_data['kd_d'] = self.get_KDJ_factors(index_data)
        index_data = self.get_DMI_factors(index_data)
        index_data = self.get_standardized_moment(index_data)
        index_data = self.get_chaikin_oscillator(index_data)

        index_data = DataToolkit.fillna_data(index_data, fillna_drop_narows=True)
        return index_data

    def fetch_data(self, index_code):
        # get database data
        m = MysqlAPI.MysqlAPI()
        table_column_dict = {'index_data':
                                 ['open',
                                  'close',
                                  'high',
                                  'low',
                                  'volume',
                                  'money',
                                  'return_mkt']}

        # print(code)
        index_data = m.query_index_data(index_code_list=[index_code],
                                        trade_date_list=[[self.fromtime, self.endtime]],
                                        table_column_dict=table_column_dict)
        # remove none
        index_data = DataToolkit.fillna_data(index_data, fillna_drop_narows=True)
        # print(index_data.shape, data_dict[index_data].shape)

        return index_data

    def get_tushare_data(self, index_code, index_data):
        # get tushare data
        dailybasic1 = pro.index_dailybasic(end_date='20070603', ts_code=index_code)
        dailybasic2 = pro.index_dailybasic(end_date='20200101', ts_code=index_code)

        dailybasic = pd.concat([dailybasic1, dailybasic2])  # .drop_duplicates()

        dailybasic.rename(columns={'ts_code': 'index_code'}, inplace=True)

        def str_2_time(s):
            t = pd.Timestamp(s)
            return t

        dailybasic['trade_date'] = dailybasic['trade_date'].map(str_2_time)
        dailybasic.set_index(['index_code', 'trade_date'], inplace=True)
        # merge
        index_data = pd.merge(index_data, dailybasic, left_index=True, right_index=True, how='left')
        return index_data

    def get_technical_data(self, index_data, drop_list=None):
        logger.info('GETTING TECHNICAL DATA')
        # database data
        drop_list = ['label', 'open', 'close', 'high', 'low', 'money']
        # tushare data
        drop_list += ['pe', 'pb', 'total_mv', 'float_mv', 'total_share', 'float_share', 'free_share']
        index_data = index_data.drop(drop_list, axis=1)
        self.dynamic_input_size = index_data.shape[1]
        logger.info('dynamic_input_size = %s' % self.dynamic_input_size)
        # print(index_data.head())
        _ = DataToolkit.shift_by_stock_code(index_data, self.days_for_train - 1, 'forward')
        print(_.shape)
        all_technical = _.values.reshape(_.shape[0], self.dynamic_input_size, self.days_for_train)[:-1]
        all_technical = all_technical.swapaxes(1, 2)
        print(all_technical.shape)
        return all_technical

    def get_basic_data(self, index_code, index_data):
        logger.info('GETTING BASIC DATA')
        basic_list = ['pb', 'pe', 'pe_ttm', 'turnover_rate', 'turnover_rate_f']
        normalization_list = ['total_mv', 'float_mv', 'total_share', 'float_share', 'free_share','close']
        

        if index_code not in self.normalization_parameters:
            self.normalization_parameters[index_code] = {}
        
        for column in normalization_list:
            if column not in self.normalization_parameters[index_code]:
                index_data[column], df_max, df_min = DataToolkit.min_max_scale(index_data[column])
                self.normalization_parameters[index_code][column] = [df_max, df_min]
                print(column)
                print(df_max, df_min)
            else:
                df_max, df_min = self.normalization_parameters[index_code][column]
                index_data[column], df_max, df_min = DataToolkit.min_max_scale(index_data[column], df_max=df_max, df_min=df_min)
        
        all_basic = index_data[basic_list + normalization_list]
        all_basic = all_basic[self.days_for_train:].values
        self.basic_input_size = all_basic.shape[1]
        return all_basic

    def get_output_data(self, index_data):
        logger.info('GETTING OUTPUT DATA')
        # construct label features
        output = index_data['label'][self.days_for_train:]
        output = output.values
        logger.info(output.shape)
        return output

In [11]:
if __name__ == '__main__':
    data_class = GetData_index(fromtime='2005-01-01',
                               index_code_list = ['399001.SZ','000905.SH','000001.SH','000300.SH']
                              )
    data_class.generate()

    print(data_class.all_basic)

INFO:__main__:399001.SZ
INFO:__main__:label_back_days = 5
INFO:__main__:(3487, 26)
INFO:__main__:000905.SH
INFO:__main__:label_back_days = 5
INFO:__main__:(2997, 26)
INFO:__main__:000001.SH
INFO:__main__:label_back_days = 5
INFO:__main__:(3487, 26)
INFO:__main__:000300.SH
INFO:__main__:label_back_days = 5
INFO:__main__:(3426, 26)
INFO:__main__:399001.SZ
INFO:__main__:GETTING BASIC DATA
INFO:__main__:GETTING TECHNICAL DATA
INFO:__main__:dynamic_input_size = 23
INFO:__main__:GETTING OUTPUT DATA
INFO:__main__:(3472,)
INFO:__main__:000905.SH
INFO:__main__:GETTING BASIC DATA
INFO:__main__:GETTING TECHNICAL DATA
INFO:__main__:dynamic_input_size = 23


(3435, 345)
(3434, 15, 23)


INFO:__main__:GETTING OUTPUT DATA
INFO:__main__:(2982,)
INFO:__main__:000001.SH
INFO:__main__:GETTING BASIC DATA
INFO:__main__:GETTING TECHNICAL DATA
INFO:__main__:dynamic_input_size = 23


(2945, 345)
(2944, 15, 23)


INFO:__main__:GETTING OUTPUT DATA
INFO:__main__:(3472,)
INFO:__main__:000300.SH
INFO:__main__:GETTING BASIC DATA
INFO:__main__:GETTING TECHNICAL DATA
INFO:__main__:dynamic_input_size = 23


(3435, 345)
(3434, 15, 23)


INFO:__main__:GETTING OUTPUT DATA
INFO:__main__:(3411,)
INFO:__main__:torch.Size([13337, 10])
INFO:__main__:torch.Size([13185, 15, 23])
INFO:__main__:torch.Size([13337])


(3374, 345)
(3373, 15, 23)
[[ 2.16       14.36       12.87       ... -0.03826821 -0.05046827
  -0.05714737]
 [ 2.15       14.33       12.85       ... -0.03826821 -0.05046827
  -0.05714737]
 [ 2.16       14.36       12.88       ... -0.03826821 -0.05046827
  -0.05714737]
 ...
 [ 1.49       12.84       12.48       ...  0.99822568  0.9813687
   0.97791415]
 [ 1.48       12.8        12.44       ...  0.9999973   0.98321114
   0.97958727]
 [ 1.52       13.11       12.74       ...  1.          0.98371623
   0.98066164]]


In [10]:
# data_class.t_all_basic[0]