# 요약
## 공통  
- `__init__(self, **kwargs)`
- `get_config(self)`   
## 사용자 정의 손실 함수  
- `keras.losses.Loss`를 상속받으며, `call(self,y_true,y_pred)`에서 새로운 loss 결과를 리턴한다.  
- `model.compile(loss=Loss클래스(), optimizer=..., metrics=[...])`    
## 사용자 정의 활성화함수(activation) / 초기화(initializer) / 규제(regularizer) / 제한(constraint)   
- `model.layers.Dense(안에, activation='', kernel_regularizer='', kernel_initializer='', kernel_constraint='')`
- initializer함수: shape를 받아서 초기화하는 함수
- regularizer함수: weights을 받아서, 원래의 Loss에 더해지는 값
- constraint함수: weights를 받아서, weights와 같은 차원을 값으로 변형해서 리턴해주는 함수  
## 사용자 정의 metric 함수  
- `keras.metrics.Metric`을 상속받으며, `update_state(self,y_true,y_pred)` -> `result(self)`순서로 호출
- `result(self)`에 새로운 metric 결과를 리턴한다
- `model.complie(loss=..., optimizer=..., weighted_metrics=[Metric클래스()])`  
## 사용자 지정 레이어
- `keras.layers.Layer`를 상속 받음
- `build()`: 가중치마다 add_weights() 메서드를 호출하여 층의 변수를 만든다. 층이 처음 사용될 때 호출된다. 필수 메서드는 아니다.
- `call()`: 층에 필요한 연산을 수행한다. 활성화 함수까지 들어간다. 이 값이 층의 출력에 해당한다. 
- `call()`안에 `training=None` 옵션을 넣으면 학습과 검증을 다르게 할 수 있다. 

In [1]:
# 파이썬 ≥3.5 필수
import sys
assert sys.version_info >= (3, 5)

# 사이킷런 ≥0.20 필수
import sklearn
assert sklearn.__version__ >= "0.20"

try:
    # %tensorflow_version은 코랩 명령입니다.
    %tensorflow_version 2.x
except Exception:
    pass

# 이 노트북은 텐서플로 ≥2.4이 필요합니다
# 2.x 버전은 대부분 동일한 결과를 만들지만 몇 가지 버그가 있습니다.
import tensorflow as tf
from tensorflow import keras
assert tf.__version__ >= "2.4"

# 공통 모듈 임포트
import numpy as np
import os

# 노트북 실행 결과를 동일하게 유지하기 위해
np.random.seed(42)
tf.random.set_seed(42)

# 깔끔한 그래프 출력을 위해
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)

# 그림을 저장할 위치
PROJECT_ROOT_DIR = "."
CHAPTER_ID = "deep"
IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID)
os.makedirs(IMAGES_PATH, exist_ok=True)

def save_fig(fig_id, tight_layout=True, fig_extension="png", resolution=300):
    path = os.path.join(IMAGES_PATH, fig_id + "." + fig_extension)
    print("그림 저장:", fig_id)
    if tight_layout:
        plt.tight_layout()
    plt.savefig(path, format=fig_extension, dpi=resolution)

In [21]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)

# 사용자 정의 손실함수(Loss)
- `keras.losses.Loss`를 상속
- `def __init__(self, **kwargs)`: 어떤 매개변수를 받아서 전달할지를 결정하는 함수. 
  - `super().__init__(**kwargs)`
- `def call(self, y_true, y_pred)`: 레이블과 예측을 받고 모든 샘플의 손실을 계산하여 반환하는 함수.
- `def get_config(self)`: 매개변수를 딕셔너리로 만들어서 반환하는 함수

In [5]:
class HuberLoss(keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)
        
    def call(self, y_true, y_pred):
        error = y_true - y_pred #차이를 계산
        is_small_error = tf.abs(error) < self.threshold #차이가 역치보다 작음
        squared_loss = tf.square(error) / 2 #차이의 제곱근 나누기 2
        linear_loss  = self.threshold * tf.abs(error) - self.threshold**2 / 2 #역치*차이-역치^2
        return tf.where(is_small_error, squared_loss, linear_loss) #tf.where(차이가 작은가?, 작을경우 차이의 제곱근 나누기 2, 클경우 역치*차이-역치^2)
    
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold} #매개변수를 딕셔너리로 만들어서 반환하는 함수

