# 对完成处理的股票交易数据（分组、归一化）做lstm建模
输入：处理好的数据
输出：模型和模型评价

In [1]:
# !pip install scikit-learn

In [2]:
import pandas as pd
from pandas import DataFrame
import datetime
from sklearn.preprocessing import StandardScaler # pip3 install --upgrade --force-reinstall scikit-learn --target . -i https://pypi.mirrors.ustc.edu.cn/simple
from collections import deque
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential #pip3 install --upgrade --force-reinstall keras --target . -i https://pypi.mirrors.ustc.edu.cn/simple
from tensorflow.keras.models import load_model #pip3 install --upgrade --force-reinstall keras --target . -i https://pypi.mirrors.ustc.edu.cn/simple
from tensorflow.keras.layers import LSTM,Dense,Dropout
from tensorflow.compat.v1.keras.layers import CuDNNLSTM

from tensorflow.keras.callbacks import ModelCheckpoint,EarlyStopping,Callback,CSVLogger,ReduceLROnPlateau
# from sklearn.model_selection import train_test_split
# from keras.utils import multi_gpu_utils
import os
from io import StringIO
import gzip
import shutil
import matplotlib.pyplot as plt
import math
import time
import gc
from shutil import copyfile
# copy our file into the working directory (make sure it has .py suffix)
copyfile(src = "/kaggle/input/stocks-code/stocks.py", dst = "../working/stocks.py")
 
# import all our functions
from stocks import stocks_all
from stocks import bankuai

import pickle

import threading
from queue import Queue

threads = []

os.environ["CUDA_VISIBLE_DEVICES"] = '0,1'
plt.style.use('fivethirtyeight')

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' #消除tensorflow警告

model_saved_log_char = datetime.datetime.now().strftime('%Y%m%d%h%m%s')


In [3]:
#获取数据
start = datetime.datetime(2000,1,1)
end =  datetime.date.today()

#参数整理
EarlyStopping_monitor='val_loss' #monitor——被监测的量
EarlyStopping_patience=10 #检测值停止变化的次数

_mem_days=[1,3,5] #滑动区间，根据几天的数据做预测
_lstm_layers,_dense_layers=[1,5],[1,5] #图层数
# 这里我们设置的units=32的大小，其实代表得是LSTM单元内的隐藏层的尺寸。
# 对于LSTM而言，每个单元有3个门，对应了4个激活函数（3个sigmoid,一个tanh）。也就是说有4个神经元数量为32的前馈网络层。
_units= [32,64]

# #测试
# _mem_days=[3] #滑动区间，根据几天的数据做预测
# _lstm_layers,_dense_layers=[1],[1] #图层数
# _units= [32]


optimizer='adam' #优化器:控制梯度下降和梯度爆炸
loss = 'mse' #损失层
metrics=['mape'] #评价函数
batch_size=32 #每次训练在训练集中取batchsize个样本训练；.batch_size=1时为在线学习，也是标准的SGD,如果数据集比较小，则完全可以采用全数据集的形式;GPU对2的幂次的batch可以发挥更佳的性能，因此设置成16、32、64、128…时往往要比设置为整10、整100的倍数时表现更优
epochs=50 #一个 epoch（代）是指整个数据集正向反向训练一次。

model_verbose = 0

In [4]:
#文件路径 data_
# path = '/kaggle/input/stocks-data-20221216/'
log_file_name = '/kaggle/working/models'
model_saved_file='/kaggle/working/models_2'
INPUT_PATH = '/kaggle/input/stock-prefit-0306/data_pre_fit'



[os.makedirs(f"{log_file_name}/{klt}", exist_ok=True) for klt in [101, 102, 103]]
[os.makedirs(f"{model_saved_file}/{klt}", exist_ok=True) for klt in [101, 102, 103]]

for _klt_ in [101,102,103]:
    model_saved_log = f'{model_saved_file}/{_klt_}/{ model_saved_log_char}_models.csv'
    # #创建任务总模型目录
    log_csv_file = open(model_saved_log, 'a')

    # 写表头code,loss,mape,val_loss,val_mape,modelname
    model_log = f'code,klt,loss,mape,val_loss,val_mape,modelname\n'
    log_csv_file.write(model_log)
    log_csv_file.close()

