#自作モデルの実装

In [1]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import jax.numpy as jnp
from jax import jit as jjit
from functools import partial
from jax import vmap

#自作モデルの実装

#誤差を求める関数
@partial(jjit)
def rmse(y, p):
    return jnp.sqrt(((y - p)**2).mean())

@partial(jjit)
def mse(y, p):
    return ((y - p)**2).mean()


@partial(jjit)
def mae(y, p):
    return jnp.abs(y - p).mean()

@partial(jjit)
def mape(y, p):
    return jnp.abs((y - p) / y).mean()

@partial(jjit)
def r2(y, p):
    return 1 - (jnp.sum((y - p)**2) / jnp.sum((y - jnp.mean(y))**2))

@partial(jjit)
def r2_(y, p):
    return jnp.sum((y - p)**2) / jnp.sum((y - jnp.mean(y))**2)



#まとめて表示
def display_err(y, p):
    print("rmse", rmse(y, p))
    print("mse", mse(y, p))
    print("mae", mae(y, p))
    print("mape", mape(y, p))
    print("r2", r2(y, p))
    print("")

#予測する関数(１行)
@partial(jjit)
def predict(row, model_x, model_y, T):
    model_y_mean = jnp.mean(model_y)
    t = jnp.abs(model_x - row)
    w = (1/2)**(t / T)
    w = jnp.prod(w, axis=1)
    p = jnp.sum(w * model_y) / jnp.sum(w)
    p = jnp.nan_to_num(p, model_y_mean)
    return p

#予測する関数（全行）
@partial(jjit)
def predict_array(x, model_x, model_y, T):
    return vmap(lambda row : predict(row, model_x, model_y, T))(x)

#学習関数modelとtunerを引数で設定する必要がある（ランダムな要素を排除）
def learn_set_tuner(
    model_x,
    model_y,
    tuner_x,
    tuner_y,
    trial_cnt = 100, #最大試行回数
    rate = 0.5, #学習率
    err_func = rmse, #誤差の計算方法
    max_same_cnt = 3, #学習が停滞してからの最大試行回数
    update_rate = False, #学習率自体を更新するかどうか
    disp = False #経過の表示
):
    #初期状態の予測値・誤差・半減期
    same_cnt = 0 #制度が改善できなかった連続回数
    T = jnp.max(model_x, axis=0) - jnp.min(model_x, axis=0)
    tuner_p = predict_array(tuner_x, model_x, model_y, T)
    err = err_func(tuner_y, tuner_p)
    if disp:
        print("first err", err)
    
    #半減期と学習率の更新
    for n in range(trial_cnt):
        err_ex = err
        for i in range(len(T)):
            T_ = T.at[i].set(T[i] * rate)
            tuner_p_ = predict_array(tuner_x, model_x, model_y, T_)
            err_ = err_func(tuner_y, tuner_p_)
            if err > err_:
                err = err_
                T = T_.copy()
                tuner_p = tuner_p_.copy()
        if disp:
            print(n, err)
        if err_ex == err:
            same_cnt += 1
        else:
            same_cnt = 0
        if update_rate and same_cnt >= 2:
            rate = (rate + 1) / 2
        if disp and same_cnt >= max_same_cnt:
            print("learning stopped")
            print("self predict")
            display_err(tuner_y, tuner_p)
            break
    return T

#学習関数一連の作業をすべてしてくれる（ランダムな要素あり）
def learn(
    train_x,
    train_y,
    trial_cnt = 100, #最大試行回数
    rate = 0.5, #学習率
    err_func = rmse, #誤差の計算方法
    tuner_size = 0.2, #調整用データのサイズ
    max_same_cnt = 3, #学習が停滞してからの最大試行回数
    update_rate = False, #学習率自体を更新するかどうか
    disp = False
):
    #訓練データをモデル用とその調整用に分割
    model_x, tuner_x, model_y, tuner_y = train_test_split(train_x, train_y, test_size = tuner_size)
    
    #学習
    T = learn_set_tuner(
        model_x,
        model_y,
        tuner_x,
        tuner_y,
        trial_cnt,
        rate,
        err_func, 
        max_same_cnt,
        update_rate,
        disp
    )
    return T

