## 내장된 메서드로 훈련 및 평가하기

훈련 및 유효성 검증을 위해 내장 API를 사용할 때의 훈련, 평가 및 예측(추론) 모델에 대한 설명

In [1]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers



## API 개요 : 첫 번째 엔드 투 엔드 예제

* 데이터를 모델의 내장 훈련 루프로 전달할 때는 NumPy 배열 또는 `tf.data Dataset` 객체를 사용해야 함

In [2]:
inputs = keras.Input(shape=(784,), name="digits")

# Dense Layer: 입력 레이어 1개, 은닉 레이어 2개, 출력 레이더 1개로 구성 (완전 연결층이라고 정의)
x = layers.Dense(64, activation="relu", name="dense_1")(inputs)
x = layers.Dense(64, activation="relu", name="dense_2")(x)
outputs = layers.Dense(10, activation="softmax", name="predictions")(x)

# 모델 생성
model = keras.Model(inputs=inputs, outputs=outputs)

**일반적인 엔드 투 엔드 워크플로**

1. 훈련
2. 원래 훈련 데이터에서 생성된 홀드아웃 세트에 대한 유효성 검사
3. 테스트 데이터에 대한 평가

예) 다음 예시는 MNIST 데이터세트를 NumPy 배열로 사용

In [3]:
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

# 데이터를 NumPy 배열로 변환
x_train = x_train.reshape(60000, 784).astype("float32") / 255
x_test = x_test.reshape(10000, 784).astype("float32") / 255

y_train = y_train.astype("float32")
y_test = y_test.astype("float32")

# Validation을 위한 10000개의 샘플 보류
x_val = x_train[-10000:]
y_val = y_train[-10000:]
x_train = x_train[:-10000]
y_train = y_train[:-10000]

훈련 구성 (최적화, 손실, 메트릭) 지정

In [4]:
model.compile(
    optimizer=keras.optimizers.RMSprop(),  # 최적화 (Optimizer)
    # 손실함수 (SparseCategoricalCrossentropy)
    loss=keras.losses.SparseCategoricalCrossentropy(),
    # 메트릭 (List of metrics to monitor)
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
)

`fit()`을 호출하여 데이터를 배치로 분할하고, `epochs`에 대해 전체 데이터세트를 반복처리하여 모델 훈련

In [5]:
print("Fit model on training data")
history = model.fit(
    x_train,
    y_train,
    batch_size=64,
    epochs=2,
    # 각 에포크가 끝날 때 손실, 메트릭을 모니터링하기 위해 일부 validation 은 통과
    validation_data=(x_val, y_val),
)

Fit model on training data
Epoch 1/2
Epoch 2/2


반환되는 `history` 객체는 훈련 중 손실 값과 메트릭 값에 대한 레코드 유지

In [6]:
history.history

{'loss': [0.33363837003707886, 0.1597750037908554],
 'sparse_categorical_accuracy': [0.9042999744415283, 0.9525399804115295],
 'val_loss': [0.16875500977039337, 0.14108921587467194],
 'val_sparse_categorical_accuracy': [0.9524000287055969, 0.9593999981880188]}

`evaluate()`를 통해 테스트 데이터에 대해 모델 평가

In [7]:
# `evaluate`을 사용하여 테스트 데이터에 대한 모델 평가
print("Evaluate on test data")
results = model.evaluate(x_test, y_test, batch_size=128)
print("test loss, test acc:", results)

# 새로운 데이터를 사용하여 예측
print("Generate predictions for 3 samples")
predictions = model.predict(x_test[:3])
print("predictions shape:", predictions.shape)

Evaluate on test data
test loss, test acc: [0.14275218546390533, 0.9598000049591064]
Generate predictions for 3 samples
predictions shape: (3, 10)


## compile() 메서드 : 손실, 메트릭 및 최적화 프로그램 지정하기

`fit()`으로 모델 훈련을 시키기 위해 손실 함수, 최적화 프로그램, 그리고 선택적으로 모니터링할 일부 메트릭을 지정해야 함

이러한 내용들은 `compile()` 메서드에 대한 인수로 모델에 전달해야 함

In [8]:
model.compile(
    optimizer=keras.optimizers.RMSprop(learning_rate=1e-3),
    loss=keras.losses.SparseCategoricalCrossentropy(),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
)

