Audiospec

In [1]:
import numpy as np
from scipy import signal
import scipy.io.wavfile as wav
from numpy.lib import stride_tricks
import matplotlib.pyplot as plt

""" short time fourier transform of audio signal """
def stft(sig, frameSize, overlapFac=0.5, window=np.hanning):
    win = window(frameSize)
    hopSize = int(frameSize - np.floor(overlapFac * frameSize))
    
    # zeros at beginning (thus center of 1st window should be for sample nr. 0)
    samples = np.append(np.zeros(np.int64(np.floor(frameSize/2.0))), sig)    
    # cols for windowing
    cols = np.int64(np.ceil( (len(samples) - frameSize) / float(hopSize)) + 1)
    # zeros at end (thus samples can be fully covered by frames)
    samples = np.append(samples, np.zeros(frameSize))
    
    frames = stride_tricks.as_strided(
            samples, shape=(cols, frameSize),
            strides=(samples.strides[0]*hopSize, samples.strides[0])).copy()
    frames *= win
    
    return np.fft.rfft(frames)    
    
""" scale frequency axis logarithmically """    
def logscale_spec(spec, sr=44100, factor=20.):
    timebins, freqbins = np.shape(spec)

    scale = np.linspace(0, 1, freqbins) ** factor
    scale *= (freqbins-1)/max(scale)
    scale = np.unique(np.round(scale))
    scale = np.int64(scale)
    
    # create spectrogram with new freq bins
    newspec = np.complex128(np.zeros([timebins, len(scale)]))
    for i in range(0, len(scale)):
        if i == len(scale)-1:
            newspec[:,i] = np.sum(spec[:,scale[i]:], axis=1)
        else:        
            newspec[:,i] = np.sum(spec[:,scale[i]:scale[i+1]], axis=1)
    
    # list center freq of bins
    allfreqs = np.abs(np.fft.fftfreq(freqbins*2, 1./sr)[:freqbins+1])
    freqs = []
    for i in range(0, len(scale)):
        if i == len(scale)-1:
            freqs += [np.mean(allfreqs[scale[i]:])]
        else:
            freqs += [np.mean(allfreqs[scale[i]:scale[i+1]])]
    
    return newspec, freqs

""" plot spectrogram"""
def plotstft(samples, samplerate, binsize=2**10, 
        plotpath=None, colormap="jet", plot_artifacts=True):
    s = stft(samples, binsize)
    
    sshow, freq = logscale_spec(s, factor=1.0, sr=samplerate)
    ims = 20.*np.log10(np.abs(sshow)/10e-6) # amplitude to decibel
    
    timebins, freqbins = np.shape(ims)
    
    plt.figure(figsize=(15, 7.5))
    plt.imshow(np.transpose(ims), origin="lower", 
            aspect="auto", cmap=colormap, interpolation="none")

    if not plot_artifacts:
      plt.axis('off')
    else:
      plt.colorbar()

      plt.xlabel("time (s)")
      plt.ylabel("frequency (hz)")
      plt.xlim([0, timebins-1])
      plt.ylim([0, freqbins])

      xlocs = np.float32(np.linspace(0, timebins-1, 5))
      plt.xticks(xlocs, ["%.02f" % l for l in 
          ((xlocs*len(samples)/timebins)+(0.5*binsize))/samplerate])

      ylocs = np.int16(np.round(np.linspace(0, freqbins-1, 10)))
      plt.yticks(ylocs, ["%.02f" % freq[i] for i in ylocs])
    
    if plotpath:
        plt.savefig(plotpath, bbox_inches="tight")
    else:
        plt.show()
    plt.clf()
    plt.close('all')

Convert

In [2]:
import numpy as np
from scipy import signal
import scipy.io.wavfile as wav
from numpy.lib import stride_tricks
import matplotlib.pyplot as plt

""" short time fourier transform of audio signal """
def stft(sig, frameSize, overlapFac=0.5, window=np.hanning):
    win = window(frameSize)
    hopSize = int(frameSize - np.floor(overlapFac * frameSize))
    
    # zeros at beginning (thus center of 1st window should be for sample nr. 0)
    samples = np.append(np.zeros(np.int64(np.floor(frameSize/2.0))), sig)    
    # cols for windowing
    cols = np.int64(np.ceil( (len(samples) - frameSize) / float(hopSize)) + 1)
    # zeros at end (thus samples can be fully covered by frames)
    samples = np.append(samples, np.zeros(frameSize))
    
    frames = stride_tricks.as_strided(
            samples, shape=(cols, frameSize),
            strides=(samples.strides[0]*hopSize, samples.strides[0])).copy()
    frames *= win
    
    return np.fft.rfft(frames)    
    
