In [1]:
import tensorflow as tf
from tensorflow import keras
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [2]:
def create_huber(threshold=1.0):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss = threshold*tf.abs(error) - (threshold**2)*0.5
        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn

# 사용자 정의 모델과 훈련 알고리즘
## 활성화 함수, 초기화, 규제, 제한을 커스터마이징

In [9]:
def my_softplus(z): # == tf.nn.softplus(z), keras.activations.softplus()
    return tf.math.log(tf.exp(z)+1.0)
def my_glorot_initializer(shape, dtype=tf.float32):
    stddev = tf.sqrt(2./ shape[0]+shape[1])
    return tf.random.normal(shape, stddev = stddev, dtype=dtype)
def my_l1_regularizer(weights):
    return tf.reduce_sum(tf.abs(0.01*weights))
def my_positive_weigth(weights): ## relu
    return tf.where(weights < 0., tf.zeros_like(weights), weights)

In [3]:
layer = keras.layers.Dense(30, activation=my_softplus,
                          kernel_constraint=my_positive_weigth,
                          kernel_initializer=my_glorot_initializer,
                          kernel_regularizer=my_l1_regularizer)

# factor 하이퍼파라미터를 저장하는 l1 규제를 위한 클래스
class MyL1Regularizer(keras.regularizers.Regularizer):
    def __init__(self,factor):
        self.factor = factor
    # ()를 붙여 실행시 동작하는 메서드
    def __call__(self, weights):
        return tf.reduce_sum(tf.abs(self.factor*weights))
    def get_config(self):
        return {'factor':self.factor}

(모델을 저장할 때 keras는 손실 객체의 get_config() 메서드를 호출한다.)

## 사용자 정의 지표
mse, accuracy 등을 말한다.

In [None]:
# 미리 정의한 huber loss를 지표로 사용할 수도 있다.
model.compile(loss='mse',optimizer='nadam',metrics[create_huber(2.0)])

In [5]:
# 스트리밍 지표 만들기
# (배치마다 점진적으로 업데이트되기 때문에 스트리밍 지표라고 부른다.)
class HuberMetric(keras.metrics.Metric):
    def __init__(self,threshold=1.0,**kwargs):
        super().__init__(**kwargs)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        self.total = self.add_weight('total',initializer='zeros')
        self.count = self.add_weight('count',initializer='zeros')
    def update_state(self, y_true, y_pred, sample_weight=None):
        metric = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(metric))
        self.count.assign_add(tf.cast(tf.size(y_true),tf.float32))
    def result(self):
        return self.total / self.count
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, 'threshold':self.threshold}

## 사용자 정의 층
### 가중치가 없는 층(ex) Flatten, ReLU 등)
python 함수를 만든 뒤 keras.layers.Lambda로 감싸주면 된다.

In [6]:
exponential_layer = keras.layers.Lambda(lambda x:tf.exp(x))

### 가중치가 있는 층(상태가 있는 층)
keras.layers.Layer를 상속해야함

In [7]:
# Dense 층의 간소화 버전을 구현
# 다른 층과 동일하게 사용가능

class MyDense(keras.layers.Layer):
    def __init__(self, units, activation=None,**kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.activation = keras.activations.get(activation)
    # 가중치마다 add_weight 변수를 호출해 층의 변수를 만든다.
    # backward와 관련된 부분은 상속을 통해 구현되어있을 것
    def build(self, batch_input_shape):
        self.kernel = self.add_weight(
            name = 'kernel',shape = [batch_input_shape[-1], self.units],
            initializer='glorot_normal')
        self.bias = self.add_weight(
            name = 'bias',shape = [self.units], initializer='zeros')
        # 마지막에 호출해야
        super().build(batch_input_shape)
    # 해당 층에 필요한 연산
    def call(self, X):
        return self.activation(X @ self.kernel + self.bias)
    # 해당 층의 출력 크기 반환
    # ex) 300*20 -> Dense(10) -> 300*10
    def compute_output_shape(self, batch_input_shape):
        return tf.Tensorshape(batch_input_shape.as_list()[:-1] + [self.units])
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "units":self.units,
               "activation":keras.activations.serialize(self.activation)}

여러 출력을 가진 층을 만드려며뉴 call() 메서드가 출력의 리스트를 반환해야하고 <br>
compute_output_shape()는 배치 출력의 크기의 리스트를 반환해야한다.<br>
다음 예시는 두 개의 입력과 세 개의 출력

