In [None]:
import numpy as np
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.utils import to_categorical
def prepare_data(): 
    """ 
    準備資料
    Returns:
        X_train(ndarray)     : 訓練資料 (50000,32,32,3)
        X_test(ndarray)      : 測試資料 (10000,32,32,3) 
        y_train(ndarray)     : 將訓練資料的正確答案做One-hot encoding轉換 (50000,10) 
        y_test(ndarray)      : 將測試資料的正確答案做One-hot encoding轉換 (10000,10) 
        y_test_label(ndarray): 測試資料的正確答案 (10000)
    """
    (X_train, y_train), (X_test, y_test) = cifar10.load_data()

    # 將用於訓練跟測試的圖像資料進行標準化
    # 對4維張量所有軸線方向求出平均值、標準差 
    # 可省略axis=(0,1,2,3)  
    mean = np.mean(X_train, axis=(0,1,2,3)) 
    std = np.std(X_train, axis=(0,1,2,3)) 
    # 要執行標準化時，在分母的標準差加上極小值
    x_train = (X_train - mean) / (std + 1e-7)
    x_test = (X_test - mean) / (std + 1e-7)
    # 將測試資料的正確答案拉直，從2維矩陣轉換為1維陣列
    y_test_label = np.ravel(y_test) 
    # 將訓練資料與測試資料的正確答案標籤做One-hot encoding轉換 (變成10個類別)
    y_train, y_test = to_categorical(y_train), to_categorical(y_test)

    return X_train, X_test, y_train, y_test, y_test_label 


In [None]:
from tensorflow.keras.layers import Input, Conv2D, Dense, Activation
from tensorflow.keras.layers import AveragePooling2D, GlobalAvgPool2D
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras import regularizers
from tensorflow.keras.models import Model

def make_convlayer(input, fsize, layers): 
    """ 
    建立卷積層
    Parameters: 
        inp(Input)  : 輸入層 
        fsize(int)  : 過濾器尺寸 
        layers(int) : 層數
    Returns: 卷積層物件
"""
    x = input
    for i in range(layers):
        x = Conv2D(filters=fsize,
                   kernel_size=3,
                   padding="same")(x)
        x = BatchNormalization()(x)
        x = Activation("relu")(x)
    return x

def create_model(): 
    """
    建立模型
    Returns: 含有卷積層的模型
    """
    input = Input(shape=(32,32,3))
    x = make_convlayer(input, 64, 3)
    x = AveragePooling2D(2)(x)
    x = make_convlayer(x, 128, 3)
    x = AveragePooling2D(2)(x)
    x = make_convlayer(x, 256, 3)
    x = GlobalAvgPool2D()(x)
    x = Dense(10, activation="softmax")(x)

    model = Model(input, x)
    return model


In [None]:
from scipy.stats import mode

def ensemble_majority(models, X):
    """
    多數決集成
    Parameters:
        models(list): Model 物件清單
        X(array): 驗證資料
    Returns: np.ndarray收容了(10000)各個圖像的正確標籤
    """
    # 製作(資料數量, 模型數量) 的零矩陣
    pred_labels = np.zeros((X.shape[0],   # 列數同圖像張數
                            len(models))) # 行數為模型數量
    # 從models取出的索引值跟凍結更新的模型
    for i, model in enumerate(models):
        # 從每個模型的預測機率(資料數量 ,類別數量 ) 的各列(axis=1)當中
        # 取最大值的索引(資料數量 ,模型數量 )
        # 並在模型的行中放入各列的資料
        pred_labels[:, i] = np.argmax(model.predict(X), axis=1)
        # 在mode()中僅指定pred_labels的各列眾數為[0]並取得數據
        # 將(資料數量, 1)的形狀以ravel()拉平為(, 資料數量)的形狀
    return np.ravel(mode(pred_labels, axis=1)[0])


In [None]:
from tensorflow.keras.callbacks import Callback
class Checkpoint(Callback): 
    """Callback 的子類別
    Attributes: 
        model(object): 訓練中的模型 
        filepath(str): 儲存權重的資料夾路徑 
        best_val_acc : 目前最高準確率
    """
    def __init__(self, model, filepath): 
        """ 
        Parameters: 
            model(Model): 訓練中的模型
            filepath(str): 儲存權重的資料夾路徑  
            best_val_acc(int): 前最高準確率
        """
        self.model = model
        self.filepath = filepath
        self.best_val_acc = 0.0

    def on_epoch_end(self, epoch, logs): 
        """ 
        重新定義訓練週期結束時所呼出的函式
        從剛剛的訓練週期當中儲存準確率較高的權重
        Parameters:
            epoch(int): 訓練次數
            logs(dict): {'val_acc': 損失 , 'val_acc': 準確率 }
        """
        if self.best_val_acc < logs['val_acc']:
            # 儲存比前一次的訓練準確率還要高的權重
            self.model.save_weights(self.filepath)
            # 儲存準確率
            self.best_val_acc = logs['val_acc']
            print('Weights saved.', self.best_val_acc)


