In [1]:
# 파이썬 2와 3의 버전 차이로 인해 생기는 문제를 방지하고 호환이 되도록 하기 위해 사용.\n
from __future__ import absolute_import, division, print_function

In [None]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.layers import Dense, Flatten, Conv2D, Dropout, MaxPool2D
import numpy as np
from tensorflow.keras import Model #

from keras.callbacks import EarlyStopping
from keras.callbacks import ModelCheckpoint

import matplotlib.pyplot as plt
import os

In [None]:
learning_rate = 0.001
training_epochs = 100
batch_size = 64

In [None]:
## Checkpoint directory 생성

In [None]:
cur_dir = os.getcwd()
ckpt_dir_name = 'checkpoints'
model_dir_name = 'dual_cnn_major'

checkpoint_dir = os.path.join(cur_dir, ckpt_dir_name, model_dir_name)
os.makedirs(checkpoint_dir, exist_ok=True)

checkpoint_prefix = os.path.join(checkpoint_dir, model_dir_name)

In [None]:
## 우리가 사용할 데이터 (MFCC) - 추후 변경

In [None]:
acoustic_sound = keras.datasets.acoustic_sound
class_names = ['vacuum_cleaner', 'water_drops', 'clock_alarm', 'glass_breaking', 'baby_crying', 'door_knock', 'toilet_flush', 'washing_machine']


In [None]:
## dataset

In [None]:
# image dataset 만들어서 로드하는거 넣기

In [None]:
(train_images, train_labels), (test_images, test_labels) = acoustic_sound.load_data()    

# 0~1로 정규화
train_images = train_images.astype(np.float32) / 255.
test_images = test_images.astype(np.float32) / 255.

# channel 차원 추가
train_images = np.expand_dims(train_images, axis=-1)
test_images = np.expand_dims(test_images, axis=-1)

# one-hot encoding
train_labels = to_categorical(train_labels, 10)
test_labels = to_categorical(test_labels, 10)

# dataset 섞고 batch 만들기
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(
                buffer_size=100000).batch(batch_size)
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(batch_size)

In [None]:
##### conv 3, pooling 3
class AcousticSoundModel(tf.keras.Model):
    def __init__(self):
        super(AcousticSoundModel, self).__init__()
        self.conv1 = Conv2D(filters=64, kernel_size=[3, 3], padding='SAME', activation=tf.nn.relu)
        self.drop1 = Dropout(rate=0.2)
        self.pool1 = MaxPool2D(padding='SAME') ###### pooling 2x2. stride는 표기 x, 확인 ######
        
        self.conv2 = Conv2D(filters=64, kernel_size=[3, 3], padding='SAME', activation=tf.nn.relu)
        self.drop2 = Dropout(rate=0.2) #20% dropout
        self.pool2 = MaxPool2D(padding='SAME')
        
        self.conv3 = Conv2D(filters=64, kernel_size=[3, 3], padding='SAME', activation=tf.nn.relu)
        self.drop3 = Dropout(rate=0.2) #20% dropout
        self.pool3 = MaxPool2D(padding='SAME')
        
        self.pool3_flat = keras.layers.Flatten()
        self.dense4 = Dense(units=128, activation=tf.nn.relu)
        self.dense5 = Dense(units=8, activation=tf.nn.sigmoid) ### 일단 8. class 개수 추가되는 대로 변경
        
    def call(self, inputs, training=False):
        net = self.conv1(inputs)
        net = self.drop1(net)
        net = self.pool1(net)
        
        net = self.conv2(net)
        net = self.drop2(net)
        net = self.pool2(net)
        
        net = self.conv3(net)
        net = self.drop3(net)
        net = self.pool3(net)
        
        net = self.pool3_flat(net)
        net = self.dense4(net)
        net = self.dense5(net) #
        return net #
        # return self.dense5(net)

In [None]:
model = AcousticSoundModel()
#temp_inputs = keras.Input(shape=(28, 28, 1)) #### MFCC 이미지 가로(Time), 세로(Frequency) ex) 28x28x1
#model(temp_inputs)
#model.summary()

