<a href="https://colab.research.google.com/github/chiyeon01/Hands_On_Machine_Learning/blob/main/12.%20%ED%85%90%EC%84%9C%ED%94%8C%EB%A1%9C%EB%A5%BC_%EC%82%AC%EC%9A%A9%ED%95%9C_%EC%82%AC%EC%9A%A9%EC%9E%90_%EC%A0%95%EC%9D%98_%EB%AA%A8%EB%8D%B8%EA%B3%BC_%ED%9B%88%EB%A0%A8.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 넘파이처럼 텐서플로 사용하기

## 텐서와 연산

In [1]:
import tensorflow as tf

# tf.constant()로 텐서 생성.
t = tf.constant([[1., 2., 3.], [4., 5., 6.]])

In [2]:
t

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [3]:
t.shape

TensorShape([2, 3])

In [4]:
t.dtype

tf.float32

In [5]:
t[:, 1:]

<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[2., 3.],
       [5., 6.]], dtype=float32)>

In [6]:
t[..., 1, tf.newaxis]

<tf.Tensor: shape=(2, 1), dtype=float32, numpy=
array([[2.],
       [5.]], dtype=float32)>

In [7]:
t + 10

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[11., 12., 13.],
       [14., 15., 16.]], dtype=float32)>

In [8]:
tf.square(t)

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[ 1.,  4.,  9.],
       [16., 25., 36.]], dtype=float32)>

In [9]:
t @ tf.transpose(t)

<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[14., 32.],
       [32., 77.]], dtype=float32)>

In [10]:
tf.constant(42)

<tf.Tensor: shape=(), dtype=int32, numpy=42>

## 텐서와 넘파이

In [11]:
import numpy as np

a = np.array([2., 4., 5.])
tf.constant(a)

<tf.Tensor: shape=(3,), dtype=float64, numpy=array([2., 4., 5.])>

In [None]:
t.numpy() # 또는 np.array(t)

array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)

In [None]:
tf.square(a)

<tf.Tensor: shape=(3,), dtype=float64, numpy=array([ 4., 16., 25.])>

In [None]:
np.square(t)

array([[ 1.,  4.,  9.],
       [16., 25., 36.]], dtype=float32)

## 타입 변환

In [None]:
# 타입이 같아야 함.
tf.constant(2.) + tf.constant(40)

InvalidArgumentError: cannot compute AddV2 as input #1(zero-based) was expected to be a float tensor but is a int32 tensor [Op:AddV2] name: 

In [None]:
# 비트가 같아야 함.
tf.constant(2.) + tf.constant(40, dtype=tf.float64)

InvalidArgumentError: cannot compute AddV2 as input #1(zero-based) was expected to be a float tensor but is a double tensor [Op:AddV2] name: 

In [None]:
# tf.cast()로 타입 변경
t2 = tf.constant(40, dtype=tf.float64)
tf.constant(2.0) + tf.cast(t2, tf.float32)

<tf.Tensor: shape=(), dtype=float32, numpy=42.0>

## 변수

In [None]:
# 변경 가능한 텐서
v = tf.Variable([[1., 2., 3.], [4., 5., 6.]])
v

<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [None]:
v.assign(2 * v)
print(v)
v[0, 1].assign(42)
print(v)
v[:, 2].assign([0., 1.])
print(v)
v.scatter_nd_update(
    indices=[[0, 0], [1, 2]], updates=[100., 200.]
)
print(v)

<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[ 2.,  4.,  6.],
       [ 8., 10., 12.]], dtype=float32)>
<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42.,  6.],
       [ 8., 10., 12.]], dtype=float32)>
<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42.,  0.],
       [ 8., 10.,  1.]], dtype=float32)>
<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[100.,  42.,   0.],
       [  8.,  10., 200.]], dtype=float32)>


In [None]:
# 직접 수정은 불가능
v[1] = [7., 8., 9.]

TypeError: 'ResourceVariable' object does not support item assignment

# 사용자 정의 모델과 훈련 알고리즘

## 사용자 정의 손실 함수

In [None]:
def huber_fn(y_true, y_pred):
    error = y_true - y_pred
    is_small_error = tf.abs(error) < 1
    squared_error = tf.square(error) / 2
    linear_loss = tf.abs(error) - 0.5
    return tf.where(is_small_error, squared_error, linear_loss)

In [None]:
# 아래와 같이 모델에 컴파일 가능.
model.compile(loss=huber_fn, optimizer="nadam")
model.fit(X_train, y_train, [...])

## 사용자 정의 요소를 가진 모델을 저장하고 로드하기

In [None]:
model = tf.keras.models.load_model("my_model_with_a_custom_loss",
                                   custom_objects={"huber_fn": huber_fn})

