<a href="https://colab.research.google.com/github/heros-lab/colaboratory/blob/master/Model_optimization.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from tensorflow.keras.backend import clear_session
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam

from google.colab import drive
drive.mount('/content/drive')

work_path = "/content/drive/My Drive/Colab Notebooks"


In [None]:
!pip install optuna

import optuna
from optuna.integration import KerasPruningCallback


In [17]:
x_index = [0,1,2,3,4,5,6] # model_n1, n2
#x_index = [0,3,4,6] # model_s1
#x_index = [1,2,5,6] # model_s2
y_index = [2]

learn_name = "ms2a"
test_name  = "ms1a"

learn_x = np.load(f"{work_path}/learn/{learn_name}_x.npy")[:, x_index]
learn_y = np.load(f"{work_path}/learn/{learn_name}_y.npy")[:, y_index]

test_x = np.load(f"{work_path}/learn/{test_name}_x.npy")[:, x_index]
test_y = np.load(f"{work_path}/learn/{test_name}_y.npy")[:, y_index]


In [25]:
class optimize_manager:
    def __init__(self, study_name):
        self.score_path  = f"{work_path}/score_{study_name}.csv"
        self.result_path = f"{work_path}/result_{study_name}.csv"

    def filtering_with_IQR(self, data_list):
        pd_series = pd.Series(data_list)
        q1 = pd_series.quantile(.25)
        q3 = pd_series.quantile(.75)
        iqr = q3 - q1
        lim_upper = q3 + iqr*1.5
        lim_lower = q1 - iqr*1.5
        return pd_series[pd_series.apply(lambda x:lim_lower < x < lim_upper)]    

    def save_scores(self, count, units, data_list):
        with open(self.score_path, "w" if count == 0 else "a") as file:
            file.write(f"#{count}")
            for num_unit in units:
                file.write(f", {num_unit}")
            for data in data_list:
                file.write(f", {data:.6e}")
            file.write("\n")

    def save_results(self, trial_id, units, samples, mean, std, mean_f, std_f):
        if trial_id == 0:
            mode = "w"
            header = "Trials"
            for i in range(len(units)):
                header += f", Unit-{i+1}"
            header += "Samples(Full:101), Estimated loss, Standard-deviation, Estimated loss(filter), Standard-deviation(filter)\n"
        else:
            mode = "a"
            header = ""

        with open(self.result_path, mode) as file:
            file.write(header)
            file.write(f"#{trial_id}")
            for num_unit in units:
                file.write(f", {num_unit}")
            file.write(f", {samples}, {mean:.6e}, {std:.6e}, {mean_f:.6e}, {std_f:.6e}\n")

    def objective(self, trial):
        epochs = 200
        num_batch = 256
        num_sample = 101
        num_units = [
            trial.suggest_int("num_unit1", 1, 200),
            trial.suggest_int("num_unit2", 1, 200)]

        score_list = []
        for i in range(num_sample):
            clear_session()
            print(f"\r#{trial.number:2} -- unit: {num_units}, sampling: {i+1}/{num_sample}", end="")
            
            model = Sequential()
            model.add(Dense(
                    input_dim=learn_x.shape[1], units=num_units[0],
                    activation="tanh", kernel_initializer="glorot_uniform"))
            for i in range(len(num_units) - 1):
                model.add(Dense(
                    input_dim=num_units[i], units=num_units[i+1],
                    activation="tanh", kernel_initializer="glorot_uniform"))
            model.add(Dense(input_dim=num_units[-1], units=1))
            model.compile(loss="mse", optimizer=Adam(lr=0.001))
            model.fit(learn_x, learn_y, batch_size=num_batch, epochs=epochs, verbose=0)

            score = model.evaluate(test_x, test_y, batch_size=test_x.shape[0], verbose=0)
            score_list.append(score)
        
        # フィルタ処理
        score_list_flt = self.filtering_with_IQR(score_list)
        
        # 平均と標準偏差の算出
        mean, std = pd.Series(score_list).describe().loc[["mean","std"]]        
        samples, mean_f, std_f = score_list_flt.describe().loc[["count","mean","std"]]
        
        # 保存＆結果出力
        self.save_scores(trial.number, num_units, score_list)
        self.save_results(trial.number, num_units, samples, mean, std, mean_f, std_f)
        print(f"\r#{trial.number:2} -- unit: {num_units}, samples: {samples}/101, mean: {mean:.4e}, std: {std:.4e}")

        return mean

In [None]:
STUDY_LOADING = False

storage_path = f"sqlite:///{work_path}/opt_model_s2.db"
study_name = "s2_ver4.1"
manager = optimize_manager(study_name)


# study load or create
if STUDY_LOADING:
    study = optuna.load_study(study_name, storage_path, pruner=optuna.pruners.MedianPruner())
else:
    study = optuna.create_study(study_name=study_name, storage=storage_path, direction="minimize", pruner=optuna.pruners.MedianPruner())


In [None]:
study.optimize(manager.objective, n_trials=50)

print("\n")
print("*** All Trial are finished!! ***")

# 0 -- unit: [76, 76], sampling: 8/101