# [2020年3月] CRBMのパラメータ学習

正常時のシステムの時系列データを学習し、学習済みパラメータを保存する。

## 必要なモジュールのimport

In [4]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
import random
from sklearn.model_selection import train_test_split
from tqdm import tqdm_notebook as tqdm

## 条件付き制限ボルツマンマシン (CRBM)

In [5]:
class CRBM(object):
    def __init__(self, input_data=None, input_history=None, n_hidden=100, input_dim=1, delay=6, A=None, B=None, W=None, hbias=None, vbias=None):

        self.input_dim = input_dim  # 入力層データの次元数
        self.n_hidden = n_hidden  # 隠れ層のユニット数
        self.delay = delay # 遡るデータ数

        if A is None: # [入力層-出力層] 間の重みの初期化
            A = np.array(np.random.uniform(low = -0.01, high = 0.01, size = (input_dim * delay, input_dim)))

        if B is None: # [入力層-隠れ層] 間の重みの初期化
            B = np.array(np.random.uniform(low = -0.01, high = 0.01, size = (input_dim * delay, n_hidden)))

        if W is None: # [出力層-隠れ層] 間の重みの初期化
            W = np.array(np.random.uniform(low = -0.01, high = 0.01, size = (input_dim, n_hidden)))

        if hbias is None: # 隠れ層のバイアスの初期化
            hbias = np.zeros(n_hidden)

        if vbias is None: # 出力層のバイアスの初期化
            vbias = np.zeros(input_dim)

        # インスタンス変数の作成
        self.input_data = input_data
        self.input_history = input_history
        self.A = A
        self.B = B
        self.W = W
        self.hbias = hbias
        self.vbias = vbias

    # シグモイド関数 (OverFlow対策有り)
    def _sigmoid(self, x):
        sigmoid_range = 34.538776394910684
        x = np.clip(x, -sigmoid_range, sigmoid_range)
        return 1.0 / (1.0 + np.exp(-x))

    # CD法 (lr:学習率, k:反復回数)
    def contrastive_divergence(self, lr=0.1, k=1, input_data=None, input_history=None):

        if input_data is not None: # 出力層のデータ
            self.input_data = input_data

        if input_history is not None: # 入力層のデータ
            self.input_history = input_history

        # 隠れ層のサンプリング
        ph_mean, ph_sample = self.sample_h_given_v(self.input_data, self.input_history)
        nh_sample = ph_mean # 期待値がサンプル結果 (本来は0か1だが期待値を用いた方が結果が良くなる)

        # ギブスサンプリングの往復
        for step in range(k):
            nv_mean, nv_sample, nh_mean, nh_sample = self.gibbs_hvh(nh_sample, self.input_history)
            nh_sample = nh_mean # 隠れ層のサンプリング結果として期待値を用いる

        # パラメータの更新
        self.vbias += lr * np.mean(self.input_data - nv_sample, axis=0)
        self.hbias += lr * np.mean(ph_sample - nh_sample, axis=0)
        self.A += lr * (self.input_history.T @ (self.input_data - nv_sample))
        self.B += lr * (self.input_history.T @ (ph_sample - nh_sample))
        self.W += lr * (self.input_data.T @ ph_sample - nv_sample.T @ nh_sample)

    # 隠れ層の期待値
    def propup(self, v, v_history):
        return self._sigmoid(v @ self.W + v_history @ self.B + self.vbias)

    # 隠れ層のサンプリング
    def sample_h_given_v(self, v0_sample, v_history):
        h1_mean = self.propup(v0_sample, v_history) # 状態値の期待値
        h1_sample = np.random.binomial(size = h1_mean.shape, n=1, p=h1_mean) # 二項分布から期待値に従ってサンプリング
        return [h1_mean, h1_sample] # [期待値, 二値]

    # 出力層の期待値
    def propdown(self, h, v_history):
        mean_activation = h @ self.W.T + v_history @ self.A + self.vbias # ガウス分布の平均値(=期待値)
        return mean_activation

    # 出力層のサンプリング
    def sample_v_given_h(self, h0_sample, v_history):
        v1_mean = self.propdown(h0_sample, v_history)
        v1_sample = v1_mean # 期待値をサンプリング結果として用いる
        return [v1_mean, v1_sample] # [期待値, 期待値]

    # ギブスサンプリング
    def gibbs_hvh(self, h0_sample, v_history):
        v1_mean, v1_sample = self.sample_v_given_h(h0_sample, v_history)
        h1_mean, h1_sample = self.sample_h_given_v(v1_sample, v_history)
        return [v1_mean, v1_sample, h1_mean, h1_sample]

    # 出力値の生成
    def recon(self, v_history, k=None, threshold=1e-3):
        v = np.zeros(self.input_dim)

        if k is None:
            k = 5

        for _ in range(k):
            h_mean, h_sample = self.sample_h_given_v(v, v_history)
            _, nv = self.sample_v_given_h(h_mean, v_history)

            if np.abs(nv - v).all() < threshold:
                break

            v = nv

        return v

    # 誤差関数
    def mean_squared_error(self, v, v_history):
        return np.sum((v - self.recon(v_history,k=5)) **2)

    # 条件付き確率 P( v | v_history ) の計算関数
    #     ※注意: データが2次元以上の場合コード修正の必要あり
    def conditional_dist(self, v, v_history):
        #第一項
        temp1 = np.exp(-0.5 * (v - self.vbias - v_history@self.A)**2) # 入力データが標準化されているので、分散σ = 1 と置いている
        #第二項
        temp2 = np.prod(1 + np.exp(self.hbias + v_history@self.B + v*self.W))
        return temp1*temp2

    # 学習用関数
    def fit(self, X_data, y_data, batch_size, epochs, validation_split, learning_rate=1e-3):

        costs = [] # 学習途中の誤差のログ
        cnt = 0
        last_cost = 0

        X_train, X_val, y_train, y_val = train_test_split(X_data,y_data,test_size=validation_split) # 学習データと検証データの分割
        val_num = X_val.shape[0]

        for i in tqdm(range(epochs+1)):

            # batch_size の数だけ学習データからサンプリング
            batch_mask = np.random.choice(X_train.shape[0], batch_size)
            X_batch = X_train[batch_mask]
            y_batch = y_train[batch_mask]

            # CD法による学習
            self.contrastive_divergence(lr=learning_rate, k=5, input_data=y_batch, input_history=X_batch)

            if i%100==0:
              # 誤差のログの追加
              cost = self.mean_squared_error(y_val, X_val)/val_num
              costs.append(cost)

              # validationデータに対する誤差が5回連続で増えると打ち切り
              cnt = cnt+1 if cost > last_cost else 0
              last_cost = cost
              if cnt >= 5:
                break
              print("{}epoch cost={}".format(i+1, cost))

        # 誤差の表示
        plt.plot(costs)
        plt.show()

    def param(self):
        return self.A, self.B, self.W, self.hbias, self.vbias