input_shape = X_train.shape[1:]

model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                       input_shape=input_shape),
    keras.layers.Dense(1),
])

keras.utils.plot_model(model, "-.png", show_shapes=True)

model.compile(loss=HuberLoss(2.), optimizer="nadam", metrics=["mae"])

model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))

# model.save("my_model_with_a_custom_loss_class.h5")

# model = keras.models.load_model("my_model_with_a_custom_loss_class.h5",
#                                 custom_objects={"HuberLoss": HuberLoss})

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x23790757e50>

# 사용자 정의 활성화함수(activation) / 초기화(initializer) / 규제(regularizer) / 제한(constraint) 

- `activation`: keras.activatons 참고
  - 일반예시: `softmax, elu, selu, softplus, softsign, relu, tanh, sigmoid, linear`  

- `kernel_initializer`
  - 일반예시: `"he_normal"`, `"glorot_uniform"`
  - shape를 인풋으로 받아 shape에 맞게끔 최초 값이 초기화되어 리턴되는 함수가 들어간다.   

- `kernel_regularizer`
  - https://light-tree.tistory.com/125 
  - 일반예시: `keras.regularizer.l1(0.01)`, `keras.regularizer.l2(0.01)`  
  - 기존의 Loss 결과(Loss(y_true,y_pred))에 더해지는 수치 함수가 들어간다.   

- `kernel_constraints`
  - 일반예시: `keras.constraints.max_norm(1.)`
  - 기존의 weights에 변형을 가한 결과 함수가 들어간다.   

In [16]:
def my_softplus(z): # tf.nn.softplus(z) 값을 반환합니다
    return tf.math.log(tf.exp(z) + 1.0)

def my_glorot_initializer(shape, dtype=tf.float32):
    stddev = tf.sqrt(2. / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)

def my_l1_regularizer(weights):
    return tf.reduce_sum(tf.abs(0.01 * weights))

def my_positive_weights(weights): # tf.nn.relu(weights) 값을 반환합니다
    return tf.where(weights < 0., tf.zeros_like(weights), weights)

keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                       input_shape=input_shape),
    keras.layers.Dense(1, activation=my_softplus,
                       kernel_regularizer=my_l1_regularizer,
                       kernel_constraint=my_positive_weights,
                       kernel_initializer=my_glorot_initializer),
])
model.compile(loss="mse", optimizer="nadam", metrics=["mae"])
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x2379abaa880>

# 사용자 정의 지표(Metric)

- Loss처럼 `__init__()`과 `get_config()`를 가지고 있음
- Loss의 `call()`대신 `update_state()`, `result()` 
- `update_state()`: 함수처럼 사용할 때 호출된다. 
  - y_true, y_pred를 받아서 metric을 계산
  - `metric = self.huber_fn(y_true, y_pred)`
  - `self.total.assign_add(tf.reduce_sum(metric))`
- `result()` 최종 결과를 계산하고 반환함. 호출 순서는 update_state -> result()순서

In [23]:
def create_huber(threshold=1.0):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss  = threshold * tf.abs(error) - threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn

class HuberMetric(keras.metrics.Metric):
    def __init__(self, threshold=1.0, **kwargs):
        super().__init__(**kwargs) # 기본 매개변수 처리 (예를 들면, dtype)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        self.total = self.add_weight("total", initializer="zeros")#상태변수 추가하는 함수
        self.count = self.add_weight("count", initializer="zeros")
    def update_state(self, y_true, y_pred, sample_weight=None):
        metric = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(metric))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
    def result(self):
        return self.total / self.count
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold} 

keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                       input_shape=input_shape),
    keras.layers.Dense(1),
])

model.compile(loss=keras.losses.Huber(2.0), optimizer="nadam", 
              weighted_metrics=[HuberMetric(2.0)])
