# 데이터 가져오기

In [None]:
# 구글 드라이브에 있는 데이터 가져오기

from google.colab import drive
drive.mount('/content/drive')

In [None]:
# 1. 장르별 데이터 가져오기

import numpy as np
import tensorflow as tf
from keras.preprocessing import image # 이미지파일 사용 위한 라이브러리
from keras.preprocessing.image import ImageDataGenerator # 이미지 한꺼번에 불러오기
import matplotlib.pyplot as plt

# 데이터 가져오는 객체 생성
data_img = ImageDataGenerator(
    rescale= 1./255
    )

# 훈련데이터
train_img = data_img.flow_from_directory(
    '/content/drive/MyDrive/images_original/train', # 데이터 경로
    target_size=(288, 432), # melspectrogram 이미지 하나의 크기
    batch_size = 1, # 배치사이즈 1
    class_mode='categorical',# 다중 분류
    shuffle=True, # 데이터 섞기
    classes=['blues','classical','country','disco','hiphop','jazz','metal','pop','reggae','rock'], # 장르명시
    subset='training' # flow_from_directory()가 훈련데이터를 로드
  )


# 테스트데이터
test_img_load = ImageDataGenerator(
    rescale=1./255
)
test_img = test_img_load.flow_from_directory(
    '/content/drive/MyDrive/images_original/test', # 테스트데이터 경로
    target_size=(288, 432), # 데이터 하나 크기
    batch_size=1, # 배치사이즈 1
    class_mode='categorical', # 다중분류
    shuffle=False, # 데이터 섞기
    classes=['blues','classical','country','disco','hiphop','jazz','metal','pop','reggae','rock'] # 장르명 명시
)

# 각 데이터 파일에 대한 softmax값을 출력하기 위한
# shuffle 하지 않은 훈련데이터
not_shuffle = data_img.flow_from_directory(
    '/content/drive/MyDrive/images_original/train', # 데이터 경로
    target_size=(288, 432), # melspectrogram 이미지 하나의 크기
    batch_size = 1, # 배치사이즈 1
    class_mode='categorical',# 다중 분류
    shuffle=False, # 데이터 섞기
    classes=['blues','classical','country','disco','hiphop','jazz','metal','pop','reggae','rock'] # 장르명시
  )


# 클래스 별 이미지 개수 확인(chatgpt 활용)
import collections # 필요한 라이브러리
class_indices = train_img.class_indices # 클래스 부류 불러오기

class_counts = collections.Counter(train_img.classes) # 클래스별 이미지 개수 세기
print("\n train image 개수")
for class_name, class_index in class_indices.items() : # 출력
  print(f"{class_name}: {class_counts[class_index]} images")

test_cnt = collections.Counter(test_img.classes)
print("\n test image 개수")
for class_name, test_index in class_indices.items() :
  print(f"{class_name}: {test_cnt[test_index]} images ")

In [None]:
# 훈련데이터 x, y 나누기
x_train = []
y_train = []

# test_img의 samples 속성을 사용하여 전체 샘플 수를 가져옵니다.
total_samples_train = train_img.samples
print(total_samples_train)

# 전체 데이터를 가져옵니다.
for i in range(total_samples_train):
    img, label = train_img.next()
    x_train.append(img)  # img는 배치 크기만큼의 배열이므로, 첫 번째 항목만 추가합니다.
    y_train.append(label)  # label도 마찬가지로 첫 번째 항목만 추가합니다.

# 리스트를 NumPy 배열로 변환합니다.
x_train = np.array(x_train)
y_train = np.array(y_train)

x_train = np.squeeze(x_train)
y_train = np.squeeze(y_train)

print("squeeze후 x_shape:", x_train.shape)
print("squeeze후 y_shape:", y_train.shape)

print(len(x_train))

# 테스트데이터 x, y 나누기

x_test = []
y_test = []
x_test_name = []

# test_img의 samples 속성을 사용하여 전체 샘플 수를 가져옵니다.
total_samples_test = test_img.samples
print(total_samples_test)

# 전체 데이터를 가져옵니다.
for i in range(total_samples_test):
    img, label = test_img.next()
    x_test.append(img)  # img는 배치 크기만큼의 배열이므로, 첫 번째 항목만 추가합니다.
    y_test.append(label)  # label도 마찬가지로 첫 번째 항목만 추가합니다.
    x_test_name.append(test_img.filenames[test_img.index_array[i]])

