修改DIR跟model_save_location變數，把檔案放進DIR

wav跟txt放在同一個目錄(DIR)

檔名要一樣，副檔名.wav and .txt

可直接全部執行

In [69]:
DIR = 'snare_drum'
LABELS = ['no', 'yes'] # Has onset: no = 0, yes = 1
NUM_CLASSES = len(LABELS)
PLOT_SAVE_LOCATION = '.'
model_save_location = './model_sd'

CHANNELS = [2048]  # [1024, 2048, 4096]
MEL_BANDS = 80
TIME_FRAMES = 12
DIFF_FROM_ONSET_MS = 0.03
THRESHOLD_FREQ = 15000

BATCHES = [64]#, 256, 512]
EPOCHS = 50
PATIENCE = 150
TRAIN_TEST_SPLIT = 0.10
TRAIN_VAL_SPLIT = 0.20

# Categorical cross-entropy expects labels to be provided in a one-hot representation (0, 1).
LOSS_FUNCTION = 'categorical_crossentropy'
LEARNING_RATE = 0.001

INPUT_SHAPE = (MEL_BANDS, TIME_FRAMES, len(CHANNELS))
PRED_LAYER_ACTIVATION = 'sigmoid'
METRICS = ['acc']
PREC_REC_FSCORE_AVERAGE = 'macro'  # None # 'weighted' # 'micro'


# Some functions for processing data

In [70]:
from processing import SpectrogramProcessor

In [71]:
import os
import os.path
import numpy as np
import pandas as pd
import random
from pathlib import Path

import librosa
import librosa.display
from pydub import AudioSegment
from scipy.io import wavfile
from scipy.signal import stft, spectrogram

from sklearn.model_selection import train_test_split
from keras.utils import to_categorical


class DatasetGenerator:
    def __init__(self, 
                 label_set, 
                 sample_rate=44100,
                 channels=[2048],
                 mel_bands=80,
                 time_frames=15,
                 diff_from_onset_ms=0.030,
                 threshold_freq=15500):
        
        self.label_set = label_set
        self.sample_rate = sample_rate
        self.channels = channels
        self.mel_bands = mel_bands
        self.time_frames = time_frames
        self.diff_from_onset_ms = diff_from_onset_ms
        self.threshold_freq = threshold_freq    

    def construct_wav_annotation_pair_data_frame(self, directory):
      files = list(Path(dir).rglob('*wav'))
      wav_annotation_pairs = []

      for file in files:
        wav_file_path = os.path.join(dir, file.name)
        filename_base = Path(wav_file_path).stem
        annotation_file_path = os.path.join(directory, filename_base + '.txt')

        if os.path.isfile(annotation_file_path):
                wav_annotation_pair = (wav_file_path, annotation_file_path)
                wav_annotation_pairs.append(wav_annotation_pair)
      data_frame = pd.DataFrame(wav_annotation_pairs, columns=['wave_file', 'annotation'])
      self.data_frame = data_frame

      return data_frame


    def read_wav_file(self, x):
        # Read wavfile using scipy wavfile.read.
        _, wav = wavfile.read(x) 
        
        # Normalize.
        wav = wav.astype(np.float32) / np.iinfo(np.int16).max
        
        wav_dim = np.shape(wav)
        if len(wav_dim) == 2:
            # Convert stereo to mono.
            wav = wav.sum(axis=1) / 2
        
        return wav


    def process_wav_file(self, wav_file, annotation_file, win_length=2048, eps=1e-10):
      wav = self.read_wav_file(wav_file)
      sample_rate = self.sample_rate
      hop_length = 441 # win_length // 4 # 2048 // 4 = 512
      noverlap = win_length - hop_length#***********************************************************************************************
      
      freqs, times, spec = spectrogram(wav, sample_rate, window='hann', nperseg=win_length, noverlap=noverlap, mode='complex')
      _, S_percussive = librosa.decompose.hpss(spec, margin=(1.0, 5.0))
      S = librosa.feature.melspectrogram(S=np.abs(S_percussive), sr=sample_rate, window='hann', win_length=win_length, hop_length=hop_length, n_mels=self.mel_bands, center=False, fmax=self.threshold_freq)
      S_db = librosa.core.power_to_db(S, ref=np.max)
      S_expanded = np.expand_dims(S_db, axis=2)

      spectrograms = []
      sp = SpectrogramProcessor(S_expanded, times, annotation_file)
      spectrograms = sp.split_spectrogram(self.time_frames)
      annotations = sp.get_annotations()
      onsets = sp.get_onsets(spectrograms, annotations, self.diff_from_onset_ms)
      spectrograms = [s[0] for s in spectrograms]  # Remove time indices.

      return spectrograms, onsets

    
    def split_train_test_set(self, test_size, random_state):
      self.df_train, self.df_test = train_test_split(
                  self.df, 
                  test_size=test_size, 
                  random_state=random_state)
      

    def apply_train_test_split_by_windows(self, test_size, shuffle_train_data=True):
        self.df_train = self.df
        data, labels = self.get_train_test_validation_data('train', shuffle_train_data=shuffle_train_data)
        
        larger_portion = int(len(data)*(1-test_size))
        train_data = data[:larger_portion]
        train_labels = labels[:larger_portion]
        test_data = data[larger_portion:]
        test_labels = labels[larger_portion:]

        # Remove effects of to_categorical function, convert to binary.
        test_labels = np.argmax(test_labels, axis=1)
        return train_data, train_labels, test_data, test_labels  


    def load_wav_and_annotation_files(self, dir):
        files = list(Path(dir).rglob('*wav'))
        data = []

        # Loop over files to get samples.
        for file in files:
            wav_file = os.path.join(dir, file.name)
            filename_base = Path(wav_file).stem
            # Files with '_acc' contain only the accompaniment track.
            if '_acc' in filename_base:
                continue
            else:
                # NOTE: Modify the annotation file format and paths to suit your
                # needs.
                annotations_path = os.path.join(dir, filename_base + '.txt')
            # If wav file has matching drum instrument annotation file, add to input data.
            if os.path.isfile(annotations_path):
                sample = (wav_file, annotations_path)
                data.append(sample)

        # Data Frames with wavs and matching annotation paths.
        df = pd.DataFrame(data, columns=['wav_file', 'annotations'])
        self.df = df
        return df

