<a href="https://colab.research.google.com/github/JHWannabe/keras-dl/blob/main/7%EC%9E%A5_%EC%BC%80%EB%9D%BC%EC%8A%A4_%EC%B0%BD%EC%8B%9C%EC%9E%90%EC%97%90%EA%B2%8C_%EB%B0%B0%EC%9A%B0%EB%8A%94_%EB%94%A5%EB%9F%AC%EB%8B%9D.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 7.2 케라스 모델을 만드는 여러 방법

### 7.2.1 Sequential Model

In [1]:
from tensorflow import keras 
from tensorflow.keras import layers 

In [2]:
model = keras.Sequential([
    layers.Dense(64, activation="relu"),
    layers.Dense(10, activation="softmax") 
])

In [3]:
model = keras.Sequential()
model.add(layers.Dense(64, activation="relu")) 
model.add(layers.Dense(10, activation="softmax"))

In [4]:
model.weights

ValueError: ignored

In [5]:
model.build(input_shape=(None, 3))
model.weights

[<tf.Variable 'dense_2/kernel:0' shape=(3, 64) dtype=float32, numpy=
 array([[-0.2686492 , -0.24285959,  0.06223121, -0.22246557,  0.16302067,
          0.09676817, -0.23660305, -0.1504798 ,  0.26210344, -0.2838178 ,
         -0.00785035,  0.04243773, -0.11484368,  0.15731409, -0.27222782,
          0.17358607,  0.19266623, -0.22870882, -0.00284854, -0.29410174,
         -0.14484484,  0.2599514 , -0.16433989, -0.08289145,  0.15266436,
          0.27884412,  0.11119348,  0.23713851, -0.09264135, -0.09515913,
          0.19220042, -0.28560618, -0.11530523, -0.18288808,  0.26122767,
          0.28912598, -0.25331178,  0.06329381, -0.11808649,  0.08839062,
          0.00524747, -0.20392635, -0.05487552, -0.25779504,  0.25556433,
         -0.26428172, -0.2311146 , -0.10836811,  0.01453298,  0.07692194,
          0.09363702,  0.01142159, -0.16591409, -0.07849509,  0.1700885 ,
         -0.04985531, -0.12276731, -0.06643316, -0.17557111,  0.22758895,
          0.11903   ,  0.22958761, -0.12503

In [6]:
model.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_2 (Dense)             (None, 64)                256       
                                                                 
 dense_3 (Dense)             (None, 10)                650       
                                                                 
Total params: 906
Trainable params: 906
Non-trainable params: 0
_________________________________________________________________


In [7]:
model = keras.Sequential(name="my_example_model")
model.add(layers.Dense(64, activation="relu", name="my_first_layer"))
model.add(layers.Dense(10, activation="softmax", name="my_last_layer"))
model.build((None, 3))
model.summary()

Model: "my_example_model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 my_first_layer (Dense)      (None, 64)                256       
                                                                 
 my_last_layer (Dense)       (None, 10)                650       
                                                                 
Total params: 906
Trainable params: 906
Non-trainable params: 0
_________________________________________________________________


In [8]:
model = keras.Sequential()
model.add(keras.Input(shape=(3,)))
model.add(layers.Dense(64, activation="relu"))
model.summary()

Model: "sequential_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_4 (Dense)             (None, 64)                256       
                                                                 
Total params: 256
Trainable params: 256
Non-trainable params: 0
_________________________________________________________________


### 7.2.2 함수형 API

In [9]:
inputs = keras.Input(shape=(3,), name="my_input")
features = layers.Dense(64, activation="relu")(inputs)
outputs = layers.Dense(10, activation="softmax")(features) 
model = keras.Model(inputs=inputs, outputs=outputs)
model.summary()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 my_input (InputLayer)       [(None, 3)]               0         
                                                                 
 dense_5 (Dense)             (None, 64)                256       
                                                                 
 dense_6 (Dense)             (None, 10)                650       
                                                                 
Total params: 906
Trainable params: 906
Non-trainable params: 0
_________________________________________________________________


In [10]:
vocabulary_size = 10000 
num_tags = 100 
num_departments = 4   

title = keras.Input(shape=(vocabulary_size,), name="title")  
text_body = keras.Input(shape=(vocabulary_size,), name="text_body")        
tags = keras.Input(shape=(num_tags,), name="tags")       

features = layers.Concatenate()([title, text_body, tags])
features = layers.Dense(64, activation="relu")(features)

priority = layers.Dense(1, activation="sigmoid", name="priority")(features)
department = layers.Dense(
    num_departments, activation="softmax", name="department")(features)   
    
model = keras.Model(inputs=[title, text_body, tags], outputs=[priority, department])

In [11]:
import numpy as np  

num_samples = 1280   

title_data = np.random.randint(0, 2, size=(num_samples, vocabulary_size))    
text_body_data = np.random.randint(0, 2, size=(num_samples, vocabulary_size)) 
tags_data = np.random.randint(0, 2, size=(num_samples, num_tags))

priority_data = np.random.random(size=(num_samples, 1))
department_data = np.random.randint(0, 2, size=(num_samples, num_departments))

model.compile(optimizer="rmsprop",
              loss=["mean_squared_error", "categorical_crossentropy"],
              metrics=[["mean_absolute_error"], ["accuracy"]])
model.fit([title_data, text_body_data, tags_data],
          [priority_data, department_data],
          epochs=1)
model.evaluate([title_data, text_body_data, tags_data],
               [priority_data, department_data])
priority_preds, department_preds = model.predict([title_data, text_body_data, tags_data])



In [12]:
model.compile(optimizer="rmsprop",
              loss={"priority": "mean_squared_error", "department": "categorical_crossentropy"},
              metrics={"priority":["mean_absolute_error"], "department":["accuracy"]})
model.fit({"title": title_data, "text_body": text_body_data, "tags": tags_data},
          {"priority": priority_data, "department": department_data}, epochs=1)
model.evaluate({"title": title_data, "text_body": text_body_data, "tags": tags_data},
               {"priority": priority_data, "department": department_data})
priority_preds, department_preds = model.predict({"title": title_data, "text_body": text_body_data, "tags": tags_data})



In [13]:
model.layers

[<keras.engine.input_layer.InputLayer at 0x7f1c59c72eb0>,
 <keras.engine.input_layer.InputLayer at 0x7f1c59c6d040>,
 <keras.engine.input_layer.InputLayer at 0x7f1c59c724f0>,
 <keras.layers.merging.concatenate.Concatenate at 0x7f1c59ce3be0>,
 <keras.layers.core.dense.Dense at 0x7f1c59c72700>,
 <keras.layers.core.dense.Dense at 0x7f1c59c72520>,
 <keras.layers.core.dense.Dense at 0x7f1c59c72490>]

In [14]:
model.layers[3].input

[<KerasTensor: shape=(None, 10000) dtype=float32 (created by layer 'title')>,
 <KerasTensor: shape=(None, 10000) dtype=float32 (created by layer 'text_body')>,
 <KerasTensor: shape=(None, 100) dtype=float32 (created by layer 'tags')>]

In [15]:
model.layers[3].output 

<KerasTensor: shape=(None, 20100) dtype=float32 (created by layer 'concatenate')>

In [17]:
features = model.layers[4].output
difficulty = layers.Dense(3, activation="softmax", name="difficulty")(features)  
new_model = keras.Model(
    inputs=[title, text_body, tags], 
    outputs=[priority, department, difficulty])

### 7.2.3 Model 서브클래싱

In [18]:
class CustomerTicketModel(keras.Model):  
    def __init__(self, num_departments):
        super().__init__()
        self.concat_layer = layers.Concatenate()              
        self.mixing_layer = layers.Dense(64, activation="relu")  
        self.priority_scorer = layers.Dense(1, activation="sigmoid") 
        self.department_classifier = layers.Dense(num_departments, activation="softmax") 

    def call(self, inputs):
        title = inputs["title"]
        text_body = inputs["text_body"]
        tags = inputs["tags"]  
        features = self.concat_layer([title, text_body, tags])
        features = self.mixing_layer(features)
        priority = self.priority_scorer(features)
        department = self.department_classifier(features) 
        return priority, department

In [19]:
model = CustomerTicketModel(num_departments=4) 
priority, department = model({"title": title_data, "text_body": text_body_data, "tags": tags_data})

In [20]:
model.compile(optimizer="rmsprop",
              loss=["mean_squared_error", "categorical_crossentropy"],
              metrics=[["mean_absolute_error"], ["accuracy"]])
model.fit({"title": title_data,
           "text_body": text_body_data,
           "tags": tags_data},
          [priority_data, department_data], epochs=1)
model.evaluate({"title": title_data,
                "text_body": text_body_data,
                "tags": tags_data},
               [priority_data, department_data])
priority_preds, department_preds = model.predict({"title": title_data,
                                                 "text_body": text_body_data, 
                                                 "tags": tags_data})



### 7.2.4 여러 방식을 혼합하여 사용하기

In [21]:
class Classifier(keras.Model):  
    def __init__(self, num_classes=2):
        super().__init__()
        if num_classes == 2:
            num_units = 1 
            activation = "sigmoid" 
        else:
            num_units = num_classes
            activation = "softmax" 
        self.dense = layers.Dense(num_units, activation=activation)  
    def call(self, inputs):
        return self.dense(inputs)  

inputs = keras.Input(shape=(3,))
features = layers.Dense(64, activation="relu")(inputs)
outputs = Classifier(num_classes=10)(features) 
model = keras.Model(inputs=inputs, outputs=outputs)

In [22]:
inputs = keras.Input(shape=(64,))
outputs = layers.Dense(1, activation="sigmoid")(inputs)
binary_classifier = keras.Model(inputs=inputs, outputs=outputs)

class MyModel(keras.Model):  
    def __init__(self, num_classes=2):
        super().__init__()
        self.dense = layers.Dense(64, activation="relu")
        self.classifier = binary_classifier  
    def call(self, inputs):
        features = self.dense(inputs)
        return self.classifier(features)  
         
model = MyModel()

## 7.3 내장된 훈련 루프와 평가 루프 사용하기

In [23]:
from tensorflow.keras.datasets import mnist  

def get_mnist_model():
    inputs = keras.Input(shape=(28 * 28,))
    features = layers.Dense(512, activation="relu")(inputs)
    features = layers.Dropout(0.5)(features)
    outputs = layers.Dense(10, activation="softmax")(features)
    model = keras.Model(inputs, outputs)
    return model  

(images, labels), (test_images, test_labels) = mnist.load_data()
images = images.reshape((60000, 28 * 28)).astype("float32") / 255 
test_images = test_images.reshape((10000, 28 * 28)).astype("float32") / 255 
train_images, val_images = images[10000:], images[:10000]
train_labels, val_labels = labels[10000:], labels[:10000]

model = get_mnist_model()
model.compile(optimizer="rmsprop",                  
              loss="sparse_categorical_crossentropy",            
              metrics=["accuracy"])                          
model.fit(train_images, train_labels,
          epochs=3,                                              
          validation_data=(val_images, val_labels))
test_metrics = model.evaluate(test_images, test_labels)
predictions = model.predict(test_images)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
Epoch 1/3
Epoch 2/3
Epoch 3/3


### 7.3.1 사용자 정의 지표 만들기

In [32]:
import tensorflow as tf  

class RootMeanSquaredError(keras.metrics.Metric):
    def __init__(self, name="rmse", **kwargs):
        super().__init__(name=name, **kwargs)                     
        self.mse_sum = self.add_weight(name="mse_sum", initializer="zeros")
        self.total_samples = self.add_weight(                   
            name="total_samples", initializer="zeros", dtype="int32")
       
    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true = tf.one_hot(y_true, depth=tf.shape(y_pred)[1])
        mse = tf.reduce_sum(tf.square(y_true - y_pred))
        self.mse_sum.assign_add(mse)
        num_samples = tf.shape(y_pred)[0] 
        self.total_samples.assign_add(num_samples)

In [33]:
def result(self): 
    return tf.sqrt(self.mse_sum / tf.cast(self.total_samples, tf.float32))

In [34]:
def reset_state(self):
    self.mse_sum.assign(0.) 
    self.total_samples.assign(0)

In [35]:
model = get_mnist_model()
model.compile(optimizer="rmsprop",
              loss="sparse_categorical_crossentropy",
              metrics=["accuracy", RootMeanSquaredError()])
model.fit(train_images, train_labels,
          epochs=3,
          validation_data=(val_images, val_labels)) 
test_metrics = model.evaluate(test_images, test_labels)

TypeError: ignored

### 7.3.2 콜백 사용하기

In [36]:
callbacks_list = [
    keras.callbacks.EarlyStopping(
        monitor="val_accuracy",
        patience=2,
    ),
    keras.callbacks.ModelCheckpoint(
        filepath="checkpoint_path.keras",
        monitor="val_loss",
        save_best_only=True,
    )
]
model = get_mnist_model()
model.compile(optimizer="rmsprop",
              loss="sparse_categorical_crossentropy",
              metrics=["accuracy"])
model.fit(train_images, train_labels,
          epochs=10,                                   
          callbacks=callbacks_list,                     
          validation_data=(val_images, val_labels))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7f1c522ff4c0>

In [37]:
model = keras.models.load_model("checkpoint_path.keras")

on_epoch_begin(epoch, logs)  
on_epoch_end(epoch, logs)  
on_batch_begin(batch, logs)  
on_batch_end(batch, logs)  
on_train_begin(logs)  
on_train_end(logs)




In [None]:
from matplotlib import pyplot as plt  

class LossHistory(keras.callbacks.Callback):
    def on_train_begin(self, logs):
        self.per_batch_losses = []  
    def on_batch_end(self, batch, logs):
        self.per_batch_losses.append(logs.get("loss"))  
    def on_epoch_end(self, epoch, logs):
        plt.clf()
        plt.plot(range(len(self.per_batch_losses)), self.per_batch_losses,
                 label="Training loss for each batch")
        plt.xlabel(f"Batch (epoch {epoch})")
        plt.ylabel("Loss")
        plt.legend()
        plt.savefig(f"plot_at_epoch_{epoch}") 
        self.per_batch_losses = []

In [38]:
model = get_mnist_model()
model.compile(optimizer="rmsprop",
              loss="sparse_categorical_crossentropy",
              metrics=["accuracy"])
model.fit(train_images, train_labels,
          epochs=10,
          callbacks=[LossHistory()], 
          validation_data=(val_images, val_labels))

### 7.3.4 텐서보드를 사용한 모니터링과 시각화

In [49]:
model = get_mnist_model()
model.compile(optimizer="rmsprop",
              loss="sparse_categorical_crossentropy",
              metrics=["accuracy"])  
tensorboard = keras.callbacks.TensorBoard(
    log_dir="/full_path_to_your_log_dir",
)
model.fit(train_images, train_labels,
          epochs=10,
          validation_data=(val_images, val_labels), 
          callbacks=[tensorboard])

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7f1c51f3a0a0>

In [51]:
tensorboard --logdir /full_path_to_your_log_dir

NameError: ignored

In [52]:
%load_ext tensorboard 
%tensorboard --logdir /full_path_to_your_log_dir

ModuleNotFoundError: ignored

## 7.4 사용자 정의 훈련, 평가 루프 만들기

### 7.4.1 훈련 vs 추론

In [53]:
def train_step(inputs, targets):
    with tf.GradientTape() as tape:
        predictions = model(inputs, training=True)
        loss = loss_fn(targets, predictions)
    gradients = tape.gradients(loss, model.trainable_weights)
    optimizer.apply_gradients(zip(model.trainable_weights, gradients))

### 7.4.2 측정 지표의 저수준 사용법

In [54]:
metric = keras.metrics.SparseCategoricalAccuracy()
targets = [0, 1, 2]
predictions = [[1, 0, 0], [0, 1, 0], [0, 0, 1]]
metric.update_state(targets, predictions)
current_result = metric.result() 
print(f"결과: {current_result:.2f}")

결과: 1.00


In [55]:
values = [0, 1, 2, 3, 4]
mean_tracker = keras.metrics.Mean() 
for value in values:
    mean_tracker.update_state(value)  
print(f"평균 지표: {mean_tracker.result():.2f}")

평균 지표: 2.00


### 7.4.3 완전한 훈련과 평가 루트

In [56]:
model = get_mnist_model() 

loss_fn = keras.losses.SparseCategoricalCrossentropy()
optimizer = keras.optimizers.RMSprop()
metrics = [keras.metrics.SparseCategoricalAccuracy()]
loss_tracking_metric = keras.metrics.Mean()

def train_step(inputs, targets):
    with tf.GradientTape() as tape:
        predictions = model(inputs, training=True)               
        loss = loss_fn(targets, predictions)
    gradients = tape.gradient(loss, model.trainable_weights)
    optimizer.apply_gradients(zip(gradients, model.trainable_weights))
    logs = {}
    for metric in metrics:
        metric.update_state(targets, predictions)
        logs[metric.name] = metric.result()
    loss_tracking_metric.update_state(loss)
    logs["loss"] = loss_tracking_metric.result()
    return logs

In [57]:
def reset_metrics():
    for metric in metrics:
        metric.reset_state() 
    loss_tracking_metric.reset_state()

In [58]:
training_dataset = tf.data.Dataset.from_tensor_slices(
    (train_images, train_labels))
training_dataset = training_dataset.batch(32)
epochs = 3 
for epoch in range(epochs):
    reset_metrics()
    for inputs_batch, targets_batch in training_dataset:
        logs = train_step(inputs_batch, targets_batch)
    print(f"{epoch}번째 에포크 결과")
    for key, value in logs.items(): 
        print(f"...{key}: {value:.4f}")

0번째 에포크 결과
...sparse_categorical_accuracy: 0.9141
...loss: 0.2908
1번째 에포크 결과
...sparse_categorical_accuracy: 0.9546
...loss: 0.1665
2번째 에포크 결과
...sparse_categorical_accuracy: 0.9624
...loss: 0.1411


In [59]:
def test_step(inputs, targets):
    predictions = model(inputs, training=False)
    loss = loss_fn(targets, predictions)  

    logs = {}
    for metric in metrics:
        metric.update_state(targets, predictions)
        logs["val_" + metric.name] = metric.result()
    loss_tracking_metric.update_state(loss)
    logs["val_loss"] = loss_tracking_metric.result()
    return logs  
    
val_dataset = tf.data.Dataset.from_tensor_slices((val_images, val_labels))
val_dataset = val_dataset.batch(32)
reset_metrics() 
for inputs_batch, targets_batch in val_dataset:
    logs = test_step(inputs_batch, targets_batch) 
print("평가 결과:") 
for key, value in logs.items():  
     print(f"...{key}: {value:.4f}")

평가 결과:
...val_sparse_categorical_accuracy: 0.9657
...val_loss: 0.1367


### 7.4.4 tf.fuction으로 성능 높이기

In [60]:
@tf.function
def test_step(inputs, targets):
    predictions = model(inputs, training=False)
    loss = loss_fn(targets, predictions)  

    logs = {}
    for metric in metrics:
        metric.update_state(targets, predictions)
        logs["val_" + metric.name] = metric.result() 
 
    loss_tracking_metric.update_state(loss)
    logs["val_loss"] = loss_tracking_metric.result()
    return logs  

val_dataset = tf.data.Dataset.from_tensor_slices((val_images, val_labels))
val_dataset = val_dataset.batch(32)
reset_metrics() 

for inputs_batch, targets_batch in val_dataset:
    logs = test_step(inputs_batch, targets_batch) 
print("평가 결과:") 
for key, value in logs.items(): 
    print(f"...{key}: {value:.4f}")

평가 결과:
...val_sparse_categorical_accuracy: 0.9657
...val_loss: 0.1367


### 7.4.5 fit() 메서드를 사용자 정의 루프로 활용하기

In [61]:
loss_fn = keras.losses.SparseCategoricalCrossentropy()
loss_tracker = keras.metrics.Mean(name="loss")

class CustomModel(keras.Model):
    def train_step(self, data):
        inputs, targets = data
        with tf.GradientTape() as tape:
            predictions = self(inputs, training=True)
            loss = loss_fn(targets, predictions)
        gradients = tape.gradient(loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_weights))  
        loss_tracker.update_state(loss)
        return {"loss": loss_tracker.result()}
        
    @property 
    def metrics(self):
        return [loss_tracker]

In [62]:
inputs = keras.Input(shape=(28 * 28,))
features = layers.Dense(512, activation="relu")(inputs)
features = layers.Dropout(0.5)(features)
outputs = layers.Dense(10, activation="softmax")(features)
model = CustomModel(inputs, outputs)  

model.compile(optimizer=keras.optimizers.RMSprop()) 
model.fit(train_images, train_labels, epochs=3)

Epoch 1/3
Epoch 2/3
Epoch 3/3


<keras.callbacks.History at 0x7f1c613d73a0>

In [63]:
class CustomModel(keras.Model):
    def train_step(self, data):
        inputs, targets = data
        with tf.GradientTape() as tape:
            predictions = self(inputs, training=True)
            loss = self.compiled_loss(targets, predictions)
        gradients = tape.gradient(loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_weights))
        self.compiled_metrics.update_state(targets, predictions)
        return {m.name: m.result() for m in self.metrics}

In [64]:
inputs = keras.Input(shape=(28 * 28,))
features = layers.Dense(512, activation="relu")(inputs)
features = layers.Dropout(0.5)(features)
outputs = layers.Dense(10, activation="softmax")(features)
model = CustomModel(inputs, outputs)  
model.compile(optimizer=keras.optimizers.RMSprop(),
              loss=keras.losses.SparseCategoricalCrossentropy(),
              metrics=[keras.metrics.SparseCategoricalAccuracy()]) 
model.fit(train_images, train_labels, epochs=3)

Epoch 1/3
Epoch 2/3
Epoch 3/3


<keras.callbacks.History at 0x7f1c612bee20>