In [1]:
import tensorflow as tf
import numpy as np
from tensorflow import keras 

# NumpyのようなTensor

In [2]:
tf.constant([[1., 2., 3.], [4., 5., 6.]])

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [3]:
tf.constant(42)

<tf.Tensor: shape=(), dtype=int32, numpy=42>

In [4]:
t = tf.constant([[1., 2., 3.], [4., 5., 6.]])
t.shape

TensorShape([2, 3])

In [5]:
t.dtype

tf.float32

In [6]:
t[:, 1:]

<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[2., 3.],
       [5., 6.]], dtype=float32)>

In [7]:
t[..., 1, tf.newaxis]

<tf.Tensor: shape=(2, 1), dtype=float32, numpy=
array([[2.],
       [5.]], dtype=float32)>

In [8]:
t + 10

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[11., 12., 13.],
       [14., 15., 16.]], dtype=float32)>

In [9]:
tf.square(t)

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[ 1.,  4.,  9.],
       [16., 25., 36.]], dtype=float32)>

In [10]:
t @ tf.transpose(t)

<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[14., 32.],
       [32., 77.]], dtype=float32)>

In [11]:
a = np.array([2., 4., 5.])
tf.constant(a)

<tf.Tensor: shape=(3,), dtype=float64, numpy=array([2., 4., 5.])>

In [12]:
t.numpy()

array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)

In [13]:
tf.square(a)

<tf.Tensor: shape=(3,), dtype=float64, numpy=array([ 4., 16., 25.])>

In [14]:
# 型変換
t2 = tf.constant(40., dtype=tf.float64)
tf.constant(2.0) + tf.cast(t2, tf.float32)

<tf.Tensor: shape=(), dtype=float32, numpy=42.0>

# カスタム損失関数

- モデルをロードするときは、名前をオブジェクトにマッピングする辞書が必要。

```python
model = keras.models.load_model("my_model_with_a_custom_loss.h5",
                               custom_objects={"huber_fn": huber_fn})
```

In [15]:
def huber_fn(y_true, y_pred):
    error = y_true - y_pred
    is_small_error = tf.abs(error) < 1
    squared_loss = tf.square(error) / 2
    linear_loss = tf.abs(error) - 0.5
    return tf.where(is_small_error, squared_loss, linear_loss)

# model.complile(loss=huber_fn, optimizer="nadam")...

In [16]:
# 閾値を変更できるカスタム関数
def create_huber(threshold=1.0):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss = threshold * tf.abs(error) - threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn

# model.complile(loss=create_huber(2.0), optimizer="nadam")...

# モデルロード時に閾値を設定しなければいけない
# model = keras.models.load_model("my_model_with_a_custom_loss.h5",
#                                custom_objects={"huber_fn": create_huber(2.0)})

# これの対処
class HuberLoss(keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)
    def call(self, y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss = threshold * tf.abs(error) - threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    def get_config(self):
        """
        個々のハイパーパラメータ名を値にマッピングする辞書を返す。
        モデル保存時、HDF5ファイルにJSOnとして設定を保存する。
        """
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}
    
# コンパイル
# model.complile(loss=HuberLoss(2.), optimizer="nadam")...

# モデルロード(値ごとロードしてくれる)
# model = keras.models.load_model("my_model_with_a_custom_loss.h5",
#                                custom_objects={"HuberLoss": HuberLoss})

# カスタム活性化関数、初期化子、正則化器、制約

In [17]:
def my_softplus(z):
    return tf.math.log(tf.exp(z) + 1.0)

def my_glorot_initalizer(shape, dtype=tf.float32):
    stddev = tf.sqrt(2. / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)

def my_l1_regularizer(weights):
    return tf.reduce_sum(tf.abs(0.01 * weights))

def my_positive_weights(weights):
    return tf.where(weights < 0., tf.zeros_loke(weights), weights)

layer = keras.layers.Dense(30, activation=my_softplus,
                          kernel_initializer=my_glorot_initalizer,
                          kernel_regularizer=my_l1_regularizer,
                          kernel_constraint=my_positive_weights
                          )

