Before you turn this exercise in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All).

Make sure you fill in any place that says "YOUR ANSWER HERE" or `YOUR CODE HERE` and remove the `raise NotImplementedError()` lines. Please add your name and student ID below:

In [None]:
NAME = "Peter Rjabcsenko"
STUDENT_ID = "1228563"

---

# Assignment 2 - Automatic Drum Transcription
## Intelligent Audio and Music Analysis
### WS 2019


# 0. Introduction

Welcome to the second assignment. 
In this exercise you will build four different algorithms to extract drum note onsets from audio.

## 0.1. Naming notebooks

Set your name and matriculation number in the title of the notebook above, replacing the placeholders. The title should be then e.g. "Assignment 2 - 12345678 Hans Meiser".

## 0.2. Download and extract the dataset
Again, you will need a dataset consisting of audio files and annotations for this exercise. Download the .zip file from TUWEL and unpack it into the folder where you run this jupyter notebook from. The folder should have the "data" folder as a subfolder. Alternatively, set the DATA_PATH constant accordingly. Please make sure that you do not use backward slashes if working on Windows. Either use normal forward slashes in path names or use the functions provided by the os.path module.

If you use Google Colab, upload the data to the Google Colab folder in Google Drive.

## 0.3. Other requirements
Make sure your python(3) installation contains the following packages:
* cython
* numpy
* scipy
* sklearn
* torch (PyTorch)
* madmom

In Google Colab you can use the following code block to install the required packages:

In [None]:
! pip uninstall madmom magenta mido
! pip install mido numpy scipy madmom torch matplotlib sklearn --upgrade

## 0.4. Fill in the missing code blocks
Read and work through the whole notebook from top to bottom and fill in missing code marked with TODO comments.
The comments along the code provide many hints, so it is recommended to also read through the provided code pieces, since they will help solve the exercise.

Execute *ALL* code cells below from top to bottom after each other, by either using `Ctrl-Enter`, `Shift-Enter`, or the GUI buttons above.

# 1. Dependencies, imports, and global variables
First we will import all required libraries and define globally used parameters for spectrogram calculation and paths.

In [None]:
# lets import everything we will need first...
# some generic stuff, numpy will help us with math!
import os
import numpy as np
import time

# filters, might be useful for separate and detect
from scipy.signal import butter, freqz
from scipy.ndimage.filters import maximum_filter, uniform_filter

# classifier for segment and classify method
from sklearn.neighbors import KNeighborsClassifier

# madmom audio processing stuff and evaluation
import madmom
from madmom.audio.spectrogram import LogarithmicFilteredSpectrogram
from madmom.audio import Signal
from madmom.features.onsets import OnsetPeakPickingProcessor
from madmom.evaluation import OnsetEvaluation, OnsetSumEvaluation
from madmom.features import CNNOnsetProcessor
from madmom.utils import search_files

# pytorch, deep learning library
import torch
import torch.nn as nn
import torch.nn.functional as torch_func
import torch.optim as optim
from torch.utils.data import Dataset as Dataset

# plotting library for visualization for debugging
import matplotlib.pyplot as plt
plt.rcParams.update({'pgf.rcfonts': False})

COLAB_DRIVE_BASE = "/content/g-drive"
import sys
IN_COLAB = 'google.colab' in sys.modules

# if in colab, mount gdrive
if IN_COLAB:
  from google.colab import drive
  print('trying to mount google drive...')
  drive.mount(COLAB_DRIVE_BASE, force_remount=True)

#
# some global parameter settings we will need along the way
#
EPSILON = np.finfo(np.float32).eps  # small epsilon needed sometimes for computational stability (div by zeros)

SETTINGS = {  # settings for spectrogram (feature) calculation
    'fps': 100,  # frames per second of our resulting spectrograms
    'fmin': 30,  # minimum frequency
    'fmax': 15000,  # maximum frequency of spectrogram
    'frame_size': 2048,  # frame size for spectrogram
    'sample_rate': 44100,  # input sample rate - input audio will be resampled to this sample rate.
    'num_bands': 12,  # bands per octave (freq. factor 2)
    'num_channels': 1,  # input audio will be converted to mono
    'norm_filters': True,  # normalize triangular filters for log/log spectrogram to have equal area
}

# drum label names
# all arrays and lists containing instruments will always follow this index system, 0:KD (kick/bass drum),
# 1:SD (snare drum), 2: HH (hi-hat).
names_3_map = ['KD', 'SD', 'HH']
num_3_drum_notes = len(names_3_map)

# paths to our small example dataset
PATH = os.getcwd()

if IN_COLAB:
  PATH = os.path.join(COLAB_DRIVE_BASE, 'My Drive/Colab Notebooks')

DATA_PATH = os.path.join(PATH, 'data/drums_simple')  # change this value if you copied the dataset somewhere else!
ANNOTATIONS_PATH = os.path.join(DATA_PATH, 'annotations')
SAMPLE_ANNOTATIONS_PATH = os.path.join(DATA_PATH, 'sample_annotations')
AUDIO_PATH = os.path.join(DATA_PATH, 'audio')
SAMPLES_PATH = os.path.join(DATA_PATH, 'samples')
CACHE_PATH = os.path.join(DATA_PATH, 'feat_cache')
if not os.path.exists(CACHE_PATH):
    os.makedirs(CACHE_PATH)
MODEL_PATH = os.path.join(DATA_PATH, 'models')
if not os.path.exists(MODEL_PATH):
    os.makedirs(MODEL_PATH)
CNN_MODEL_NAME = 'cnn_model'