# 리스트를 NumPy 배열로 변환합니다.
x_test = np.array(x_test)
y_test = np.array(y_test)

x_test = np.squeeze(x_test)
y_test = np.squeeze(y_test)

print(x_test.shape)
print(y_test.shape)

In [None]:
# softmax 출력을 위해 순서대로 가져온 train 데이터

# 훈련데이터 x, y 나누기
x_not_shuffle = []
y_not_shuffle = []
not_shuffle_name = []

# test_img의 samples 속성을 사용하여 전체 샘플 수를 가져옵니다.
total_samples_original = not_shuffle.samples
print(total_samples_original)

# 전체 데이터를 가져옵니다.
for i in range(total_samples_original):
    img, label = not_shuffle.next()
    x_not_shuffle.append(img)  # img는 배치 크기만큼의 배열이므로, 첫 번째 항목만 추가합니다.
    y_not_shuffle.append(label)  # label도 마찬가지로 첫 번째 항목만 추가합니다.
    not_shuffle_name.append(not_shuffle.filenames[not_shuffle.index_array[i]])

# 리스트를 NumPy 배열로 변환합니다.
x_not_shuffle = np.array(x_not_shuffle)
y_not_shuffle = np.array(y_not_shuffle)

x_not_shuffle = np.squeeze(x_not_shuffle)
y_not_shuffle = np.squeeze(y_not_shuffle)

print("squeeze후 x_shape:", x_not_shuffle.shape)
print("squeeze후 y_shape:", y_not_shuffle.shape)

print(len(x_not_shuffle))
print(not_shuffle_name)

# CNN 구현
Music Genre classification on GTZAN dataset using CNNs

github 주소: https://github.com/Hguimaraes/gtzan.keras/blob/master/nbs/1.1-custom_cnn_2d.ipynb

레퍼런스 모델

In [None]:
# 1. 필요한 라이브러리 불러오기

import os
import h5py
import librosa
import itertools
from copy import copy
import numpy as np
import matplotlib.pyplot as plt
from collections import OrderedDict
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix

import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Add
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Activation
from tensorflow.keras.layers import PReLU
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.layers import AveragePooling2D
from tensorflow.keras.layers import GlobalAveragePooling2D
from tensorflow.keras.layers import GlobalMaxPooling2D
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.callbacks import ReduceLROnPlateau
from sklearn.metrics import f1_score

#모델 구성

In [None]:
# 1. CNN layer 블록 만드는 함수
def conv_block(x, n_filters, pool_size=(2,2)) :
  x = Conv2D(n_filters, (3,3), strides=(1,1), padding='same')(x)  # 필터크기 3x3, 보폭 1, 입력크기=출력크기
  x = Activation('relu')(x) # 활성화함수로 ReLU 함수 사용
  x = MaxPooling2D(pool_size=pool_size, strides=pool_size)(x) # max pooling 적용
  x = Dropout(0.5)(x) # 과대적합 방지
  return x

In [None]:
def create_model(input_shape, num_genres):
    inpt = Input(shape=input_shape)
    x = conv_block(inpt, 16)
    x = conv_block(x, 32)
    x = conv_block(x, 64)
    x = conv_block(x, 128)

    # x = conv_block(x, 192)

    # Global Pooling and MLP
    x = Flatten()(x)
    x = Dropout(0.5)(x)
    x = Dense(192, activation='relu',
              kernel_regularizer=tf.keras.regularizers.l2(0.02))(x)
    x = Dropout(0.25)(x)
    predictions = Dense(num_genres,
                        activation='softmax',
                        kernel_regularizer=tf.keras.regularizers.l2(0.02))(x)

    model = Model(inputs=inpt, outputs=predictions)
    return model

model = create_model((288, 432, 3), 10)
model.summary()

In [None]:
model.compile(loss = tf.keras.losses.categorical_crossentropy, # 다중분류, 원핫인코딩.
              optimizer = tf.keras.optimizers.Adam(),
              metrics = ['accuracy'])