In [5]:
exception_file_full_name = f'{model_saved_file}/{ model_saved_log_char}_exception.txt'

#创建异常文件
exception_file = open(exception_file_full_name, 'a')

# 写表头code,loss,mape,val_loss,val_mape,modelname
exception_log = f'---------------Exception:{str(end)}------------------\n'
exception_file.write(exception_log)
exception_file.close()

In [6]:
#模型callback类
class CustomCallback(Callback):
#     print('-----------------CustomCallback-----------------')
    code = ''
    the_mem_days=0
    the_lstm_layers=0
    the_dense_layers=0
    the_units = 0
    csv_file_name = ''
    model_path = ''
    saveModelFile = False
    saveModelLog = True

    #epoch,loss,mape,val_loss,val_mape,code,the_mem_days,the_lstm_layers,the_dense_layers,the_units
    csv_file = DataFrame()



    def __init__(self,path,csv_file_name,code,the_mem_days,the_lstm_layers,the_dense_layers,the_units,
                 saveModelFile=False,saveModelLog=True,klt=101):
#         print(f'-----------------path:{path},klt:{klt},code:{code}-----------------\n')
        self.model_path = path
        self.csv_file_name = csv_file_name
        self.code = code
        self.the_mem_days = the_mem_days
        self.the_lstm_layers = the_lstm_layers
        self.the_dense_layers = the_dense_layers
        self.the_units = the_units
        self.saveModelFile = saveModelFile
        self.saveModelLog=saveModelLog
        self.klt = klt
#         print(f'-----------------CustomCallback__init__,klt:{klt},code:{code}-----------------\n')
        #
        if not os.path.exists(csv_file_name):
#             print(f'-----------------os.path.exists(csv_file_name),klt:{klt},code:{code}-----------------\n')
            # #创建任务总模型目录
            _temp_file = open(csv_file_name, 'a') 
            _temp_file_header = f'epoch,loss,mape,val_loss,val_mape,code,klt,the_mem_days,the_lstm_layers,the_dense_layers,the_units\n'
#             print(f'-----------------_temp_file_header:{_temp_file_header},klt:{klt},code:{code}-----------------\n')
            _temp_file.write(_temp_file_header)
            _temp_file.close()
#         print(f'-----------------self.csv_file,klt:{klt},code:{code}-----------------\n')
        self.csv_file = pd.read_csv(csv_file_name, lineterminator='\n', header=0)  
                

    def on_epoch_end(self, epoch, logs={}):
#         print(f'-----------------self.on_epoch_end,klt:{self.klt},code:{self.code}-----------------\n')
        if self.saveModelFile == True:
#             print(f'-----------------self.saveModelFile,klt:{self.klt},code:{self.code}-----------------\n')
            loss = logs['loss']
            filepath =  f'{self.model_path}/{loss:.2f}_{self.code}_{epoch:02}_mem_{self.the_mem_days}_ltsm_{self.the_lstm_layers}_dense_{self.the_dense_layers}_unit_{self.the_units}.h5'
#             print(f'-----------------filepath:{filepath}-----------------\n')
            loss = logs['loss']
            mape = logs['mape']
            val_loss = logs['val_loss']
            val_mape = logs['val_mape']
            model_saved_log1 = f'{model_saved_file}/{self.klt}/{ model_saved_log_char}_models.csv'
#             print(f'-----------------model_saved_log:{model_saved_log1},klt:{self.klt},code:{self.code}-----------------\n')
            log_csv_file = open(model_saved_log1, 'a+')
            # code,loss,mape,val_loss,val_mape,modelname
            model_log = f'c{self.code},{self.klt},{loss:.2f},{mape:.2f},{val_loss:.2f},{val_mape:.2f},{filepath}\n'
            log_csv_file.write(model_log)
            log_csv_file.close()
            