In [None]:
class MyMultiLayer(keras.layers.Layer):
    def call(self, X):
        X1, X2 = X
        return [X1+X2, X1*X2, X1/X2]
    def compute_output_shape(self, batch_input_shape):
        b1,b2 = batch_input_shape
        return [b1,b1,b1]

만일 훈련과 테스트에서 다르게 동작하는 층이 필요하다면 call() 메서드에 training 매개변수를 추가해 훈련인지 테스트인지 결정하면 된다.<br>
(ex) dropout, batch normalization)<br>
다음 예시는 훈련 중에는 규제를 위해 가우스 잡음을 추가하고 테스트 시에는 아무것도 하지 않는 층

In [8]:
class MyGaussianNoise(keras.layers.Layer):
    def __init__(self,stddev, **kwargs):
        super().__init__(**kwargs)
        self.stddev = stddev
    def call(self, X, training=None):
        if training:
            noise = tf.random.normal(tf.shape(X),stddev=self.stddev)
            return X + noise
        else:
            return X
    def compute_output_shape(self, batch_input_shape):
        return batch_input_shape

## 사용자 정의 모델
1. keras.Model 클래스 상속
2. 생성자에서 층과 변수를 만듦
3. 모델이 해야할 작업을 call() 메서드에 구현

다음 예시는 input -> Dense1 -> ResidualBlock1 (ResidualBlock1만 *3번 반복) -> ResidualBlock2 -> Dense2 ->...<br>
ResidualBlock : input -> Dense_1 -> Dense_2 -> (input + Dense_2의 output)

keras.models.load_model()을 이용해 저장된 모델을 로드하고 싶다면 두 클래스 모두에 get_config() 메서드 구현해야함

In [16]:
class ResidualBlock(keras.layers.Layer):
    def __init__(self, n_layer, n_neurons, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [keras.layers.Dense(n_neurons, activation='elu',
                                         kernel_initializer='he_normal') for _ in layer(n_layers)]
    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        return inputs + Z

class ResidualRegressor(keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden1 = keras.layers.Dense(30, activation='elu',
                                         kernel_initializer='he_normal')
        self.block1 = ResidualBlock(2,30)
        self.block2 = ResidualBlock(2,30)
        self.out = keras.layers.Dense(output_dim)
        
    def call(self, inputs):
        Z = self.hidden1(inputs)
        for _ in range(1+3):
            Z = self.block1(Z)
        Z = self.block2(Z)
        return self.out(Z)

## 모델 구성 요소에 기반한 손실과 지표
다음 모델은 맨 위의 은닉층에 보조 출력을 갖는다. 이 보조 출력에 연결된 손실을 $\textbf{재구성 손실}$이라고 부른다.(=재구성과 입력 사이의 mse)<br>
이 재구성 손실은 주 손실에 더해 회귀 작업에 직접적인 도움은 되지 않더라도 모델이 은닉층을 통과하며 가능한 많은 정보를 유지하도록 유도한다. 이런 손실이 이따금 일반화 성능을 향상시킨다.

In [None]:
class ReconstructingRegressor(keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [keras.layers.Dense(30, activation='selu',
                                         kernel_initializer='lecun_normal') for _ in range(5)]
        self.out = keras.layers.Dense(output_dim)
        
    # 완전 연결층을 하나 더 추가하여 모델의 입력을 재구성하는데 사용
    # 완전연결층의 유닛 개수는 입력 개수와 같아야한다.
    # 재구성 층을 굳이 build() 메서드에서 만드는 이뉴는 이 메서드가 호출되기 전까지 입력 개수를 알 수 없기 때문
    def build(self, batch_input_shape):
        n_inputs = batch_input_shape[-1]
        self.reconstruct = keras.layers.Dense(n_inputs)
        super().build(batch_input_shape)
    
    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        reconstruction = self.reconstruct(Z)
        recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs))
        # 모델의 손실 리스트에 추가(단 재구성 손실이 주 손실을 압도하지 않도록 0.005를 곱해 크기를 줄임)
        self.add_loss(0.05*recon_loss)
        return self.out(Z)

## 자동 미분을 사용하여 그래디언트 계산

In [2]:
def f(w1,w2):
    return (3*w1**2) + (2*w1*w2)

In [3]:
w1, w2 = 5, 3
eps = 1e-6
print((f(w1+eps,w2)-f(w1,w2))/eps)
print((f(w1,w2+eps)-f(w1,w2))/eps)

36.000003007075065
10.000000003174137


근사값은 약간의 오차를 포함하는데다가 매 파라미터마다 적어도 한번씩 f()가 호출되므로 대규모 신경망에서는 적용하기 어렵기 때문에 자동미분을 사용해야한다.

In [4]:
w1,w2 = tf.Variable(5.), tf.Variable(3.)

# 메모리를 아끼기 위해서는 다음 블럭에 최소한만 담아야한다.
# 블록 안에서 with tape.stop_recording() 블록을 만들어 계산을 기록하지 않을 수 있음
with tf.GradientTape() as tape:
    z = f(w1,w2)
gradients = tape.gradient(z, [w1,w2])
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

In [5]:
# 해당 인자가 없으면 gradient 메서드를 한번 밖에 호출하지 못함
with tf.GradientTape(persistent=True) as tape:
    z = f(w1,w2)
    
dz_dw1 = tape.gradient(z, w1)
dz_dw2 = tape.gradient(z, w2)
print(dz_dw1)
print(dz_dw2)

del tape

tf.Tensor(36.0, shape=(), dtype=float32)
tf.Tensor(10.0, shape=(), dtype=float32)


tf.Variable이 아닌 다른 객체(ex)tf.constant)에 대한 그래디언트를 계산하면 None이 반환됨