In [None]:
def create_huber(threshold=1.0):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < 1
        squared_error = tf.square(error) / 2
        linear_loss = tf.abs(error) - 0.5
        return tf.where(is_small_error, squared_error, linear_loss)
    return huber_fn

model.compile(loss=create_huber(2.0), optimizer="nadam")

In [None]:
model = tf.keras.models.load_model(
    "my_model_with_a_custom_loss_threshold_2",
    custom_objects={"huber_fn": create_huber(2.0)}
)

In [None]:
class HuberLoss(tf.keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwards):
        self.threshold = threshold
        super().__init__(**kwards)

    def call(self, y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < 1
        squared_error = tf.square(error) / 2
        linear_loss = tf.abs(error) - 0.5
        return tf.where(is_small_error, squared_error, linear_loss)

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

In [None]:
# 이렇게 되면 threhold값이 자동으로 저장됨.
model.compile(loss=HuberLoss(2.), optimizer="nadam")

In [None]:
model = tf.keras.models.load_model("my_model_with_a_custom_loss_class",
                                   custom_objects={"HuberLoss": HuberLoss})

## 활성화 함수, 초기화, 규제, 제한을 커스터마이징하기

In [None]:
def my_softplus(z):
    return tf.math.log(1.0 + tf.exp(z))

def my_glorot_initializer(shape, dtype=tf.float32):
    stddev = tf.sqrt(2. / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)

def my_l1_regularizer(weights):
    return tf.reduce_sum(tf.abs(0.01 * weights))

def my_positive_weights(weights):
    return tf.where(weights < 0., tf.zeros_like(weights), weights)

In [None]:
layer = tf.keras.layers.Dense(1, activation=my_softplus,
                              kernel_initializer=my_glorot_initializer,
                              kernel_regularizer=my_l1_regularizer,
                              kernel_constraint=my_positive_weights)

In [None]:
class MyL1Regularizer(tf.keras.regularizers.Regularizer):
    def __init__(self, factor):
        self.factor = factor

    def __call__(self, weights):
        return tf.reduce_sum(tf.abs(self.factor * weights))

    def get_config(self):
        return {"factor": self.factor}

## 사용자 정의 지표

In [None]:
model.compile(loss="mse", optimizer="nadam", metrics=[create_huber(2.0)])

In [None]:
# 단순 평균이 아닌 스트리밍 지표
precision = tf.keras.metrics.Precision()
precision([0, 1, 1, 1, 0, 1, 0, 1], [1, 1, 0, 1, 0, 1, 0, 1])

<tf.Tensor: shape=(), dtype=float32, numpy=0.800000011920929>

In [None]:
precision([0, 1, 0, 0, 1, 0, 1, 1], [1, 0, 1, 1, 0, 0, 0, 0])

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In [None]:
precision.result()

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In [None]:
precision.variables

[<Variable path=precision_2/true_positives, shape=(1,), dtype=float32, value=[4.]>,
 <Variable path=precision_2/false_positives, shape=(1,), dtype=float32, value=[4.]>]

In [None]:
precision.reset_state()

In [None]:
class HuberMetric(tf.keras.metrics.Metric):
    def __init__(self, threshold=1.0, **kwards):
        super().__init__(**kwards)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        self.total = self.add_weight("total", initializer="zeros")
        self.count = self.add_weight("count", initializer="zeros")

    def update_state(self, y_true, y_pred, sample_weight=None):
        sample_metrics = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(sample_metrics))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))

    def result(self):
        return self.total / self.count

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

## 사용자 정의 층

In [None]:
exponential_layer = tf.keras.layers.Lambda(lambda x: tf.exp(x))

