In [25]:
import tensorflow as tf
from tensorflow import keras
import numpy as np

#   テンソルの作り方 : tf.Tensorはimmutable
tf.constant([[1.,2.,3.],[4.,5.,6.]])    #   2*3行列
tf.constant(42)                         #   スカラー

t = tf.constant([[1.,2.,3.],[4.,5.,6.]])
t.shape #   TensorShape([2, 3])
t.dtype #   tf.float32

t[:, 1:]    #   numpyと同様、[[2.,3.],[5.,6.]]
t[..., 1, tf.newaxis]   #   [[2.],[5.]]
t + 10  #   [[11., 12., 13.], [14., 15., 16.]]
tf.square(t)    #   [[ 1.,  4.,  9.],[16., 25., 36.]]
t @ tf.transpose(t) #   tとtの転置の行列積

#   テンソルとNumpy
#   デフォルトでは、numpyは64bit精度、tensorflowは32bit精度
a = np.array([2.,4.,5.])
tf.constant(a)
t.numpy()   #   np.array(t)
np.square(t)

#   tensorflowは暗黙の型変換をサポートしない(処理性能を下げないためのよう)
tf.constant(3.) + tf.constant(40)                   #   error
tf.constant(3.) + tf.constant(40, dtype=tf.float32) #   OK

t2 = tf.constant(40)
tf.constant(3.) + tf.cast(t2, tf.float32)   #   OK : cast


<tf.Tensor: shape=(), dtype=float32, numpy=43.0>

In [32]:
#   変数 : tf.Tensorと同様にふるまう
v = tf.Variable([[1.,2.,3.],[4.,5.,6.]])

#   tf.Tensorと異なり値を書き換えられる
v.assign(2*v)       #   [[ 2.,  4.,  6.],[ 8., 10., 12.]]
v[0,1].assign(42)   #   [[ 2., 42.,  6.],[ 8., 10., 12.]]
v[:,2].assign([0.,1.])  #   [[ 2., 42.,  0.],[ 8., 10.,  1.]]
v.scatter_nd_update(indices=[[0,0], [1,2]], updates=[100.,200.])    #   [[100.,  42.,   0.],[  8.,  10., 200.]]

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[100.,  42.,   0.],
       [  8.,  10., 200.]], dtype=float32)>

In [34]:
#   カスタム損失関数
def huber_fn(y_true, y_pred):   #   y_true : ラベル, y_pred : 予測
    error = y_true - y_pred
    is_small_error = tf.abs(error) < 1
    squared_loss = tf.square(error) / 2
    linear_loss = tf.abs(error) - 0.5
    return tf.where(is_small_error, squared_loss, linear_loss)
model.compile(loss=huber_fn, optimizer="nadam")
model.fit(X_train, y_train, [...])

#   上のようなカスタムコンポーネントを含むモデルの保存とロード
model = keras.models.load_model("my_model_with_a_custom_loss.h5", custom_objects={"huber_fn": huber_fn})    #   ロードのために辞書が必要

#   閾値を変えたいとき
def create_huber(threshold=1.0):
    def huber_fn(y_true, y_pred):   #   y_true : ラベル, y_pred : 予測
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss = threshold * tf.abs(error) - threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn

model.compile(loss=create_huber(2.0), optimizer="nadam")
model = keras.models.load_model("my_model_with_a_custom_loss_threshold_2.h5",
                                custom_objects={"huber_fn": create_huber(2.0)}) #   modelの保存時、thresholdが保存されないので、ロード時に指定する必要がある