In [6]:
# 모든 tensor를 감시해 연산을 기록할 수 있음
with tf.GradientTape() as tape:
    tape.watch(w1)
    tape.watch(w2)
    z = f(w1,w2)
    
gradients = tape.gradient(z, [w1,w2])
gradients    

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

위와 같은 기법은 입력이 작을 때 변동 폭이 큰 활성화 함수에 대한 규제 손실을 구현하는 경우 유용함<br>
입력은 변수가 아니므로 테이프에 기록을 명시적으로 알려주어야 한다.

(자세한 것은 hands on machine learning - github -'자동 미분으로 그래디언트 계산하기'를 참조)

In [8]:
# 신경망 일부에 그래디언트의 역전파를 막을 수도 있음
def f(w1,w2):
    return (3*w1**2) + tf.stop_gradient(2*w1*w2)

with tf.GradientTape() as tape:
    z = f(w1,w2)

gradients = tape.gradient(z,[w1,w2])
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=30.0>, None]

In [10]:
# 자동미분으로 다음 함수의 그래디언트를 계산하는 것이 수치적으로 불안정하기 때문에 NaN이 반환(부동소수점 오류로 무한 나누기 무한 계산)
x = tf.Variable([100.])
with tf.GradientTape() as tape:
    z = my_softplus(x)
    
tape.gradient(z,[x])

[<tf.Tensor: shape=(1,), dtype=float32, numpy=array([nan], dtype=float32)>]

In [11]:
# 수치적으로 안전한 softplus의 도함수(1/(1+1/exp(x)))를 해석적으로 구하고, 다음 데코레이터를 사용하고, 
# 일반 출력과 도함수를 계산하는 함수를 반환하여 안전하게 그래디언트를 계산하는 함수를 만들 수 있음
@tf.custom_gradient
def my_better_softplus(z):
    exp = tf.exp(z)
    def my_softplus_gradients(grad):
        return grad/(1+1/exp)
    return tf.math.log(exp+1), my_softplus_gradients
# 여전히 지수함수이기 때문에 폭주할 수 있는데, tf.where을 사용해 값이 클 때 입력을 그대로 반환하는 방법도 있음

x = tf.Variable([100.])
with tf.GradientTape() as tape:
    z = my_better_softplus(x)
    
tape.gradient(z,[x])

[<tf.Tensor: shape=(1,), dtype=float32, numpy=array([1.], dtype=float32)>]

## 사용자 정의 훈련 반복
두개의 다른 옵티마이저를 사용하고 싶은데 .fit() 메서드는 하나의 옵티마이저만 사용, 해당 기능을 위해 훈련 반복을 직접 구현<br>
혹은 의도한 대로 잘 동작하는지 확신을 갖기 위해 사용자 정의 훈련 반복을 사용할 수도 있다.

(다만 사용자 정의 훈련은 버그가 발생하기 쉽고, 유지 보수하기 어려운 코드가 되므로 극도의 유연성이 필요한 경우가 아니라면 fit() 메서드 사용)

In [2]:
from sklearn.datasets import load_boston
boston = load_boston()
df = pd.DataFrame(boston.data, columns=boston.feature_names)
df['target'] = boston.target