#-------
    def get_train_test_validation_data(self, mode, shuffle_train_data=True):
        if mode == 'train':
            df = self.df_train
            # Shuffle input data.
            audiofile_ids = random.sample(range(df.shape[0]), df.shape[0]) if shuffle_train_data else list(range(df.shape[0])) 
        elif mode == 'val':
            df = self.df_val
            audiofile_ids = list(range(df.shape[0]))
        elif mode == 'test':
            df = self.df_test
            audiofile_ids = list(range(df.shape[0]))
        else:
            raise ValueError('The mode should be either train, val or test.')        
        return self.get_singlechannel_data(df, audiofile_ids, mode == 'test')

#---------
    def get_singlechannel_data(self, df, audiofile_ids, is_test):
        input_data = []
        labels = []

        for i in range(0, len(audiofile_ids)):
            for win_length in self.channels:
                spectrograms, onsets = self.process_wav_file(df.wav_file.values[i], df.annotations.values[i], win_length=win_length)
                input_data.extend(spectrograms)
                labels.extend(onsets)
        
        # Convert to numpy array.
        input_data = np.array(input_data)

        if not is_test:
            # Process labels to one-hot encoding.
            labels = to_categorical(labels, num_classes=len(self.label_set))

        return input_data, labels

# CNN model

In [72]:
# -*- coding: utf-8 -*-

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import InputLayer

def deep_cnn_sequential(features_shape, num_classes, act='relu'):
    """ CNN model for a single drum instrument training.
    May use the same model for e.g. snare, bass drum and hi-hat onset training.
    """
    model = keras.Sequential()

    model.add(InputLayer(name='inputs', input_shape=features_shape, dtype='float32'))
    # Block 1
    model.add(tf.keras.layers.Conv2D(10, (3, 7), activation='relu', padding='same', strides=1, name='block1_conv', input_shape=features_shape))
    model.add(tf.keras.layers.MaxPooling2D((3, 1), strides=(2,2), padding='same', name='block1_pool'))
    model.add(tf.keras.layers.BatchNormalization(name='block1_norm'))
    
    # Block 2
    model.add(tf.keras.layers.Conv2D(20, (3, 3), activation='relu', padding='same', strides=1, name='block2_conv'))
    model.add(tf.keras.layers.MaxPooling2D((3, 1), strides=(2,2), padding='same', name='block2_pool'))
    model.add(tf.keras.layers.BatchNormalization(name='block2_norm'))

    # Flatten
    model.add(tf.keras.layers.Flatten(name='flatten'))
    
    # Fully connected layer 1
    model.add(tf.keras.layers.Dense(256, activation='relu', name='dense'))
    model.add(tf.keras.layers.BatchNormalization(name='dense_norm'))
    model.add(tf.keras.layers.Dropout(0.5, name='dropout'))
    
    # Prediction (Fully connected layer 2)
    # 2 predictions: onset or no onset
    model.add(tf.keras.layers.Dense(num_classes, activation=act, name='pred'))

    # Print network summary
    model.summary()

    return model

# Some functions for training