#   keras.losses.Lossのサブクラスを作りget_configを利用することでロード時の値の設定は必要なくなる(オーバーライド関数を編集して引数としてオブジェクトを渡すだけでよい)
class HuberLoss(keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super.__init__(**kwargs)    #   標準ハイパーパラメータを処理
    def call(self, y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < self.threshold
        squared_loss = tf.square(error) / 2
        linear_loss = self.threshold * tf.abs(error) - self.threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    def get_config(self):
        base_config = super().get_config()  #   ハイパーパラメータを持つ辞書を取得
        return {**base_config, "threshold": self.threshold} #   thresholdを追加した辞書を返す
    
model.compile(loss=HuberLoss(2.), optimizer="nadam")
model = keras.models.load_model("my_model_with_a_custom_loss_class.h5", custom_objects={"HuberLoss": HuberLoss})

#   一般的なカスタムコンポーネント
#   カスタム活性化関数
def my_softplus(z):
    return tf.math.log(tf.exp(z) + 1.0)

#   カスタム初期化子
def my_glorot_initializer(shape, dtype=tf.float32):
    stddev = tf.sqrt(1. / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)

#   カスタム正則化器
def my_l1_regularizer(weights):
    return tf.reduce_sum(tf.abs(0.01 * weights))    #   l1ノルム

#   カスタム制約
def my_positive_weights(weights):
    return tf.where(weights < 0., tf.zero_like(weights), weights)

layer = keras.layers.Dense(30, activation=my_softplus,
                           kernel_initializer=my_glorot_initializer,
                           kernel_regularizer=my_l1_regularizer,
                           kernel_constraint=my_positive_weights)

#   ハイパーパラメータを持つカスタムコンポーネントの定義にはサブクラス化を利用
class MyL1Regularizer(keras.regularizers.Regularizer):
    def __init__(self, factor):
        self.factor = factor
    def __call__(self, weights):    #   正則化器、初期化子、制約ではcall()ではなく__call__メソッドの定義が必要
        return tf.reduce_sum(tf.abs(self.factor * weights))
    def get_config(self):
        return {"factor": self.factor}
    
#   カスタム指標
#   単に指標の平均を得たい場合
model.compile(loss="mse", optimizer="nadam", metrics=[create_huber(2.0)])

#   ストリーミング指標(全体の指標を逐一再計算する.正しい指標が得られる)
#   Precisionオブジェクトはストリーミング指標をを計算している好例
precision = keras.model.Precision()
precision([0,1,1,1,0,1,0,1],[1,1,0,1,0,1,0,1])  #バッチのラベルと予測を渡す
precision([0,1,0,0,1,0,1,1],[1,0,1,1,0,0,0,0])
precision.result()  #   現時点での指標
precision.valiables #   オブジェクトが管理する変数(指標計算に用いる変数)
precision.reset_states()    #   変数をリセット

#   ストリーミング指標を得たいときはkeras.metrics.Metricをサブクラス化
class HuberMetric(keras.metrrics.Metric):
    def __init__(self, threshold=1.0, **kwargs):
        super().__init__(**kwargs)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        self.total = self.add_weight("total", initializer="zeros")  #   変数を作る
        self.count = self.add_weight("count", initializer="zeros")
    def update_state(self, y_true, y_pred, sample_weight=None):
        metric = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(metric))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
    def result(self):
        return self.total / self.count
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}
    #   reset_states()はデフォルトでは変数を0にリセットするが、必要ならオーバーライドできる
    
#   カスタムレイヤ
#   kers.layers.Flatttenのような重みをもたないカスタムレイヤを作りたいときはkeras.layers.Lambdaを利用
exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x))