#             print(f'-----------------filepath:{filepath},klt:{self.klt},code:{self.code}-----------------\n')

            self.model.save(filepath,save_format='h5')
#         print(f'-----------------self.saveModelLog:{self.saveModelLog},klt:{self.klt},code:{self.code}-----------------\n')
        if self.saveModelLog == True:
#             print(f'-----------------logs.loss{logs['loss']}-----------------\n')
            if not math.isnan(logs['loss']) :
#                 print(f'-----------------self.csv_file{self.csv_file},klt:{klt},code:{code}-----------------\n')
                _i_ = len(self.csv_file)
#                 print(f'-----------------_i_:{_i_},klt:{self.klt},code:{self.code}-----------------\n')
                row = {
                    'epoch':epoch,
                    'loss' : float(round(logs['loss'],2) ),
                    'mape':round(logs['mape'],2)  ,
                    'val_loss': round(logs['val_loss'],2) ,
                    'val_mape': round(logs['val_mape'],2)  ,

                    'code': self.code,
                    'klt':self.klt,
                    'the_mem_days': self.the_mem_days,
                    'the_lstm_layers': self.the_lstm_layers,
                    'the_dense_layers': self.the_dense_layers,
                    'the_units': self.the_units
                }
#                 print(f'-----------------row{row},klt:{self.klt},code:{self.code}-----------------\n')
                row_index = len(self.csv_file)
#                 print(f'-----------------self.csv_file.loc:{313},klt:{self.klt},code:{self.code}-----------------\n')
                self.csv_file.loc[row_index] = row
#                 print(f'-----------------{self.csv_file_name},klt:{self.klt},code:{self.code}-----------------\n')
                self.csv_file.to_csv(self.csv_file_name,index=False)
 

In [7]:
_time_start = time.time()
_time_limit = 10

In [8]:
#建模
def build_models(file_path,code,mem_days,lstm_layers,dense_layers,units,saveModelFile ,saveModelLog,thread_count,klt ):
    
    build_models_times = 0

    for the_mem_days in mem_days:
#         new_df = f
        x, y = open_data_processd(file_path,klt)
        x_train, x_test, y_train, y_test = train_test_split(x, y, shuffle=False, test_size=0.2, random_state=42)
        
        # 转换为 Dataset 对象
        train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(batch_size)

        for the_lstm_layers in lstm_layers:
            for the_dense_layers in dense_layers:
                for the_units in units:
                    
                    callback = [EarlyStopping(monitor=EarlyStopping_monitor, patience=EarlyStopping_patience),
                        # CSVLogger(filename, separator=',', append=True),
                        ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=10, verbose=0, mode='auto',
                                          min_delta=0.0001, cooldown=0, min_lr=0),
                        CustomCallback(f'{model_saved_file}/{klt}',f'{log_file_name}/{klt}/{code}.csv',code,the_mem_days,the_lstm_layers,the_dense_layers,the_units,
                                       saveModelFile=saveModelFile,saveModelLog=saveModelLog,klt=klt)]
                    
                     #构建神经网络
                    model = Sequential()
                    model.add(LSTM(the_units,input_shape=x.shape[1:],return_sequences=True)) #第一层
                    model.add(Dropout(0.1)) #防止过拟合

                    for i in range(the_lstm_layers):
                        model.add(LSTM(the_units,return_sequences=True)) #要有返回值
                        model.add(Dropout(0.1)) #防止过拟合

                    model.add(LSTM(the_units))
                    model.add(Dropout(0.1)) #防止过拟合

                    for i in range(the_dense_layers):
                        model.add(Dense(the_units,activation='relu'))  #全连接层
                        model.add(Dropout(0.1)) #防止过拟合

                    model.add(Dense(1)) #输出层

                    model.compile(optimizer='adam' ,#优化器
                                  loss = 'mse' ,#损失层
                                  metrics=['mape'])#评价函数) #编译

                    print(f'thread{thread_count},{code},NO.{build_models_times}:{the_mem_days},{the_lstm_layers},{the_dense_layers},{the_units},{str(datetime.datetime.now())}')