# some info about our data
NUM_KITS = 4  # we have 4 different drum kits
NUM_TRACKS = 4  # and 4 tracks per kit
FPS = SETTINGS['fps']  # shorthand to the FPS we use for our spectrogram
RANK = num_3_drum_notes  # we use three instruments

# turn on / off plotting (for debugging)
plot = False
plot_len = 400

# use GPU for NN training?
g_use_cuda = True

# seed for RNG for reproducible results
seed = 12345
print('done')


# 2. Helper functions
The next block defines some helper functions for audio file handling, spectrogram calculation, and annotation processing.

In [None]:


#
# some helper functions to handle data from our example dataset
#
def step_diff(array, step):
    """
    Calculates a 1st order difference between rows step values .

    Parameters
    ----------
    array : np.array
        Input array to calculate the 1st order difference.

    step : int
        Number of steps for offset for difference calculation.

    Returns
    -------
    difference : np.array
        Array containing the 1st order difference. Note that the number of rows will be steps less than the input's.
    """
    a = array[step:]
    b = array[:-step]
    return b-a


def load_audio(audio_file_list):
    """
    Load audio from the given files.

    Parameters
    ----------
    audio_file_list : list
        List with audio filenames.

    Returns
    -------
    audio_list : list
        List containing the actual audio.

    """
    audio_list = []
    for audio_file in audio_file_list:
        signal = Signal(audio_file, **SETTINGS)
        audio_list.append(signal)
    return audio_list


def compute_feature(file, **kwargs):
    """
    Compute (spectrogram) feature for the given audio file.

    Parameters
    ----------
    file : str
        Audio file name.
    kwargs : dict, optional
        Additional arguments used for feature computation.

    Returns
    -------
    feature : numpy array
        Computed feature

    """
    # create (filtered) spectrogram
    return LogarithmicFilteredSpectrogram(file, **kwargs)


def create_features(files, cache=True, cache_ext='.cache.npy', **kwargs):
    """
    Create features for given audio files or load them from cache.

    Parameters
    ----------
    files : list
        List with audio file names.
    cache : bool, optional
        Cache features or use cached ones if available.
    cache_ext : str, optional
        Extension used for caching.
    kwargs : dict, optional
        Additional arguments passed for feature computation.

    Returns
    -------
    feature_list : list
        List containing the computed/loaded features.

    """

    feature_list = []
    for audio_file in files:
        file_path, file_name = os.path.split(audio_file)
        file_base, file_ext = os.path.splitext(file_name)
        cache_file = os.path.join(CACHE_PATH, file_base + cache_ext)
        if cache and os.path.exists(cache_file):
            feat = np.load(cache_file)
            print('successfully loaded cached file:', cache_file)
        else:
            feat = compute_feature(audio_file, **kwargs)
            if cache:
                np.save(cache_file, feat)
                print('successfully stored cache for file:', audio_file)
        feature_list.append(feat)
    return feature_list


def load_annotations(files):
    """
    Load annotations from files.

    Parameters
    ----------
    files : list
        List with annotation filenames.

    Returns
    -------
    annotation_list : list
        List with annotations.

    """
    annotation_list = []
    for annotation_file in files:
        annotation = madmom.io.load_notes(annotation_file)
        annotation_list.append(annotation)
    return annotation_list


def compute_target_array_from_times(times, fps, num_frames, num_targets):
    """
    creates a numpy array with targets for neural network training
    :param times: list
    list of annotations for which the target should be 1. times in seconds.
    :param fps:
    sampling frequency of target array
    :param num_frames:
    total number of frames (all entries in times must fit into the total number of frames).
    :param num_targets:
    total number of targets (all entries in times must fit into the total number of labels).
    :return:
    """
    if len(times) > 0 and np.max(times, 0)[0] * fps > num_frames:
        print("Maximum time is larger than number of samples - cutting times.")
    if len(times) > 0 and np.max(times, 0)[1] >= num_targets:
        print("Maximum label index is larger than num_targets - cutting labels.")

    new_targets = np.zeros((num_frames, num_targets))
    for entry_nr, time_entry in enumerate(times):
        cur_time = time_entry[0]
        time_idx = int(cur_time*fps)
        inst_idx = int(time_entry[1])
        if 0 <= inst_idx < num_targets:
            if time_idx < num_frames:
                new_targets[time_idx, inst_idx] = 1

    return new_targets


def create_targets(annotation_list, feature_list, fps=FPS, num_classes=3):
    """
    Create targets for the given annotations.

    Parameters
    ----------
    annotation_list : list
        List with annotations
    feature_list : list
        List with features (needed to determine length)
    fps : float
        Frames per second
    num_classes

    Returns
    -------
    target_list : list
        List with targets for NN training.

    """
    target_list = []
    for annotation, feature in zip(annotation_list, feature_list):
        target = compute_target_array_from_times(annotation, fps, len(feature), num_classes)
        target_list.append(target)
    return target_list