# カスタム指標
- 適合率
    - モデルが最初のバッチで5個のインスタンスを陽性として予測し、そのうち4個が正しかったすると、適合率は80%。
    - 第２のバッチで3個のインスタンスを陽性と予測したものの、全て誤っていた場合、第２バッチの適合率は0%。
    - 適合率の平均を取れば40%だが、モデル2個のバッチに対する適合率ではなく、8個の要請予測のうち4個の真陽性があったので、全体としての適合率は50%となる。
    - このようにバッチごとに少しずつ更新されるため、この指標を「**ストリーミング指標**」と呼ぶ。

In [18]:
precision = keras.metrics.Precision()
precision([0, 1, 1, 1, 0, 1, 0, 1], [1, 1, 0, 1, 0, 1, 0, 1])

<tf.Tensor: shape=(), dtype=float32, numpy=0.8>

In [19]:
precision([0, 1, 0, 0, 1, 0, 1, 1], [1, 0, 1, 1, 0, 0, 0, 0])

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In [20]:
# 現時点での指標の値
precision.result()

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In [21]:
# 変数を確認
precision.variables

[<tf.Variable 'true_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>,
 <tf.Variable 'false_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>]

In [22]:
# 変数をリセット
precision.reset_states()

In [23]:
# 全体としてのフーバー損失とそれまでに処理したインスタンス数を管理する単純な例。
class HuberMetric(keras.metrics.Metric):
    def __init__(self, threshold=1.0, **keargs):
        super().__init__(**kwargs)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        self.total = self.add_weight("total", initializer="zeros")
        self.count = self.add_weight("count", initializer="zeros")
    def update_state(self, y_true, y_pred, sample_weight=None):
        """
        このクラスのインスタンスを関数として使用した時に呼び出される。
        """
        metric = self.huber_fn(y_true, y_pred)
        self.total.adssign_add(tf.reduce_sum(metric))
        self.count.adssign_add(tf.cast(tf.size(y_true), tf.float32))
    def result(self):
        """
        最終結果を計算して返す。
        全インスタンスの平均フーバー損失。
        """
        return self.total / self.count
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

# カスタムレイヤ

- 最も単純なもの
```python
exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x))
```

In [24]:
class MyDense(keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.activation = keras.activation.get(activation)
    def build(self, batch_input_shape):
        self.kernel = self.add_weight(
            name="kernel", shape=[batch_input_shape[-1], self.units],
            initializer="glorot_normal"
        )
        self.bias = self.add_weight(
            name="bias", shape=[self.units], initializer="zeros"
        )
        super().build(batch_input_shape)
    def call(self, X):
        return self.activation(X @ self.kenel + self.bias)
    def compute_output_shape(self, batch_input_shape):
        """
        レイヤの出力の形を返す。
        """
        return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units])
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "units": self.units, 
                "activation": keras.activation.serialize(self.activation)}

# カスタムモデル

In [25]:
class ResidualBlock(keras.layers.Layer):
    def __init__(self, n_layers, n_neurons, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [keras.layers.Dense(n_neurons, activation="elu",
                                         kenel_initializer="he_normal"
                                         )
                      for _ in range(n_layers)
                      ]
    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        return inputs + Z
    
class ResidualRegressor(keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden1 = keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal")
        self.block1 = ResidualBlock(2, 30)
        self.block2 = ResidualBlock(2, 30)
        self.out = keras.layers.Dense(output_dim)
        
    def call(self, inputs):
        Z = self.hidden1(inputs)
        for _ in rangeg(1 + 3):
            Z = self.block1(Z)
        Z = self.block2(Z)
        return self.out(Z)

# 自動微分

In [26]:
def f(w1, w2):
    return 3 * w1 ** 2 + 2 * w1 * w2

w1, w2 = 5, 3
eps = 1e-6
(f(w1 + eps, w2) - f(w1, w2)) / eps

36.000003007075065

In [27]:
(f(w1, w2 + eps) - f(w1, w2)) / eps

10.000000003174137

In [28]:
w1, w2 = tf.Variable(5.), tf.Variable(3.)
with tf.GradientTape() as tape:
    z = f(w1, w2)
gradients = tape.gradient(z, [w1, w2])

In [29]:
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]