In [2]:
import warnings
warnings.filterwarnings('ignore')
import jupyter_helper
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.model_selection import cross_val_score # K折交叉验证模块
from sklearn.preprocessing import MinMaxScaler

import QUANTAXIS as QA
import pandas as pd
import numpy as np
import pyecharts



In [74]:
s,e=jupyter_helper.get_start_end_date()
benchmark_code=jupyter_helper.get_benchmark_code()
stock_code='300378'
DAYS=5#计算时间段

In [75]:
def create_validate_df_close(df: pd.DataFrame,
                             days: int,
                             column: str = 'close') -> pd.DataFrame:
    """制作验证用数据。

    根据 df[column].shift(-days)` 查找指定天数后的指定列的数据作为验证数据。

    Args:
        df: 数据源
        days: 天数
        column: 使用的列
    """
    if column not in df.columns:
        raise ValueError('数据中不包含 {} 列。'.format(column))
    return pd.DataFrame(df[column].shift(-1 * days).values,
                        columns=[days],
                        index=df.index).dropna()


def get_calc_data(stock_code, s, e, fq='qfq',
                   drop_columns=['code', 'preclose', 'adj'],
                   scaler=['amount', 'volume'],
                   scaler_func=MinMaxScaler):
    """获取计算用数据源

    Args:
        fq: 是否采用复权数据。默认使用前复权。如果不需要复权则传''即可。
        stock_code:
        s:
        e:
        drop_columns: 丢弃的列
        scaler: 需要做归一化的数据列。
        scaler_func: 做归一化时默认使用的方法。
    """
    raw_data = QA.QA_fetch_stock_day_adv(stock_code, s, e)
    calc_data = raw_data.data.reset_index().set_index(
        'date').copy()
    if fq == 'qfq':
        qfq_data = raw_data.to_qfq().data.reset_index().set_index(
            'date')  # DataFrame格式
        calc_data = qfq_data.copy()
    elif fq == 'hfq':
        qfq_data = raw_data.to_qfq().data.reset_index().set_index(
            'date')  # DataFrame格式
        calc_data = qfq_data.copy()
    if drop_columns:
        calc_data = calc_data.drop(columns=drop_columns)  # 丢弃多余的列
    if scaler:
        calc_data[scaler] = scaler_func().fit_transform(
            calc_data[scaler])
    return calc_data

def packing_calc_data(df:pd.DataFrame,stock_code)->pd.DataFrame:
    return df


def get_fit_report(X, y, func, days, column: str = 'close') -> pd.DataFrame:
    """对测试数据集获取验证结果

    Args:
        X: 测试数据集
        func: 实现了`predict`方法的sklearn函数
        days: 天数

    Returns:

    """
    Z = X[[column]].rename(columns={column: '当日收盘价'}).join(
        y[[days]].rename(columns={days: '{}日后收盘价'.format(days)}))
    return Z.join(
        pd.DataFrame(func.predict(X), index=X.index).rename(columns={0: '预测值'}))


def plot_report(days, func, X,y, column: str = 'close', title='', subtitle=''):
    r = get_fit_report(X,y, func, days, column)
    line = pyecharts.Line('{} Days'.format(days) if not title else title,
                          subtitle=subtitle)
    for col in r.columns:
        line.add(col, r.index.date, r[col], datazoom_extra_type='both',
             is_datazoom_extra_show=True,
             datazoom_extra_orient='horizontal')
    # line.add('实际值', data.index.date, data[column], datazoom_extra_type='both',
    #          is_datazoom_extra_show=True,
    #          datazoom_extra_orient='horizontal')
    # line.add('预测值', r.index.date, r[0], datazoom_extra_type='both',
    #          is_datazoom_extra_show=True,
    #          datazoom_extra_orient='horizontal')
    return line

In [76]:
data=get_calc_data(stock_code,s,e)

留100条数据作为测试用

In [77]:
X=packing_calc_data(data,stock_code)
y=create_validate_df_close(X,DAYS)
X = X[X.index.isin(y.index)]

test_size=0.2
random_state=10

# 拆分数据源
X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                        test_size=test_size,
                                                        random_state=random_state)

lr=LinearRegression()

lr.fit(X_train,y_train)

LinearRegression(copy_X=True, fit_intercept=True, n_jobs=None,
         normalize=False)

## model得分

In [78]:
lr.score(X_test, y_test)

0.9504624199353563

## 交叉验证：评估估算器性能

学习预测函数的参数并在相同的数据上进行测试是一个方法上的错误：一个只重复它刚才看到的样本标签的模型会得到一个完美的分数但是却无法预测任何有用的东西 - 看不见的数据。这种情况称为过度拟合。为了避免这种情况，通常的做法是执行（监督的）机器学习实验以将部分可用数据作为测试集保存 。