def plot_peak_picking(onset_function, pre_avg = 0.05, post_avg = 0.05, pre_max = 0.02, post_max = 0.02, combine = 0.02,
                      thresh = 0.2, smooth = 0.0, plot_frames=1000):
    """
    helper function which visualizes the peak picking parameters, use to adapt peak picking settings if necessary.
    :param onset_function:
    :param pre_avg:
    :param post_avg:
    :param pre_max:
    :param post_max:
    :param combine:
    :param thresh:
    :param smooth:
    :param plot_frames:
    :return:
    """
    # plot example to investigate peak picking
    peak_picker = OnsetPeakPickingProcessor(threshold=thresh, smooth=smooth, pre_avg=pre_avg,
                                               post_avg=post_avg, pre_max=pre_max, post_max=post_max,
                                               combine=combine, fps=FPS)
    inst_det = [peak_picker.process(onset_function[:, inst]) for inst in range(3)]

    for inst in range(3):
        plt.subplot(3, 1, inst + 1)
        activations = onset_function[:plot_frames, inst]

        avg_length = (pre_avg + post_avg) * FPS + 1
        avg_origin = int(np.floor((pre_avg - post_avg) * FPS / 2))
        avg_filter_size = avg_length
        max_length = (pre_max + post_max) * FPS + 1
        max_filter_size = max_length
        max_origin = int(np.floor((pre_max - post_max) * FPS / 2))

        mov_avg = uniform_filter(activations, avg_filter_size, mode='constant', origin=avg_origin)
        mov_max = maximum_filter(activations, max_filter_size, mode='constant', origin=max_origin)

        select = inst_det[inst] * FPS < plot_frames
        peaks = inst_det[inst][select] * FPS
        plt.plot(activations)
        plt.plot(peaks, onset_function[np.asarray(peaks, dtype=int), inst], 'ro')
        plt.plot(mov_avg, 'g')
        plt.plot(mov_max, 'y')
    plt.show()


def plot_activation_functions(spectrogram, activation_functions, templates=None):
    """
    helper function that visualizes spectrogram alongside detected activation functions.
    use to debug your methods.
    :param spectrogram:
    :param activation_functions:
    :param templates:
    :return:
    """
    if templates is None:
        num_plots = 2
    else:
        num_plots = 3
    plt.figure()
    plt.subplot(num_plots, 1, 1)
    plt.imshow(spectrogram.T, aspect='auto', origin='lower')
    if templates is not None:
        plt.subplot(num_plots, 1, 2)
        plt.imshow(templates, aspect='auto', origin='lower')
    plt.subplot(num_plots, 1, num_plots)
    plt.plot(activation_functions[:, 0])
    plt.plot(activation_functions[:, 1] + 1)
    plt.plot(activation_functions[:, 2] + 2)

    plt.show()
    
print('done')



# 3. Dataset
The next code block will load all the data from the example dataset we will use for the exercise.

In [None]:


#
# load our example dataset
#

# load audio and calculate features
audio_files = search_files(AUDIO_PATH, '.wav')
audio_files += search_files(AUDIO_PATH, '.flac')
audio = load_audio(audio_files)
features = create_features(audio_files, **SETTINGS)

sample_files = search_files(SAMPLES_PATH, '.wav')
sample_files += search_files(SAMPLES_PATH, '.flac')
sample_audio = load_audio(sample_files)
sample_features = create_features(sample_files, **SETTINGS)


# load annotations and create targets
annotation_files = search_files(ANNOTATIONS_PATH, '.txt')
annotations = load_annotations(annotation_files)
targets = create_targets(annotations, features)

sample_annotation_files = search_files(SAMPLE_ANNOTATIONS_PATH, '.txt')
sample_annotations = load_annotations(sample_annotation_files)
sample_targets = create_targets(sample_annotations, features)
sample_times = [[0, 8], [11, 19], [21, 29]]  # these are the times within which the onsets for each instrumt
                                              # are in the audio (in spec seconds) detailed annotations exist too!

fs = SETTINGS['sample_rate']
test_audio = audio[:(NUM_TRACKS * NUM_KITS)]

print('done')


# 4. Separate and detect approach
The next code block contains the first method which requiers some code to be filled in.

In [None]:
def separate_and_detect():
    """
    this function runs the main loop over our dataset using a simple separate and detect approach
    :return:
    """

    """
    To separate the individual drum instruments you can either use bandpass filters, see 
        https://scipy-cookbook.readthedocs.io/items/ButterworthBandpass.html
    You can look at the spectrogram to decide where you want to place the cutoff-frequencies.
    For the kits used in our dataset, these frequencies might work ok: 
    low 0-100 Hz
    high 10k - 22k Hz
    mid  100-10k Hz
        
    or simply calculate a spectrogram and only use the relevant frequency bands, i.e. set the rest to 0 
    (recommended, will usually work better). 
    The following frequency bands should work with our dataset: 
    low: bands 0-4
    mid: bands 6-30
    high: bands 50-end
    
    Note: this doesn't work too well, aim for around 50% f-measure; don't spend to much time on this approach.
    """

    # peak picking settings, use these settings, only play around with them once you have a working system.
    peak_picking_sep = OnsetPeakPickingProcessor(threshold=0.15, smooth=0.0, combine=0.04, delay=0.0, fps=100,
                                                 pitch_offset=0, pre_max=0.02, post_max=0.02)

    results_sep = [None for _ in range(len(test_audio))]
    # iterate over tracks
    for idx, data in enumerate(test_audio):
        inst_eval = [None, None, None]
        # for each instrument
        for inst in range(3):
            # get the spectrogram for the current file
            filt_spec = np.copy(features[idx])

            # fiter the signal for current instrument
            # TODO

            # calculate a simple onset detection function
            # e.g. use the step_diff function to calculate the spectral diff, like discussed
            # in the onset detection part (sum of abs. diffs)
            # TODO

            onset_activations = np.zeros((100, ))  # TODO replace with onset activations!

            # peack picking and onset evaluation
            filt_detections = peak_picking_sep.process(onset_activations)
            inst_eval[inst] = OnsetEvaluation(filt_detections,
                                              [event[0] for event in annotations[idx] if event[1] == inst],
                                              window=0.05, combine=0.025)

        # evaluation over all isntruments for the current track
        results_sep[idx] = OnsetSumEvaluation(inst_eval)

    # evaluation over all tracks, ouput results
    overall_results_sep = OnsetSumEvaluation(results_sep)
    print('Separate and Detect f-measure: %f' % overall_results_sep.fmeasure)