In [None]:
class MyDense(tf.keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.activation = tf.keras.activations.get(activation)

    def build(self, batch_input_shape):
        self.kernel = self.add_weight(
            name="kernel", shape=[batch_input_shape[-1], self.units],
            initializer="glorot_normal"
        )
        self.bias = self.add_weight(
            name="bias", shape=[self.units], initializer="zeros"
        )

    def call(self, X):
        return self.activation(X @ self.kernel + self.bias)

    def get_config(self):
        base_config = super().get_config()
        return {
            **base_config, "units": self.units,
            "activation": tf.keras.activations.serialize(self.activation)
        }

In [None]:
...

## 사용자 정의 모델

In [None]:
# 사용자 정의 층
class ResidualBlock(tf.keras.layers.Layer):
    def __init__(self, n_layers, n_neurons, **kwargs):
        super().__init__()
        self.hidden = [tf.keras.layers.Dense(n_neurons, activation="relu",
                                             kernel_initializer="he_normal")
        for _ in range(n_layers)]

    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        return inputs + Z

In [None]:
# 사용자 정의 모델
class ResidualRegressor(tf.keras.models.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__()
        self.hidden1 = tf.keras.layers.Dense(30, activation="relu",
                                             kernel_initializer="he_normal")
        self.block1 = ResidualBlock(2, 30)
        self.block2 = ResidualBlock(2, 30)
        self.out = tf.keras.layers.Dense(output_dim)

    def call(self, inputs):
        Z = self.hidden1(inputs)

        for _ in range(1 + 3):
            Z = self.block1(Z)
        Z = self.block2(Z)

        return self.out(Z)

## 모델 구성 요소에 기반한 손실과 지표

In [12]:
class ReconstructRegressor(tf.keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__()
        self.hidden = [tf.keras.layers.Dense(30, activation="relu",
                                             kernel_initializer="he_normal")]
        self.out = tf.keras.layers.Dense(output_dim)
        self.reconstruction_mean = tf.keras.metrics.Mean(
            name="reconstruction_error"
        )

    def build(self, batch_input_shape):
        n_inputs = batch_input_shape[-1]
        self.reconstruct = tf.keras.layers.Dense(n_inputs)

    def call(self, inputs, training=False):
        Z = inputs

        for layer in self.hidden:
            Z = layer(Z)

        reconstruction = self.reconstruct(Z)
        recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs))
        self.add_loss(0.05 * recon_loss) # 손실 리스트에 추가

        if training:
            result = self.reconstruction_mean(recon_loss)
            self.add_metric(result) # 훈련중에 call() 메서드가 지표 재구성

        return self.out(Z)

## 자동 미분으로 그레디언트 계산하기

In [13]:
def f(w1, w2):
    return 3 * w1 ** 2 + 2 * w1 * w2

In [16]:
w1, w2 = 5, 3
eps = 1e-6
(f(w1 + eps, w2) - f(w1, w2)) / eps

36.000003007075065

In [19]:
(f(w1, w2 + eps) - f(w1, w2)) / eps

10.000000003174137

In [22]:
w1, w2 = tf.Variable(5.), tf.Variable(3.)

# tensorflow의 후진 자동 미분
with tf.GradientTape() as tape:
    z = f(w1, w2)

gradients = tape.gradient(z, [w1, w2])

In [23]:
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

In [24]:
w1, w2 = tf.Variable(5.), tf.Variable(3.)

# tensorflow의 후진 자동 미분
with tf.GradientTape() as tape:
    z = f(w1, w2)

gradients = tape.gradient(z, [w1, w2])
gradients = tape.gradient(z, [w1, w2]) # error 발생

RuntimeError: A non-persistent GradientTape can only be used to compute one set of gradients (or jacobians)

In [32]:
w1, w2 = tf.Variable(5.), tf.Variable(3.)

# tensorflow의 후진 자동 미분
with tf.GradientTape(persistent=True) as tape:
    z = f(w1, w2)

gradients_w1 = tape.gradient(z, w1)
gradients_w2 = tape.gradient(z, w2)
del tape

In [33]:
gradients_w1

<tf.Tensor: shape=(), dtype=float32, numpy=36.0>

In [34]:
gradients_w2

<tf.Tensor: shape=(), dtype=float32, numpy=10.0>

In [35]:
c1, c2 = tf.constant(5.), tf.constant(3.)

with tf.GradientTape() as tape:
    z = f(c1, c2)

# 변수에 대해서만 자동 미분 가능
tape.gradient(z, [c1, c2])

[None, None]

In [36]:
with tf.GradientTape() as tape:
    tape.watch(c1)
    tape.watch(c2)
    z = f(c1, c2)

gradients = tape.gradient(z, [c1, c2])

In [37]:
def f(w1, w2):
    return 3 * w1 ** 2 + tf.stop_gradient(2 * w1 * w2)

with tf.GradientTape() as tape:
    z = f(w1, w2)

tape.gradient(z, [w1, w2])

[<tf.Tensor: shape=(), dtype=float32, numpy=30.0>, None]

In [38]:
...

Ellipsis

## 사용자 정의 훈련 반복

In [40]:
l2_reg = tf.keras.regularizers.l2(0.05)
model = tf.keras.Sequential([
    tf.keras.layers.Dense(30, activation="relu", kernel_initializer="he_normal", kernel_regularizer=l2_reg),
    tf.keras.layers.Dense(1, kernel_regularizer=l2_reg)
])

In [43]:
def random_batch(X, y, batch_size=32):
    idx = np.random.randint(len(X), size=batch_size)
    return X[idx], y[idx]

In [None]:
def print_status_bar(step, total, loss, metrics=None):
    metrics = " - ".join([f"{m.name}: {m.result():.4f}" for m in [loss] + (metrics or [])])
    end = "" if step < total else "\n"
    print(f"\r{step}/{total} - " + metrics, end=end)

In [None]:
...

# 텐서플로 함수와 그래프

In [None]:
...