<a href="https://colab.research.google.com/github/clearpoem/practice/blob/main/stock_forecast.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import os
path="/content/sample_data/"
os.chdir(path)


In [1]:
import pandas as pd
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt


def get_local_data(file_name, test_split, feature_dim, sc):
    """本地获得训练数据和测试数据"""
    df = pd.read_csv(file_name, encoding='gbk')  # 原始文件前面feature_dim列只保存所需要作为输入的特征
    len_data = len(df)
    len_train_data = int(len_data * (1 - test_split))
    train_data_ori = df.iloc[:len_train_data, :feature_dim].values  # 按比例切割训练数据和测试数据
    test_data_ori = df.iloc[len_train_data:len_data, :feature_dim].values
    # 把训练数据和测试数据分别归一化
    train_data = sc.fit_transform(train_data_ori)
    test_data = sc.transform(test_data_ori)
    return train_data, test_data


def get_online_data():
    """线上获得训练数据和测试数据"""
    pass


def make_data_set(train_data, test_data, len_seq, feature_dim):
    """制作训练集和测试集"""

    x_train = []
    y_train = []
    x_test = []
    y_test = []
    # 制作训练集
    for i in range(0, len(train_data) - len_seq):
        x_train.append(train_data[i:i + len_seq])

        y_train.append(train_data[i + len_seq, feature_dim-1])
    # 打乱训练集数据
    np.random.seed(7)
    np.random.shuffle(x_train)
    np.random.seed(7)
    np.random.shuffle(y_train)
    tf.random.set_seed(7)
    # 设置成array格式
    x_train, y_train = np.array(x_train), np.array(y_train)
    # 设置成lstm输入格式
    x_train = np.reshape(x_train, (x_train.shape[0], len_seq, feature_dim))
    # 制作测试集
    for i in range(0, len(test_data) - len_seq):
        x_test.append(test_data[i:i + len_seq])
        y_test.append(test_data[i + len_seq, feature_dim-1])

    x_test, y_test = np.array(x_test), np.array(y_test)
    x_test = np.reshape(x_test, (x_test.shape[0], len_seq, feature_dim))
    print("数据集制作完毕！")
    return x_train, y_train, x_test, y_test


def get_real_and_pre_data(sc, test_data, len_seq, fea_dim, model, x_test):
    real_data = []
    # 获得原始数据
    test_data = sc.inverse_transform(test_data)
    for i in range(0, len(test_data) - len_seq):
        real_data.append(test_data[i + len_seq, fea_dim-1])
    # 预测结果
    predict_data = model.predict(x_test)
    # 修改格式和测试集一致
    temp = predict_data
    for i in range(0, fea_dim-1):
        predict_data = np.column_stack((predict_data, temp))
    #  执行反归一化并获取最后一列
    predict_data = sc.inverse_transform(predict_data)[:, fea_dim-1]
    return real_data, predict_data


def show_loss(history):
    """loss可视化"""
    loss = history.history['loss']
    val_loss = history.history['val_loss']
    plt.plot(loss, label='Training Loss')
    plt.plot(val_loss, label='Validation Loss')
    plt.title('train and val loss')
    plt.legend()
    plt.show()


def show_acc(real_data, pre_data):
    """计算展示准确率"""
    acc1_num = 0
    acc2_num = 0
    acc3_num = 0
    for i in range(0, len(real_data)):
        threshold1 = real_data[i] * 0.0005
        threshold2 = real_data[i] * 0.001
        threshold3 = real_data[i] * 0.01
        if abs(real_data[i] - pre_data[i]) < threshold1:
            acc1_num += 1
        if abs(real_data[i] - pre_data[i]) < threshold2:
            acc2_num += 1
        if abs(real_data[i] - pre_data[i]) < threshold3:
            acc3_num += 1
    acc1 = acc1_num/len(real_data)
    acc2 = acc2_num / len(real_data)
    acc3 = acc3_num / len(real_data)
    print("准确率(0.00005):%f" % acc1)
    print("准确率(0.001):%f" % acc1)
    print("准确率(0.01):%f" % acc1)
    print("准确率计算完毕！")


def show_test_res(real_data, pre_data):
    """测试效果可视化"""
    plt.plot(pre_data, color='red', label="pre_price")
    plt.plot(real_data, color='blue', label="real_price")
    plt.title("stoce_price_forecast")
    plt.xlabel('Trading_Day')
    plt.ylabel('Price')
    print("预测效果图绘制完毕！")
    plt.legend()
    plt.show()


def get_weights(model):
    """参数提取"""
    file = open("./wights.txt", 'w')
    for v in model.trainable_variables:
        file.write(str(v.name)+'\n')
        file.write(str(v.shape) + '\n')
        file.write(str(v.numpy()) + '\n')
    print("参数提取完毕！")
    file.close()


if __name__ == '__main__':
    pass

    # print(len(x))
    # print(len(y))
    # print(x)
    # print(y)


In [3]:
from keras.layers import Dense, LSTM, Dropout
from keras.callbacks import EarlyStopping
import keras
from keras.models import load_model


def build_model(x_train, y_train, batch_size, epochs, validation_data):
    """建立模型并训练"""
    model = keras.Sequential(
        [
            LSTM(60, return_sequences=True),
            # Dropout(0.2),
            LSTM(50),
            # Dropout(0.2),
            Dense(1)
        ]
    )

    model.compile(
        optimizer='adam',
        loss='mae',
        metrics='acc'
    )
    print("模型建立完毕,开始训练...")
    # 设置回调函数
    check_save_path = "./stock_LSTM.ckpt"
    callback = [
        # EarlyStopping(monitor='loss', patience=5),
        keras.callbacks.ModelCheckpoint(
            filepath=check_save_path,
            save_best_only=True,
            monitor='loss'
        )
    ]
    history = model.fit(x_train, y_train, batch_size=batch_size,
                        epochs=epochs, validation_data=validation_data,
                        callbacks=callback
                        )
    print("模型训练完毕！")
    return model, history


def save_model(model, code):
    """以股票代码为名保存模型"""
    save_path = "./%s.h5" % code
    model.save(save_path)
    print("模型保存完毕！")


def load_models(code):
    """以股票代码加载模型"""
    print("模型加载中....！")
    save_path = "./data/%s.h5" % code
    model = load_model(save_path)
    print("s%模型加载完毕！" % code)
    return model


if __name__ == '__main__':
    pass








In [None]:
"""
模型训练主函数
"""
import my_model as ml
import data_process as dp
from sklearn.preprocessing import MinMaxScaler
import os


def main():
    # 1、获取数据
    os.chdir("./data")
    file_name = "600021-short.csv"
    code = file_name[:6]
    test_split = 0.2
    feature_dim = 4

    # 数据归一化
    sc = MinMaxScaler(feature_range=(0, 1))
    train_data, test_data = dp.get_local_data(file_name, test_split, feature_dim, sc)
    # 2、制作数据集
    len_seq = 5
    x_train, y_train, x_test, y_test = dp.make_data_set(train_data, test_data,
                                                        len_seq, feature_dim,)
    # 3、创建模型训练数据
    batch_size = 8
    epochs = 10
    model, history = ml.build_model(x_train, y_train, batch_size, epochs, (x_test, y_test))
    # 保存模型
    ml.save_model(model, code)
    # 画出预测图
    real_data, pre_data = dp.get_real_and_pre_data(sc, test_data, len_seq, feature_dim, model, x_test)
    dp.show_test_res(real_data, pre_data)
    # 输出准确率
    dp.show_acc(real_data, pre_data)




if __name__ == '__main__':
    main()