model.fit(X_train_scaled.astype(np.float32), y_train.astype(np.float32), epochs=2)

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x2379066e7c0>

# 사용자 지정 레이어
- `keras.layers.Layer`를 상속 받음
- `build()`: 가중치마다 add_weights() 메서드를 호출하여 층의 변수를 만든다. 층이 처음 사용될 때 호출된다. 필수 메서드는 아니다.
- `call()`: 층에 필요한 연산을 수행한다. 활성화 함수까지 들어간다. 이 값이 층의 출력에 해당한다. 
- `call()`안에 `training=None` 옵션을 넣으면 학습과 검증을 다르게 할 수 있다. 

In [34]:
class MyDense(keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.activation = keras.activations.get(activation)

    def build(self, batch_input_shape):
        self.kernel = self.add_weight(
            name="kernel", 
            shape=[batch_input_shape[-1], self.units],
            initializer="glorot_normal")
        self.bias = self.add_weight(
            name="bias", 
            shape=[self.units], 
            initializer="zeros")
        super().build(batch_input_shape) # must be at the end

    def call(self, X, training=None):
        result = self.activation(X @ self.kernel + self.bias)
        if training:
            stddev=1.0
            noise = tf.random.normal(tf.shape(result), stddev=stddev)
            return result + noise
        return result

    def compute_output_shape(self, batch_input_shape):
        return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units])

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "units": self.units,
                "activation": keras.activations.serialize(self.activation)}
        
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    MyDense(30, activation="relu", input_shape=input_shape),
    MyDense(1)
])

model.compile(loss="mse", optimizer="nadam")
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))
model.evaluate(X_test_scaled, y_test)

Epoch 1/2
Epoch 2/2


0.7047481536865234

In [28]:
#사용자가 정의한 레이어
class AddGaussianNoise(keras.layers.Layer):
    def __init__(self, stddev, **kwargs):
        super().__init__(**kwargs)
        self.stddev = stddev

    def call(self, X, training=None):
        if training:
            noise = tf.random.normal(tf.shape(X), stddev=self.stddev)
            return X + noise
        else:
            return X

    def compute_output_shape(self, batch_input_shape):
        return batch_input_shape
    
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    AddGaussianNoise(stddev=1.0),
    keras.layers.Dense(30, activation="selu"),
    keras.layers.Dense(1)
])

model.compile(loss="mse", optimizer="nadam")
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))
model.evaluate(X_test_scaled, y_test)

Epoch 1/2
Epoch 2/2


0.7559615969657898

# 사용자 정의 모델
- 복잡하니까 나중에 정리

In [35]:
class ReconstructingRegressor(keras.models.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [keras.layers.Dense(30, activation="selu",
                                          kernel_initializer="lecun_normal")
                       for _ in range(5)]
        self.out = keras.layers.Dense(output_dim)
        self.reconstruct = keras.layers.Dense(8) # TF 이슈 #46858에 대한 대책
        self.reconstruction_mean = keras.metrics.Mean(name="reconstruction_error")

#     TF 이슈 #46858 때문에 주석 처리
#     def build(self, batch_input_shape):
#         n_inputs = batch_input_shape[-1]
#         self.reconstruct = keras.layers.Dense(n_inputs, name='recon')
#         super().build(batch_input_shape)

    def call(self, inputs, training=None):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        reconstruction = self.reconstruct(Z)
        self.recon_loss = 0.05 * tf.reduce_mean(tf.square(reconstruction - inputs))
        
        if training:
           result = self.reconstruction_mean(recon_loss)
           self.add_metric(result)
        return self.out(Z)
    
    def train_step(self, data):
        x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x)
            loss = self.compiled_loss(y, y_pred, regularization_losses=[self.recon_loss])

        gradients = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))

        return {m.name: m.result() for m in self.metrics}
    
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

model = ReconstructingRegressor(1)
model.compile(loss="mse", optimizer="nadam")
history = model.fit(X_train_scaled, y_train, epochs=2)
y_pred = model.predict(X_test_scaled)

Epoch 1/2
Epoch 2/2