""" scale frequency axis logarithmically """    
def logscale_spec(spec, sr=44100, factor=20.):
    timebins, freqbins = np.shape(spec)

    scale = np.linspace(0, 1, freqbins) ** factor
    scale *= (freqbins-1)/max(scale)
    scale = np.unique(np.round(scale))
    scale = np.int64(scale)
    
    # create spectrogram with new freq bins
    newspec = np.complex128(np.zeros([timebins, len(scale)]))
    for i in range(0, len(scale)):
        if i == len(scale)-1:
            newspec[:,i] = np.sum(spec[:,scale[i]:], axis=1)
        else:        
            newspec[:,i] = np.sum(spec[:,scale[i]:scale[i+1]], axis=1)
    
    # list center freq of bins
    allfreqs = np.abs(np.fft.fftfreq(freqbins*2, 1./sr)[:freqbins+1])
    freqs = []
    for i in range(0, len(scale)):
        if i == len(scale)-1:
            freqs += [np.mean(allfreqs[scale[i]:])]
        else:
            freqs += [np.mean(allfreqs[scale[i]:scale[i+1]])]
    
    return newspec, freqs

""" plot spectrogram"""
def plotstft(samples, samplerate, binsize=2**10, 
        plotpath=None, colormap="jet", plot_artifacts=True):
    s = stft(samples, binsize)
    
    sshow, freq = logscale_spec(s, factor=1.0, sr=samplerate)
    ims = 20.*np.log10(np.abs(sshow)/10e-6) # amplitude to decibel
    
    timebins, freqbins = np.shape(ims)
    
    plt.figure(figsize=(15, 7.5))
    plt.imshow(np.transpose(ims), origin="lower", 
            aspect="auto", cmap=colormap, interpolation="none")

    if not plot_artifacts:
      plt.axis('off')
    else:
      plt.colorbar()

      plt.xlabel("time (s)")
      plt.ylabel("frequency (hz)")
      plt.xlim([0, timebins-1])
      plt.ylim([0, freqbins])

      xlocs = np.float32(np.linspace(0, timebins-1, 5))
      plt.xticks(xlocs, ["%.02f" % l for l in 
          ((xlocs*len(samples)/timebins)+(0.5*binsize))/samplerate])

      ylocs = np.int16(np.round(np.linspace(0, freqbins-1, 10)))
      plt.yticks(ylocs, ["%.02f" % freq[i] for i in ylocs])
    
    if plotpath:
        plt.savefig(plotpath, bbox_inches="tight")
    else:
        plt.show()
    plt.clf()
    plt.close('all')

CQT

In [3]:
import librosa
import librosa.display
import numpy as np
import matplotlib.pyplot as plt

def plot_cqt(song, path):
  plt.figure(figsize=(7.5, 3.75))
  y, sr = librosa.load(song)
  C = librosa.cqt(y, sr=sr)
  librosa.display.specshow(librosa.amplitude_to_db(C, ref=np.max),
                            sr=sr)
  plt.axis('off')
  plt.savefig(path, bbox_inches="tight")
  plt.close('all')

Split midi

In [8]:
import mido
from mido import MidiFile, MidiTrack, Message, MetaMessage
from os import listdir
from os.path import isfile, split, join
import argparse

