# Music classification
* 출처 : https://github.com/ZainNasrullah/music-artist-classification-crnn

# 라이브러리 설치 및 임포트

In [None]:
!pip install numpy
!pip install librosa
!pip install -r requirements.txt

In [None]:
import os
from os.path import isfile
import gc
import dill
import random
import itertools

import numpy as np
import pandas as pd
from numpy.random import RandomState
import matplotlib.pyplot as plt

import librosa
import librosa.display

from sklearn.model_selection import train_test_split
from sklearn import preprocessing
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.utils import shuffle
from scipy import stats

from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation, Reshape, Permute
from keras.layers import Conv1D, Conv2D, MaxPooling1D, MaxPooling2D
from keras.layers.normalization import BatchNormalization
from keras.layers.recurrent import GRU, LSTM
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras.optimizers import Adam

# 데이터셋 관련 함수 정의

In [None]:
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.get_cmap('Blues')):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=90)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')


def plot_history(history, title="model accuracy"):
    """
    This function plots the training and validation accuracy
     per epoch of training
    """
    plt.plot(history.history['acc'])
    plt.plot(history.history['val_acc'])
    plt.title(title)
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'test'], loc='lower right')
    plt.show()

    return

In [None]:
def create_dataset(artist_folder='artists', save_folder='song_data',
                   sr=16000, n_mels=128,
                   n_fft=2048, hop_length=512):
    """This function creates the dataset given a folder
     with the correct structure (artist_folder/artists/albums/*.mp3)
    and saves it to a specified folder."""

    # get list of all artists
    os.makedirs(save_folder, exist_ok=True)
    artists = [path for path in os.listdir(artist_folder) if
               os.path.isdir(os.path.join(artist_folder, path))]
    print("artists : ", artists)

    # iterate through all artists, albums, songs and find mel spectrogram
    for artist in artists:
        artist_path = os.path.join(artist_folder, artist)
        artist_songs = os.listdir(artist_path)
        print("songs : ", artist_songs)

        for song in artist_songs:
            song_path = os.path.join(artist_path, song)

            # Create mel spectrogram and convert it to the log scale
            y, sr = librosa.load(song_path, sr=sr)
            S = librosa.feature.melspectrogram(y, sr=sr, n_mels=n_mels,
                                                n_fft=n_fft,
                                                hop_length=hop_length)
            log_S = librosa.amplitude_to_db(S, ref=1.0)
            data = (artist, log_S, song)

            # Save each song
            save_name = artist + '_%%-%%_' + song
            with open(os.path.join(save_folder, save_name), 'wb') as fp:
                dill.dump(data, fp)


def load_dataset(song_folder_name='song_data',
                 artist_folder='artists',
                 nb_classes=20, random_state=42):
    """This function loads the dataset based on a location;
     it returns a list of spectrograms
     and their corresponding artists/song names"""

    # Get all songs saved as numpy arrays in the given folder
    song_list = os.listdir(song_folder_name)

    # Load the list of artists
    artist_list = os.listdir(artist_folder)

    # select the appropriate number of classes
    prng = RandomState(random_state)
    artists = prng.choice(artist_list, size=nb_classes, replace=False)

    # Create empty lists
    artist = []
    spectrogram = []
    song_name = []

    # Load each song into memory if the artist is included and return
    for song in song_list:
        with open(os.path.join(song_folder_name, song), 'rb') as fp:
            loaded_song = dill.load(fp)
        if loaded_song[0] in artists:
            artist.append(loaded_song[0])
            spectrogram.append(loaded_song[1])
            song_name.append(loaded_song[2])

    return artist, spectrogram, song_name


def load_dataset_song_split(song_folder_name='song_data',
                            artist_folder='artists',
                            nb_classes=20,
                            test_split_size=0.1,
                            validation_split_size=0.1,
                            random_state=42):
    Y, X, S = load_dataset(song_folder_name=song_folder_name,
                           artist_folder=artist_folder,
                           nb_classes=nb_classes,
                           random_state=random_state)
    # train and test split
    X_train, X_test, Y_train, Y_test, S_train, S_test = train_test_split(
        X, Y, S, test_size=test_split_size, stratify=Y,
        random_state=random_state)

    # Create a validation to be used to track progress
    X_train, X_val, Y_train, Y_val, S_train, S_val = train_test_split(
        X_train, Y_train, S_train, test_size=validation_split_size,
        shuffle=True, stratify=Y_train, random_state=random_state)

    return Y_train, X_train, S_train, \
           Y_test, X_test, S_test, \
           Y_val, X_val, S_val