`metrics` 인수는 목록이어야 함 (모델 메트릭 수는 얼마든지 가능)


모델에 여러 개의 출력이 있는 경우, 각 출력에 대해 로 다른 손실 및 메트릭을 지정하고 모델의 총 손실에 대한 각 출력의 기여도를 조정할 수 있음

In [9]:
model.compile(
    optimizer="rmsprop",
    loss="sparse_categorical_crossentropy",
    metrics=["sparse_categorical_accuracy"],
)

나중에 재사용하기 위해 모델 정의와 컴파일 단계를 함수에 넣음

In [10]:
# 컴파일하지 않은 모델
def get_uncompiled_model():
    inputs = keras.Input(shape=(784,), name="digits")
    x = layers.Dense(64, activation="relu", name="dense_1")(inputs)
    x = layers.Dense(64, activation="relu", name="dense_2")(x)
    outputs = layers.Dense(10, activation="softmax", name="predictions")(x)
    model = keras.Model(inputs=inputs, outputs=outputs)
    return model

# 컴파일 한 모델
def get_compiled_model():
    model = get_uncompiled_model()
    model.compile(
        optimizer="rmsprop",
        loss="sparse_categorical_crossentropy",
        metrics=["sparse_categorical_accuracy"],
    )
    return model

### 많은 내장 최적화 프로그램, 손실 및 메트릭 사용 가능

* 최적화 프로그램 : `SGE()`, `RMSprop()`, `Adam()`, ...


* 손실 : `MeanSquaredError()`, `KLDivergence()`, `CosineSimilarity()`, ...


* 메트릭 : `AUC()`, `Precision()`, `Recall()`, ...

### 사용자 정의 손실

사용자 정의 손실을 만들어야 하는 경우, Keras는 두 가지 방법 제공


1. 입력 y_true 및 y_pred를 받아들이는 함수를 만듦

In [11]:
# 실제 데이터와 예측 사이의 오차에 대해 평균 제곱을 계산하는 손실함수
def custom_mean_squared_error(y_true, y_pred):
    return tf.math.reduce_mean(tf.square(y_true - y_pred))

model = get_uncompiled_model()
model.compile(optimizer=keras.optimizers.Adam(), loss=custom_mean_squared_error)

# MSE를 사용하기 위해 원-핫 인코딩
y_train_one_hot = tf.one_hot(y_train, depth=10)
model.fit(x_train, y_train_one_hot, batch_size=64, epochs=1)



<keras.src.callbacks.History at 0x2c23a237f40>

2. y_true 및 y_pred 이외의 매개변수를 사용하는 손실 함수가 필요한 경우 `tf.keras.losses.Loss` 클래스를 하위 클래스화하고 다음 두 메서드를 구현


- `__init__(self)` : 손실 함수 호출 중에 전달할 매개변수를 받아들임


- `call(self, y_true, y_pred)` : 목표(y_true)와 모델 예측(y_pred)을 사용하여 모델의 손실을 계산

오차의 평균 제곱을 사용하려고 하지만 예측 값을 0.5에서 멀어지게 하는 항이 추가되었다고 가정 (범주형 목표가 원-핫 인코딩되고 0과 1 사이의 값을 취하는 것으로 가정) 


이렇게 하면 모델이 너무 확신을 갖지 않게 하는 인센티브가 *과적합*을 줄이는 데 도움이 될 수 있음 (단, 시도 하기 전까진 효과가 있는지 알 수 없음)

In [12]:
class CustomMSE(keras.losses.Loss):
    def __init__(self, regularization_factor=0.1, name="custom_mse"):
        super().__init__(name=name)
        self.regularization_factor = regularization_factor

    def call(self, y_true, y_pred):
        mse = tf.math.reduce_mean(tf.square(y_true - y_pred))
        reg = tf.math.reduce_mean(tf.square(0.5 - y_pred))  # 예측 값을 0.5에서 멀어지게 함
        return mse + reg * self.regularization_factor


model = get_uncompiled_model()
model.compile(optimizer=keras.optimizers.Adam(), loss=CustomMSE())

y_train_one_hot = tf.one_hot(y_train, depth=10)
model.fit(x_train, y_train_one_hot, batch_size=64, epochs=1)



<keras.src.callbacks.History at 0x2c23d384730>

### 사용자 정의 메트릭