## 4.1. Run method
Once the TODO blocks are implemented, run the method in the next code cell:

In [None]:
separate_and_detect()


# 5. Segment and classify approach
In the next code block, implement the missing code marked with "TODO" to create a simple segment and classify approach.

In [None]:
def segment_and_classify():
    """
    this function runs the main loop over our dataset using a simple segment and classify approach
    :return:
    """

    # train the classifier
    #
    # we first have to train a classifier which is able to classify the different instruments and also
    # combined onsets of them. Since we only have single instrument onsets as sample, we first build a dictionary
    # of instrument hit combinations.
    # We will then use a simple KNN-classifier; you can experiment with other classifiers after the KNN version 
    # works, if you want (e.g. try SVMs from sklearn)

    # the hits are separated by about one second:
    hits_len = int(1*fs)

    # list of our features and labels for the classes used to train our classifier
    knn_feats = []
    knn_labels = []

    # onset combinations we want to use. 1..use instrument in combo 0..don't use
    # indices are: KD, SD, HH
    combos = [[1, 0, 0], [0, 1, 0], [0, 0, 1], [1, 1, 0], [0, 1, 1], [1, 0, 1], [1, 1, 1]]
    # iterate over kits and create combinations:
    for kit_idx in range(NUM_KITS):
        cur_audio = sample_audio[kit_idx]
        cur_annot = np.asarray(sample_annotations[kit_idx][:, 0]*fs, dtype=int)
        # create combinations:
        # there are four onsetes per instrument, with different loudness.
        # we will only combine onsetes with the same loudness, but use
        # ever loudness value for our classifier.
        for onset_idx in range(4):
            kd = cur_annot[onset_idx]
            sd = cur_annot[onset_idx + 4]
            hh = cur_annot[onset_idx + 8]

            for combo_idx, combo in enumerate(combos):
                # add audio to create combined onset
                combo_audio = cur_audio[kd:(kd+hits_len)] * combo[0] +\
                              cur_audio[sd:(sd+hits_len)] * combo[1] +\
                              cur_audio[hh:(hh+hits_len)] * combo[2]

                # calculate features (mean spectrogram), add features and add lable
                cur_mean = np.sum(compute_feature(combo_audio, **SETTINGS), axis=0)
                cur_mean = cur_mean / np.max(cur_mean)
                knn_feats.append(cur_mean)
                knn_labels.append(combo_idx)

    # for the no instrument class (7) use noise with five different levels of volume:
    for idx in range(5):
        factor = idx*0.2+0.1
        combo_audio = np.random.randint(low=int(-32768*factor), high=int(32767*factor),
                                        size=(hits_len,), dtype=np.int16)

        cur_mean = np.sum(compute_feature(combo_audio, **SETTINGS), axis=0)
        cur_mean = cur_mean / np.max(cur_mean)
        knn_feats.append(cur_mean)
        knn_labels.append(7)

    # Train classifier
    # Use a KNeighborsClassifier from sklearn (e.g. with 5 neighbours)
    # TODO

    # initialize an onset detector (use the one from madmom: CNNOnsetProcessor)
    # TODO

    # and a peak picking method (use the one from madmom: OnsetPeakPickingProcessor)
    # TODO

    # results list
    results_class = [None for _ in range(len(test_audio))]
    # iterate over dataset
    for idx, data in enumerate(test_audio):
        # Detect onsets, using your onset detector of choice.
        # TODO
        onsets = []  # TODO replace with list of onset positions

        # Calculate features for onsets.
        onset_feats = [None for _ in range(len(onsets))]
        for onsets_idx, onset in enumerate(onsets):
            # TODO
            onset_feats[onsets_idx] = np.zeros((10,))  # TODO replace with features (mean spectrogram) for the current
                                                       # TODO do it as it is done when creating the training data

        # Predict class labels for onsets using the trained KNN classifier.
        # TODO

        # Translate labels back to instrument combinations.
        # TODO

        # Finally, fill this list with sub lists for each instrument with the corresponding onset times.
        # inst_det = [[<times for KD>], [<times for SD>], [<times for HH>]]
        # TODO
        inst_det = []  # TODO replace this empty list

        # Evaluate onsets for this track.
        inst_eval = [OnsetEvaluation(inst_det[inst], [event[0] for event in annotations[idx] if event[1] == inst],
                                     window=0.05, combine=0.025) for inst in range(3)]
        results_class[idx] = OnsetSumEvaluation(inst_eval)

    # Evaluate over all tracks and print results.
    overall_results_class = OnsetSumEvaluation(results_class)
    print('Segment and Classify f-measure: %f' % overall_results_class.fmeasure)

## 5.1. Run method
Once the TODO blocks are implemented, run the method in the next code cell:

In [None]:
segment_and_classify()


# 6. NMF based methods
For the NMF based methods we will use one function that compares (i) a simple NMF method with random initialization, (ii) NMF with template initialization, (iii) fixed bases NMF, and (iv) semi-adaptive bases NMF.

## 6.1. NMF base class
Implement a class that performs NMF. Fill in the missing code blocks marked with "TODO".

In [None]:


