In [72]:
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.utils import to_categorical

def prepare_data():
    (x_train, y_train), (x_test, y_test) = cifar10.load_data()
    x_train, x_test = x_train.astype('float32'), x_test.astype('float32')
    y_train, y_test = to_categorical(y_train), to_categorical(y_test)
    
    return x_train, x_test, y_train, y_test

In [73]:
import tensorflow as tf

class CNN(tf.keras.Model):
    def __init__(self):
        super().__init__()

        weight_decay = 1e-4
        self.std1 = tf.keras.layers.BatchNormalization()
        self.std2 = tf.keras.layers.BatchNormalization()
        self.std3 = tf.keras.layers.BatchNormalization()
        self.std4 = tf.keras.layers.BatchNormalization()
        self.std5 = tf.keras.layers.BatchNormalization()
        self.std6 = tf.keras.layers.BatchNormalization()

        self.conv2D_1 = tf.keras.layers.Conv2D(
            filters=32,
            kernel_size=(3,3),
            padding="same",
            input_shape=x_train.shape[1:],
            kernel_regularizer=tf.keras.regularizers.l2(weight_decay),
            activation="relu",
        )
        self.conv2D_2 = tf.keras.layers.Conv2D(
            filters=32,
            kernel_size=(3,3),
            padding="same",
            input_shape=x_train[0].shape,
            kernel_regularizer=tf.keras.regularizers.l2(weight_decay),
            activation="relu",
        )
        self.pool1 = tf.keras.layers.MaxPooling2D(pool_size=(2,2))
        self.dropout1=tf.keras.layers.Dropout(0.2)
        self.conv2D_3 = tf.keras.layers.Conv2D(
            filters=64,
            kernel_size=(3,3),
            padding="same",
            kernel_regularizer=tf.keras.regularizers.l2(weight_decay),
            activation="relu",
        )
        self.conv2D_4 = tf.keras.layers.Conv2D(
            filters=64,
            kernel_size=(3,3),
            padding="same",
            kernel_regularizer=tf.keras.regularizers.l2(weight_decay),
            activation="relu"
        )
        self.pool2 = tf.keras.layers.MaxPooling2D(
            pool_size=(2,2),
        )
        self.dropout2 = tf.keras.layers.Dropout(0.3)
        self.conv2D_5 = tf.keras.layers.Conv2D(
            filters=128,
            kernel_size=(3,3),
            padding="same",
            kernel_regularizer=tf.keras.regularizers.l2(weight_decay),
            activation="relu",
        )
        self.conv2D_6 = tf.keras.layers.Conv2D(
            filters=128,
            kernel_size=(3,3),
            padding="same",
            kernel_regularizer=tf.keras.regularizers.l2(weight_decay),
            activation="relu",
        )
        self.pool3 = tf.keras.layers.MaxPooling2D(
            pool_size=(2,2)
        )
        self.dropout3 = tf.keras.layers.Dropout(0.4)
        self.flatten = tf.keras.layers.Flatten()
        self.fc1 = tf.keras.layers.Dense(
            128,
            activation="relu",
        )
        self.dropout4 = tf.keras.layers.Dropout(0.4)
        self.fc2 = tf.keras.layers.Dense(
            10,
            activation="softmax",
        )

    @tf.function
    def call(self, x, training=None):
        x = self.std1(self.conv2D_1(x))
        x = self.pool1(self.std2(self.conv2D_2(x)))
        if training:
            x = self.dropout1(x)
        x = self.std3(self.conv2D_3(x))
        x = self.pool2(self.std4(self.conv2D_4(x)))
        if training:
            x = self.dropout2(x)
        x = self.std5(self.conv2D_5(x))
        x = self.pool3(self.std6(self.conv2D_6(x)))
        if training:
            x = self.dropout3(x)
        x = self.flatten(x)
        x = self.fc1(x)
        if training:
            x = self.dropout4(x)
        x = self.fc2(x)
        return x