#                      
                    model.fit(train_dataset.prefetch(tf.data.experimental.AUTOTUNE),epochs=epochs,validation_data=(x_test,y_test),verbose=model_verbose,callbacks=callback)
#                     
                    build_models_times+=1
                    del  x_train, x_test, y_train, y_test,train_dataset,model
                    
    
    return build_models_times

In [9]:
def open_data_processd(file_path,klt):
    
    with gzip.open(file_path, 'rb') as f:
        x, y = pickle.load(f)
        
    return x, y
       

In [10]:
def lstm_model_fit(begin,files,thread_count,klt):
    thread_count = thread_count
    try:
        
        for index in range(begin,len(files)):
            #超时打包
#             print(f'already:{time.time()-_time_start},limit:{_time_limit*60*60}')
            if time.time()-_time_start >_time_limit*60*60 :
                print('time out')
                break
            
            if os.path.exists('/kaggle/working/stop'):
                
              #退出循环
                break
            
            code = files[index]
            file_path = f'{INPUT_PATH}/{klt}/{code}.pkl'
            
            if os.path.exists(file_path):
#                  
                fit_model = build_models(file_path,code,_mem_days,_lstm_layers,_dense_layers,_units,True,True,f'{thread_count},{index}',klt)
                 
                log_df = pd.read_csv(f'{log_file_name}/{klt}/{code}.csv', lineterminator='\n', header=0)
#                  
                min_loss_row = log_df.sort_values(by='loss',ascending=True)[0:1].to_dict(orient='records')[0]
                 
                loss = min_loss_row['loss']
                mape = min_loss_row['mape']
                val_loss = min_loss_row['val_loss']
                val_mape = min_loss_row['val_mape']
               
                _mem_day = int(min_loss_row['the_mem_days'])
                _lstm_layer = int(min_loss_row['the_lstm_layers'])
                _dense_layer = int(min_loss_row['the_dense_layers'])
                _unit = int(min_loss_row['the_units'])
                save_model = fit_model
 
                # 把不符合标准的模型从csv和文件列表中删除
                model_saved_log2 = f'{model_saved_file}/{klt}/{ model_saved_log_char}_models.csv'
                save_model_csv = pd.read_csv(model_saved_log2)
            
                #code被解析为int，再文件保存时，加上字符c保证解析为code
                min_loss = save_model_csv.loc[save_model_csv['code'] == 'c'+code].sort_values('loss',ascending=True)[0:1].to_dict(orient='records')[0]['loss']
                rows = save_model_csv.loc[(save_model_csv['code'] == 'c'+code )& (save_model_csv['loss'] > min_loss)]
                for row in rows.to_dict(orient='records'):
                    
                    filename = row['modelname']
                    if os.path.exists(filename):
                        os.remove(filename)

                save_model_csv = save_model_csv.drop(rows.index)
                save_model_csv.to_csv(model_saved_log2, index=False)
                print(f'thread{thread_count},{index},{code}:save_model_csv_{model_saved_log2}')
                del log_df,save_model_csv
                gc.collect()
            else:
                print(f'code:{code};not exit')
 

    except Exception as reason:
        print(f'-----------------Exception-----------------')
        if reason != '超时':
            print(f'Exception:thread{thread_count},{index}:{str(reason)}')
            exception_file = open(exception_file_full_name, 'a')

            # 写表头code,loss,mape,val_loss,val_mape,modelname
            exception_log = f'\'{code}\':{reason}\n'
            exception_file.write(model_log)
            exception_file.close()

            lstm_model_fit(index+1,files,thread_count,klt)
        else:
            print(str(reason))

In [11]:
# 时间测试
 
_mem_days=[3] #滑动区间，根据几天的数据做预测
_lstm_layers,_dense_layers=[1],[1] #图层数
_units= [64]