`tf.keras.metrics.Metric` : API의 일부가 아닌 메트릭이 필요한 경우, 이 클래스를 하위 클래스화하여 사용자 정의 메트릭 생성 가능


* *아래의 4가지 메서드 구현 필요*

    1. `__init__(self)` : 메트릭에 대한 상태 변수 생성
    2. `update_state(self, y_true, y_pred, sample_weight=None)` : y_true 목표값과 y_pred 모델 예측값을 사용하여 상태 변수 업데이트
    3. `result(self)` : 상태 변수를 사용하여 최종 경과 계산
    4. `reset_state(self)` : 메트릭 상태를 다시 초기화 함


* 상태 업데이트와 결과 계산은 개별적으로 수행 -> 일부의 경우, 결과 계산 부담이 크고, 주기적으로 수행되기 때문

In [13]:
# 올바르게 분류된 샘플 수를 계산하는 메트릭 구현 예제
class CategoricalTruePositives(keras.metrics.Metric):
    # 메트릭에 대한 상태 변수 생성
    def __init__(self, name="categorical_true_positives", **kwargs):
        super(CategoricalTruePositives, self).__init__(name=name, **kwargs)
        self.true_positives = self.add_weight(name="ctp", initializer="zeros")

    # 상태 변수 업데이트 (y_true, y_pred 사용)
    def update_state(self, y_true, y_pred, sample_weight=None):
        y_pred = tf.reshape(tf.argmax(y_pred, axis=1), shape=(-1, 1))
        values = tf.cast(y_true, "int32") == tf.cast(y_pred, "int32")
        values = tf.cast(values, "float32")
        if sample_weight is not None:
            sample_weight = tf.cast(sample_weight, "float32")
            values = tf.multiply(values, sample_weight)
        self.true_positives.assign_add(tf.reduce_sum(values))
    
    # 상태 변수를 사용하여 최종 경과 계산
    def result(self):
        return self.true_positives

    # 메트릭 상태를 다시 초기화
    def reset_state(self):
        # epoch 시작마다 메트릭 상태 초기화
        self.true_positives.assign(0.0)

model = get_uncompiled_model()  # 컴파일 되지 않은 모델 함수
model.compile(
    optimizer=keras.optimizers.RMSprop(learning_rate=1e-3),
    loss=keras.losses.SparseCategoricalCrossentropy(),
    metrics=[CategoricalTruePositives()],
)
model.fit(x_train, y_train, batch_size=64, epochs=3)

Epoch 1/3
Epoch 2/3
Epoch 3/3


<keras.src.callbacks.History at 0x2c23e6d40d0>

### 표준 서명에 맞지 않는 손실 및 메트릭 처리

* 거의 대부분의 손실과 메트릭은 `y_true`와 `y_pred`에서 계산할 수 있지만 모두 그런 것은 아님


* 이러한 경우, 사용자 정의 레이어의 호출 메서드 내에서 `self.add_loss(loss_value)`를 호출 가능<br> 추가된 손실은 훈련 중 "주요" 손실(compile()로 전달되는 손실)에 추가

In [14]:
# 활동 정규화를 추가하는 간단한 예
class ActivityRegularizationLayer(layers.Layer):
    def call(self, inputs):
        self.add_loss(tf.reduce_sum(inputs) * 0.1)
        return inputs  # Pass-through layer.


inputs = keras.Input(shape=(784,), name="digits")
x = layers.Dense(64, activation="relu", name="dense_1")(inputs)

# Layer에 활동 정규화 삽입
x = ActivityRegularizationLayer()(x)

x = layers.Dense(64, activation="relu", name="dense_2")(x)
outputs = layers.Dense(10, name="predictions")(x)

model = keras.Model(inputs=inputs, outputs=outputs)
model.compile(
    optimizer=keras.optimizers.RMSprop(learning_rate=1e-3),
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
)

# 정규화 요소를 추가했으므로, 보여지는 손실은 전보다 더 커짐
model.fit(x_train, y_train, batch_size=64, epochs=1)



<keras.src.callbacks.History at 0x2c23a197c70>

* `add_metric()`을 사용하여 메트릭 값 로깅에 대해 같은 작업 수행 가능

In [15]:
class MetricLoggingLayer(layers.Layer):
    def call(self, inputs):
        # `aggregation`은 각 epoch마다 각 배치 값을 합치는 방법을 정의
        self.add_metric(
            keras.backend.std(inputs), name="std_of_activation", aggregation="mean"
        )
        return inputs  # Pass-through layer.