def split_midi(mid_file, target_dir, default_tempo=500000, target_segment_len=1):
  '''Split midi file into many chunks'''
  song_name = split(mid_file)[-1][:-4]
  mid = MidiFile(mid_file)

  # identify the meta messages
  metas = []
  tempo = default_tempo
  for msg in mid:
    if msg.type == 'set_tempo':
      tempo = msg.tempo
    if msg.is_meta:
      metas.append(msg)
  for meta in metas:
    meta.time = int(mido.second2tick(meta.time, mid.ticks_per_beat, tempo))

  target = MidiFile()
  track = MidiTrack()
  track.extend(metas)
  target.tracks.append(track)
  prefix = 0
  time_elapsed = 0
  for msg in mid:
    # Skip non-note related messages
    if msg.is_meta:
      continue
    time_elapsed += msg.time
    if msg.type != 'end_of_track':
      msg.time = int(mido.second2tick(msg.time, mid.ticks_per_beat, tempo))
      track.append(msg)
    if msg.type == 'end_of_track' or time_elapsed >= target_segment_len:
      track.append(MetaMessage('end_of_track'))
      target.save(join(target_dir, song_name + '_{}.mid'.format(prefix)))
      target = MidiFile()
      track = MidiTrack()
      track.extend(metas)
      target.tracks.append(track)
      time_elapsed = 0
      prefix += 1


def merge_midi(midis, input_dir, output, default_tempo=500000):
  '''Merge midi files into one'''
  pairs = [(int(x[:-4].split('_')[-1]), x) for x in midis]
  pairs = sorted(pairs, key=lambda x: x[0])
  midis = [join(input_dir, x[1]) for x in pairs]

  mid = MidiFile(midis[0])
  # identify the meta messages
  metas = []
  # tempo = default_tempo
  tempo = default_tempo // 2
  for msg in mid:
    if msg.type == 'set_tempo':
      tempo = msg.tempo
    if msg.is_meta:
      metas.append(msg)
  for meta in metas:
    meta.time = int(mido.second2tick(meta.time, mid.ticks_per_beat, tempo))
  
  target = MidiFile()
  track = MidiTrack()
  track.extend(metas)
  target.tracks.append(track)
  for midi in midis:
    mid = MidiFile(midi)
    for msg in mid:
      if msg.is_meta:
        continue
      if msg.type != 'end_of_track':
        msg.time = int(mido.second2tick(msg.time, mid.ticks_per_beat, tempo))
        track.append(msg)

  track.append(MetaMessage('end_of_track'))
  target.save(output)


def parse_args():
  '''Parse arguments'''
  parser = argparse.ArgumentParser()
  parser.add_argument('input_dir', type=str)
  parser.add_argument('target_dir', type=str)
  parser.add_argument('-l', '--length', default=1, type=float)
  parser.add_argument('-m', '--merge', action='store_true')
  return parser.parse_args()


def main():
  args = parse_args()
  input_dir = args.input_dir
  target_dir = args.target_dir
  length = args.length

  # Get all the input midi files
  midis = [x for x in listdir(input_dir) if x.endswith('.mid')]

  if args.merge:
    merge_midi(midis, input_dir, target_dir)
  else:
    for midi in midis:
      print(midi)
      try:
        split_midi(join(input_dir, midi), target_dir, target_segment_len=length)
      except:
        print('\tProblem!')

utils

In [10]:
from __future__ import division
"""
Simple function for converting Pretty MIDI object into one-hot encoding
/ piano-roll-like to be used for machine learning.
"""
import pretty_midi
import numpy as np
import sys
import argparse

def pretty_midi_to_one_hot(pm, fs=100):
    """Compute a one hot matrix of a pretty midi object
    Parameters
    ----------
    pm : pretty_midi.PrettyMIDI
        A pretty_midi.PrettyMIDI class instance describing
        the piano roll.
    fs : int
        Sampling frequency of the columns, i.e. each column is spaced apart
        by ``1./fs`` seconds.
    Returns
    -------
    one_hot : np.ndarray, shape=(128,times.shape[0])
        Piano roll of this instrument. 1 represents Note Ons,
        -1 represents Note offs, 0 represents constant/do-nothing
    """

    # Allocate a matrix of zeros - we will add in as we go
    one_hots = []

    if len(pm.instruments) < 1:
        return 0

    for instrument in pm.instruments:
        one_hot = np.zeros((128, int(fs*instrument.get_end_time())+1))
        for note in instrument.notes:
            # note on
            one_hot[note.pitch, int(note.start*fs)] = 1
            # print('note on',note.pitch, int(note.start*fs))
            # note off
            one_hot[note.pitch, int(note.end*fs)] = 0
            # print('note off',note.pitch, int(note.end*fs))
        one_hots.append(one_hot)

    one_hot = np.zeros((128, np.max([o.shape[1] for o in one_hots])))
    for o in one_hots:
        one_hot[:, :o.shape[1]] += o

    one_hot = np.clip(one_hot,-1,1)
    return one_hot