def slice_songs(X, Y, S, length=911):
    """Slices the spectrogram into sub-spectrograms according to length"""

    # Create empty lists for train and test sets
    artist = []
    spectrogram = []
    song_name = []

    # Slice up songs using the length specified
    for i, song in enumerate(X):
        slices = int(song.shape[1] / length)
        for j in range(slices - 1):
            spectrogram.append(song[:, length * j:length * (j + 1)])
            artist.append(Y[i])
            song_name.append(S[i])

    return np.array(spectrogram), np.array(artist), np.array(song_name)


def predict_artist(model, X, Y, S,
                   le, class_names,
                   slices=None, verbose=False,
                   ml_mode=False):
    """
    This function takes slices of songs and predicts their output.
    For each song, it votes on the most frequent artist.
    """
    print("Test results when pooling slices by song and voting:")
    # Obtain the list of songs
    songs = np.unique(S)

    prediction_list = []
    actual_list = []

    # Iterate through each song
    for song in songs:

        # Grab all slices related to a particular song
        X_song = X[S == song]
        Y_song = Y[S == song]

        # If not using full song, shuffle and take up to a number of slices
        if slices and slices <= X_song.shape[0]:
            X_song, Y_song = shuffle(X_song, Y_song)
            X_song = X_song[:slices]
            Y_song = Y_song[:slices]

        # Get probabilities of each class
        predictions = model.predict(X_song, verbose=0)

        if not ml_mode:
            # Get list of highest probability classes and their probability
            class_prediction = np.argmax(predictions, axis=1)
            class_probability = np.max(predictions, axis=1)

            # keep only predictions confident about;
            prediction_summary_trim = class_prediction[class_probability > 0.5]

            # deal with edge case where there is no confident class
            if len(prediction_summary_trim) == 0:
                prediction_summary_trim = class_prediction
        else:
            prediction_summary_trim = predictions

        # get most frequent class
        prediction = stats.mode(prediction_summary_trim)[0][0]
        actual = stats.mode(np.argmax(Y_song))[0][0]

        # Keeping track of overall song classification accuracy
        prediction_list.append(prediction)
        actual_list.append(actual)

        # Print out prediction
        if verbose:
            print(song)
            print("Predicted:", le.inverse_transform(prediction), "\nActual:",
                  le.inverse_transform(actual))
            print('\n')

    # Print overall song accuracy
    actual_array = np.array(actual_list)
    prediction_array = np.array(prediction_list)
    cm = confusion_matrix(actual_array, prediction_array)
    plot_confusion_matrix(cm, classes=class_names, normalize=True,
                          title='Confusion matrix for pooled results' +
                                ' with normalization')
    class_report = classification_report(actual_array, prediction_array,
                                         target_names=class_names)
    print(class_report)

    class_report_dict = classification_report(actual_array, prediction_array,
                                              target_names=class_names,
                                              output_dict=True)
    return (class_report, class_report_dict)


def encode_labels(Y, le=None, enc=None):
    """Encodes target variables into numbers and then one hot encodings"""

    # initialize encoders
    N = Y.shape[0]

    # Encode the labels
    if le is None:
        le = preprocessing.LabelEncoder()
        Y_le = le.fit_transform(Y).reshape(N, 1)
    else:
        Y_le = le.transform(Y).reshape(N, 1)

    # convert into one hot encoding
    if enc is None:
        enc = preprocessing.OneHotEncoder()
        Y_enc = enc.fit_transform(Y_le).toarray()
    else:
        Y_enc = enc.transform(Y_le).toarray()

    # return encoders to re-use on other data
    return Y_enc, le, enc

# 구글 드라이브 연동

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
%cd /content/drive/MyDrive/졸작/mp3파일/

# 데이터셋 만들기 실행

In [None]:
forder_name = os.path.join(os.getcwd(), 'moods')
os.listdir(forder_name)

['dramatic',
 'sadness',
 'darkness',
 'calm',
 'funky',
 'happiness',
 'angry',
 'brightness']

In [None]:
create_dataset(artist_folder=forder_name, save_folder='song_data',
               sr=16000, n_mels=128, n_fft=2048,
               hop_length=512)

# 모델 정의