inputs = keras.Input(shape=(784,), name="digits")
x = layers.Dense(64, activation="relu", name="dense_1")(inputs)

# Insert std logging as a layer.
x = MetricLoggingLayer()(x)

x = layers.Dense(64, activation="relu", name="dense_2")(x)
outputs = layers.Dense(10, name="predictions")(x)

model = keras.Model(inputs=inputs, outputs=outputs)
model.compile(
    optimizer=keras.optimizers.RMSprop(learning_rate=1e-3),
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
)
model.fit(x_train, y_train, batch_size=64, epochs=1)



<keras.src.callbacks.History at 0x2c23a177610>

* Functional API에서 `model.add_loss(loss_tensor)` 또는 `model.add_metric(metric_tensor, name, aggregation)`을 호출 가능
    - `add_loss()`를 통해, 손실 전달하면 모델에 이미 최소화할 수 있는 손실이 있으므로 손실 함수 없이 `compile()` 호출 가능

In [16]:
inputs = keras.Input(shape=(784,), name="digits")
x1 = layers.Dense(64, activation="relu", name="dense_1")(inputs)
x2 = layers.Dense(64, activation="relu", name="dense_2")(x1)
outputs = layers.Dense(10, name="predictions")(x2)
model = keras.Model(inputs=inputs, outputs=outputs)

# Function API
model.add_loss(tf.reduce_sum(x1) * 0.1)

model.add_metric(keras.backend.std(x1), name="std_of_activation", aggregation="mean")

model.compile(
    optimizer=keras.optimizers.RMSprop(1e-3),
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
)
model.fit(x_train, y_train, batch_size=64, epochs=1)



<keras.src.callbacks.History at 0x2c2383bcc10>

`LogisticEndpoint` 레이어 : 입력으로  targets 및 logits를 받아들이고 `add_loss()`를 통해 교차 엔트로피 손실을 추적

- `add_metric()`을 통해 분류 정확도도 추적

In [17]:
class LogisticEndpoint(keras.layers.Layer):
    def __init__(self, name=None):
        super(LogisticEndpoint, self).__init__(name=name)
        self.loss_fn = keras.losses.BinaryCrossentropy(from_logits=True)
        self.accuracy_fn = keras.metrics.BinaryAccuracy()

    def call(self, targets, logits, sample_weights=None):
        # training-time loss 값을 계산하고
        # 'self.add_loss()'를 사용한 레이어를 추가
        loss = self.loss_fn(targets, logits, sample_weights)
        self.add_loss(loss)
        
        # 메트릭으로 정확도를 기록하고
        # 'self.add_metric()'을 사용하여 레이어에 추가
        acc = self.accuracy_fn(targets, logits, sample_weights)
        self.add_metric(acc, name="accuracy")

        # 추론 시간 예측 텐서 반환 (.predict()에 대해)
        return tf.nn.softmax(logits)

* loss 인수 없이 컴파일된 두 개의 입력(입력 data 및 targets)이 있는 모델에서 사용 가능

In [18]:
import numpy as np

inputs = keras.Input(shape=(3,), name="inputs")
targets = keras.Input(shape=(10,), name="targets")
logits = keras.layers.Dense(10)(inputs)
predictions = LogisticEndpoint(name="predictions")(logits, targets)

model = keras.Model(inputs=[inputs, targets], outputs=predictions)
model.compile(optimizer="adam")  # No loss argument!

data = {
    "inputs": np.random.random((3, 3)),
    "targets": np.random.random((3, 10)),
}
model.fit(data)



<keras.src.callbacks.History at 0x2c23850dc10>

### 유효성 검사 홀드아웃 세트를 자동으로 분리

In [None]:
model = get_compiled_model()
model.fit(x_train, y_train, batch_size=64, validation_split=0.2, epochs=1)

## tf.data 데이터세트로부터 훈련 및 평가

In [None]:
model = get_compiled_model()

# First, let's create a training Dataset instance.
# For the sake of our example, we'll use the same MNIST data as before.
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
# Shuffle and slice the dataset.
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

# Now we get a test dataset.
test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
test_dataset = test_dataset.batch(64)