batch_size=10 #每次训练在训练集中取batchsize个样本训练；.batch_size=1时为在线学习，也是标准的SGD,如果数据集比较小，则完全可以采用全数据集的形式;GPU对2的幂次的batch可以发挥更佳的性能，因此设置成16、32、64、128…时往往要比设置为整10、整100的倍数时表现更优
epochs=100 #一个 epoch（代）是指整个数据集正向反向训练一次。


model_verbose = 0

_time_limit = 10
 
_time_start = time.time()
 

# lstm_model_fit(0,stocks_all[_file_begin:_file_end],0,101)

stock_list = stocks_all+bankuai

for ep in [25,50,100]:
    for batch  in [10,20,50,100,150,200]:
        batch_size = batch #batch_size 越小，训练时间越长。
        epochs = ep #epochs越小，训练时间越少

        _time_start = time.time()
#         print(f'本轮开始时间：{_time_start},batch_size：{batch},epochs:{ep} ')

#         lstm_model_fit(0,stock_list[0:2],1,101)
 
        comp_time = datetime.datetime.now()
        _time_end = time.time()

#         print(f'完成时间：{comp_time},用时：{(_time_end-_time_start)/60} min|||batch_size：{batch},epochs:{ep} ')

In [12]:
# _data =lstm_cleanm_data( '/kaggle/input/stocks-data-20221216/301089.gzip')
# ttt = build_models(_data.copy(deep=True),'301089',_mem_days,_lstm_layers,_dense_layers,_units,True,True,0)
# #测试
_mem_days=[3] #滑动区间，根据几天的数据做预测
_lstm_layers,_dense_layers=[1],[1] #图层数
_units= [64]

batch_size=150 #每次训练在训练集中取batchsize个样本训练；.batch_size=1时为在线学习，也是标准的SGD,如果数据集比较小，则完全可以采用全数据集的形式;GPU对2的幂次的batch可以发挥更佳的性能，因此设置成16、32、64、128…时往往要比设置为整10、整100的倍数时表现更优
epochs=50 #一个 epoch（代）是指整个数据集正向反向训练一次。


model_verbose = 0

_time_limit = 10
# print(str(_lstm_layers))
_time_start = time.time()

_file_begin = 0
_file_end = 99999
#stocks_all,bankuai
# files = os.listdir(path)
print(f'文件范围{_file_begin}-{_file_end}')

# lstm_model_fit(0,stocks_all[_file_begin:_file_end],0,101)

# stock_list = stocks_all+bankuai


# lock = threading.Lock()

# t1 = threading.Thread(target=lstm_model_fit, args=(80,stock_list[_file_begin:_file_end],1,101))
# t1.start()
# threads.append(t1)

# t2 = threading.Thread(target=lstm_model_fit, args=(94,stock_list[_file_begin:_file_end],2,102))
# t2.start()
# threads.append(t2)

# t3 = threading.Thread(target=lstm_model_fit, args=(115,stock_list[_file_begin:_file_end],3,103))
# t3.start()
# threads.append(t3)

# for thread in threads:
#     thread.join()

comp_time = datetime.datetime.now()
_time_end = time.time()

print(f'完成时间：{comp_time},用时：{_time_end-_time_start} s')

文件范围0-99999
完成时间：2023-03-12 03:38:07.773289,用时：0.00022292137145996094 s


In [13]:
def scanfiles():
    # 使用 os.walk 函数遍历目录及其下所有文件和子目录
    for root, dirs, files in os.walk('/kaggle/working/'):
        for file in files:
            file_path = os.path.join(root, file)
            print(file_path)
scanfiles()

/kaggle/working/stocks.py
/kaggle/working/__notebook__.ipynb
/kaggle/working/models_2/20230312Mar031678592287_exception.txt
/kaggle/working/models_2/102/20230312Mar031678592287_models.csv
/kaggle/working/models_2/101/20230312Mar031678592287_models.csv
/kaggle/working/models_2/103/20230312Mar031678592287_models.csv
/kaggle/working/__pycache__/stocks.cpython-37.pyc


In [14]:
import shutil
# 定义要删除的目录路径
dir_path = '/kaggle/working/'

# 删除目录及其下所有文件和子目录
# shutil.rmtree(dir_path)