#カリフォルニア住宅価格データのロード

テストデータは２割

In [2]:
import xgboost as xgb
import lightgbm as lgb
from sklearn.ensemble import RandomForestRegressor#ランダムフォレスト

#カリフォルニア住宅価格データをロード
from sklearn.datasets import fetch_california_housing
from sklearn.datasets import load_boston
housing = fetch_california_housing()
#housing = load_boston()
#訓練データと検証データに分割
x = housing.data
y = housing.target
X_train, X_test, y_train, y_test = train_test_split(x, y, test_size = 0.2)
print("X_test", X_test.shape)
print("y_test", y_test.shape)
print("X_train", X_train.shape)
print("y_train", y_train.shape)

X_test (4128, 8)
y_test (4128,)
X_train (16512, 8)
y_train (16512,)


#各モデルの学習と予測

木のアンサンブル系のモデルと比較
1. lightGBM
2. XGboost
3. ランダムフォレスト

In [3]:
#自作モデル
#学習
T = learn(X_train, y_train)
#予測
p_hl = predict_array(X_test, X_train, y_train, T) #制度を上げるためtarindataをmodelにそのまま突っ込む

In [4]:
#lightGBM
#学習
model = lgb.LGBMRegressor(
    random_state = 42,
)

model.fit(
    X_train, 
    y_train,
    eval_set=[(X_test, y_test), (X_train, y_train)],
    verbose=-1 # 学習ログを省略
)

#予測
p_lgb = model.predict(X_test)

In [5]:
#XGboost
#学習
xgbX_train, xgbX_test, xgby_train, xgby_test = train_test_split(X_train, y_train, test_size = 0.2)
dtrain = xgb.DMatrix(xgbX_train, label=xgby_train)
dvalid = xgb.DMatrix(xgbX_test, label=xgby_test)

params = {
        'objective': 'reg:squarederror','silent':1, 'random_state':1234, 
        # 学習用の指標 (RMSE)
        'eval_metric': 'rmse',
    }
num_round = 500
watchlist = [(dtrain, 'train'), (dvalid, 'eval')]#訓練データはdtrain、評価用のテストデータはdvalidと設定
model = xgb.train(
    params,
    dtrain,#訓練データ
    num_round,#設定した学習回数
    early_stopping_rounds=20,
    evals=watchlist,
    verbose_eval=0
)

#予測
dtest = xgb.DMatrix(X_test)
p_xgb = model.predict(dtest, ntree_limit = model.best_ntree_limit)

In [6]:
#ランダムフォレスト
#学習
clf = RandomForestRegressor(random_state=0)
clf.fit(X_train, y_train)

#予測
p_rf = clf.predict(X_test)

#各モデルの比較

1.   rmse : 平均二乗偏差
2.   mse : 平均二乗誤差
3.   mae : 平均絶対誤差
4.   mape : 平均絶対誤差率
5.   r2 : 決定係数

r2は大きいほど良い。他は小さいほど良い


In [8]:
#比較
def errs(y, p):
    return [
        rmse(y, p),
        mse(y, p),
        mae(y, p),
        mape(y, p),
        r2(y, p)
    ]
pd.DataFrame(np.array([errs(y_test, p_hl), errs(y_test, p_lgb), errs(y_test, p_xgb), errs(y_test, p_rf)]),
                  columns=["rmse", "mse", "mae", "mape", "r2"],
                  index=["half life", "lightGBM", "XGboost", "Random Forest"])

Unnamed: 0,rmse,mse,mae,mape,r2
half life,0.441684,0.195085,0.269033,0.143852,0.856298
lightGBM,0.455528,0.207505,0.308097,0.175372,0.847149
XGboost,0.467212,0.218287,0.314924,0.178134,0.839208
Random Forest,0.49705,0.247059,0.329939,0.189865,0.818014