# PyTorch의 학습률 감소 기법 중 하나로, val_loss가 더이상 개선되지 않을 때
# 학습률을 동적으로 감소시켜 모델의 학습을 돕는 기법
# ReduceLROnPlateau에 대한 설명 링크 : https://wikidocs.net/195132
reduceLROnPlat = ReduceLROnPlateau(
    monitor = 'val_loss',
    factor = 0.895, # lr을 감소시킬 비율 (기본값 0.1 = val_loss 개선되지 않을 때 현재 lr에 0.1을 곱하여 감소시킴)
    patience = 3, # val_loss 값이 개선되지 않은 상태를 얼마나 허용할 것인지를 설정
    verbose = 1, # True : 감소된 lr에 대한 정보를 출력
    mode = 'min', # 성능 개선을 어떻게 측정할지 지정. val_loss값이 감소할 때 성능이 개선되었다고 판단.
    min_delta = 0.0001,
    cooldown = 2, # lr을 감소시킨 후, 새로운 lr을 적용하기 전에 몇 epoch동안 학습을 일시 정지할지 횟수를 지정
    min_lr = 1e-5 # lr을 감소시킬 최소값을 지정 (lr이 이 값보다 작아지지 않도록 함)
)

batch_size = 32
steps_per_epoch = np.ceil(len(x_train)/batch_size)

history = model.fit(
    x_train, y_train,
    batch_size=32,
    validation_split = 0.2,
    epochs=150,
    verbose=1,
    callbacks=[reduceLROnPlat]
)

model.save('/content/drive/MyDrive/deep24_model/is_this_best_question.h5')

In [None]:
score = model.evaluate(x_test, y_test, verbose=0)
print("test_loss={:.3f} and test_acc = {:.3f}".format(score[0],score[1]))

plt.figure(figsize=(15,7))

