In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
print(tf.__version__)

In [None]:
data = pd.read_csv('5min_data.csv',encoding="gb2312")
data.columns=['date','open','high','low','close','volumn']  
data = pd.DataFrame(data,columns=['open','high','low','close','volumn'])
data.head()

In [None]:
# 特征数量
features_num = len(data.columns) - 1
# 定义观察时间窗口120/170/220/270
observe_time = 120
# 定义预测时间窗口5/10/15
predict_time = 5
# 一组时间窗口
group_time = observe_time + predict_time

### 分割特征及标签

In [None]:
features,returns = list(),list()
for i in range(len(data.close)-group_time):
    features.append(np.array(data[i:i+observe_time]))
    returns.append(data.close[i+group_time]-data.close[i+observe_time])
features = np.array(features)
returns = np.array(returns)
print(features.shape,returns.shape)

### 分割训练集和测试集

In [None]:
alpha = 0.8
train_length = int(len(features)*alpha)

train_data = features[:train_length]
test_data = features[train_length:]

train_return = returns[:train_length]
test_return = returns[train_length:]

### 根据收益率实现三分类打标签

In [None]:
def segmentation(features,returns,per):
    neg_list,pos_list,mid_list = list(),list(),list()
    neg_value = round(float(sorted(returns)[int(len(returns)*per):int(len(returns)*per)+1][0]),2)
    pos_value = round(float(sorted(returns)[int(len(returns)*(1-per)):int(len(returns)*(1-per))+1][0]),2)
    mid_left_value = round(float(sorted(returns)[int(len(returns)*(0.5*(1-per))):int((len(returns)*(0.5*(1-per))))+1][0]),2)
    mid_right_value = round(float(sorted(returns)[int(len(returns)*(0.5*(1+per))):int((len(returns)*(0.5*(1+per))))+1][0]),2)
    print('正样本最小值:%.2f\t中样本范围:%.2f~%.2f\t负样本最大值:%.2f'%(pos_value,mid_left_value,mid_right_value,neg_value))
    data_x = list()
    data_y = list()
    for i in range(len(returns)):
        if returns[i]<=neg_value:
            data_x.append(features[i])
            data_y.append(0)
        elif mid_left_value<=returns[i]<=mid_right_value:
            data_x.append(features[i])
            data_y.append(1)            
        elif returns[i]>=pos_value:
            data_x.append(features[i])
            data_y.append(2)
        else:
            continue
    data_x = np.array(data_x)
    data_y = np.array(data_y)
    data_x = data_x.reshape(data_x.shape[0],data_x.shape[1],data_x.shape[2],1)
#     data_y = data_y.reshape(data_y.shape[0],1)
    return data_x,data_y

In [None]:
train_x,train_y = segmentation(train_data,train_return,per=0.1)
print(train_x.shape,train_y.shape)

In [None]:
test_x,test_y = segmentation(test_data,test_return,per=0.1)
print(test_x.shape,test_y.shape)

In [None]:
model = keras.Sequential()
model.add(layers.Conv2D(input_shape=(train_x.shape[1], train_x.shape[2], train_x.shape[3]),
                        filters=32, kernel_size=(3,3), strides=(1,1), padding='same',
                       activation='relu'))
model.add(layers.BatchNormalization())
model.add(layers.MaxPool2D(pool_size=(2,2)))

In [None]:
model.add(layers.Conv2D(input_shape=(train_x.shape[1], train_x.shape[2], train_x.shape[3]),
                        filters=16, kernel_size=(3,3), strides=(1,1), padding='same',
                       activation='relu'))
model.add(layers.MaxPool2D(pool_size=(2,2)))
model.add(layers.Flatten())
model.add(layers.Dense(16, activation='relu'))
model.add(layers.Dense(3, activation='softmax'))

In [None]:
model.compile(optimizer=keras.optimizers.Adam(),
             # loss=keras.losses.CategoricalCrossentropy(),  # 需要使用to_categorical
             loss=keras.losses.SparseCategoricalCrossentropy(),
              metrics=['accuracy'])
model.summary()

In [None]:
history = model.fit(train_x, train_y, batch_size=64, epochs=20, validation_split=0.1)

In [None]:
import matplotlib.pyplot as plt
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.legend(['training', 'valivation'], loc='upper left')
plt

In [None]:
res = model.evaluate(test_x, test_y)