Date: 20230419  
Model_Selection  

想法：最近实习跑数据模型，觉得一个个数据和一个个模型的输入跑，太浪费时间，且很混乱，就写了个类来规整一下，昨天觉得有必要以后都这样规范的写，所以这里做个备份  

大抵构思为：DataModel类存储数据集，并保存必要信息，然后可以定义一系列的数据处理函数，以及模型fit函数，以及各具体模型的fit函数，由modelname属性来判断是要拟合哪个模型，并输入对应参数，这样就可以很方便的对于不同数据，不同模型拟合，并很好的展示其测试结果。  
然后使用了帖子https://blog.csdn.net/qq_52466006/article/details/127633149 定义的多分类的报告类，并稍作修改，用以展示测试结果，比如多分类的混淆矩阵，accuracy  

In [1]:
import pandas as pd
# XGB
import numpy as np
import xgboost as xgb
from sklearn.metrics import auc, roc_auc_score, roc_curve, f1_score, accuracy_score
from sklearn.model_selection import train_test_split

# SVM
import numpy as np                   
from sklearn import svm            
from sklearn.svm import SVC

# 全连接 softmax dropout，这里的神经网络是课程作业设置的简单的全连接网络，还需后续再改下代码整洁度，并嵌入DataModel类中
import torch
from torch import nn, optim

# softmax
from sklearn.linear_model import LogisticRegression

# 训练随机森林解决回归问题
from sklearn.ensemble import RandomForestRegressor

# lgbm
import lightgbm as lgb
from lightgbm import LGBMClassifier

In [7]:
class FCsoftmax(nn.Module):
    def __init__(self, input_dim):
        super(FCsoftmax, self).__init__()
        self.block = nn.Sequential(
            nn.Linear(input_dim, 32), nn.Dropout(p=0.5), nn.ReLU(),
            nn.Linear(32, 64), nn.Dropout(p=0.5), nn.ReLU(),
            nn.Linear(64, 128), nn.Dropout(p=0.5), nn.ReLU(),
            nn.Linear(128, 64), nn.Dropout(p=0.5), nn.ReLU(),
            nn.Linear(64, 32), nn.Dropout(p=0.5), nn.ReLU(),
            nn.Linear(32, 3), nn.Softmax(dim=1))  # 需指明dim维度，不然会warning
        
    def forward(self, x):
        x = self.block(x)
        return x

def train(model, optimizer, lossfunction, x, y):
    model.train()
    optimizer.zero_grad()
    output = model(x)
    loss = lossfunction(output, y)
    loss.backward()
    optimizer.step()
    return loss.item()  #转为标量浮点值

def Fnn(X_train, X_test, y_train):
    torch.manual_seed(1)  # 使用随机化种子使神经网络的初始化每次都相同
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    
    train_x = torch.tensor(X_train.values, dtype=torch.float32).cuda()
    train_y = torch.tensor(y_train.to_numpy()[:,np.newaxis] == np.arange(3), dtype=torch.float32).cuda()  # 转为onehot
    
    x_test = torch.tensor(X_test.values, dtype=torch.float32)
    
    input_dim = train_x.shape[1]
    n_epochs = 1000 # 经由初步筛选避免过拟合后的次数
    lossfunction = nn.CrossEntropyLoss().cuda()
    model = FCsoftmax(input_dim)
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    for epoch in range(n_epochs):
        trainloss = train(model, optimizer, lossfunction, train_x, train_y)

        if (epoch+1) % 100 == 0:
            print('epoch:{} loss:{}'.format(epoch+1,trainloss))
    
    model.eval()
    model.to('cpu')
    
    #predictions from test data
    inputs = x_test 
    test_output = model(inputs)
    
    pred_y = torch.max(test_output, 1)[1].data.numpy()
    return pred_y