#   重みをもつレイヤを作りたい
class MyDense(keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        super().__init__(**kwargs)
        self.units = units  #   ニューロンの数
        self.activation = keras.activations.get(activation)
        
    def build(self, batch_input_shape): #   レイヤが初めて使用されるときに呼び出される
        self.kernel = self.add_weight(  #   接続重み行列
            name="kernel", shape=[batch_input_shape[-1], self.units],
            initializer="glorot_normal"
        )
        self.bias = self.add_weight(    #   バイアス項
            name="bias", shape=[self.units], initializer="zeros"
        )
        super().build(batch_input_shape)
        
    def call(self, X):
        return self.activation(X @ self.kernel + self.bias) #   レイヤの出力。Xは入力
    
    def compute_output_shape(self, batch_input_shape):  #   レイヤの出力のshape。省略できる
        return tf.TensorShape(batch_input_shape.as_list()[:,-1] + [self.units]) #   as_list()によってPythonリストに変換できる
    
    def get_config(self):   #   他のカスタムクラスと同様、設定を保存する
        base_config = super().get_config()
        return {**base_config, "units": self.units,
                "activation": keras.activations.serialize(self.activation)}
        
#   複数入力(concatenate)を持つレイヤを作りたい
class MyMultiLayer(keras.layers.layer):
    def call(self, X):  #   Xはすべての入力を含むtuple
        X1, X2 = X
        return [X1 + X2, X1 * X2, X1 / X2]
    def compute_output_shape(self, batch_input_shape):
        b1, b2 = batch_input_shape
        return [b1, b1, b1]
    
#   BN層やDropout層など、訓練中とテスト中でレイヤのふるまいを変えたいとき、call()にtraining引数を追加する
class MyGaussianNoise(keras.layers.layer):
    def __init__(self, stddev, **kwargs):
        super().__init__(**kwargs)
        self.stddev = stddev
    
    def call(self, X ,training=None):
        if training:
            noise = tf.random.normal(tf.shape(X), stddev=self.stddev)
            return X + noise
        else:
            return X
    
    def compute_output_shape(self, batch_input_shape):
        return batch_input_shape
    
#   カスタムモデル
#   まずカスタムレイヤを定義
class ResidualBlock(keras.layers.layer):
    def __init__(self, n_layers, n_neurons, **kwargs):
        super().__init__(**kwargs)
        self.hiddens = [keras.layers.Dense(n_neurons, activation="elu", kernel_initializer="he_normal") for _ in range(n_layers)]
    
    def call(self, inputs):
        Z = inputs
        for layer in self.hiddens:
            Z = layer(Z)
        return inputs + Z
    
class ResidualRegressor(keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden1 = keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal")
        self.block1 = ResidualBlock(2, 30)
        self.block2 = ResidualBlock(2, 30)
        self.out = keras.layers.Dense(output_dim)
        
    def call(self, inputs):
        Z = self.hidden1(inputs)
        for _ in range(1 + 3):
            Z = self.block1(Z)
        Z = self.block2(Z)
        Z = self.out(Z)
        return Z
    
#   再構築ロス : 隠れ層の重みなどの補助出力を行う
#   再構築ロスの計算を含むカスタムモデル
class ReconstructingRegressor(keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal") for _ in range(5)]
        self.out = keras.layers.Dense(output_dim)
        
    def build(self, batch_input_shape):
        n_inputs = batch_input_shape[-1]
        self.reconstruct = keras.layers.Dense(n_inputs)
        super().build(batch_input_shape)
        
    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        reconstruction = self.reconstruct(Z)
        recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs))   #   再構築データと入力との平均二乗誤差
        self.add_loss(0.05 * recon_loss)    #   損失リストに含める
        return self.out(Z)

<tf.Tensor: shape=(2, 1), dtype=int64, numpy=
array([[0],
       [2]], dtype=int64)>

In [22]:
#   自動微分
def f(w1, w2):
    return 3*w1**2 + 2*w1*w2

w1, w2 = tf.Variable(5.), tf.Variable(3.)
with tf.GradientTape() as tape:
    z = f(w1, w2)
    
gradients = tape.gradient(z, [w1, w2])  #   w1, w2で偏微分した結果をtf.Tensor型の1*2行列を出力
#   tape.gradient(z, [w1, w2])  #   error! tapeはgradient()が呼ばれた直後に自動で消去される

with tf.GradientTape(persistent=True) as tape:  #   tapeは永続化させられる
    z = f(w1, w2)
dz_dw1 = tape.gradient(z, w1)
dz_dw2 = tape.gradient(z, w2)
del tape

c1, c2 = tf.constant(5.), tf.constant(3.)
with tf.GradientTape() as tape:
    z = f(c1, c2)
gradients = tape.gradient(z, [c1, c2])  #   変数以外のとき勾配を計算するとNoneが出力される

with tf.GradientTape() as tape:
    tape.watch(c1)  #   任意のテンソルを監視するよう指示すると、変数であるかのようにふるまわせることができる
    tape.watch(c2)
    z = f(c1, c2)

gradients = tape.gradient(z, [c1, c2])  #   [tensor 36., tensor 10.]を出力

def g(w1, w2):
    return 3*w1**2 + tf.stop_gradient(2*w1*w2)  #   stop_gradient()により定数としてふるまうので、w1で偏微分すると6w1となるイメージ
with tf.GradientTape() as tape:
    z = g(w1, w2)
gradients = tape.gradient(z, [w1, w2])  #   [tensor 30., tensor None]

x = tf.Variable(100.)
with tf.GradientTape() as tape:
    z = my_softplus(x)
tape.gradient(z, [x])    #   [NaN] 自動微分では計算不可な場合がある。

#   上の問題の対策として、関数をデコレートして通常の出力に加えて導関数を返すように編集できる
@tf.custom_gradient
def my_better_softplus(z):
    exp = tf.exp(z)
    def my_softplus_gradients(grad):
        return grad / (1 + 1 / exp)
    return tf.math.log(exp + 1), my_softplus_gradients

x = tf.Variable(0.)
with tf.GradientTape() as tape:
    z = my_better_softplus(x)
tape.gradient(z, [x])    #   0.5となり計算ができるようになる




[<tf.Tensor: shape=(), dtype=float32, numpy=0.5>]

In [None]:
#   カスタム訓練ループ : fit()を使わず実装する戦略
l2_reg = keras.regularizers.l2(0.05)
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal", kernel_regularizer=l2_reg),
    keras.layers.Dense(1, kernel_regularizer=l2_reg)
])

