# K-fold cross validation and Ensemble

In [1]:
# import packages
from sklearn import metrics
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score

In [2]:
# import algorithm models
from lib.models import models
from lib.test import Check,CrossCheck
# import report object
from utils.reports import Report

In [3]:
# params path
params_path = "./data/aapl_data_set.csv"

# reports saving path
reports_path = "./log/test_2019_5_24.log"

In [4]:
report_object = Report(reports_path)

In [35]:
# import params
params = pd.read_csv(params_path,index_col="date")

In [37]:
X = params.loc[:].copy()
X.drop(["stock_price_movement"],inplace=True,axis=1)
y = params["stock_price_movement"].copy()

In [7]:
from sklearn.model_selection import KFold

In [95]:
def simple_ensemble(models_list,X_train,y_train,X_test):
    """
    using models with sample ensemble techniques to fit data, 
    and return the predicted result
    :param models_list: scikit-learn models list
    :param X_train: pandas.Dataframe traing data of x
    :param y_train: pandas.Dataframe traing data of y
    :param X_test: pandas.Dataframe testing data of x
    :return y_pred: pandas.Series predicted results
    """
    result = {}
    for model_item in models_list:
        model_name = model_item["name"]
        y_pred = model_fit_and_predict(model_item["model"],X_train,y_train,X_test)
        result[model_name] = list(y_pred)
    result = pd.DataFrame(result)
    result["sum"] = result.apply(lambda x: sum(x),axis=1)
    result["vote"]= result.apply(lambda x: 1 if x["sum"] > 0 else -1, axis=1)
    result.index = X_test.index
    return result["vote"].copy()

In [92]:
def model_fit_and_predict(model,X_train,y_train,X_test):
    """
    using model to fit data, and return the predicted result
    :param model: scikit-learn model
    :param X_train: pandas.Dataframe traing data of x
    :param y_train: pandas.Dataframe traing data of y
    :param X_test: pandas.Dataframe testing data of x
    :return y_pred: np.ndarry predicted result
    """
    model.fit(X_train,y_train)
    return model.predict(X_test)

In [93]:
def ensemble_test(X,y,cv,models_list):
    """
    using ensemble and k-fold cross validation to train the model, test the data.
    """
    outer_cv = KFold(n_splits=cv, shuffle=True, random_state=1)
    scores = []
    for i, (train_idx, test_idx) in enumerate(outer_cv.split(X, y)): 
        X_train, X_test = X.loc[train_idx], X.loc[test_idx]
        y_train, y_test = y.loc[train_idx], y.loc[test_idx]
        y_pred = simple_ensemble(models_list,X_train,y_train,X_test)
        scores.append(accuracy_score(y_test, y_pred))
    return scores

In [12]:
ensemble_test(X,y,5,models)

[0.6, 0.6, 0.9, 0.8888888888888888, 0.6666666666666666]

In [13]:
from sklearn import metrics
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import train_test_split
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score
import numpy as np
import pandas as pd

In [18]:
import datetime,time