from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
scaler = StandardScaler()
X_train, X_test, y_train, y_test = train_test_split(df.iloc[:,:-1], df.iloc[:,-1])
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
X_train_scaled = X_train_scaled.astype(np.float32)

In [3]:
# 모델 생성
l2_reg = keras.regularizers.l2(0.05)
model = keras.models.Sequential([
    keras.layers.Dense(30, activation='relu',kernel_initializer='he_normal',
                      kernel_regularizer=l2_reg),
    keras.layers.Dense(1,kernel_regularizer=l2_reg)
])

# 훈련세트에서 샘플 배치를 랜덤하게 추출하는 함수
def random_batch(X,y,batch_size=32):
    idx = np.random.randint(len(X),size=batch_size)
    return X[idx], y[idx]
# 현재 스텝 수, 전체 스텝 횟수, 에포크 시작부터 mse 등을 출력하는 함수 생성
def print_status_bar(iteration, total, loss, metrics=None):
    metrics = '-'.join(['{}: {:.4f}'.format(m.name, m.result())
                       for m in [loss]+(metrics or [])])
    end = "" if iteration < total else '\n'
    print('\r{}/{} - '.format(iteration,total)+metrics,end=end)

In [None]:
# 적용 (뭔가 잘 안되니까 생략)
n_epochs = 5
batch_size = 4
n_steps = len(X_train)// batch_size
optimizer = keras.optimizers.Nadam(lr=0.01)
loss_fn = keras.losses.mean_squared_error
mean_loss = keras.metrics.Mean()
metrics = [keras.metrics.MeanAbsoluteError()]

for epoch in range(1,n_epochs+1):
    print(f"epoch {epoch}/{n_epochs}")
    for step in range(1, n_steps+1):
        X_batch, y_batch = random_batch(X_train_scaled, y_train)
        with tf.GradientTape() as tape:
            y_pred = model(X_batch, training=True)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss]+model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        mean_loss(loss)
        for metric in metrics:
            metric(y_batch, y_pred)
        print_status_bar(step * batch_size, len(y_train),mean_loss, metrics)
    print_status_bar(len(y_train),len(y_train),mean_loss,metrics)
    for metric in [mean_loss]+ metrics:
        metric.reset_states()

# 텐서플로우의 함수와 그래프

In [11]:
def cube(x):
    return x**3
cube(tf.constant(2.0))

<tf.Tensor: shape=(), dtype=float32, numpy=8.0>

In [12]:
# 텐서플로우 함수(데코레이터로 더 많이 사용)
tf_cube = tf.function(cube)

# 동일한 결과 반환
tf_cube(2)

<tf.Tensor: shape=(), dtype=int32, numpy=8>

tf.function()은 cube() 함수에서 수행되는 계산을 분석하고 동일한 작업을 수행하는 계산 그래프 생성<br>
복잡한 연산을 수행시 파이썬 함수를 빠르게 실행하려면 텐서플로 함수로 바꾸는 것이 좋다.

다만 사용자 정의 손실, 지표, 층 등 사용자 정의 함수 작성 후 이를 keras 모델에 사용할 때 케라스는 이 함수를 텐서플로 함수로 변환하므로 tf.constant()를 사용할 필요가 없다.

In [13]:
# 원본 파이썬 함수처럼 사용
tf_cube.python_function(2)

8

## 오토그래프와 트레이싱
오토그래프 : 텐서플로우가 파이썬 소스를 분석해 while, if, break, return 등의 요소를 찾음<br>
--> 파이썬 함수와 제어문을 텐서플로우의 연산으로 바꾼 업데이트된 버전을 만듦(ex) 반복문 -> tf.while_loop())

In [None]:
# 예시 (실행하지 마시오)
@tf.function
def sum_squares(n):
    s = 0
    for i in tf.range(n+1):
        s += i ** 2
    return s

# --> 오토그래프
def tf__sum_squares(n):
    s = 0
    def loop_body(i,s):
        s += i**2
        return s,
    # 축약
    s, = ag__.for_stmt(...,loop_body,(s,))
    return s

# --> 트레이싱
# 노드(연산)와 화살표(텐서) 연결된 형태의 그래프로 만든다.

tf.autograph.to_code(sum_squares.python_function)을 호출하면 생성된 함수의 소스코드를 볼 수 있다. <br>
깔끔한 코드는 아니지만 디버깅에 도움이 될 수 있음