In [74]:
"""
set error function and optimizer
"""

import tensorflow as tf

loss_fn = tf.keras.losses.CategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

In [75]:
"""
update the parameter
"""

train_loss = tf.keras.metrics.Mean()
train_accuracy = tf.keras.metrics.CategoricalAccuracy()

@tf.function
def train_step(x, t):
    with tf.GradientTape() as tape:
        outputs = model(x, training=True)
        tmp_loss = loss_fn(t, outputs)

    grads = tape.gradient(
        tmp_loss,
        model.trainable_variables
    )
    optimizer.apply_gradients(
        zip(grads, model.trainable_variables)
    )
    train_loss(tmp_loss)
    train_accuracy(t, outputs) 

In [76]:
val_loss = tf.keras.metrics.Mean()
val_accuracy = tf.keras.metrics.CategoricalAccuracy()

@tf.function
def valid_step(val_x, val_y):
    pred = model(val_x, training=False)
    tmp_loss = loss_fn(val_y, pred)
    val_loss(tmp_loss)
    val_accuracy(val_y, pred)

In [77]:
from sklearn.utils import shuffle

x_train, x_test, y_train, y_test = prepare_data()

epochs = 120
batch_size = 64
train_steps = x_train.shape[0] // batch_size
val_steps = x_test.shape[0] // batch_size

model = CNN()
history = {'loss':[],'accuracy':[], 'val_loss':[], 'val_accuracy':[]}

train_datagen = tf.keras.preprocessing.image.ImageDataGenerator(
    featurewise_center=True,
    featurewise_std_normalization=True,
    width_shift_range=0.1,
    height_shift_range=0.1,
    rotation_range=10,
    zoom_range=0.1,
    horizontal_flip=True,
)
test_datagen = tf.keras.preprocessing.image.ImageDataGenerator(
    featurewise_center=True,
    featurewise_std_normalization=True,
)

"""normarize the data"""
train_datagen.fit(x_train)
test_datagen.fit(x_test)

"apply batch to generator"
train_generator = train_datagen.flow(
    x_train,
    y_train,
    batch_size=batch_size,
)

validation_generator = test_datagen.flow(
    x_test,
    y_test,
    batch_size=batch_size,
)

for epoch in range(epochs):

    step_counter = 0
    for x_batch, t_batch in train_generator:
        train_step(x_batch, t_batch)
        step_counter += 1
        if step_counter >= train_steps:
            break
    
    v_step_counter = 0
    for x_val_batch, t_val_batch in validation_generator:
        valid_step(x_val_batch, t_val_batch)
        v_step_counter += 1
        if v_step_counter >= val_steps:
            break

    
    avg_train_loss = train_loss.result()    # 訓練時の平均損失値を取得
    avg_train_acc = train_accuracy.result() # 訓練時の平均正解率を取得
    avg_val_loss = val_loss.result()     # 検証時の平均損失値を取得
    avg_val_acc = val_accuracy.result()  # 検証時の平均正解率を取得

    # 損失の履歴を保存する
    history['loss'].append(avg_train_loss)
    history['val_loss'].append(avg_val_loss)
    # 精度の履歴を保存する
    history['accuracy'].append(avg_train_acc)
    history['val_accuracy'].append(avg_val_acc)

    # 1エポックごとに結果を出力
    if (epoch + 1) % 1 == 0:
        print(
            'epoch({}) train_loss: {:.4} train_acc: {:.4} val_loss: {:.4} val_acc: {:.4}'.format(
                epoch+1,
                avg_train_loss, # 現在の損失を出力
                avg_train_acc,  # 現在の精度を出力
                avg_val_loss,   # 現在の損失を出力
                avg_val_acc     # 現在の精度を出力
    ))

# モデルの概要を出力
model.summary()


epoch(1) train_loss: 0.1921 train_acc: 0.3662 val_loss: 0.3187 val_acc: 0.1028


KeyboardInterrupt: 