In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
import numpy as np
import keras
from keras.preprocessing.image import ImageDataGenerator

In [None]:
basicpath = '/content/drive/Shareddrives/Project/CNN_spectrogram_our_dataset/spectrograms'

# 모든 position generator

In [None]:
def image_pos_gen_list(datapath, batch_size):

  pos_gen_list = list()
  datagen = ImageDataGenerator(rescale=1.0/255)

  positions = os.listdir(datapath)
  for pos in positions:
    pos_path = os.path.join(datapath, pos)
    batch_size_pos = batch_size // len(positions)
    print(pos_path)
    print(f'batch size: {batch_size_pos}')
    pos_gen = datagen.flow_from_directory(pos_path, target_size=(48,48), batch_size=batch_size_pos)
    pos_gen_list.append(pos_gen)
  
  return pos_gen_list

In [None]:
def image_pos_generator(img_pos_gen_list):
  while True:  
    batch_label_pairs = [ img_pos_gen.next() for img_pos_gen in img_pos_gen_list ]

    batches = list()
    labels = list()

    for batch, label in batch_label_pairs:
      batches.append(batch)
      labels.append(label)

    batch_size, width, height, channel = batches[0].shape
    batch_size, classes = labels[0].shape
    
    batch_concat = np.empty((batch_size * len(batches), width, height, channel))
    label_concat = np.empty((batch_size * len(labels), classes))

    for i, (batch, label) in enumerate(zip(batches, labels)):
      batch_concat[batch_size * i : batch_size * (i + 1)] = batches[i]
      label_concat[batch_size * i : batch_size * (i + 1)] = labels[i]

    yield batch_concat, label_concat

In [None]:
trainpath = os.path.join(basicpath, 'train')
valpath = os.path.join(basicpath, 'val')
testpath = os.path.join(basicpath, 'test')

In [None]:
batch_size = 64

In [None]:
train_pos_gen_list = image_pos_gen_list(trainpath, batch_size)
val_pos_gen_list = image_pos_gen_list(valpath, batch_size)
test_pos_gen_list = image_pos_gen_list(testpath, batch_size)

In [None]:
train_gen = image_pos_generator(train_pos_gen_list)
val_gen = image_pos_generator(val_pos_gen_list)
test_gen = image_pos_generator(test_pos_gen_list)

# Pocket position generator

In [None]:
trainpath = os.path.join(basicpath, 'train', 'Pocket')
valpath = os.path.join(basicpath, 'val', 'Pocket')
testpath = os.path.join(basicpath, 'test', 'Pocket')

In [None]:
train_datagen = ImageDataGenerator(rescale=1.0/255)
val_datagen = ImageDataGenerator(rescale=1.0/255)
test_datagen = ImageDataGenerator(rescale=1.0/255)

In [None]:
classes = ['bus', 'car', 'metro', 'powerChar', 'still', 'walking' ]

In [None]:
batch_size = 26

In [None]:
train_gen = train_datagen.flow_from_directory(trainpath, target_size=(48,48), batch_size=batch_size, classes=classes)
val_gen = val_datagen.flow_from_directory(valpath, target_size=(48,48), batch_size=batch_size, classes=classes)
test_gen = test_datagen.flow_from_directory(testpath, target_size=(48,48), batch_size=batch_size, classes=classes)

In [None]:
steps_per_epoch = int(2640/ batch_size)
validation_steps = int(286 / batch_size)
test_steps = int(286 / batch_size)

In [None]:
train_gen.class_indices

# F1-score

In [None]:
from keras import backend as K

def recall(y_target, y_pred):
    # clip(t, clip_value_min, clip_value_max) : clip_value_min~clip_value_max 이외 가장자리를 깎아 낸다
    # round : 반올림한다
    y_target_yn = K.round(K.clip(y_target, 0, 1)) # 실제값을 0(Negative) 또는 1(Positive)로 설정한다
    y_pred_yn = K.round(K.clip(y_pred, 0, 1)) # 예측값을 0(Negative) 또는 1(Positive)로 설정한다

    # True Positive는 실제 값과 예측 값이 모두 1(Positive)인 경우이다
    count_true_positive = K.sum(y_target_yn * y_pred_yn) 

    # (True Positive + False Negative) = 실제 값이 1(Positive) 전체
    count_true_positive_false_negative = K.sum(y_target_yn)

    # Recall =  (True Positive) / (True Positive + False Negative)
    # K.epsilon()는 'divide by zero error' 예방차원에서 작은 수를 더한다
    recall = count_true_positive / (count_true_positive_false_negative + K.epsilon())

    # return a single tensor value
    return recall


def precision(y_target, y_pred):
    # clip(t, clip_value_min, clip_value_max) : clip_value_min~clip_value_max 이외 가장자리를 깎아 낸다
    # round : 반올림한다
    y_pred_yn = K.round(K.clip(y_pred, 0, 1)) # 예측값을 0(Negative) 또는 1(Positive)로 설정한다
    y_target_yn = K.round(K.clip(y_target, 0, 1)) # 실제값을 0(Negative) 또는 1(Positive)로 설정한다

    # True Positive는 실제 값과 예측 값이 모두 1(Positive)인 경우이다
    count_true_positive = K.sum(y_target_yn * y_pred_yn) 

    # (True Positive + False Positive) = 예측 값이 1(Positive) 전체
    count_true_positive_false_positive = K.sum(y_pred_yn)

    # Precision = (True Positive) / (True Positive + False Positive)
    # K.epsilon()는 'divide by zero error' 예방차원에서 작은 수를 더한다
    precision = count_true_positive / (count_true_positive_false_positive + K.epsilon())

    # return a single tensor value
    return precision