# implement a class which performs NMF
class NMFBase:
    """
    class that is used to perform NMF variants
    """

    def __init__(self, X, B_init, G_init, updateB=True, verbose=False, beta=None):
        """
        :param X: input spectrogram
        :param B_init: initialization for base matrix B
        :param G_init: initialization for activations matrix G
        :param updateB: update matrix B (true) or not (false)
        :param verbose: provide more output for debugging
        :param beta: use a semi-adaptive update of B using the provided beta
                     (None = don't use semi adaptive updates). udpateB has to be True for this.
        """
        self.B = np.copy(B_init)
        self.B_init = np.copy(B_init)
        self.G = np.copy(G_init)
        self.X = X
        self.X_hat = np.zeros(X.shape)
        self.updateB = updateB
        self.verbose = verbose
        self.alpha = 1.0
        self.beta = beta

    # Calculates loss according to generalized Kullback-Leibler divergence between X and X_hat.
    def loss(self):
        # Implement the generalized Kullback-Leibler divergence as provided in the slides of the lecture.
        # Use the classe's fields' current values as inputs (self.B, self.G, ...).
        # Use numpy functions np.dot(...), np.devide(), etc. for the calculation!
        # TODO
        return np.zeros((1,))  # TODO replace this with the real loss

    def update(self):
        # Implement the update for B and G as discussed in the slides of the lecture.
        # Make sure you consider the options self.updateB (fixed bases) and self.beta (semi-adaptive bases).
        # Use the classes fields current values as inputs and update them in-place (self.B, self.G, ...).
        # Also note the extra slide from the lecture providing implementation hints for the update calculation.
        # TODO
        pass

    # call this method from outside the class to run the NMF algorithm
    def optimize(self, tol=1e-4, max_iter=100):
        n_iter = 0
        errors = []
        start_time = time.time()
        error_at_init = self.loss()
        previous_error = error_at_init
        while n_iter < max_iter:
            if self.beta is not None:
                self.alpha = (n_iter / max_iter) ** self.beta
            
            self.update()
            error = self.loss()
            errors.append(error)
            n_iter += 1

            # test convergence criterion every 10 iterations
            if tol > 0 and n_iter % 10 == 0:
                if self.verbose:
                    iter_time = time.time()
                    print("Epoch %02d reached after %.3f seconds, error: %f" %
                          (n_iter, iter_time - start_time, error))

                if (previous_error - error) / error_at_init < tol:
                    break
                previous_error = error

        # do not print if we have already printed in the convergence test
        if self.verbose and (tol == 0 or n_iter % 10 != 0):
            end_time = time.time()
            print("Epoch %02d reached after %.3f seconds." %
                  (n_iter, end_time - start_time))

            if plot:
                # plot NMF loss development along with X, X_hat, B and G for debugging
                max_plot_frames = 2000
                plt.figure()
                plt.subplot(5, 1, 1)
                plt.imshow(self.X[:, :max_plot_frames], origin='lower', aspect='auto')
                plt.subplot(5, 1, 2)
                plt.imshow(self.X_hat[:, :max_plot_frames], origin='lower', aspect='auto')
                plt.subplot(5, 1, 3)
                plt.plot(self.G[:, :max_plot_frames].T / np.max(self.G[:, :5000].T, axis=0) + [0, 1, 2])
                plt.xlim([0, max_plot_frames])
                plt.subplot(5, 1, 4)
                plt.plot(self.B / np.max(self.B, axis=0) + [0, 1, 2])
                plt.subplot(5, 1, 5)
                plt.plot(errors)
                plt.xlim([0, max_iter])
                plt.show(block=False)


## 6.2 NMF experiment function
Similar as with the first two approaches, we now implement a function that runs the different NMF approaches on our test dataset. Fill in the missing code blocks marked with "TODO".