In [None]:
def CRNN2D(X_shape, nb_classes):
    '''
    Model used for evaluation in paper. Inspired by K. Choi model in:
    https://github.com/keunwoochoi/music-auto_tagging-keras/blob/master/music_tagger_crnn.py
    '''

    nb_layers = 4  # number of convolutional layers
    nb_filters = [64, 128, 128, 128]  # filter sizes
    kernel_size = (3, 3)  # convolution kernel size
    activation = 'elu'  # activation function to use after each layer
    pool_size = [(2, 2), (4, 2), (4, 2), (4, 2),
                 (4, 2)]  # size of pooling area

    # shape of input data (frequency, time, channels)
    input_shape = (X_shape[1], X_shape[2], X_shape[3])
    frequency_axis = 1
    time_axis = 2
    channel_axis = 3

    # Create sequential model and normalize along frequency axis
    model = Sequential()
    model.add(BatchNormalization(axis=frequency_axis, input_shape=input_shape))

    # First convolution layer specifies shape
    model.add(Conv2D(nb_filters[0], kernel_size=kernel_size, padding='same',
                     data_format="channels_last",
                     input_shape=input_shape))
    model.add(Activation(activation))
    model.add(BatchNormalization(axis=channel_axis))
    model.add(MaxPooling2D(pool_size=pool_size[0], strides=pool_size[0]))
    model.add(Dropout(0.1))

    # Add more convolutional layers
    for layer in range(nb_layers - 1):
        # Convolutional layer
        model.add(Conv2D(nb_filters[layer + 1], kernel_size=kernel_size,
                         padding='same'))
        model.add(Activation(activation))
        model.add(BatchNormalization(
            axis=channel_axis))  # Improves overfitting/underfitting
        model.add(MaxPooling2D(pool_size=pool_size[layer + 1],
                               strides=pool_size[layer + 1]))  # Max pooling
        model.add(Dropout(0.1))

        # Reshaping input for recurrent layer
    # (frequency, time, channels) --> (time, frequency, channel)
    model.add(Permute((time_axis, frequency_axis, channel_axis)))
    resize_shape = model.output_shape[2] * model.output_shape[3]
    model.add(Reshape((model.output_shape[1], resize_shape)))

    # recurrent layer
    model.add(GRU(32, return_sequences=True))
    model.add(GRU(32, return_sequences=False))
    model.add(Dropout(0.3))

    # Output layer
    model.add(Dense(nb_classes))
    model.add(Activation("softmax"))
    return model

# 학습