plt.subplot(1,2,1)
plt.plot(history.history['accuracy'], label='train')
plt.plot(history.history['val_accuracy'], label='validation')
plt.title('Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.subplot(1,2,2)
plt.plot(history.history['loss'], label='train')
plt.plot(history.history['val_loss'], label='validation')
plt.title('Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

plt.tight_layout()
plt.show()

# 혼동행렬, softmax 출력

In [None]:
#http://scikit-learn.org/stable/auto_examples/model_selection/plot_confusion_matrix.html
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

In [None]:
genres = {'blues': 0, 'classical': 1, 'country': 2, 'disco': 3, 'hiphop': 4,
          'jazz': 5, 'metal': 6, 'pop': 7, 'reggae': 8, 'rock': 9}
# train
train_predictions_matrix = model.predict(x_not_shuffle)
train_preds_matrix = np.argmax(train_predictions_matrix, axis = 1)
y_orig = np.argmax(y_not_shuffle, axis = 1)
cm = confusion_matrix(train_preds_matrix, y_orig)

keys = OrderedDict(sorted(genres.items(), key=lambda t: t[1])).keys()

plt.figure(figsize=(10,10))
plot_confusion_matrix(cm, keys, normalize=True)

# DNN

In [None]:
# 필요한 라이브러리 임포트
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelEncoder,OneHotEncoder,StandardScaler
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ReduceLROnPlateau
from tensorflow.keras.layers import Dropout, BatchNormalization, Dense
from tensorflow.keras.models import Sequential, load_model
from sklearn.preprocessing import StandardScaler

In [None]:
import pandas as pd

# 데이터 세트 읽어들기
data_feature = pd.read_csv('/content/drive/MyDrive/features_30_sec.csv')

# 데이터프레임으로 변환
data_df = pd.DataFrame(data_feature)

# 결측값이 있는 행 삭제
data_df = data_df.dropna()
data_df = data_df.reset_index(drop=True)
print(data_df)

In [None]:
# 사이킷런을 이용한 라벨인코딩
# 참고문헌 : https://dlearner.tistory.com/22
from sklearn.preprocessing import LabelEncoder
data_labels = data_df['label']

# 먼저 레이블 인코딩 과정을 거쳐서 문자열에 숫자 하나씩 매핑
data_encoder = LabelEncoder()
data_encoder.fit(data_labels)
data_labels = data_encoder.transform(data_labels)

print("data_labels:")
print(data_labels)

In [None]:
from sklearn.model_selection import train_test_split

# data : 특성과 레이블 분리
X_data30 = data_df.drop(['label', 'filename'], axis=1)
y_data30 = data_labels


print(X_data30.shape, y_data30.shape)

x_train, x_test, y_train, y_test = train_test_split(X_data30, y_data30, test_size=0.2, random_state=42)

print(x_train.shape, x_test.shape)
print(y_train.shape, y_test.shape)

# 데이터 정규화
ss30 = StandardScaler()
ss30.fit(x_train)
train_scaled30 = ss30.transform(x_train)
test_scaled30 = ss30.transform(x_test)

In [None]:
import os
from keras.models import load_model

# Keras 모델 구성
model = tf.keras.Sequential()

model.add(tf.keras.layers.Dense(1024, activation='relu', input_shape=(58, )))
model.add(tf.keras.layers.Dropout(0.5))
model.add(BatchNormalization())

model.add(tf.keras.layers.Dense(1024, activation='relu'))
model.add(tf.keras.layers.Dropout(0.5))
model.add(BatchNormalization())

model.add(tf.keras.layers.Dense(512, activation='relu'))
model.add(tf.keras.layers.Dropout(0.5))
model.add(BatchNormalization())

model.add(tf.keras.layers.Dense(128, activation='relu'))
model.add(tf.keras.layers.Dropout(0.5))
model.add(BatchNormalization())

model.add(tf.keras.layers.Dense(10, activation='softmax'))

# 옵티마이저 값 설정
# 깃허브 참고 : https://github.com/KunalVaidya99/Music-Genre-Classification/blob/master/DNN_Music_Genre_Classification.ipynb
Adam = tf.keras.optimizers.Adam(learning_rate=0.0003, beta_1=0.9, beta_2=0.999, epsilon = 1e-08)

# 모델 컴파일
model.compile(optimizer=Adam, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

reduceLROnPlat = ReduceLROnPlateau(
    monitor = 'val_loss',
    factor = 0.895, # lr을 감소시킬 비율 (기본값 0.1 = val_loss 개선되지 않을 때 현재 lr에 0.1을 곱하여 감소시킴)
    patience = 3, # val_loss 값이 개선되지 않은 상태를 얼마나 허용할 것인지를 설정
    verbose = 1, # True : 감소된 lr에 대한 정보를 출력
    mode = 'min', # 성능 개선을 어떻게 측정할지 지정. val_loss값이 감소할 때 성능이 개선되었다고 판단.
    min_delta = 0.0001,
    cooldown = 2, # lr을 감소시킨 후, 새로운 lr을 적용하기 전에 몇 epoch동안 학습을 일시 정지할지 횟수를 지정
    min_lr = 1e-5 # lr을 감소시킬 최소값을 지정 (lr이 이 값보다 작아지지 않도록 함)
)


# 모델 훈련
history = model.fit(train_scaled30, y_train, epochs=150, batch_size=32, #64
                        validation_split=0.2, callbacks=[reduceLROnPlat])  # 콜백에 추가

In [None]:
# 모델 평가
test_loss30, test_accuracy30 = model.evaluate(test_scaled30, y_test, verbose=2)
print('Test Loss = {:.5f}'.format(test_loss30))
print('Test Accuracy = {:.5f}'.format(test_accuracy30))

In [None]:
# 훈련 손실과 검증 손실 그래프
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

# 훈련 정확도와 검증 정확도 그래프
plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

#혼동행렬 출력

In [None]:
#http://scikit-learn.org/stable/auto_examples/model_selection/plot_confusion_matrix.html
from collections import OrderedDict
import itertools
from sklearn.metrics import confusion_matrix

def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

genres = {'blues': 0, 'classical': 1, 'country': 2, 'disco': 3, 'hiphop': 4,
          'jazz': 5, 'metal': 6, 'pop': 7, 'reggae': 8, 'rock': 9}

test_predictions_matrix = model.predict(test_scaled30)
test_preds_matrix = np.argmax(test_predictions_matrix, axis = 1)
y_orig = y_test
cm = confusion_matrix(test_preds_matrix, y_orig)

keys = OrderedDict(sorted(genres.items(), key=lambda t: t[1])).keys()

plt.figure(figsize=(10,10))
plot_confusion_matrix(cm, keys, normalize=True)