In [None]:
# run nmf variants
def nmf():
    name = 'NMF'

    # peak picking for NMF, values should work, only change if you have a working method.
    nmf_peak_picking = OnsetPeakPickingProcessor(threshold=0.2, smooth=0.0, pre_avg=0.05,
                                                 post_avg=0.05, pre_max=0.02, post_max=0.02,
                                                 combine=0.02, fps=FPS)

    # base-templates from train data which can be used for initialization of B for fixed and semi-adaptive approaches
    # extract mean spectrograms from training samples...
    means = [[None, None, None] for _ in range(NUM_KITS)]
    for cur_kit_idx in range(NUM_KITS):
        sample_feat = sample_features[cur_kit_idx]
        cur_means = [np.sum(sample_feat[slice(sample_times[idx][0] * FPS, sample_times[idx][1] * FPS), :], axis=0) for
                     idx in range(len(sample_times))]
        means[cur_kit_idx] = [cur_mean / np.max(cur_mean) for cur_mean in cur_means]

    # result lists to hold results for individual tracks for each method
    results_mnf_rand = [None for _ in range(len(test_audio))]
    results_mnf_init = [None for _ in range(len(test_audio))]
    results_mnf_fixed = [None for _ in range(len(test_audio))]
    results_mnf_semi_adaptive = [None for _ in range(len(test_audio))]

    # iterate over songs...
    for idx, data in enumerate(test_audio):
        print('%s, track %d' % (name, idx))
        cur_kit_idx = idx % NUM_KITS

        # plain NMF, random initalization of B and G
        # create a NMFBase object and initialize it accordingly. 
        # run the optimize method of the NMFBase object.
        # normalize the activation output G, (divide by its maximum) for consistent peak picking!
        # TODO
        nmf_trans_rand = np.zeros((1,))  # TODO replace this with the activation functions provided by NMF 
        # for evaluation, the indices of the output are expected to be 0..KD, 1..SD, 2..HH;
        # find a simple method to sort the activation functions correctly, you might reuse the KNN
        # classifier from the segment and classify method (make it a global variable in that case).

        # evaluate results
        inst_det_rand = [nmf_peak_picking.process(nmf_trans_rand[:, inst]) for inst in range(3)]
        inst_eval_rand = [OnsetEvaluation(inst_det_rand[inst], [event[0] for event in annotations[idx] if event[1] == inst],
                                       window=0.05, combine=0.025) for inst in range(3)]
        results_mnf_rand[idx] = OnsetSumEvaluation(inst_eval_rand)

        # plain NMF, initialize B with templates
        # create a NMFBase object and initialize it accordingly. 
        # run the optimize method of the NMFBase object.
        # normalize the activation output G, (divide by its maximum) for consistent peak picking!
        # TODO
        nmf_trans_init = np.zeros((1,))  # TODO replace this with the activation functions provided by NMF 

        # evaluate results
        inst_det_init = [nmf_peak_picking.process(nmf_trans_init[:, inst]) for inst in range(3)]
        inst_eval_init = [OnsetEvaluation(inst_det_init[inst], [event[0] for event in annotations[idx] if event[1] == inst],
                                       window=0.05, combine=0.025) for inst in range(3)]
        results_mnf_init[idx] = OnsetSumEvaluation(inst_eval_init)

        # perform fixed bases NMF, initialize B with templates
        # create a NMFBase object and initialize it accordingly. 
        # run the optimize method of the NMFBase object.
        # normalize the activation output G, (divide by its maximum) for consistent peak picking!
        # TODO
        nmf_trans_fixed = np.zeros((1,))  # TODO replace this with the activation functions provided by NMF 

        # evaluate results
        inst_det_fixed = [nmf_peak_picking.process(nmf_trans_fixed[:, inst]) for inst in range(3)]
        inst_eval_fixed = [OnsetEvaluation(inst_det_fixed[inst], [event[0] for event in annotations[idx] if event[1] == inst],
                                     window=0.05, combine=0.025) for inst in range(3)]
        results_mnf_fixed[idx] = OnsetSumEvaluation(inst_eval_fixed)

        # semi adaptive NMF, initialize B with templates
        # create a NMFBase object and initialize it accordingly. 
        # run the optimize method of the NMFBase object.
        # normalize the activation output G, (divide by its maximum) for consistent peak picking!
        # TODO
        nmf_trans_semi_adaptive = np.zeros((1,))  # TODO replace this with the activation functions provided by NMF 

        # evaluate results
        inst_det_semi_adaptive = [nmf_peak_picking.process(nmf_trans_semi_adaptive[:, inst]) for inst in range(3)]
        inst_eval_semi_adaptive = [OnsetEvaluation(inst_det_semi_adaptive[inst], [event[0] for event in annotations[idx] if event[1] == inst],
                                       window=0.05, combine=0.025) for inst in range(3)]
        results_mnf_semi_adaptive[idx] = OnsetSumEvaluation(inst_eval_semi_adaptive)

    # collect overall results and print output
    overall_results_nmf_rand = OnsetSumEvaluation(results_mnf_rand)
    print('%s rand-init f-measure: %f' % (name, overall_results_nmf_rand.fmeasure))

    overall_results_nmf_init = OnsetSumEvaluation(results_mnf_init)
    print('%s template init f-measure: %f' % (name, overall_results_nmf_init.fmeasure))

    overall_results_nmf_fixed = OnsetSumEvaluation(results_mnf_fixed)
    print('%s fixed f-measure: %f' % (name, overall_results_nmf_fixed.fmeasure))

    overall_results_nmf_semi_adaptive = OnsetSumEvaluation(results_mnf_semi_adaptive)
    print('%s semi-adapative f-measure: %f' % (name, overall_results_nmf_semi_adaptive.fmeasure))


## 6.3. Run method
Once the TODO blocks are implemented, run the method in the next code cell:

In [None]:
nmf()


# 7. CNN based method
In the next couple of code blocks we will implement a neural network based (CNN) ADT approach.
Please fill in the missing code marked with "TODO".

## 7.1. CNN class
Implement a convolutional neural network as shown in the slides of the lecture. Implement the missing parts marked with "TODO".


In [None]:
# transcription base class
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        #
        # In this constructor, create the layers needed to build the network.
        # Use the pytorch components nn.Conv2d, nn.BatchNorm2d, nn.Dropout2d, nn.Linear, nn.BatchNorm1d
        # The network should have the same architecture as presented in the lecture slides (CNN)
        # Note that one convolutional block (yellowish blocks) consists of TWO layers of 3x3 convolutions with
        # batch normalization for EACH layer and max pooling and dropout after the convolutional block (after the 2nd
        # convolutional layer. The whole network consists of two convolutional blocks, where in the first each layer
        # contains 32 filters, and in the second each layer contains 64 filters.
        # After that, a dense layer (nn.Linear) with 50 neurons and the output dense layer with 3 neurons follow.
        # TODO
        # e.g. for the first convolutional layer we will need something like:
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3)
        # ...

    def forward(self, x):
        # This function calculates a forward pass through the network (i.e. calculates the output for given input x).
        # Hand x through the layers of the network and calculate the output.
        # Don't forget to apply the nonlinearities (activation functions).
        # Use ReLU activation function ( torch_func.relu() ) except for the ouput of the network where we need
        # sigmoid activations (0-1) for our activation functions ( torch_func.sigmoid() or torch.sigmoid() )
        # TODO
        # e.g. to calculate the hidden output of the first convolutional layer:
        h1 = torch_func.relu(self.conv1_bn(self.conv1(x)))

        # Note that you should always apply the batch normalization, max pooling (torch_func.max_pool2d),
        # and dropout BEFORE you apply the activation function!!
        # TODO
        hn = 0  # TODO replace this with the last layer of the network
        y = torch_func.sigmoid(hn)
        return y


## 7.2 CNN helper functions, dataset preparation, and data formatter
The next code block defines some helper functions, read through the code---there is nothing to implement in this code block.