def f1score(y_target, y_pred):
    _recall = recall(y_target, y_pred)
    _precision = precision(y_target, y_pred)
    # K.epsilon()는 'divide by zero error' 예방차원에서 작은 수를 더한다
    _f1score = ( 2 * _recall * _precision) / (_recall + _precision+ K.epsilon())
    
    # return a single tensor value
    return _f1score

# 모델 정의

In [None]:
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, Dense, Flatten, Dropout
from keras import optimizers

model = Sequential()
model.add(Conv2D(16, (3,3), activation='relu', input_shape = (48, 48, 3), padding='same'))
model.add(MaxPooling2D((2,2)))
model.add(Conv2D(32, (3,3), activation='relu', padding='same'))
model.add(MaxPooling2D((2,2)))
model.add(Conv2D(64, (3,3), activation='relu', padding='same'))
model.add(MaxPooling2D((2,2)))

model.add(Flatten())
model.add(Dropout(0.25))
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(len(classes), activation='sigmoid'))

In [None]:
model.summary()

# Callback 함수

In [None]:
from keras.callbacks import EarlyStopping, ModelCheckpoint
es = EarlyStopping(monitor='val_loss', patience=8)

In [None]:
basicpath_cp = '/content/drive/Shareddrives/Project/CNN_spectrogram_our_dataset/cp/05'
cp_path = os.path.join(basicpath_cp, 'pocket_{epoch}.ckpt')
cp = ModelCheckpoint(cp_path, monitor='val_loss', save_best_only=False)

In [None]:
lr_schedule = keras.optimizer_v2.learning_rate_schedule.ExponentialDecay(initial_learning_rate=0.001, decay_steps=steps_per_epoch * 4, decay_rate=0.8, staircase=True)

# 모델 학습

In [None]:
model.compile(optimizer=keras.optimizer_v2.adam.Adam(learning_rate=lr_schedule),
              loss='categorical_crossentropy',
              metrics=['acc', f1score])

In [None]:
history = model.fit_generator(train_gen,
                              steps_per_epoch=steps_per_epoch,
                              epochs=100,
                              validation_data=val_gen,
                              validation_steps=validation_steps,
                              callbacks=[es, cp])

# Plotting

In [None]:
import matplotlib.pyplot as plt

loss = history.history['loss']
val_loss = history.history['val_loss']

epochs = range(1, len(loss) + 1)  #1~20

plt.plot(epochs, loss, 'bo', label='Training loss')
plt.plot(epochs, val_loss, 'b', label='Validation loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.show()

plt.clf()

acc = history.history['acc']
val_acc = history.history['val_acc']

plt.plot(epochs, acc, 'bo', label='Training acc')
plt.plot(epochs, val_acc, 'b', label='Validation acc')
plt.title('Training and Validation Acc')
plt.xlabel('Epoch')
plt.ylabel('Acc')
plt.legend()

plt.show()

# 평가

In [None]:
for cp in os.listdir(basicpath_cp):
  print(cp)
  epoch = int(cp.split(sep='_')[1].split(sep='.')[0])
  if epoch < 15:
    print('pass')
    continue
  model.load_weights(os.path.join(basicpath_cp, cp))
  score = model.evaluate_generator(test_gen, steps=test_steps)
  print(f'Test loss: {score[0]}, Test acc: {score[1]}, Test f1-score: {score[2]}')

# Confusion matrix 생성

In [None]:
model.load_weights(os.path.join(basicpath_cp, 'pocket_29.ckpt' ))

In [None]:
import matplotlib.pyplot as plt
import itertools
from sklearn.metrics import confusion_matrix

def plot_confusion_matrix(cm, classes, normalize=False, title='Confusion matrix', cmap=plt.cm.Blues):
  plt.imshow(cm, interpolation='nearest', cmap=cmap)
  plt.title(title)
  plt.colorbar()
  tick_marks=np.arange(len(classes))
  plt.xticks(tick_marks, classes, rotation=45)
  plt.yticks(tick_marks, classes)
  
  if normalize:
    cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
      
  thresh = cm.max() / 2.
  for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
    plt.text(j, i, cm[i, j], horizontalalignment="center", color="white" if cm[i, j] > thresh else "black")
  
  plt.tight_layout()
  plt.ylabel('True label')
  plt.xlabel('Predicted label')

In [None]:
def make_test_batch(test_gen, step):
  batches = list()
  labels = list()

  for i in range(step):
    batch, label = next(test_gen)
    batches.append(batch)
    labels.append(label)
  
  batch_size, width, height, channel = batches[0].shape
  batch_size, classes = labels[0].shape

  batch_concat = np.empty((batch_size * len(batches), width, height, channel))
  label_concat = np.empty((batch_size * len(batches), classes))

  for i, (batch, label) in enumerate(zip(batches, labels)):
    batch_concat[batch_size * i : batch_size * (i + 1)] = batches[i]
    label_concat[batch_size * i : batch_size * (i + 1)] = labels[i]
  
  return batch_concat, label_concat

In [None]:
X, Y = make_test_batch(test_gen, step=test_steps)

In [None]:
X.shape

In [None]:
# Predict the values from the validation dataset
Y_pred = model.predict(X)
# Convert predictions classes to one hot vectors
Y_pred_classes = np.argmax(Y_pred, axis = 1)
# Convert validation observations to one hot vectors
Y_true = np.argmax(Y, axis = 1)
# compute the confusion matrix
confusion_mtx = confusion_matrix(Y_true, Y_pred_classes)
# plot the confusion matrix
plot_confusion_matrix(confusion_mtx, classes = classes)