def one_hot_to_pretty_midi(one_hot, fs=100, program=1,bpm=120):
    '''Convert a Piano Roll array into a PrettyMidi object
     with a single instrument.
    Parameters
    ----------
    piano_roll : np.ndarray, shape=(128,time)
        Piano roll of one instrument
    fs : int
        Sampling frequency of the columns, i.e. each column is spaced apart
        by ``1./fs`` seconds.
    program : int
        The program number of the instrument.
    bpm : int
        Beats per minute, used to decide when to re-emphasize notes left on.
    Returns
    -------
    midi_object : pretty_midi.PrettyMIDI
        A pretty_midi.PrettyMIDI class instance describing
        the piano roll.
    '''
    notes, frames = one_hot.shape
    pm = pretty_midi.PrettyMIDI()
    instrument = pretty_midi.Instrument(program=program)

    # prepend, append zeros so we can acknowledge inital and ending events
    piano_roll = np.hstack((np.zeros((notes, 1)),
                            one_hot,
                            np.zeros((notes, 1))))

    # use changes to find note on / note off events
    changes = np.nonzero(np.diff(piano_roll).T)

    # keep track of note on times and notes currently playing
    note_on_time = np.zeros(notes)
    current_notes = np.zeros(notes)

    bps = bpm / 60
    beat_interval = fs / bps
    strong_beats = beat_interval * 2 #(for 4/4 timing)

    last_beat_time = 0

    for time, note in zip(*changes):
        change = piano_roll[note, time + 1]

        if time >= last_beat_time + beat_interval:
            for note in current_notes:
                time = time / fs

        time = time / fs
        if change == 1:
            # note on
            if current_notes[note] == 0:
                # from note off
                note_on_time[note] = time
                current_notes[note] = 1
            else:
                #re-articulate (later in code)
                '''pm_note = pretty_midi.Note(
                        velocity=100, #don't care fer now
                        pitch=note,
                        start=note_on_time[note],
                        end=time)
                instrument.notes.append(pm_note)
                note_on_time[note] = time
                current_notes[note] = 1'''
        elif change == 0:
            #note off
            pm_note = pretty_midi.Note(
                    velocity=100, #don't care fer now
                    pitch=note,
                    start=note_on_time[note],
                    end=time)
            current_notes[note] = 0
            instrument.notes.append(pm_note)
    pm.instruments.append(instrument)
    return pm

def slice_to_categories(piano_roll):
    notes_list = np.zeros(128)
    notes = np.nonzero(piano_roll)[0]
    notes = np.unique(notes)

    for note in notes:
        notes_list[note] = 1

    return notes_list

cnn

In [13]:
import numpy as np
import keras
from keras.layers import Dense, Flatten, Reshape, Input
from keras.layers import Conv2D, MaxPooling2D, GlobalAveragePooling2D, Dropout
from keras.models import Sequential
from keras.callbacks import ModelCheckpoint, EarlyStopping, TensorBoard, CSVLogger
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from  skimage.measure import block_reduce

from PIL import Image
#import utils
import pretty_midi
import os, os.path
import sys

def create_model():
    img_x, img_y = 145, 49
    input_shape = (img_x, img_y, 3)
    num_classes = 128

    model = Sequential()
    model.add(Conv2D(32, kernel_size=(5,5), strides=(1,1),
        activation='tanh',
        input_shape=input_shape))
    model.add(Dropout(0.5))
    model.add(MaxPooling2D(pool_size=(2,2), strides=(2,2)))
    model.add(Conv2D(64, (3,3), activation='tanh'))
    model.add(Dropout(0.5))
    model.add(MaxPooling2D(pool_size=(2,2)))
    #model.add(Conv2D(64, (5,5), activation='relu'))
    # Final output layer
    #model.add(Conv2D(128, (5,5), activation='sigmoid'))
    #model.add(Flatten())
    model.add(Flatten())
    #model.add(Dense(64, activation='sigmoid'))
    model.add(Dense(num_classes, activation='sigmoid'))
    return model
    

class AccuracyHistory(keras.callbacks.Callback):
    def on_train_begin(self, logs={}):
        self.acc = []

    def on_epoch_end(self, batch, logs={}):
        self.acc.append(logs.get('acc'))