In [None]:
def train_nn(args, model, device, train_loader, optimizer, epoch):
    """
    Training loop for one epoch of NN training.
    Within one epoch, all the data is used once, we use mini-batch gradient descent.
    :param args: NN parameters for training and inference
    :param model: The model to be trained
    :param device: PyTorch device: CPU or GPU
    :param train_loader: Data provider
    :param optimizer: Optimizer (Gradient descent update algorithm)
    :param epoch: Current epoch number
    :return:
    """
    # set model to training mode (activate dropout layers for example).
    model.train()
    # we measure the needed time
    t = time.time()
    # iterate over training data
    for batch_idx, (data, target) in enumerate(train_loader):
        # move data to device (GPU) if necessary
        data, target = data.to(device), target.to(device)
        # reset optimizer
        optimizer.zero_grad()
        # forward pass (calculate output of network for input)
        output = model(data)
        # calculate loss
        loss = torch_func.binary_cross_entropy(output, target)
        # do a backward pass (calculate gradients using automatic differentiation and backpropagation)
        loss.backward()
        # udpate parameters of network using our optimizer
        optimizer.step()
        # print some outputs if we reached our logging intervall
        if batch_idx % args.log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}, took {:.2f}s'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item(), time.time()-t))
            t = time.time()


def test_nn(args, model, device, test_loader):
    """
    Function wich iterates over test data (eval or test set) and calculates loss.
    Here no parameter update is done
    :param args: NN parameters for training and inference
    :param model: The model to be tested
    :param device: PyTorch device: CPU or GPU
    :param test_loader: Data provider
    :return: cumulative test loss
    """
    # set model to inference mode (deactivate dropout layers for example).
    model.eval()
    # init cumulative loss
    test_loss = 0
    # do not calculate gradients since we do not want to do updates
    with torch.no_grad():
        # iterate over test data
        for data, target in test_loader:
            # move data to device (GPU) if necessasry
            data, target = data.to(device), target.to(device)
            # forward pass (calculate output of network for input)
            output = model(data)
            # claculate loss and add it to our cumulative loss
            test_loss += torch_func.binary_cross_entropy(output, target, reduction='sum').item()  # sum up batch loss

    # output results of test run
    test_loss /= len(test_loader.dataset)
    print('Average loss: {:.4f}\n'.format(
        test_loss, len(test_loader.dataset)))

    return test_loss


def inference_cnn(args, model, device, data, outnum=3):
    """
    Function calculating the actual output of the network, given some input.
    :param args: NN parameters for training and inference
    :param model: The network to be used
    :param device: PyTorch device: CPU or GPU
    :param data: Data for which the output should be calculated
    :param outnum: Number of network outputs to reserve output memory
    :return: output of network
    """
    # set model to inference mode (deactivate dropout layers for example).
    model.eval()
    # reserve output memory
    in_shape = data.shape
    side = int((args.context-1)/2)
    outlen = in_shape[0] - 2*side
    output = np.zeros((outlen, outnum))
    # move input to device if necessary
    data = torch.from_numpy(data[None, None, :, :])
    data = data.to(device)
    # do not calculate gradients since we do not want to do updates
    with torch.no_grad():
        # iterate over input data
        for idx in range(outlen):
            # calculate output for input data
            output[idx, :] = model(data[:, :, idx:(idx+args.context), :])[0, :]
    # compensate for convolutional context and return output
    return np.vstack((np.zeros((side, outnum)), output, np.zeros((side, outnum))))


# split data into train, valid and test sets.
# for a hard but fair evaluation, we can not reuse a track or kit we trained on for validation or testing.
# therefore, we split the data in respect to tracks and kits, only using kit 0 and 1 as well as track 0 and 1 for
# training; kit 2 and track 2 for validation; and kit 3 and track 3 for testing.
# you can also test on the whole set, but you will see, that for the tracks we trained on, the performance is even
# better since the model usually overfits a bit on the training data.
train_tracks = [0, 1]
train_kits = [0, 1]
valid_tracks = [2]
valid_kits = [2]
test_tracks = [3]
test_kits = [3]

train_idxs = [track * NUM_KITS + kit for kit in train_kits for track in train_tracks]
valid_idxs = [track * NUM_KITS + kit for kit in valid_kits for track in valid_tracks]

test_idxs = [track * NUM_KITS + kit for kit in test_kits for track in test_tracks]

train_feat = [np.asarray(features[cur_idx], dtype='float32') for cur_idx in train_idxs]
train_feat.extend([np.asarray(feat, dtype='float32') for feat in sample_features])
valid_feat = [np.asarray(features[cur_idx], dtype='float32') for cur_idx in valid_idxs]
test_feat = [np.asarray(features[cur_idx], dtype='float32') for cur_idx in test_idxs]

train_annot = [annotations[cur_idx] for cur_idx in train_idxs]
train_annot.extend(sample_annotations)
valid_annot = [annotations[cur_idx] for cur_idx in valid_idxs]
test_annot = [annotations[cur_idx] for cur_idx in test_idxs]

train_targets = [np.asarray(targets[cur_idx], dtype='float32') for cur_idx in train_idxs]
train_targets.extend([np.asarray(targ, dtype='float32') for targ in sample_targets])
valid_targets = [np.asarray(targets[cur_idx], dtype='float32') for cur_idx in valid_idxs]
test_targets = [np.asarray(targets[cur_idx], dtype='float32') for cur_idx in test_idxs]