In [84]:
class Check(object):
    """train the model, test the model, output the results"""
    def __init__(self, test_models,epochs,X,y,test_size=0.2,report_obj=None,ensemble=False,show=True,result_saving_path=None):
        self.test_models = test_models
        self.epochs = epochs

        self.split(X, y, test_size)
        self.show = show or False
        self.print_func_list = []
        self.output_functions(report_obj)
        self.ensemble = ensemble
        self.result_saving_path=result_saving_path

    def output_functions(self, report_obj):
        """
        add the report string functions list
        """
        self.print_func_list = []
        if report_obj !=None:
            self.print_func_list.append(report_obj.add)
        if self.show == True:
            self.print_func_list.append(print)

    def output(self,*output_str):
        """
        description
        """
        output_str = " ".join(str(x) for x in list(output_str))
        if len(output_str) > 0:
            [x(output_str) for x in self.print_func_list]

    def split(self,X,y,test_size=0.2):
        
        """
        Split the dataset into train and test data.
        """
        # get the test data item number
        test_shape = int(X.shape[0]*test_size)
        
        if test_shape != X.shape[0]*test_size:
            test_shape += 1

        train_shape = X.shape[0] - test_shape
        
        self.X_train = X.iloc[0:train_shape-1].copy()
        self.X_test = X.iloc[train_shape:].copy()
        
        self.y_train = y.iloc[0:train_shape-1].copy()
        self.y_test = y.iloc[train_shape:].copy()
        

    def ensemble_test(self):
        """
        using ensemble model to train & test the data
        """
        if len(self.test_models)%2 !=1:
            self.output("Ensemble needs odd number of models!")
            return
        result = {"y_test":self.y_test}
        for i in range(self.epochs):
            self.output("ensemble epoch ",i+1)
            self.output("="*80)
            y_pred = simple_ensemble(self.test_models,
                                                 self.X_train,
                                                 self.y_train,
                                                 self.X_test)
            
            pred_result_key = "epoch_{}".format(i+1)
            result[pred_result_key] = y_pred
            
            self.model_accuracy("ensemble test accuracy: ",self.y_test,y_pred)
            self.output('-'*80)

        if self.result_saving_path:
            self.save_testing_results("ensemble",result)

    def model_accuracy(self,classifier,y_test,y_pred):
        self.output("Classification report for classifier %s:\n%s\n"
            % (classifier, metrics.classification_report(y_test, y_pred)))
        self.output("model accuracy: ", metrics.accuracy_score(y_test, y_pred))

    def save_testing_results(self,model_name,result):
        """
        save the testing result and predicting result.
        """
        now = time.strftime("%Y_%m_%d_%H_%M_%S")
        
        file_path = "{0}/{1}_{2}.csv".format(self.result_saving_path,now,model_name)
        result = pd.DataFrame(result)
        result.to_csv(file_path)

    def seperate_test(self):
        """
        testing the models seperately.
        """
        
        for model_item in self.test_models:
            self.output("Using {} model to train and predict the data".format(model_item["name"]))
            result = {"y_test":self.y_test}
            for i in range(self.epochs):
                self.output("="*80)
                self.output("testing epoch ",i+1)
                y_pred = model_fit_and_predict(model_item["model"],
                                                           self.X_train,
                                                           self.y_train,
                                                           self.X_test)
                pred_result_key = "epoch_{}".format(i+1)
                result[pred_result_key] = y_pred
                self.model_accuracy(model_item["name"],self.y_test,y_pred)
                self.output('-'*80)
            
            if self.result_saving_path:
                self.save_testing_results(model_item["name"],result)

    def train(self):
        """
        train and testing the data
        """
        models_name = []
        for item in self.test_models:
            models_name.append(item["name"])
        self.output("Testing these models: ", models_name)
        if self.ensemble == True:
            self.output("Using ensemble techniques to predict the stock price movement.")
        self.output('*'*90)
        if self.ensemble:
            self.ensemble_test()
        else:
            self.seperate_test()

In [79]:
check = Check(models,3,X,y,test_size=0.2,report_obj=report_object,
              ensemble=False,show=False,result_saving_path="./result")

In [80]:
check.train()

  'precision', 'predicted', average, warn_for)


In [81]:
ensemble_models = models.copy()

In [82]:
ensemble_models.pop(3)

{'model': LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
           intercept_scaling=1, max_iter=100, multi_class='multinomial',
           n_jobs=1, penalty='l2', random_state=0, solver='lbfgs',
           tol=0.0001, verbose=0, warm_start=False),
 'name': 'logistic_regression'}

In [89]:
ensemble_check = Check(ensemble_models,3,X,y,test_size=0.2,
                       report_obj=report_object,ensemble=True,
                       show=False,result_saving_path="./result")

In [97]:
ensemble_check.train()