### (前処理済み) 学習データの読み込み

In [6]:
# ドライブのマウント
from google.colab import drive
drive.mount('/content/drive')

training_data_X = np.loadtxt('/content/drive/MyDrive/research/training_data_X.csv', delimiter=',')
training_data_y = np.loadtxt('/content/drive/MyDrive/research/training_data_y.csv', delimiter=',')

Mounted at /content/drive


FileNotFoundError: /content/drive/MyDrive/research/training_data_X.csv not found.

In [None]:
# データの整形
size, delay = training_data_X.shape
training_data_y = training_data_y.reshape(size,1)
print(delay)

### CRBMの学習

モデルパラメータ


*   n_hidden : 隠れ層のユニット数
*   input_dim : 入力データの次元数



学習パラメータ
*   batch_size : バッチサイズ
*   epochs : エポック数
*   validation_split : データ中の検証データの比率
*   learning_rate : 学習率




In [None]:
crbm = CRBM(input_data=None, input_history=None, n_hidden=10, input_dim=1, delay=delay)
crbm.fit(training_data_X, training_data_y, batch_size=100, epochs=30000, validation_split = 0.001, learning_rate=1e-5)

### 学習済みパラメータの保存

In [None]:
# 保存先パス
%cd /content/drive/My Drive/research

# パラメータ
A, B, W, hbias, vbias = crbm.param()

# 保存
np.savetxt('./crbm_param_A.csv', A, delimiter=',')
np.savetxt('./crbm_param_B.csv', B, delimiter=',')
np.savetxt('./crbm_param_W.csv', W, delimiter=',')
np.savetxt('./crbm_param_hbias.csv', hbias, delimiter=',')
np.savetxt('./crbm_param_vbias.csv', vbias, delimiter=',')

In [None]:
seq_len = 200 - 75 - delay # シーケンスの長さ
seq_num = size / seq_len # シーケンスの個数

fig = plt.figure()

#rand_seq = np.random.randint(0, seq_num) # ランダムなシーケンス番号
rand_seq = 3
seq_X = training_data_X[rand_seq*seq_len : (rand_seq+1)*seq_len]
seq_y = training_data_y[rand_seq*seq_len : (rand_seq+1)*seq_len]
log_eval = []
log_ans = []
for i in range(seq_len):
  c_dist = crbm.recon(seq_X[i],5)
  log_eval.append(c_dist)
  log_ans.append(seq_y[i])

plt.plot(log_ans)
plt.plot(log_eval)

In [None]:
array = np.random.rand(2, 20)
print(array)
print(array.shape)

In [None]:
array.sum(axis = 1)