In [73]:
import time
import numpy as np
from datetime import datetime

import matplotlib
from matplotlib import pyplot as plt
# Disable showing figures to prevent random GPU failures.
matplotlib.use('Agg')

import os
import random as python_random
import codecs

#from sklearn.metrics import accuracy_score, confusion_matrix, precision_recall_fscore_support, precision_recall_curve, plot_precision_recall_curve
from sklearn.utils import class_weight

import tensorflow as tf
from keras.callbacks import EarlyStopping

def prepare_data(directory):
    # Set to global scope for easy access in other functions.
    global TRAIN_DATA
    global TRAIN_LABELS
    global TRAIN_LABELS_1D
    global TEST_DATA
    global TEST_LABELS

    dsGen = DatasetGenerator(label_set=LABELS,
                             sample_rate=44100,
                             channels=CHANNELS,
                             mel_bands=MEL_BANDS,
                             time_frames=TIME_FRAMES,
                             diff_from_onset_ms=DIFF_FROM_ONSET_MS,
                             threshold_freq=THRESHOLD_FREQ)
    
    dsGen.load_wav_and_annotation_files(directory)
    TRAIN_DATA, TRAIN_LABELS, TEST_DATA, TEST_LABELS = dsGen.apply_train_test_split_by_windows(test_size=TRAIN_TEST_SPLIT, shuffle_train_data=True)

    print('Training data size: ', len(TRAIN_DATA))
    print('Test data size: ', len(TEST_DATA))


prepare_data(DIR)

  _, wav = wavfile.read(x)


Training data size:  118910
Test data size:  13213


In [74]:
def plot(metric1, metric2, label1, label2, save_location, id, batch_size):
    """
    Creates and saves the plotted figure.
    """
    try: 
        fig = plt.figure()
        plt.plot(metric1, label=label1)
        plt.plot(metric2, label=label2, linestyle='dashed')
        plt.legend()
        plt.xlabel('Epoch')
        plt.grid(linestyle='dotted')
        # plt.ylim(top=)
        # plt.show()
        plt.savefig(save_location + id + '_' + DRUM_INSTRUMENT + '_' + str(EPOCHS) + '_' + str(batch_size) + '.pdf')
        plt.clf()
        plt.cla()
        plt.close(fig=fig)
    except Exception as e:
        print('Failed to create plot: ', e)


def get_model():
    """
    Build and compile a CNN model. 
    """
    # Reset scheduled learning rate.
    #learning_rate_schedule = CustomScheduleTanh(warmup_steps=3000, phase_step=25000, max_lr=LEARNING_RATE)
    learning_rate_schedule=0.001
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate_schedule, beta_1=0.9, beta_2=0.98, epsilon=1e-9)
    
    model = deep_cnn_sequential(INPUT_SHAPE, NUM_CLASSES, act=PRED_LAYER_ACTIVATION)
    model.compile(optimizer=optimizer, loss=LOSS_FUNCTION, metrics=METRICS)
    return model


def train(model, batch_size):
    """
    Train the model and return history results.
    """
    global TRAIN_LABELS_1D
    global TRAIN_DATA
    global TRAIN_LABELS

    callbacks = [EarlyStopping(monitor='val_loss', min_delta=0.01, patience=PATIENCE, verbose=1, mode='auto')]
    # Balance imbalanced onset classes.
    #class_weights = class_weight.compute_class_weight('balanced', np.unique(TRAIN_LABELS_1D), TRAIN_LABELS_1D)

    history = model.fit(x=TRAIN_DATA, 
                        y=TRAIN_LABELS, 
                        batch_size=batch_size,
                        epochs=EPOCHS,
                        verbose=1,
                        callbacks=callbacks,
                        validation_split=TRAIN_VAL_SPLIT)
    return history


def predict(model):
    global TEST_DATA
    global TEST_LABELS

    y_true = TEST_LABELS
    #y_pred = model.predict_classes(x=TEST_DATA, verbose=1)
    y_pred = model.predict(TEST_DATA, verbose=1) 
    y_pred = np.argmax(y_pred, axis=1)

    return y_true, y_pred


from sklearn.metrics import accuracy_score, confusion_matrix, precision_recall_fscore_support, precision_recall_curve
def get_metrics(y_true, y_pred):
    acc_score = accuracy_score(y_true, y_pred)
    precision, recall, fscore, _ = precision_recall_fscore_support(y_true, y_pred, average=PREC_REC_FSCORE_AVERAGE)
    return precision, recall, fscore, acc_score


def get_confusion_matrix(y_true, y_pred):
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    return tn, fp, fn, tp