https://scikit-learn.org/stable/modules/cross_validation.html

拟合模型并连续5次计算得分（每次使用不同的分割）来估计线性回归在数据集上的准确性：

In [80]:
scores = cross_val_score(lr,X,y,cv=10)
scores

array([ 0.40535   ,  0.75784643,  0.67425224,  0.61941016,  0.51683564,
        0.85384503,  0.89255133,  0.60100887,  0.63329812, -0.97775685])

得分估计的平均得分和95％置信区间由下式给出：

In [81]:
"Accuracy: %0.2f (+/- %0.2f)" % (scores.mean(), scores.std() * 2)

'Accuracy: 0.50 (+/- 1.02)'

### 指定 score 估算器方法

默认情况下，在每次CV迭代时计算的分数是score 估算器的方法。可以使用评分参数更改此设置：

https://scikit-learn.org/stable/modules/model_evaluation.html#common-cases-predefined-values

In [85]:
scores = cross_val_score(lr,X,y,cv=10,scoring='explained_variance')
scores

array([0.43702676, 0.77600904, 0.67552203, 0.62755732, 0.53546717,
       0.85751608, 0.91834929, 0.69412397, 0.66520303, 0.06237667])

### 指定不同的数据划分方式

https://scikit-learn.org/stable/modules/classes.html#module-sklearn.model_selection

In [90]:
from sklearn.model_selection import TimeSeriesSplit

scores = cross_val_score(lr,X,y,cv=TimeSeriesSplit(500))
scores.mean()

-2.6014992905099784e+26

## 数据集预处理

https://scikit-learn.org/stable/data_transforms.html#data-transforms

In [73]:
from sklearn.model_selection import LeavePOut
cross_val_score(lr,X,y,cv=LeavePOut(p=100)).mean()

KeyboardInterrupt: 

In [68]:
from sklearn.model_selection import ShuffleSplit

cross_val_score(lr,X,y,cv=ShuffleSplit(n_splits=500)).mean()

0.9762945917065329

In [69]:
plot_report(DAYS,lr,X,y)

In [71]:
X1=packing_calc_data(data[-100:],stock_code)
y1=create_validate_df_close(X1,DAYS)
X1 = X1[X1.index.isin(y1.index)]

plot_report(DAYS,lr,X1,y1)

In [62]:
new_df=QA.QAFetch.QATdx.QA_fetch_get_stock_day(stock_code,'2019-01-01',QA.QAUtil.QA_util_datetime_to_strdate(QA.QAUtil.QA_util_date_today()))
new_df=new_df[['open','close','high','low','vol','amount']]
new_df[['amount', 'vol']]=MinMaxScaler().fit_transform(new_df[['amount', 'vol']])
new_df=new_df.rename(columns={'vol':'volume'})
new_df.index=pd.to_datetime(new_df.index)

new_df=packing_calc_data(new_df,stock_code)

new_valid_df=create_validate_df_close(new_df,DAYS)

In [63]:
plot_report(DAYS,lr,new_df,new_valid_df)

In [60]:
def plot_report(days, func, X,y, column: str = 'close', title='', subtitle=''):
    r = get_fit_report(X,y, func, days, column)
    line = pyecharts.Line('{} Days'.format(days) if not title else title,
                          subtitle=subtitle)
    for col in r.columns:
        line.add(col, r.index.date, r[col], datazoom_extra_type='both',
             is_datazoom_extra_show=True,
             datazoom_extra_orient='horizontal',
            yaxis_min='dataMin')
    # line.add('实际值', data.index.date, data[column], datazoom_extra_type='both',
    #          is_datazoom_extra_show=True,
    #          datazoom_extra_orient='horizontal')
    # line.add('预测值', r.index.date, r[0], datazoom_extra_type='both',
    #          is_datazoom_extra_show=True,
    #          datazoom_extra_orient='horizontal')
    return line

In [50]:
get_fit_report(new_df,new_valid_df,lr,DAYS)

Unnamed: 0_level_0,当日收盘价,5日后收盘价,预测值
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2019-01-24,5.46,5.66,5.453533
2019-01-25,5.52,5.65,5.500801
2019-01-28,5.5,5.64,5.474241
2019-01-29,5.54,5.65,5.514067
2019-01-30,5.55,5.71,5.519596
2019-01-31,5.66,5.65,5.570472
2019-02-01,5.65,5.51,5.602343
2019-02-11,5.64,5.6,5.593567
2019-02-12,5.65,5.57,5.626585
2019-02-13,5.71,5.57,5.635974


In [54]:
lr.score(new_df[:-5],new_valid_df)

-4.342201649185873

In [52]:
new_df.shape,new_valid_df.shape

((16, 6), (11, 1))

In [61]:
plot_report(DAYS,lr,new_df,new_valid_df)