In [4]:
class Myreport:
    def __init__(self):
        self.confusion = None

    def statistics_confusion(self,y_true,y_predict):
        self.confusion = np.zeros((3,3))
        for i in range(y_true.shape[0]):
            self.confusion[int(y_predict[i])][int(y_true[i])] += 1

    def __cal_Acc(self):
        return np.sum(self.confusion.diagonal()) / np.sum(self.confusion)

    def __cal_Pc(self):
        return self.confusion.diagonal() / np.sum(self.confusion, axis=1)

    def __cal_Rc(self):
        return self.confusion.diagonal() / np.sum(self.confusion, axis=0)

    def __cal_F1score(self,PC,RC):
        return 2 * np.multiply(PC, RC) / (PC + RC)

    def report(self,y_true,y_predict,classNames):
        self.statistics_confusion(y_true,y_predict)
        Acc = self.__cal_Acc()
        Pc = self.__cal_Pc()
        Rc = self.__cal_Rc()
        F1score = self.__cal_F1score(Pc,Rc)
        str = "Class Name\t\tprecision\t\trecall\t\tf1-score\n"
        for i in range(len(classNames)):
           str += f"{classNames[i]}   \t\t\t{format(Pc[i],'.2f')}   \t\t\t{format(Rc[i],'.2f')}" \
                  f"   \t\t\t{format(F1score[i],'.2f')}\n"
        str += f"accuracy is {format(Acc,'.2f')}"
        return str

In [2]:
class Datamodel:
    def __init__(self, data_name, model_name):
        self.data = None
        self.label = None
        self.X_train = None
        self.X_test = None
        self.y_train = None
        self.y_test = None
        self.y_pred = None
        self.data_name = data_name
        self.confusion = None
#         self.Report = None
        self.pcrc = None
        self.modelname = model_name
        
    def data_read(self, path, label):
        self.label = label
        self.data = pd.read_csv(path, header=0)
        self.data = self.data[['ask1', 'bid1', 'asize1', 'bsize1', 'spread', 'mid_price', 'micro_price', 'dPa', 'dPb', 'dPmicro', 'OFI','LogquoteSlope1_new', 'SOIR1', 'MPC1', 'MPC5', 'label_next5', 'label_next10', 'label_next20']]
        self.data = self.data.dropna()
        self.data['label_next5'] = self.data['label_next5']+1
        self.data['label_next10'] = self.data['label_next10']+1
        self.data['label_next20'] = self.data['label_next20']+1  
        
    def sample_balance(self):
        sample_balance_index = self.data[self.data[self.label]==1].index
        # 对序列随机排序
#         np.random.seed(20303)
        indices_random = np.random.permutation(sample_balance_index)
                
        # 随机抽取与label_next5==+-1的总个数两倍作为样本平衡
        pnum = self.data[self.data[self.label]==2].shape[0]
        nnum = self.data[self.data[self.label]==0].shape[0]
        
        zero_index = pd.Index(indices_random[:int(np.floor(0.5*(pnum+nnum)))])
        p_index = self.data[self.data[self.label]==2].index
        n_index = self.data[self.data[self.label]==0].index

        balance_index=zero_index.append(n_index.append(p_index))
        balance_index = [i for i in balance_index if i < self.data.shape[0]]
        self.data = self.data.iloc[balance_index[:],:]   
    
    def data_split(self):
        # split data into features and target，label_next5-20为预测的target
        X = self.data.drop(['label_next5', 'label_next10', 'label_next20'], axis=1)
        y = self.data[self.label] # choose any of the target variables to train on
        
        # split into train and test sets