In [75]:
def run(batch_size):
  global model_save_location
  
  start = time.time()
  model = get_model()
  history = train(model, batch_size)

  model.save(model_save_location)

  y_true, y_pred = predict(model)
  precision, recall, fscore, acc_score = get_metrics(y_true, y_pred)
  '''
  tn, fp, fn, tp = get_confusion_matrix(y_true, y_pred)
  '''
  acc = history.history['acc']
  val_acc = history.history['val_acc']
  loss = history.history['loss']
  val_loss = history.history['val_loss']

  # Write your own logging.
  # log(history, start, precision, recall, fscore, acc_score, tn, fp, fn, tp)
  min_val_loss = min(history.history['val_loss'])

  now = datetime.now()
  id = now.strftime('%Y%m%d%H%M%S')
  elapsed_s = time.time() - start
  elapsed = time.strftime('%H:%M:%S', time.gmtime(elapsed_s))

  print(elapsed)

  print('Accuracy: ', acc_score)
  print('Precision: ', precision)
  print('Recall: ', recall)
  print('F-score: ', fscore)
  '''
  print('TN: ', tn)
  print('FP: ', fp)
  print('FN: ', fn)
  print('TP: ', tp)
  '''
  return min_val_loss, precision, recall, fscore, acc, val_acc, loss, val_loss, id


In [76]:
N = 1  # How many outer loops. Each contributes to the mean and standard deviation. 8
M = 1  # Find the best (minimum) validation loss and fscore among M runs.4
def main():
  table_data = {}
  for batch_size in BATCHES:
            precisions = []
            recalls = []
            fscores = []
            min_val_losses = []

            # Evaluation framework.
            for n in range(N):
                best_precision = float('-inf')
                best_recall = float('-inf')
                best_fscore = float('-inf')
                best_min_val_loss = float('inf')

                # The learning algorithm.
                # Choosing the best run based on training results among M runs.
                for m in range(M):
                    min_val_loss, precision, recall, fscore, acc, val_acc, loss, val_loss, id = run(batch_size)

                    # Pick the best run based on the minimum validation loss.
                    if min_val_loss < best_min_val_loss:
                        best_min_val_loss = min_val_loss                
                        best_fscore = fscore
                        best_precision = precision
                        best_recall = recall

                    #plot(acc, val_acc, 'Accuracy', 'Validation accuracy', PLOT_SAVE_LOCATION, id, batch_size)
                    #plot(loss, val_loss, 'Loss', 'Validation  loss', PLOT_SAVE_LOCATION, id, batch_size)

                precisions.append(best_precision)
                recalls.append(best_recall)
                fscores.append(best_fscore)
                min_val_losses.append(best_min_val_loss)

            # Get results for LaTeX table.
            p_mean = np.mean(precisions)
            r_mean = np.mean(recalls)
            f_mean = np.mean(fscores)
            min_val_loss_mean = np.mean(min_val_losses)

            p_std = np.std(precisions)
            r_std = np.std(recalls)
            f_std = np.std(fscores)
            min_val_loss_std = np.std(min_val_losses)
            '''
            print(p_mean)
            print(r_mean)
            print(f_mean)
            print(min_val_loss_mean)

            print(p_std)
            print(r_std)
            print(f_std)
            print(min_val_loss_std)
            '''
            table_data[batch_size] = {
                'p_mean': p_mean,
                'p_std': p_std,
                'r_mean': r_mean,
                'r_std': r_std,
                'f_mean': f_mean,
                'f_std': f_std
            }

    # Enable automatic LaTeX table creation of the results.
    # create_latex_table(table_data, id)

In [77]:
main()

Model: "sequential_4"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 block1_conv (Conv2D)        (None, 80, 12, 10)        220       
                                                                 
 block1_pool (MaxPooling2D)  (None, 40, 6, 10)         0         
                                                                 
 block1_norm (BatchNormaliza  (None, 40, 6, 10)        40        
 tion)                                                           
                                                                 
 block2_conv (Conv2D)        (None, 40, 6, 20)         1820      
                                                                 
 block2_pool (MaxPooling2D)  (None, 20, 3, 20)         0         
                                                                 
 block2_norm (BatchNormaliza  (None, 20, 3, 20)        80        
 tion)                                                

2023-06-10 23:31:12.060851: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'inputs' with dtype float and shape [?,256]
	 [[{{node inputs}}]]
2023-06-10 23:31:12.272409: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'inputs' with dtype float and shape [?,256]
	 [[{{node inputs}}]]


INFO:tensorflow:Assets written to: ./model_sd/assets


INFO:tensorflow:Assets written to: ./model_sd/assets


01:52:29
Accuracy:  0.9918262317414668
Precision:  0.9820103594343281
Recall:  0.8783250453637136
F-score:  0.9235186673955686