# class which formats the spectrogram data in the way needed for convolutional neural network training
class DrumSet(Dataset):
    def __init__(self, feat_list, targ_list, context, hop):
        """
        Create spectrogram based drum dataset for CNN training
        :param feat_list: list with spectrograms (np.array) for individual tracks
        :param targ_list: list with targets (np.array) for individual tracks
        :param context: length of context for CNN
        :param hop: hop size between two consecutive spectrogram snippets. this is usually 1, especiall for testing
                    and inference. For very large datasets it can make sense to increase this, to speed up training.
        """
        self.features = feat_list
        self.targets = targ_list
        self.context = context
        self.side = int((context-1)/2)
        self.hop_size = hop
        # list with snippet count per track
        self.snip_cnt = []
        # overall snippet count
        total_snip_cnt = 0
        # calculate overall number of snippets we can get from our data
        for feat in feat_list:
            cur_len = int(np.ceil((feat.shape[0] - self.side*2) / self.hop_size))
            self.snip_cnt.append(cur_len)
            total_snip_cnt += cur_len
        self.length = int(total_snip_cnt)
        super(DrumSet, self).__init__()

    def __len__(self):
        return self.length

    def __getitem__(self, index):
        """
        Get a snipped by index, from the whole dataset
        :param index: index of the snipped to be returned
        :return: a snipped for CNN training
        """
        # find track which contains snipped with index [index]
        overal_pos = 0
        for idx, cnt in enumerate(self.snip_cnt):
            # check if this track contains the snipped with index [index]
            if index < overal_pos+cnt:
                break
            else:
                # if not, add the current tracks snipped count to the overall snipped count already visited
                overal_pos += cnt

        # calculate the position of the snipped within the track nr. [idx]
        position = index-overal_pos
        position += self.side

        # get snipped and target
        sample = self.features[idx][(position-self.side):(position+self.side+1), :]
        target = self.targets[idx][position, :]
        # convert to PyTorch tensor and return
        return torch.from_numpy(sample).unsqueeze_(0), torch.from_numpy(target)

# helper class for arguments
class Args:
    pass

print('done')


## 7.3 CNN experiment function
The next code block contains the experiment using the CNN. First we train the CNN, then we use it to transcribe the test split of the dataset.
Implement the missing blocks marked with "TODO".

In [None]:
# cnn drum transcription experiment
def cnn():
    print('Training CNN...')
    # peak picking for CNN outputs
    cnn_peak_picking = OnsetPeakPickingProcessor(threshold=0.05, smooth=0.0, pre_avg=0.01,
                                                 post_avg=0.01, pre_max=0.02, post_max=0.02,
                                                 combine=0.02, fps=FPS)
    # parameters for NN training
    name = 'CNN'
    args = Args()
    args.batch_size = 64
    args.test_batch_size = 1000
    args.max_epochs = 1000
    args.patience = 4
    args.lr = 0.01
    args.momentum = 0.5
    args.no_cuda = not g_use_cuda
    args.seed = 1
    args.log_interval = 100
    args.context = 25
    args.step_size = 1

    # setup pytorch
    use_cuda = not args.no_cuda and torch.cuda.is_available()
    torch.manual_seed(seed)
    device = torch.device("cuda" if use_cuda else "cpu")

    # create model and optimizer, we use plain SGD with momentum
    model = Net().to(device)
    optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)

    # setup our datasets for training, evaluation and testing
    kwargs = {'num_workers': 4, 'pin_memory': True} if use_cuda else {'num_workers': 4}
    train_loader = torch.utils.data.DataLoader(DrumSet(train_feat, train_targets, args.context, args.step_size),
                                               batch_size=args.batch_size, shuffle=True, **kwargs)
    valid_loader = torch.utils.data.DataLoader(DrumSet(valid_feat, valid_targets, args.context, 1),
                                               batch_size=args.batch_size, shuffle=False, **kwargs)
    test_loader = torch.utils.data.DataLoader(DrumSet(test_feat, test_targets, args.context, 1),
                                              batch_size=args.batch_size, shuffle=False, **kwargs)

    # main training loop
    best_test_loss = 9999
    cur_patience = args.patience
    for epoch in range(1, args.max_epochs + 1):
        # run one epoch of NN training
        train_nn(args, model, device, train_loader, optimizer, epoch)
        # validate on validation set
        print('\nValidation Set:')
        test_loss = test_nn(args, model, device, valid_loader)
        # check for early stopping
        if test_loss < best_test_loss:
            torch.save(model.state_dict(), os.path.join(MODEL_PATH, CNN_MODEL_NAME + '.model'))
            best_test_loss = test_loss
            cur_patience = args.patience
        else:
            # if performance does not improve, we do not stop immediately but wait for 4 iterations (patience)
            if cur_patience <= 0:
                print('Early stopping, no improvement for %d epochs...' % args.patience)
                break
            else:
                print('No improvement, patience: %d' % cur_patience)
                cur_patience -= 1

    # testing on test data
    print('Evaluate CNN...')
    print('Test Set:')
    # calculate loss for test set
    test_nn(args, model, device, test_loader)

    # calculate actual output for the test data and generate a transcript and evaluate it.
    results_cnn = [None for _ in range(len(test_feat))]
    # iterate over test tracks
    for test_idx, cur_test_feat in enumerate(test_feat):
        # run the inference method
        # TODO

        # evaluate results, compare how it was done with NMF
        # TODO
        results_cnn[test_idx] = 0  # replace with OnsetSumEvaluation

    # calculate overall results and print output
    overall_results_cnn = OnsetSumEvaluation(results_cnn)
    print('%s f-measure: %f' % (name, overall_results_cnn.fmeasure))


## 7.4. Run experiment
The next and last code block runs the CNN experiments.

In [None]:
cnn()

# 8. Results and discussion
Briefly note your observations about the evaluation.
How did the methods perform compared to each other?
What can you say in terms of runtime?
What was surprising? Can you give explanations for the differences in performance?
How would you expect to perform the individual methods on other, unseen data?

Your answer goes here...