In [None]:
def train_model(nb_classes=20,
                slice_length=911,
                artist_folder='artists',
                song_folder='song_data',
                plots=True,
                train=True,
                load_checkpoint=False,
                save_metrics=True,
                save_metrics_folder='metrics',
                save_weights_folder='weights',
                batch_size=16,
                nb_epochs=200,
                early_stop=10,
                lr=0.0001,
                album_split=False,
                random_states=42):
    """
    Main function for training the model and testing
    """

    weights = os.path.join(save_weights_folder, str(nb_classes) +
                           '_' + str(slice_length) + '_' + str(random_states))
    os.makedirs(save_weights_folder, exist_ok=True)
    os.makedirs(save_metrics_folder, exist_ok=True)

    print("Loading dataset...")

    if not album_split:
        # song split
        Y_train, X_train, S_train, Y_test, X_test, S_test, \
        Y_val, X_val, S_val = \
            load_dataset_song_split(song_folder_name=song_folder,
                                    artist_folder=artist_folder,
                                    nb_classes=nb_classes,
                                    random_state=random_states,
                                    test_split_size=0.05,
                                    validation_split_size=0.05)
    else:
        Y_train, X_train, S_train, Y_test, X_test, S_test, \
        Y_val, X_val, S_val = \
            load_dataset_album_split(song_folder_name=song_folder,
                                     artist_folder=artist_folder,
                                     nb_classes=nb_classes,
                                     random_state=random_states)

    print("Loaded and split dataset. Slicing songs...")

    # Create slices out of the songs
    X_train, Y_train, S_train = slice_songs(X_train, Y_train, S_train,
                                            length=slice_length)
    X_val, Y_val, S_val = slice_songs(X_val, Y_val, S_val,
                                      length=slice_length)
    X_test, Y_test, S_test = slice_songs(X_test, Y_test, S_test,
                                         length=slice_length)

    print("Training set label counts:", np.unique(Y_train, return_counts=True))

    # Encode the target vectors into one-hot encoded vectors
    Y_train, le, enc = encode_labels(Y_train)
    Y_test, le, enc = encode_labels(Y_test, le, enc)
    Y_val, le, enc = encode_labels(Y_val, le, enc)

    # Reshape data as 2d convolutional tensor shape
    X_train = X_train.reshape(X_train.shape + (1,))
    X_val = X_val.reshape(X_val.shape + (1,))
    X_test = X_test.reshape(X_test.shape + (1,))

    # build the model
    model = CRNN2D(X_train.shape, nb_classes=Y_train.shape[1])
    #model = CRNN1D(X_train.shape, nb_classes=Y_train.shape[1])
    model.compile(loss='categorical_crossentropy',
                  optimizer=Adam(lr=lr),
                  metrics=['accuracy'])
    model.summary()

    # Initialize weights using checkpoint if it exists
    if load_checkpoint:
        print("Looking for previous weights...")
        if isfile(weights):
            print('Checkpoint file detected. Loading weights.')
            model.load_weights(weights)
        else:
            print('No checkpoint file detected.  Starting from scratch.')
    else:
        print('Starting from scratch (no checkpoint)')

    checkpointer = ModelCheckpoint(filepath=weights,
                                   verbose=1,
                                   save_best_only=True,
                                   monitor='val_accuracy')
    earlystopper = EarlyStopping(monitor='val_loss', min_delta=0,
                                 patience=early_stop, verbose=0, mode='auto')

    # Train the model
    if train:
        print("Input Data Shape", X_train.shape)
        history = model.fit(X_train, Y_train, batch_size=batch_size,
                            shuffle=True, epochs=nb_epochs,
                            verbose=1, validation_data=(X_val, Y_val),
                            callbacks=[checkpointer, earlystopper])
        if plots:
            plot_history(history)

    # Load weights that gave best performance on validation set
    model.load_weights(weights)
    filename = os.path.join(save_metrics_folder, str(nb_classes) + '_'
                            + str(slice_length)
                            + '_' + str(random_states))

    # Score test model
    score = model.evaluate(X_test, Y_test, verbose=0)
    y_score = model.predict_proba(X_test)

    # Calculate confusion matrix
    y_predict = np.argmax(y_score, axis=1)
    y_true = np.argmax(Y_test, axis=1)
    cm = confusion_matrix(y_true, y_predict)

    # Plot the confusion matrix
    class_names = np.arange(nb_classes)
    class_names_original = le.inverse_transform(class_names)
    plt.figure(figsize=(14, 14))
    plot_confusion_matrix(cm, classes=class_names_original,
                          normalize=True,
                          title='Confusion matrix with normalization')
    if save_metrics:
        plt.savefig(filename + '.png', bbox_inches="tight")
    plt.close()
    plt.figure(figsize=(14, 14))

    # Print out metrics
    print('Test score/loss:', score[0])
    print('Test accuracy:', score[1])
    print('\nTest results on each slice:')
    scores = classification_report(y_true, y_predict,
                                   target_names=class_names_original)
    scores_dict = classification_report(y_true, y_predict,
                                        target_names=class_names_original,
                                        output_dict=True)
    print(scores)

    # Predict artist using pooling methodology
    pooling_scores, pooled_scores_dict = \
        predict_artist(model, X_test, Y_test, S_test,
                       le, class_names=class_names_original,
                       slices=None, verbose=False)

    # Save metrics
    if save_metrics:
        plt.savefig(filename + '_pooled.png', bbox_inches="tight")
        plt.close()
        with open(filename+'.txt', 'w') as f:
            f.write("Training data shape:" + str(X_train.shape))
            f.write('\nnb_classes: ' + str(nb_classes) +
                    '\nslice_length: ' + str(slice_length))
            f.write('\nweights: ' + weights)
            f.write('\nlr: ' + str(lr))
            f.write('\nTest score/loss: ' + str(score[0]))
            f.write('\nTest accuracy: ' + str(score[1]))
            f.write('\nTest results on each slice:\n')
            f.write(str(scores))
            f.write('\n\n Scores when pooling song slices:\n')
            f.write(str(pooling_scores))

    return (scores_dict, pooled_scores_dict)

In [None]:
'''
1s 32 frames
3s 94 frames
5s 157 frames
6s 188 frames
10s 313 frames
20s 628 frames
29.12s 911 frames
'''

#slice_lengths = [911, 628, 313, 157, 94, 32]
slice_lengths = [94] # 3 sec
#random_state_list = [0, 21, 42]
random_state_list = [21]
iterations = 1
summary_metrics_output_folder = 'trials_song_split'

original_folder = os.path.join(os.getcwd(), 'moods')
processed_folder = os.path.join(os.getcwd(), 'song_data')

for slice_len in slice_lengths:
    scores = []
    pooling_scores = []
    for i in range(iterations):
        print("Hello! Iteration : ", i)
        score, pooling_score = train_model(
            nb_classes=8,
            slice_length=slice_len,
            artist_folder=original_folder,
            song_folder=processed_folder,
            lr=0.001,
            train=True,
            load_checkpoint=True,
            plots=False,
            album_split=False,
            random_states=random_state_list[i],
            save_metrics=True,
            save_metrics_folder='metrics_song_split',
            save_weights_folder='weights_song_split')

        scores.append(score['weighted avg'])
        pooling_scores.append(pooling_score['weighted avg'])
        gc.collect()

    os.makedirs(summary_metrics_output_folder, exist_ok=True)

    pd.DataFrame(scores).to_csv(
        '{}/{}_score.csv'.format(summary_metrics_output_folder, slice_len))

    pd.DataFrame(pooling_scores).to_csv(
        '{}/{}_pooled_score.csv'.format(
            summary_metrics_output_folder, slice_len))