In [None]:
@tf.function
def loss_fn(model, images, labels):
    logits = model(images, training=True)
    loss = tf.reduce_mean(tf.keras.losses.categorical_crossentropy(
        y_pred=logits, y_true=labels, from_logits=True))     
    return loss

In [None]:
@tf.function
def grad(model, images, labels):
    with tf.GradientTape() as tape:
        loss = loss_fn(model, images, labels)
    return tape.gradient(loss, model.variables)

In [None]:
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

In [None]:
def evaluate(model, images, labels):
    logits = model(images, training=False)
    correct_prediction = tf.equal(tf.argmax(logits, 1), tf.argmax(labels, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
    return accuracy

checkpoint = tf.train.Checkpoint(cnn=model)

In [None]:
@tf.function
def train(model, images, labels):
    grads = grad(model, images, labels)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

In [None]:
#early_stopping = EarlyStopping(monitor='val_loss', mode='min', patience=10, verbose=1)
#model_choice = ModelCheckpoint('best_model.h5', monitor='val_loss', mode='min', save_best_only=True)

print('Learning started.')
for epoch in range(training_epochs):
    test_loss = 0.
    train_acc = 0.
    test_acc = 0.
    train_step = 0
    test_step = 0

    for images, labels in train_dataset:
        train(model, images, labels)
        loss = loss_fn(model, images, labels)
        acc = evaluate(model, images, labels)
        test_loss = test_loss + loss
        train_acc = train_acc + acc
        train_step += 1

    test_loss = test_loss / train_step
    train_acc = train_acc / train_step

    for images, labels in test_dataset:
        acc = evaluate(model, images, labels)
        test_acc = test_acc + acc
        test_step += 1

    test_acc = test_acc / test_step

    print('Epoch:', '{}'.format(epoch + 1), 
          'loss =', '{:.8f}'.format(test_loss),
          'train accuracy = ', '{:.4f}'.format(train_acc),
          'test accuracy = ', '{:.4f}'.format(test_acc))

    checkpoint.save(file_prefix=checkpoint_prefix)
        
    #if early_stopping.validate(val_loss):
        #break
print('Learning Finished!')


In [None]:
#######################################################################################################################
#######################################################################################################################

In [None]:
# 모델의 손실과 성능 측정 지표
# epoch가 진행되는 동안 수집된 측정 지표를 바탕으로 최종 결과를 출력

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

test_loss = tf.keras.metrics.Mean(name='test_loss')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')

In [None]:
es = EarlyStopping(monitor='val_loss', mode='min', patience=10, verbose=1) #
model_choice = ModelCheckpoint('best_model.h5', monitor='val_loss', mode='min', save_best_only=True)
print('Learning started.')
hist = model.fit(train_images, train_labels, 
                 epochs=training_epochs, 
                 batch_size=batch_size, verbose=2, validation_split=0.2, 
                 callbacks=[early_stopping, model_choice])

print('Learning started.')
for epoch in range(training_epochs):
    avg_loss = 0.
    avg_train_acc = 0.
    avg_test_acc = 0.
    train_step = 0
    test_step = 0
    
    for images, labels in train_dataset:
        train(model, images, labels)
        loss = loss_fn(model, images, labels)
        acc = evaluate(model, images, labels)
        avg_loss += loss
        avg_train_acc += acc
        train_step += 1
    avg_loss = avg_loss / train_step
    avg_train_acc = avg_train_acc / train_step
    
    for images, labels in test_dataset:        
        acc = evaluate(model, images, labels)        
        avg_test_acc = avg_test_acc + acc
        test_step += 1    
    avg_test_acc = avg_test_acc / test_step    

    print('Epoch:', '{}'.format(epoch + 1), 'loss =', '{:.8f}'.format(avg_loss), 
          'train accuracy = ', '{:.4f}'.format(avg_train_acc), 
          'test accuracy = ', '{:.4f}'.format(avg_test_acc))
    
    checkpoint.save(file_prefix=checkpoint_prefix)

print('Learning Finished!')