In [None]:
import math
import pickle
import numpy as np
from sklearn.metrics import accuracy_score
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import LearningRateScheduler
from tensorflow.keras.callbacks import History
def train(X_train, X_test, y_train, y_test, y_test_label):
    """ 
    進行學習
    Parameters:
        X_train(ndarray): 訓練資料
        X_test(ndarray) : 測試資料
        y_train(ndarray): 訓練資料的正確答案
        y_test(ndarray) : 測試資料的正確答案
        y_test_label(ndarray): 測試資料的正確答案
    """
    models_num = 5 # 集成的模型數量
    batch_size = 1024 # 批次大小
    epoch = 80 # 訓練次數
    models = [] # 模型的清單
    # 各個模型的訓練歷程dict 
    history_all = {"hists":[], "ensemble_test":[]} 
    # 初始化各個模型預測結果的2維矩陣
    # (資料數量, 模型數量)
    model_predict = np.zeros((X_test.shape[0], # 列數為圖像張數 
                              models_num))     # 行數為模型數量 
    # 模型有幾個、就重複幾次
    for i in range(models_num):
        # 顯示現在是第幾個模型 
        print('Model',i+1) 
        # 建立卷積神經網路
        train_model = create_model() 
        # 編譯模型
        train_model.compile(optimizer='adam',
                            loss='categorical_crossentropy',
                            metrics=["acc"])

        # 將編譯後的模型追加到清單
        models.append(train_model)

        # 建立回呼當中的History物件
        hist = History()
        # 建立回呼當中的Checkpoint物件
        cpont = Checkpoint(train_model,       # 模型
                           f'weights_{i}.h5') # 儲存權重的檔名
        # 步驟衰減函數t
        def step_decay(epoch):
            initial_lrate = 0.001 # 基礎學習率
            drop = 0.5            # 衰減率
            epochs_drop = 10.0    # 每10次訓練週期執行步驟衰減
            lrate = initial_lrate * math.pow(drop,
                                             math.floor((1+epoch)/epochs_drop))
            return lrate

        lrate = LearningRateScheduler(step_decay)

        # 資料增補
        datagen = ImageDataGenerator(rotation_range=15,      # 在15度範圍內隨機旋轉
                                     width_shift_range=0.1,  # 以圖像寬度的0.1比例隨機橫向移動
                                     height_shift_range=0.1, # 以圖像高度的0.1比例隨機上下移動
                                     horizontal_flip=True,   # 朝水平方向隨機翻轉、左右對調
                                     zoom_range=0.2)         # 以原始尺寸的0.2倍比例隨機放大
        
        # 進行訓練
        train_model.fit(datagen.flow(X_train, y_train, batch_size=batch_size), 
                        epochs=epoch, 
                        steps_per_epoch=X_train.shape[0] // batch_size, 
                        validation_data=(X_test, y_test), 
                        verbose=1, 
                        callbacks=[hist, cpont, lrate]) # 回呼

        # 讀入訓練完成的模型所獲得最高準確率時的權重
        train_model.load_weights(f'weights_{i}.h5')
        
        # 凍結模型的所有權重更新
        for layer in train_model.layers:
            layer.trainable = False
            
        # 預測測試資料
        # 求取每列的最大值
        model_predict[:, i] = np.argmax(train_model.predict(X_test), axis=-1)
        # 將訓練完成模型的訓練歷程登錄到history_all
        history_all['hists'].append(hist.history)
        # 執行多數決集成
        ensemble_test_pred = ensemble_majority(models, X_test)
        # 用scikit-learn.accuracy_score()取得集成的準確率
        ensemble_test_acc = accuracy_score(y_test_label, ensemble_test_pred)
        # 將集成準確率追加到global_hist
        history_all['ensemble_test'].append(ensemble_test_acc) 
        # 輸出現在的集成精確度
        print('Current Ensemble Accuracy : ', ensemble_test_acc)

    history_all['corrcoef'] = np.corrcoef(model_predict, rowvar=False) # 求取每行的相關係數
    print('Correlation predicted value')
    print(history_all['corrcoef'])


In [None]:
X_train, X_test, y_train, y_test, y_test_label = prepare_data()

In [None]:
train(X_train, X_test, y_train, y_test, y_test_label)