In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.utils import to_categorical
import numpy as np
import matplotlib.pyplot as plt
import os
from scipy import ndimage

# eager 활성화
# 텐서플로의 즉시 실행은 그래프를 생성하지 않고 함수를 바로 실행하는 명령형 프로그래밍 환경
# 텐서플로 2.0은 기본으로 활성화 되어있는 옵션
# tf.enable_eager_execution()

# 1) hyper parameter 세팅

learning_rate = 0.001
training_epochs = 15
batch_size = 100


# checkpoint를 저장할 디렉토리 설정
# Option

cur_dir = os.getcwd()
ckpt_dir_name = 'checkpoints'
model_dir_name = 'minst_cnn_seq'

checkpoint_dir = os.path.join(cur_dir, ckpt_dir_name, model_dir_name)
os.makedirs(checkpoint_dir, exist_ok=True)

checkpoint_prefix = os.path.join(checkpoint_dir, model_dir_name)

# Data augmentation
def data_augmentation(images, labels):
    # 이미지와 레이블을 저장할 리스트 생성
    aug_images = []
    aug_labels = []

    # 오리지널 이미지와 레이블 저장
    for x, y in zip(images, labels):
        aug_images.append(x)
        aug_labels.append(y)

        # 중간 데이터 저장 - rotate나 shift를 하면서 비는 부분을 채우는 용도
        bg_value = np.median(x)
        for _ in range(4):
            # 회전
            angle = np.random.randint(-15, 15, 1)
            rot_img = ndimage.rotate(x, angle, reshape=False, cval=bg_value)
            # shift
            shift = np.random.randint(-2, 2, 2)
            shift_img = ndimage.shift(rot_img, shift, cval=bg_value)

            aug_images.append(shift_img)
            aug_labels.append(y)
            
    # numpy로 변환
    aug_images = np.array(aug_images)
    aug_labels = np.array(aug_labels)
    return aug_images, aug_labels

# Mnist 데이터 셋 가져오기
mnist = keras.datasets.mnist

(train_images, train_labels), (test_images, test_labels) = mnist.load_data()

# 이미지들을 255로 나누고 0~1사이로 설정
train_images = train_images.astype(np.float32) / 255.
test_images = test_images.astype(np.float32) / 255.

# train_images 에 4차원구조여야 하는데 채널(4번째꺼)이 없음으로 채워넣음
# -1(마지막 차원)에 채널을 하나 추가
train_images = np.expand_dims(train_images, axis=-1)
test_images = np.expand_dims(test_images, axis=-1)

train_labels = to_categorical(train_labels, 10)
test_labels = to_categorical(test_labels, 10)

# batch 사이즈만큼씩 자르게 설정
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(buffer_size=100000).batch(batch_size)
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(batch_size)

# 3 네트워크 설정

# 모델 생성
def create_model():
    # Sequential API 사용하겠다고 선언
    model = keras.Sequential()
    model.add(keras.layers.Conv2D(filters=32, kernel_size=3, activation=tf.nn.relu, padding='SAME',input_shape=(28, 28, 1)))
    model.add(keras.layers.MaxPool2D(padding='SAME'))
    model.add(keras.layers.Conv2D(filters=64, kernel_size=3, activation=tf.nn.relu, padding='SAME'))
    model.add(keras.layers.MaxPool2D(padding='SAME'))
    model.add(keras.layers.Conv2D(filters=128, kernel_size=3, activation=tf.nn.relu, padding='SAME'))
    model.add(keras.layers.MaxPool2D(padding='SAME'))
    # fully connected layer 로 들어가기 전 벡터를 펴주는 단계
    model.add(keras.layers.Flatten())
    model.add(keras.layers.Dense(256, activation=tf.nn.relu))
    # danse layer 의 parameter 가 많아서 dropout 적용
    model.add(keras.layers.Dropout(0.4))
    model.add(keras.layers.Dense(10))
    return model

model = create_model()
# model.summary()

# Loss Function
def loss_fn(model, images, labels):
    # training = True : dropout 부분에 적용
    logits = model(images, training=True)
    # Softmax
    loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(
            logits=logits, labels=labels))
    return loss


def grad(model, images, labels):
    with tf.GradientTape() as tape:
        loss = loss_fn(model, images, labels)
        # Backpropagation : 테이프를 거꾸로 감듯이
    return tape.gradient(loss, model.variables)

# ADAM 옵티마이저 사용
optimizer = tf.optimizers.Adam(learning_rate=learning_rate)

def evaluate(model, images, labels):
    logits = model(images, training=False)
    correct_prediction = tf.equal(tf.argmax(logits, 1), tf.argmax(labels, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

    return accuracy

checkpoint = tf.train.Checkpoint(cnn=model)

for epoch in range(training_epochs):
    avg_loss = 0.
    avg_train_acc = 0.
    avg_test_acc = 0.
    train_step = 0
    test_step = 0

    for images, labels in train_dataset:
        grads = grad(model, images, labels)
        optimizer.apply_gradients(zip(grads, model.variables))
        #여기까지만 해도 학습은 완료
        loss = loss_fn(model, images, labels)
        acc = evaluate(model, images, labels)
        avg_loss = avg_loss + loss
        # 평균
        avg_train_acc = avg_train_acc + acc
        train_step += 1
    avg_loss = avg_loss / train_step
    avg_train_acc = avg_train_acc / train_step

    for images, labels in test_dataset:
        # 정확도 구현
        acc = evaluate(model, images, labels)
        avg_test_acc = avg_test_acc + acc
        test_step += 1
    avg_test_acc = avg_test_acc / test_step

    print('Epoch:', '{}'.format(epoch + 1), 'loss =', '{:.8f}'.format(avg_loss),
          'train accuracy = ', '{:.4f}'.format(avg_train_acc),
          'test accuracy = ', '{:.4f}'.format(avg_test_acc))

    checkpoint.save(file_prefix=checkpoint_prefix)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
Epoch: 1 loss = 0.18518402 train accuracy =  0.9546 test accuracy =  0.9864
Epoch: 2 loss = 0.04571076 train accuracy =  0.9897 test accuracy =  0.9895


KeyboardInterrupt: ignored