def random_batch(X, y, batch_size=32):
    idx = np.random.randint(len(X), size=batch_size)
    return X[idx], y[idx]

def print_status_bar(iteration, total, loss, metrics=None):
    metrics = " - ".join(["{}: {:.4f}".format(m.name, m.result())
                          for m in [loss] + (metrics or [])])
    end = "" if iteration < total else "\n"
    print("\r{}/{} - ".format(iteration, total) + metrics, end=end)

n_epochs = 5
batch_size = 32
n_steps = len(X_train) // batch_size    #   整数除算
optimizer = keras.optimizers.Nadam(lr=0.01)
loss_fn = keras.losses.mean_squared_error
mean_loss = keras.metrics.Mean()
metrics = [keras.metrics.MeanAbsoluteError()]

for epoch in range(1, n_epochs + 1):    #   epochループ
    print("Epoch {}/{}".format(epoch, n_epochs))
    for step in range(1, n_steps + 1):  #   epoch内のバッチループ
        X_batch, y_batch = random_batch(X_train_scaled, y_train)    #   バッチを抽出
        with tf.GradientTape() as tape:
            y_pred = model(X_batch, training=True)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses) #   正則化ロス
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))    #   tapeによって計算した勾配をoptimizerに適用
        for variable in model.variables:    #   重みに制約を加える
            if variable.constraint is not None:
                variable.assign(variable.constraint(variable))
        mean_loss(loss) #   平均損失の更新
        for metric in metrics:  #   指標の更新
            metric(y_batch, y_pred)
        print_status_bar(step * batch_size, len(y_train), mean_loss, metrics)
    print_status_bar(len(y_train), len(y_train), mean_loss, metrics)
    for metric in [mean_loss] + metrics:    #   リセットして次のepochへ
        metric.reset_states()


In [23]:
#   tensorflow関数とグラフ
#   tensorflow関数は計算グラフを効率よく実行するのでpython関数より高速。複雑な計算には有効。
def cube(x):
    return x**3
tf_cube = tf.function(cube) #   python関数をtensorflow関数に変換
#   カスタムコンポーネントをkerasで使用したときは自動的にtensorflow関数に変換されるので、tf.functionを使う必要がない
#   カスタムコンポーネントを作るときにdynamic=Trueを指定するか、compile()時にrun_eagerly=Trueを指定することでこの変換を防げる

#   tensorflow関数の注意
#   外部ライブラリはグラフの一部にはならないので、np.sum()よりもtf.reduce_sum()、sorted()よりtf.sort()を使うべき
#   tensorflowはテンソルかデータセットを処理するfor文しか捕捉しないので、for i in range()よりfor i in tf.range()



<tensorflow.python.eager.def_function.Function at 0x27f8a78e0e0>