#         这里因为是时序数据所以直接按时间顺序分为测试集训练集，可以随机分，或进一步分出验证集
        lenth = X.shape[0]
        self.X_train = X[:int(np.floor(lenth*0.8))].reset_index(drop=True)
        self.X_test = X[int(np.floor(lenth*0.8)):].reset_index(drop=True)
        self.y_train = y[:int(np.floor(lenth*0.8))].reset_index(drop=True)
        self.y_test = y[int(np.floor(lenth*0.8)):].reset_index(drop=True)
                
    def XGBoost(self, booster):
        '''
        booster: 'gblinear','gbtree'
        '''
        model = xgb.XGBClassifier(booster=booster, n_estimators=1000, learning_rate=0.05, objective='multi:softmax', num_class=3)
        model.fit(self.X_train, self.y_train)
        self.y_pred = model.predict(self.X_test)
        
    def SVM(self, C, k):
        '''
        kernel == 'linear','rbf','sigmoid'
        '''
        model = svm.SVC(C=C,                         #误差项惩罚系数,默认值是1
                        kernel=k,               #线性核 kenrel="rbf":高斯核
                        class_weight='balanced',
                        decision_function_shape='ovr') #决策函数
        model.fit(self.X_train, self.y_train)
        self.y_pred = model.predict(self.X_test)
        
    def FC(self):
        self.y_pred = Fnn(self.X_train, self.X_test, self.y_train)
    
    def softmax(self):
        '''
        softmax regression
        '''
        classifier = LogisticRegression()
        classifier.fit(self.X_train, self.y_train)
        self.y_pred = classifier.predict(self.X_test)

    def knn(self):
        knn = KNeighborsClassifier()    #实例化KNN模型
        knn.fit(self.X_train, self.y_train) 
        self.y_pred = knn.predict(self.X_test)
        
        
    def randomforest(self, n_estimator):
        regressor = RandomForestRegressor(n_estimators=n_estimator, random_state=0, bootstrap=True)
#         因样本量太小，所以尝试使用bootstrap
        regressor.fit(self.X_train, self.y_train)
        self.y_pred = regressor.predict(self.X_test)
        
    def lgbm(self):
        model = LGBMClassifier(
                max_depth=3,
                learning_rate=0.1,
                n_estimators=200, # 使用多少个弱分类器
                objective='multiclass',
                num_class=3,
                booster='gbtree',
                min_child_weight=2,
                subsample=0.8,
                colsample_bytree=0.8,
                reg_alpha=0,
                reg_lambda=1,
                seed=0 # 随机数种子
                )
        model.fit(self.X_train,self.y_train, eval_set=[(self.X_train, self.y_train), (self.X_test, self.y_test)], 
              verbose=100, early_stopping_rounds=50)

        # 对测试集进行预测
        self.y_pred = model.predict(self.X_test)

    def model_fit(self, *args, **kwargs):
        '''
        modelname = 'XGBoost','SVM','FC','softmaxregression','knn','randomforest','lightgbm'
        '''
        if self.modelname == 'XGBoost':
            self.XGBoost(*args, **kwargs)
        elif self.modelname == 'SVM':
            self.SVM(*args, **kwargs)
        elif self.modelname == 'FC':
            self.FC(*args, **kwargs)
        elif self.modelname == 'softmaxregression':
            self.softmax(*args, **kwargs)
        elif self.modelname == 'knn':
            self.knn(*args, **kwargs)
        elif self.modelname == 'randomforest':
            self.randomforest(*args, **kwargs)
        elif self.modelname == 'lightgbm':
            self.lgbm(*args, **kwargs)
            
    def result(self):
        Report = Myreport()
        self.pcrc = Report.report(y_true=self.y_test, y_predict=self.y_pred, classNames=['-1','0','1'])
        self.confusion = Report.confusion

In [6]:
# 示例
# random forest
label5rf = Datamodel(data_name='label',model_name='randomforest')
label5rf.data_read(path='D:/self_file/2303SW/230413/IF2303.csv',label='label_next5')
label5rf.sample_balance()
label5rf.data_split()

label5rf.model_fit(n_estimator=500)
label5rf.result()

print('RF')
print(label5rf.pcrc)
print(label5rf.confusion)

RF
Class Name		precision		recall		f1-score
-1   			0.05   			0.92   			0.09
0   			0.97   			0.31   			0.47
1   			nan   			0.00   			nan
accuracy is 0.33
[[ 12. 239.   2.]
 [  1. 107.   2.]
 [  0.   0.   0.]]


  return self.confusion.diagonal() / np.sum(self.confusion, axis=1)