def train(x_train, y_train, x_test, y_test):
    batch_size = 64
    num_classes = 128
    epochs = 100

    # input image dimensions
    img_x, img_y = 624, 1222

    path = '/mnt/d/Workspace/EE379K/DSL_Final/models'
    #path = '/mnt/c/Users/chau/Documents/models'
    model_ckpt = os.path.join(path,'ckpt.h5')
    
    #x_train = x_train.reshape(x_train.shape[0], img_x, img_y, 3)
    #x_test = x_test.reshape(x_train.shape[0], img_x, img_y, 3)
    
    # Convert data to right type
    #x_train = x_train.astype('float32')
    #x_test = x_test.astype('float32')
    #x_train /= 255
    #x_test /= 255
    #print(x_train)
    #print('x_train shape:', x_train.shape)
    print(x_train.shape[0], 'train samples')
    print(x_test.shape[0], 'test samples')

    model = create_model()
    model.compile(loss=keras.losses.binary_crossentropy,
            optimizer=keras.optimizers.Adam(lr=.0001, decay=1e-6),
            metrics=['accuracy'])

    history = AccuracyHistory()

    checkpoint = ModelCheckpoint(model_ckpt,
            monitor='val_loss',
            verbose=1,
            save_best_only=True,
            mode='min')
    early_stop = EarlyStopping(patience=5, 
            monitor='val_loss',
            verbose=1, mode='min')
    callbacks = [history, checkpoint,early_stop]
    
    model.fit(x_train, y_train,
            batch_size=batch_size,
            epochs=epochs,
            verbose=1,
            validation_data=(x_test, y_test),
            callbacks=callbacks)

    score = model.evaluate(x_test, y_test, verbose=0)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])
    plt.plot(range(1,101), history.acc)
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.savefig('loss.png')
    plt.show()


def run_cnn(jpg_path, midi_path):
    # x is spectrogram, y is MIDI
    #jpg_path = '/mnt/d/Workspace/EE379K/data/spectrograms'
    #midi_path = '/mnt/d/Workspace/EE379K/data/split_midi'
    #jpg_path = '/mnt/c/Users/chau/Documents/spectrograms'
    #midi_path = '/mnt/c/Users/chau/Documents/split_midi'
    x_train, y_train = [], []
    img = []
    i = 0
    for filename in os.listdir(jpg_path):
        print(filename)
        # filename = "daylight_128.jpg"
        m_fn = filename.replace(".jpg", ".mid")
        if os.path.isfile(os.path.join(midi_path, m_fn)):
            pm = pretty_midi.PrettyMIDI(os.path.join(midi_path, m_fn))
            oh = utils.pretty_midi_to_one_hot(pm)
            if type(oh) is not int:
                oh = utils.slice_to_categories(oh)
                #oh = oh.reshape(1, 128)
                y_train.append(oh)
        
                im = Image.open(os.path.join(jpg_path, filename))
                im = im.crop((14, 13, 594, 301))
                resize = im.resize((49, 145), Image.NEAREST)
                resize.load()
                #result = Image.fromarray((visual * 255).astype(numpy.uint8))
                #resize.save("images/" + str(i) + ".jpg")
                arr = np.asarray(resize, dtype="float32")
                #print(arr)
                #arr = block_reduce(arr, block_size=(2,2,1), func=np.mean)
                x_train.append(arr)
                #if len(x_train) > 0:
                #    break
                i += 1

    x_train = np.array(x_train)
    #x_train = x_train.reshape(len(x_train), 1)
    y_train = np.array(y_train)
    #print(y_train)
    #print(x_train.shape)
    #print(y_train.shape)
    #print(len(x_train))
    #print(np.shape(x_train))
    #im_array = np.array([np.array
    #x_train = np.array(x_train)
    x_test = np.copy(x_train)
    y_test = np.copy(y_train)
    #x_train, x_test, y_train, y_test = train_test_split(
    #        x_train, y_train, test_size=0.2, random_state=1)
    #print(x_train.shape)
    #print(y_train.shape)
    x_train /= 255.0
    x_test /= 255.0
    train(x_train, y_train, x_test, y_test)


run_cnn(sys.argv[1], sys.argv[2])

FileNotFoundError: [Errno 2] No such file or directory: '-f'