# Since the dataset already takes care of batching,
# we don't pass a `batch_size` argument.
model.fit(train_dataset, epochs=3)

# You can also evaluate or predict on a dataset.
print("Evaluate")
result = model.evaluate(test_dataset)
dict(zip(model.metrics_names, result))

In [None]:
model = get_compiled_model()

# Prepare the training dataset
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

# Only use the 100 batches per epoch (that's 64 * 100 samples)
model.fit(train_dataset, epochs=3, steps_per_epoch=100)

### 유효성 검사 데이터세트 사용

In [None]:
model = get_compiled_model()

# Prepare the training dataset
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

# Prepare the validation dataset
val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(64)

model.fit(train_dataset, epochs=1, validation_data=val_dataset)

In [None]:
model = get_compiled_model()

# Prepare the training dataset
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

# Prepare the validation dataset
val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(64)

model.fit(
    train_dataset,
    epochs=1,
    # Only run validation using the first 10 batches of the dataset
    # using the `validation_steps` argument
    validation_data=val_dataset,
    validation_steps=10,
)

## 지원되는 다른 입력 형식

## keras.utils.Sequence 객체를 입력으로 사용하기

In [None]:
from skimage.io import imread
from skimage.transform import resize
import numpy as np

# Here, `filenames` is list of path to the images
# and `labels` are the associated labels.

class CIFAR10Sequence(Sequence):
    def __init__(self, filenames, labels, batch_size):
        self.filenames, self.labels = filenames, labels
        self.batch_size = batch_size

    def __len__(self):
        return int(np.ceil(len(self.filenames) / float(self.batch_size)))

    def __getitem__(self, idx):
        batch_x = self.filenames[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_y = self.labels[idx * self.batch_size:(idx + 1) * self.batch_size]
        return np.array([
            resize(imread(filename), (200, 200))
               for filename in batch_x]), np.array(batch_y)

sequence = CIFAR10Sequence(filenames, labels, batch_size)
model.fit(sequence, epochs=10)

---

## 샘플 가중치 및 클래스 가중치 사용

### 클래스 가중치

In [None]:
import numpy as np

class_weight = {
    0: 1.0,
    1: 1.0,
    2: 1.0,
    3: 1.0,
    4: 1.0,
    # Set weight "2" for class "5",
    # making this class 2x more important
    5: 2.0,
    6: 1.0,
    7: 1.0,
    8: 1.0,
    9: 1.0,
}

print("Fit with class weight")
model = get_compiled_model()
model.fit(x_train, y_train, class_weight=class_weight, batch_size=64, epochs=1)

### 샘플 가중치

In [None]:
sample_weight = np.ones(shape=(len(y_train),))
sample_weight[y_train == 5] = 2.0

print("Fit with sample weight")
model = get_compiled_model()
model.fit(x_train, y_train, sample_weight=sample_weight, batch_size=64, epochs=1)

In [None]:
sample_weight = np.ones(shape=(len(y_train),))
sample_weight[y_train == 5] = 2.0

# Create a Dataset that includes sample weights
# (3rd element in the return tuple).
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train, sample_weight))

# Shuffle and slice the dataset.
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

model = get_compiled_model()
model.fit(train_dataset, epochs=1)

## 다중 입력, 다중 출력 모델로 데이터 전달하기

In [None]:
image_input = keras.Input(shape=(32, 32, 3), name="img_input")
timeseries_input = keras.Input(shape=(None, 10), name="ts_input")

x1 = layers.Conv2D(3, 3)(image_input)
x1 = layers.GlobalMaxPooling2D()(x1)

x2 = layers.Conv1D(3, 3)(timeseries_input)
x2 = layers.GlobalMaxPooling1D()(x2)

x = layers.concatenate([x1, x2])

score_output = layers.Dense(1, name="score_output")(x)
class_output = layers.Dense(5, name="class_output")(x)

model = keras.Model(
    inputs=[image_input, timeseries_input], outputs=[score_output, class_output]
)

## 콜백 사용하기

### 많은 내장 콜백 사용 가능

### 고유한 콜백 작성

## 모델 검사점 설정하기

## 학습률 일정 사용하기

### 최적화 프로그램으로 일정 전달하기

### 콜백을 사용하여 동적 학습률 일정 구현하기

## 훈련 중 손실 및 메트릭 시각화하기

### TensorBoard 콜백 사용하기