In [None]:
import librosa
import librosa.display
import mir_eval
import pretty_midi
pretty_midi.pretty_midi.MAX_TICK = 1e10

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import scipy.io.wavfile

In [None]:
%matplotlib notebook

In [None]:
import os
import sys
import time
import math

In [None]:
import pickle

In [None]:
import tensorflow as tf

In [None]:
from midiutil import MIDIFile
from midi2audio import FluidSynth

In [None]:
config = tf.compat.v1.ConfigProto()
config.gpu_options.per_process_gpu_memory_fraction = 0.7
config.gpu_options.allow_growth = True
session = tf.compat.v1.Session(config=config)

In [None]:
tf.config.list_physical_devices()

In [None]:
tf.test.is_built_with_cuda()

In [None]:
from tqdm import tqdm

# Analiza

In [None]:
note_times_train = np.array([0.0 for i in range(88)])
note_times_test = np.array([0.0 for i in range(88)])
note_appearences_train = np.array([0 for i in range(88)])
note_appearences_test = np.array([0 for i in range(88)])

In [None]:
start = time.time()
inputpath = "C:/Users/dmoho/Documents/FRI/DIPLOMA/MAPS"

for dirpath, dirnames, filenames in os.walk(inputpath):
    if len(filenames) > 2 and "MUS" in dirpath: # .mid, .txt, .wav
        
        # get working direcotry
        pwd_read = "MAPS"+dirpath[len(inputpath):]
        print(pwd_read)
        
        # get unique file names
        names = {file[:-4] for file in filenames}
        names.discard("desktop")
        
        if "ENSTDkAm" in dirpath or "ENSTDkCl" in dirpath:
            for name in names:
                if "_lower" in name or "_higher" in name:
                    continue
                with open(os.path.join(pwd_read, name)+".txt") as f:
                    next(f)
                    try:
                        for line in f:
                            if line == '\n':
                                continue
                            args = line.rstrip().split("\t")
                            note = int(args[2])-21
                            note_appearences_test[note] += 1
                            note_times_test[note] += float(args[1])-float(args[0])
                    except:
                        print(os.path.join(pwd_read, name)+".txt")
                        continue
        else:
            for name in names:
                if "_lower" in name or "_higher" in name:
                    continue
                with open(os.path.join(pwd_read, name)+".txt") as f:
                    next(f)
                    try:
                        for line in f:
                            if line == '\n':
                                continue
                            args = line.rstrip().split("\t")
                            note = int(args[2])-21
                            note_appearences_train[note] += 1
                            note_times_train[note] += float(args[1])-float(args[0])
                    except:
                        print(os.path.join(pwd_read, name)+".txt")
                        continue

end = time.time()
print(end-start)

In [None]:
plt.bar([i+21 for i in range(88)], note_appearences_train)
plt.xlabel("MIDI številka")
plt.ylabel("Število pojavitev")

In [None]:
data = sum([(i+21)*note_appearences_train[i] for i in range(88)])
n = sum(note_appearences_train)
avg = data/n

In [None]:
librosa.note_to_hz(librosa.midi_to_note(avg))

In [None]:
sd = math.sqrt(sum([note_appearences_train[i]*(i+21-avg)**2 for i in range(88)])/n)

In [None]:
avg

In [None]:
sd

In [None]:
plt.bar([i for i in range(88)], note_appearences_test)

In [None]:
notes_shifted_train = note_appearences_train + np.roll(note_appearences_train, 30) + np.roll(note_appearences_train, -30)

In [None]:
plt.bar([i+21 for i in range(88)], notes_shifted_train)
plt.xlabel("MIDI številka")
plt.ylabel("Število pojavitev")

# Generiranje transponiranih skladb

In [None]:
inputpath = "C:/Users/dmoho/Documents/FRI/DIPLOMA/MAPS"
fs = FluidSynth()

In [None]:
start = time.time()
for dirpath, dirnames, filenames in os.walk(inputpath):
    if len(filenames) > 2 and "MUS" in dirpath: # .mid, .txt, .wav
        
        # get working direcotry
        pwd_read = "MAPS"+dirpath[len(inputpath):]
        pwd_write = "MAPS"+dirpath[len(inputpath):]
        
        print(pwd_read)
        
        # get unique file names
        names = {file[:-4] for file in filenames}
        names.discard("desktop")
        names = {n for n in names if "_lower" not in n or "_higher" not in n}
        lst = list(names)
        
        # process those files and save them
        for name in names:
            print(name)
            
            midi_lower = pretty_midi.PrettyMIDI(os.path.join(pwd_read, name)+".mid")
            for instrument in midi_lower.instruments:
                for note in instrument.notes:
                    note.pitch = ((note.pitch-21)+58)%88+21
            name_lower = os.path.join(pwd_read, name)+"_lower"
            midi_lower.write(name_lower+".mid")
            fs.midi_to_audio(name_lower+".mid", name_lower+".wav")

            midi_higher = pretty_midi.PrettyMIDI(os.path.join(pwd_read, name)+".mid")
            for instrument in midi_higher.instruments:
                for note in instrument.notes:
                    note.pitch = ((note.pitch-21)+30)%88+21
            name_higher = os.path.join(pwd_read, name)+"_higher"
            midi_higher.write(name_higher+".mid")
            fs.midi_to_audio(name_higher+".mid", name_higher+".wav")
            

end = time.time()
print(end-start)

In [None]:
rate, data = scipy.io.wavfile.read("wav_outputs/output_MUSlh.wav")

In [None]:
d = data[0:rate*10,:]

In [None]:
scipy.io.wavfile.write("wav_outputs/transposed_10.wav", rate, d)

# Ime datoteke

In [None]:
file_name = "MAPS/AkPnBcht/ISOL/TR1/MAPS_ISOL_TR1_F_S0_M53_AkPnBcht"

# Branje .wav datotek

In [None]:
rate, data = scipy.io.wavfile.read(file_name+".wav")

In [None]:
rate

In [None]:
length = data.shape[0] / rate

In [None]:
length

In [None]:
data.shape[0]

In [None]:
t = np.linspace(0., length, data.shape[0])
plt.plot(t, data[:, 0], label="Levi kanal")
plt.plot(t, data[:, 1], label="Desni kanal")
plt.legend()
plt.xlabel("Čas [s]")
plt.ylabel("Amplituda")

In [None]:
s, e = 88200, 88641 #90405
t = np.linspace(s/rate, e/rate, e-s)
plt.plot(t, data[s:e, 0], label="Levi kanal")
plt.plot(t, data[s:e, 1], label="Desni kanal")
plt.legend()
plt.xlabel("Čas [s]")
plt.ylabel("Amplituda")

# To Mono

In [None]:
data_f = data.astype(float)
data_mono = librosa.to_mono(data_f.T)
#data_mono = librosa.to_mono(data_resampled)

In [None]:
data_mono.shape

In [None]:
data_mono.shape[0] / rate

# Fourier transform

In [None]:
stft_data_1 = np.abs(librosa.stft(data_mono,
                        n_fft = 2048,
                        hop_length = 1024,
                        pad_mode="wrap"))

In [None]:
stft_data_1.shape

In [None]:
tmp = stft_data_1[:256, :]
S_db = librosa.amplitude_to_db(tmp, ref=np.max)
librosa.display.specshow(S_db, x_axis='time', y_axis='linear', sr=rate*(256/1025), hop_length=1024/4)
plt.colorbar(format="%+2.f dB")
plt.xlabel("Čas [t]")
plt.ylabel("Frekvenca [Hz]")

# Constant Q Transform

In [None]:
HOP_LENGTH = 1024 #1024 # 512
BINS_PER_OCTAVE = 12 * 2 # 60
N_BINS = BINS_PER_OCTAVE * 8 # 60 * 8

FRAMES_PER_BIN = HOP_LENGTH / rate

In [None]:
cq_data_1 = np.abs(librosa.cqt(data_mono,
                        sr = rate,
                        hop_length = HOP_LENGTH, 
                        fmin = librosa.note_to_hz(librosa.midi_to_note(21)),
                        bins_per_octave = BINS_PER_OCTAVE,
                        n_bins = N_BINS,
                        pad_mode = "wrap"))
# https://dsp.stackexchange.com/questions/71874/understanding-cqt-constant-q-transformation-parameters-for-piano-amr-automati

In [None]:
cq_data_1.shape

In [None]:
rate

In [None]:
S_db = librosa.amplitude_to_db(cq_data_1, ref=np.max)
librosa.display.specshow(S_db, x_axis='time', y_axis='cqt_hz', sr=rate, fmin=librosa.note_to_hz(librosa.midi_to_note(21)),
                         hop_length=HOP_LENGTH, bins_per_octave=BINS_PER_OCTAVE)
plt.colorbar(format="%+2.0f dB")
plt.xlabel("Čas [t]")
plt.ylabel("Frekvenca [Hz]")

In [None]:
# highest piano note in hz
librosa.note_to_hz(librosa.midi_to_note(54))

In [None]:
cq_data_1.max()

In [None]:
cq_data_1 = np.float64(cq_data_1)

In [None]:
cq_data_1.dtype

In [None]:
def minMaxNorm(d):
    M = d.max()
    m = d.min()
    return (d-m)/(M-m)

# Normalize - log

In [None]:
cq_log = np.log(cq_data_1+1e-10)

In [None]:
plt.matshow(cq_log, aspect='auto', origin='lower')
plt.xlabel("Time")
plt.ylabel("Pitch")

In [None]:
S_db = librosa.amplitude_to_db(cq_log, ref=np.max)
librosa.display.specshow(S_db, x_axis='time', y_axis='cqt_hz', sr=rate/2, bins_per_octave=BINS_PER_OCTAVE)
plt.colorbar(format="%+2.f dB")
plt.xlabel("Čas [t]")
plt.ylabel("Frekvenca [Hz]")

In [None]:
cq_log_Mm = minMaxNorm(cq_log)

In [None]:
plt.matshow(cq_log_Mm, aspect='auto', origin='lower')

In [None]:
cq_log_Mm.dtype

In [None]:
cq_log_Mm.shape

# Normalize - z

In [None]:
avg = cq_data_1.mean(axis=1)
sd = cq_data_1.std(axis=1)

In [None]:
cq_z = ((cq_data_1.T-avg)/sd).T

In [None]:
plt.matshow(cq_z, aspect='auto', origin='lower')

In [None]:
cq_z_Mm = minMaxNorm(cq_z)

In [None]:
plt.matshow(cq_z_Mm, aspect='auto', origin='lower')

# txt to "spectrogram"

In [None]:
cq_data_1.shape

In [None]:
of_data_1 = np.zeros((96, cq_data_1.shape[1])).astype(np.bool)

In [None]:
FRAMES_PER_BIN

In [None]:
with open("MAPS/AkPnBcht/ISOL/TR1/MAPS_ISOL_TR1_F_S0_M53_AkPnBcht.txt") as f:
    next(f) # skip header
    
    for line in f:
        if line == '\n':
            continue
        args = line.rstrip().split("\t")
        # print(args)
        onset_bin = int(float(args[0])//FRAMES_PER_BIN)
        ofset_bin = int(float(args[1])//FRAMES_PER_BIN)
        note = int(args[2])-21
        of_data_1[note, onset_bin:ofset_bin+1] = True
        # print(note, onset_bin, ofset_bin+1)

In [None]:
of_data_1.shape

In [None]:
plt.imshow(of_data_1, aspect='auto', origin='lower')

In [None]:
plt.matshow(of_data_1, aspect='auto', origin='lower')

# MIDI to "spectrogram"

In [None]:
of_data_1_midi = np.zeros((88, cq_data_1.shape[1])).astype(np.bool)

In [None]:
midi_data = pretty_midi.PrettyMIDI(file_name+".mid")
for instrument in midi_data.instruments: # imamo samo 1 instrument
    for note in instrument.notes:
        print(note)
        onset_bin = int(float(note.start)//FRAMES_PER_BIN)
        ofset_bin = int(float(note.end)//FRAMES_PER_BIN)
        note = int(note.pitch)-21
        of_data_1_midi[note, onset_bin:ofset_bin+1] = True

In [None]:
of_data_1_midi.shape

In [None]:
plt.matshow(of_data_1_midi, aspect=0.08, origin='lower', extent=[0, length, 21, 108])
plt.xlabel("Čas [t]")
plt.ylabel("MIDI številka")

In [None]:
of_data_1_midi = np.zeros((96, cq_data_1.shape[1])).astype(np.bool)

In [None]:
midi_data = pretty_midi.PrettyMIDI("MAPS/AkPnBcht/MUS/MAPS_MUS-alb_se3_AkPnBcht.mid")
for instrument in midi_data.instruments: # imamo samo 1 instrument
    for note in instrument.notes:
        onset_bin = int(float(note.start)//FRAMES_PER_BIN)
        ofset_bin = int(float(note.end)//FRAMES_PER_BIN)
        note = int(note.pitch)-21
        of_data_1_midi[note, onset_bin:ofset_bin+1] = True

In [None]:
plt.matshow(of_data_1_midi, aspect='auto', origin='lower')

# Save CQT and "spectrogram"

In [None]:
np.save("MAPS_processed/AkPnBcht/ISOL/CH/MAPS_ISOL_CH0.1_F_AkPnBcht_wav", cq_data_1) # save CQT

In [None]:
np.save("MAPS_processed/AkPnBcht/ISOL/CH/MAPS_ISOL_CH0.1_F_AkPnBcht_mid", of_data_1) # save spectrogram

In [None]:
tmp = scipy.sparse.csc_matrix(of_data_1)
scipy.sparse.save_npz("MAPS_processed/AkPnBcht/ISOL/CH/MAPS_ISOL_CH0.1_F_AkPnBcht_mid", tmp) # save spectrogram as sparse

In [None]:
# creates same folder structure as MAPS
"""
inputpath = 'C:/Users/dmoho/Documents/FRI/DIPLOMA/MAPS'
outputpath = 'C:/Users/dmoho/Documents/FRI/DIPLOMA/MAPS_z'

for dirpath, dirnames, filenames in os.walk(inputpath):
    structure = outputpath+dirpath[len(inputpath):]
    #print(structure)
    if not os.path.isdir(structure):
        os.mkdir(structure)
    else:
        print("Folder does already exits!")
"""

# Process all files

In [None]:
inputpath = "C:/Users/dmoho/Documents/FRI/DIPLOMA/MAPS"

In [None]:
def minMaxNorm(d):
    M = d.max()
    m = d.min()
    return (d-m)/(M-m)

In [None]:
HOP_LENGTH = 1024 # 128*n
BINS_PER_OCTAVE = 24 # 12*m
N_BINS = BINS_PER_OCTAVE*8
FMIN = librosa.note_to_hz(librosa.midi_to_note(21))

In [None]:
start = time.time()

for dirpath, dirnames, filenames in os.walk(inputpath):
    if len(filenames) > 2 and "MUS" in dirpath: # .mid, .txt, .wav
        
        # get working direcotry
        pwd_read = "MAPS"+dirpath[len(inputpath):]
        pwd_write = "MAPS_processed"+dirpath[len(inputpath):]
        
        # check if folder has been processed
        #if len(os.listdir(pwd_write)) == (len(filenames)//3)*2:
        #    continue
        print(pwd_read)
        
        # get unique file names
        names = {file[:-4] for file in filenames}
        names.discard("desktop")
        
        # names = {n for n in names if "_lower" not in n or "_higher" not in n} # _lower in _higher nimajo .txt
        
        lst = list(names)
        
        # process those files and save them
        for name in names:
            #if os.path.exists(os.path.join(pwd_write, name)+"_wav.npy"): # file already exists
            #    continue

            print(os.path.join(pwd_read, name)+" --> "+os.path.join(pwd_write, name))
            
            # process .wav file
            rate, data = scipy.io.wavfile.read(os.path.join(pwd_read, name)+".wav")
            data = np.float32(data)
            
            #data = librosa.resample(data.T, rate, rate/2, res_type = "kaiser_fast")
            #rate = rate/2

            data_mono = librosa.to_mono(data.T)
            cq_data = np.abs(librosa.cqt(data_mono, sr = rate, hop_length = HOP_LENGTH,
                                           fmin = FMIN, bins_per_octave = BINS_PER_OCTAVE,
                                           n_bins = N_BINS, pad_mode = "wrap"))
            
            """avg = cq_data.mean(axis=1)
            sd = cq_data.std(axis=1)
            cq_data = ((cq_data.T-avg)/sd).T"""
            
            #cq_data = np.log(cq_data+1e-10)
            
            #cq_data = minMaxNorm(cq_data)
            cq_data = np.float32(cq_data)
            
            frames_per_bin = HOP_LENGTH/rate
            # process .txt file
            """of_data = np.zeros((96, cq_data.shape[1])).astype(np.bool)
            with open(os.path.join(pwd_read, name)+".txt") as f:
                next(f)
                try:
                    for line in f:
                        if line == '\n': # ignore last line
                            continue
                        args = line.rstrip().split("\t")
                        # print(args)
                        onset_bin = int(float(args[0])//frames_per_bin)
                        ofset_bin = int(float(args[1])//frames_per_bin)
                        note = int(args[2])-21
                        of_data[note, onset_bin:ofset_bin+1] = True
                except:
                    print(os.path.join(pwd_read, name)+".txt")
                    continue"""
            
            # process .mid file
            of_data = np.zeros((96, cq_data.shape[1])).astype(np.bool)
            midi_data = pretty_midi.PrettyMIDI(os.path.join(pwd_read, name)+".mid")
            for instrument in midi_data.instruments:
                for note in instrument.notes:
                    onset_bin = int(float(note.start)//frames_per_bin)
                    ofset_bin = int(float(note.end)//frames_per_bin)
                    note = int(note.pitch)-21
                    of_data[note, onset_bin:ofset_bin+1] = True

            # save CQT and spectrogram
            np.save(os.path.join(pwd_write, name)+"_wav", cq_data) # save CQT
            tmp = scipy.sparse.csc_matrix(of_data)
            scipy.sparse.save_npz(os.path.join(pwd_write, name)+"_mid", tmp) # save as sparse

end = time.time()
print(end-start)

downsample in HOP_SIZE(256) data ===== samo HOP_SIZE(512) {hitreje}

HOP_LENGTH = 256, BINS_PER_OCTAVE = 24
1.23GB -> 476MB
144[s]
BINS_PER_OCTAVE nima vpliva na cas, poveca pa velikost linearno

In [None]:
n_files = 0
sum_start = 0
avg_start = 0
min_start = 1
max_start = 0

for dirpath, dirnames, filenames in os.walk(inputpath):
    if len(filenames) > 2:
        names = {file[:-4] for file in filenames}
        names.discard("desktop")
        lst = list(names)
        
        pwd_read = "MAPS"+dirpath[len(inputpath):]
        for file in lst:
            with open(os.path.join(pwd_read, file)+".txt") as f:
                next(f)
                for line in f:
                    try:
                        n_files += 1
                        start = float(line.rstrip().split("\t")[0])
                        sum_start += start
                        if start > max_start:
                            max_start = start
                        if start < min_start:
                            min_start = start
                    except:
                        print(os.path.join(pwd_read, file)+".txt")
                    break
avg_start = sum_start/n_files

In [None]:
print(n_files, min_start, avg_start, max_start)

# Branje podatkov nazaj

In [None]:
file_name = "MAPS_log/AkPnBcht/ISOL/TR1/MAPS_ISOL_TR1_F_S0_M53_AkPnBcht"
data_wav = np.load(file_name+"_wav.npy")
data_mid = scipy.sparse.load_npz(file_name+"_mid.npz").toarray()

In [None]:
S_db = librosa.amplitude_to_db(data_wav, ref=np.max)
librosa.display.specshow(S_db, x_axis='time', y_axis='cqt_hz', sr=rate, fmin=librosa.note_to_hz(librosa.midi_to_note(21)),
                         hop_length=HOP_LENGTH, bins_per_octave=BINS_PER_OCTAVE)
plt.colorbar(format="%+2.f dB")
plt.xlabel("Čas [t]")
plt.ylabel("Frekvenca [Hz]")

In [None]:
plt.matshow(data_mid, aspect='auto', origin='lower')

In [None]:
data_wav.dtype

In [None]:
data_wav.shape

In [None]:
data_wav.sum(axis=1).shape

In [None]:
np.mean(data_wav, axis=1).shape

In [None]:
# convert mean vector into matrix
np.array([np.mean(data_wav, axis=1),]*data_wav.shape[1]).T.shape

In [None]:
np.zeros((192,)).shape

# MAPS_z

In [None]:
inputpath = "C:/Users/dmoho/Documents/FRI/DIPLOMA/MAPS_processed"

In [None]:
HOP_LENGTH = 1024 # 128*n
BINS_PER_OCTAVE = 24 # 12*m
N_BINS = BINS_PER_OCTAVE*8
FMIN = librosa.note_to_hz(librosa.midi_to_note(21))

In [None]:
start = time.time()

note_average = np.zeros((192,))
note_N = 0

# get mean note values
for dirpath, dirnames, filenames in os.walk(inputpath):
    if len(filenames) > 2 and "MUS" in dirpath: # .mid, .txt, .wav
        
        # get working direcotry
        pwd_read = "MAPS_processed"+dirpath[len(inputpath):]
        
        print(pwd_read)
        
        if "ENSTDkAm" in dirpath or "ENSTDkCl" in dirpath:
            continue
        
        # get unique file names
        names = {file[:-8] for file in filenames}
        names.discard("desktop")
        
        lst = list(names)
        
        # process those files and save them
        for name in names:
            #print(os.path.join(pwd_read, name))
            
            data_wav = np.load(os.path.join(pwd_read, name)+"_wav.npy")
            
            note_average += data_wav.sum(axis=1)
            note_N += data_wav.shape[1]
            
note_average /= note_N
            
end = time.time()
print(end-start)

In [None]:
note_average

In [None]:
start = time.time()

note_std = np.zeros((192,))

# get standard deveiation of notes
for dirpath, dirnames, filenames in os.walk(inputpath):
    if len(filenames) > 2 and "MUS" in dirpath: # .mid, .txt, .wav
        
        # get working direcotry
        pwd_read = "MAPS_processed"+dirpath[len(inputpath):]
        
        print(pwd_read)
        
        if "ENSTDkAm" in dirpath or "ENSTDkCl" in dirpath:
            continue
        
        # get unique file names
        names = {file[:-8] for file in filenames}
        names.discard("desktop")
        
        lst = list(names)
        
        # process those files and save them
        for name in names:
            #print(os.path.join(pwd_read, name))
            
            data_wav = np.load(os.path.join(pwd_read, name)+"_wav.npy")
            
            tmp = np.array([note_average,]*data_wav.shape[1]).T
            note_std += np.power(data_wav-tmp, 2).sum(axis=1)

note_std /= note_N
note_std = np.sqrt(note_std)

end = time.time()
print(end-start)

In [None]:
note_std

In [None]:
start = time.time()

# get standard deveiation of notes
for dirpath, dirnames, filenames in os.walk(inputpath):
    if len(filenames) > 2 and "MUS" in dirpath: # .mid, .txt, .wav
        
        # get working direcotry
        pwd_read = "MAPS_processed"+dirpath[len(inputpath):]
        pwd_write = "MAPS_z"+dirpath[len(inputpath):]
        
        print(pwd_read)
        
        # get unique file names
        names = {file[:-8] for file in filenames}
        names.discard("desktop")
        
        lst = list(names)
        
        # process those files and save them
        for name in names:
            #print(os.path.join(pwd_read, name)+" --> "+os.path.join(pwd_write, name))
            
            data_wav = np.load(os.path.join(pwd_read, name)+"_wav.npy")
            avg = np.array([note_average,]*data_wav.shape[1]).T
            std = np.array([note_std,]*data_wav.shape[1]).T
            out = (data_wav-avg)/std
            np.save(os.path.join(pwd_write, name)+"_wav", np.float32(out))
            
            data_mid = scipy.sparse.load_npz(os.path.join(pwd_read, name)+"_mid.npz")
            scipy.sparse.save_npz(os.path.join(pwd_write, name)+"_mid", data_mid)

end = time.time()
print(end-start)

# Train / Test oblika

In [None]:
def data_process(data_x, data_y, window):
    shape_x = data_x.shape
    shape_y = data_y.shape
    size = shape_x[1]-window+1
    pad = int((window-1)/2)
    output_x = np.zeros((size, 384, window))
    output_y = np.zeros((size, 88))
    
    for i in range(size):
        output_x[i, :, :] = data_x[:, i:i+window]
        output_y[i, :] = data_y[:88, i+pad]*1
    
    output_x = np.float32(output_x)
    output_y = np.int8(output_y)
    return output_x, output_y

In [None]:
train_x, train_y = data_process(data_wav, data_mid, 7)

In [None]:
train_x.shape

In [None]:
train_y.shape

In [None]:
train_x.dtype

In [None]:
train_y.dtype

In [None]:
inputpath = "C:/Users/dmoho/Documents/FRI/DIPLOMA/MAPS_processed"

In [None]:
start = time.time()

for dirpath, dirnames, filenames in os.walk(inputpath):
    if len(filenames) > 2 and "MUS" in dirpath: # .mid, .txt, .wav
        
        # get working direcotry
        pwd_read = "MAPS_processed"+dirpath[len(inputpath):]
        pwd_write = "MAPS_traintest"+dirpath[len(inputpath):]
        
        # check if folder has been processed
        # if len(os.listdir(pwd_write)) == (len(filenames)//3)*2:
        #   continue
        print(pwd_read)
        
        # get unique file names
        names = {file[:-4] for file in filenames} # vcasih 4, vcasih 8...???
        names.discard("desktop")
        lst = list(names)
        
        # process those files and save them
        for name in names:
            data_wav = np.load(os.path.join(pwd_read, name)+"_wav.npy")
            data_mid = scipy.sparse.load_npz(os.path.join(pwd_read, name)+"_mid.npz").toarray()
            
            train_x, train_y = data_process(data_wav, data_mid, 7)

            np.save(os.path.join(pwd_write, name)+"_wav", train_x)
            tmp = scipy.sparse.csr_matrix(train_y)
            scipy.sparse.save_npz(os.path.join(pwd_write, name)+"_mid", tmp)

end = time.time()
print(end-start)

# Priprava na fit_generator

In [None]:
data_train = dict()
data_test = dict()
window = 7
inputpath = "C:/Users/dmoho/Documents/FRI/DIPLOMA/MAPS_processed"

In [None]:
start = time.time()
for dirpath, dirnames, filenames in os.walk(inputpath):
    if len(filenames) >= 2 and "MUS" in dirpath: # .npy, .npz
        pwd_read = "MAPS_processed"+dirpath[len(inputpath):]
        print(pwd_read)

        names = {file[:-8] for file in filenames}
        names.discard("desktop")
        lst = list(names)
        
        if "ENSTDkAm" in dirpath or "ENSTDkCl" in dirpath: # test data
            for name in names:
                if "_higher" in name or "_lower" in name: # skip those for testing
                    continue
                n = os.path.join(pwd_read, name)
                data_wav = np.load(n+"_wav.npy")
                n = n.replace("\\", "/")
                data_test[n] = data_wav.shape[1]-window+1
        else: # train data
            for name in names:
                n = os.path.join(pwd_read, name)
                data_wav = np.load(n+"_wav.npy")
                n = n.replace("\\", "/")
                data_train[n] = data_wav.shape[1]-window+1
        
end = time.time()
print(end-start)

In [None]:
len(data_train.keys())

In [None]:
sum([x for x in data_train.values()])

In [None]:
sys.getsizeof(data_train)

In [None]:
# save dict
#pickle.dump(data_train, open("tt_files/train_p"+str(window)+".pickle", "wb"))
#pickle.dump(data_test, open("tt_files/test_p"+str(window)+".pickle", "wb"))

In [None]:
# read dict
data_train = pickle.load(open("tt_files/MUSlh_train_log.pickle", "rb"))
data_test = pickle.load(open("tt_files/MUSlh_test_log.pickle", "rb"))

In [None]:
def prepareForGenerator(data):
    names = np.array([k for k in data.keys()])
    values = np.cumsum([v for v in data.values()])
    return names, values

In [None]:
names_train, values_train = prepareForGenerator(data_train)
names_test, values_test = prepareForGenerator(data_test)

In [None]:
names_train

In [None]:
values_train

In [None]:
# find the file based on index...
names_train[np.argmin(values_train <= 38092)]

In [None]:
index = 0
idx = np.argmin(values_train <= index*128)
if idx == 0:
    sample = index*128
else:
    sample = index*128-values_train[idx-1]
    if sample+128+7 > values_train[idx]-values_train[idx-1]:
        sample = values_train[idx]-values_train[idx-1]-(128+7)
sample

In [None]:
idx

In [None]:
values_train[idx]-values_train[idx-1]

In [None]:
filename = names_train[np.argmin(values_train <= index*128)]
data_wav = np.load(filename+"_wav.npy")
data_mid = scipy.sparse.load_npz(filename+"_mid.npz").toarray()

In [None]:
pad = int((7-1)/2)
output_x = np.zeros((128, 384, 7, 1))
output_y = np.zeros((128, 88))

In [None]:
for i in range(128):
    output_x[i, :, :, :] = np.reshape(data_wav[:, sample+i:sample+i+7], (384, 7, 1))
    output_y[i, :] = data_mid[:88, sample+i+pad]*1

# Funkcije za Generator

In [None]:
def getRandomSample(data_x, data_y, window, size = 32):
    pad = int((window-1)/2)
    
    output_x = np.zeros((size, 384, window, 1))
    output_y = np.zeros((size, 88))
    
    sample = np.random.randint(pad, data_x.shape[1]-pad, size)
    
    i = 0
    for j in sample:
        output_x[i, :, :, :] = np.reshape(data_x[:, j:j+window], (384, window, 1))
        output_y[i, :] = data_y[:88, j+pad]*1
        i += 1
    
    output_x = np.float32(output_x)
    output_y = np.int8(output_y)
    return output_x, output_y

In [None]:
file_name = "MAPS_processed/ENSTDkAm/MUS/MAPS_MUS-schub_d760_3_ENSTDkAm"
data_wav = np.load(file_name+"_wav.npy")
data_mid = scipy.sparse.load_npz(file_name+"_mid.npz").toarray()

In [None]:
X, y = getRandomSample(data_wav, data_mid, 7, 32)

In [None]:
y

# Generator za keras

In [None]:
class CustomGenerator(tf.keras.utils.Sequence):
    def __init__(self, names, values, batch_size = 32, window = 7):
        self.names = names
        self.values = values
        self.batch_size = batch_size
        self.window = window
        self.samples = self.values[-1]
    
    # number of batches per epoch
    def __len__(self):
        return self.samples // self.batch_size
    
    def getRandomSample(self, data_x, data_y):
        pad = int((self.window-1)/2)

        output_x = np.zeros((self.batch_size, 384, self.window, 1))
        output_y = np.zeros((self.batch_size, 88))

        sample = np.random.randint(0, data_x.shape[1]-self.window-1, self.batch_size)

        i = 0
        for j in sample:
            output_x[i, :, :, :] = np.reshape(data_x[:, j:j+self.window], (384, self.window, 1))
            output_y[i, :] = data_y[:88, j+pad]*1
            i += 1

        output_x = np.float32(output_x)
        output_y = np.int8(output_y)
        return output_x, output_y
    
    def __getitem__(self, index):
        filename = self.names[np.argmin(self.values <= index*self.batch_size)]
        
        # load data
        data_wav = np.load(filename+"_wav.npy")
        data_mid = scipy.sparse.load_npz(filename+"_mid.npz").toarray()
        
        X, y = self.getRandomSample(data_wav, data_mid)
        
        return X, y

# Binary Crossentropy loss

In [None]:
bce = tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)
def custom_binaryCrossentropy(y_true, y_pred):
    out = bce(y_true, y_pred).numpy()
    return out

In [None]:
def custom_bce(y_true, y_pred):
    tmp = y_pred-y_pred*y_true + tf.math.log(1+tf.math.exp((-1)*tf.math.abs(y_pred)))
    weights = y_true*0.99+(1-y_true)*0.01
    return tf.convert_to_tensor(tmp*weights)

In [None]:
y_true = np.array([[1, 0], [1, 1], [0, 1]])
y_pred = np.random.rand(3, 2)

In [None]:
custom_bce(y_true, y_pred)

# CNN

In [None]:
model = tf.keras.models.Sequential()
model.add(tf.keras.layers.Conv2D(32, (48, 3), activation="relu", input_shape=(384, 7, 1), data_format="channels_last", padding="valid"))
model.add(tf.keras.layers.MaxPooling2D((2, 1)))
model.add(tf.keras.layers.Conv2D(32, (24, 3), activation="relu", padding="valid"))
model.add(tf.keras.layers.MaxPooling2D((2, 1)))
model.add(tf.keras.layers.Flatten())
model.add(tf.keras.layers.Dense(88, activation="relu"))
model.add(tf.keras.layers.Dense(88, activation="sigmoid"))

In [None]:
model.summary()

In [None]:
model.compile(optimizer="adam", loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
              metrics=[tf.keras.metrics.Precision(name='precision'), tf.keras.metrics.Recall(name='recall')])
#metrics=["precision", "recall"]

In [None]:
#model.compile(optimizer="adam", loss=loss_function, metrics=["crossentropy"])

In [None]:
model_name = "cnnModels/conv32_48_3_mp2_1_conv64_24_3_mp2_1_flat_den132_den88"
checkpoint = tf.keras.callbacks.ModelCheckpoint(model_name, monitor="val_recall", verbose=1,
                                                save_best_only=True, mode="max", save_weights_only=True)

In [None]:
generator_train = CustomGenerator(names_train, values_train, 128, 7)
generator_test = CustomGenerator(names_test, values_test, 128, 7)

In [None]:
history = model.fit(generator_train, validation_data=generator_test, epochs=100, verbose=1, callbacks=[checkpoint])

In [None]:
model.save(model_name)

# CNN load model

In [None]:
model = tf.keras.models.load_model("hteModels/l_43_3")

In [None]:
model.summary()

In [None]:
def data_test(data_x, window):
    shape_x = data_x.shape
    size = shape_x[1]-window+1
    pad = int((window-1)/2)
    output_x = np.zeros((size, 192, window, 1))
    
    for i in range(size):
        output_x[i, :, :] = np.reshape(data_x[:, i:i+window], (192, window, 1))
    
    output_x = np.float32(output_x)
    return output_x

In [None]:
# MAPS_log/ENSTDkAm/ISOL/CH/MAPS_ISOL_CH0.3_F_ENSTDkAm
# MAPS_log/ENSTDkCL/MUS/MAPS_MUS-bk_xmas4_ENSTDkCl
# MAPS_log/AkPnBsdf/MUS/MAPS_MUS-alb_se4_AkPnBsdf
#file_name = "MAPS_log/ENSTDkAm/MUS/MAPS_MUS-schub_d760_3_ENSTDkAm"
file_name = "MAPS_log/AkPnBcht/ISOL/TR1/MAPS_ISOL_TR1_F_S0_M53_AkPnBcht"
data_wav = np.load(file_name+"_wav.npy")
data_mid = scipy.sparse.load_npz(file_name+"_mid.npz").toarray()[:88,:]

In [None]:
data_wav.shape

In [None]:
data_mid.shape

In [None]:
test_x = data_test(data_wav, 5)

In [None]:
test_x.shape

In [None]:
test_y = model.predict(test_x, verbose = 1).T

In [None]:
test_y.shape

In [None]:
plt.matshow(test_y, aspect='auto', origin='lower', extent=[0, 281*0.04307/2, 21, 108])
plt.xlabel("Čas [t]")
plt.ylabel("MIDI številka")

In [None]:
plt.savefig("img/output_example.png", bbox_inches="tight")

In [None]:
plt.matshow(test_y>0.5, aspect='auto', origin='lower', extent=[0, 281*0.04307/2, 21, 108])
plt.xlabel("Čas [t]")
plt.ylabel("MIDI številka")

In [None]:
plt.savefig("img/output_threshold_example.png", bbox_inches="tight")

In [None]:
plt.matshow(data_mid, aspect='auto', origin='lower', extent=[0, 281*0.04307/2, 21, 108])
plt.xlabel("Čas [t]")
plt.ylabel("MIDI številka")

In [None]:
plt.savefig("img/output_true_example.png", bbox_inches="tight")

In [None]:
print(test_y.shape)
print(test_y.dtype)

In [None]:
data_mid = np.asarray(data_mid, dtype=np.byte)
print(data_mid.shape)
print(data_mid.dtype)

In [None]:
THRESHOLD = 0.6

In [None]:
data_y = np.zeros(data_mid.shape)
data_y[:, 2:data_y.shape[1]-2] = test_y > THRESHOLD
data_y = np.asarray(data_y, dtype=np.byte)
print(data_y.shape)
print(data_y.dtype)

In [None]:
best_i = 0
lowest = np.Inf
arr = np.zeros(100)
r = np.arange(0, 1.0, 0.01)
for i in range(len(r)):
    data_y[:, 2:data_y.shape[1]-2] = test_y >= r[i]
    diff = np.sum(np.abs(data_mid-data_y))
    arr[i] = diff
    if diff < lowest:
        lowest = diff
        best_i = i
print(r[best_i], lowest)

In [None]:
def evaluateResult(y_true, y_pred):
    METRICS = [tf.keras.metrics.TruePositives(name='tp'),
               tf.keras.metrics.FalsePositives(name='fp'),
               tf.keras.metrics.TrueNegatives(name='tn'),
               tf.keras.metrics.FalseNegatives(name='fn'),
               tf.keras.metrics.BinaryAccuracy(name='accuracy'),
               tf.keras.metrics.Precision(name='precision'),
               tf.keras.metrics.Recall(name='recall')] #, tf.keras.metrics.AUC(name='auc')
        
    NAMES = ["tp", "fp", "tn", "fn", "acc", "prec", "rec"] #, "auc"
    
    for m, n in zip(METRICS, NAMES):
        m.update_state(y_true, y_pred)
        print("[%s]: %.3f" % (n, m.result().numpy()))

In [None]:
evaluateResult(data_mid, data_y)

# Output to events

In [None]:
HOP_LENGTH = 1024 # 512
BINS_PER_OCTAVE = 12 * 2 # 60
N_BINS = BINS_PER_OCTAVE * 8
WINDOW = 5

FRAMES_PER_BIN = HOP_LENGTH / 44100 #rate

In [None]:
def output_to_events(data_y):
    tracing = np.array([False for i in range(88)])
    onsets = np.array([0.0 for i in range(88)])
    
    intervals = list()
    pitch = list()

    for frame in range(data_y.shape[1]):
        for note in range(88):
            if data_y[note, frame]:
                if not tracing[note]:
                    tracing[note] = True
                    onsets[note] = (frame+(WINDOW-1)/2)*FRAMES_PER_BIN
            elif tracing[note]:
                tracing[note] = False
                intervals.append(np.array([onsets[note], (frame+(WINDOW-1)/2)*FRAMES_PER_BIN]))
                pitch.append(note+21)
                #print(onsets[note], frame*FRAMES_PER_BIN, note+21)
    return np.array(intervals), np.array(pitch)

In [None]:
def clean_events(intervals, pitch):
    intervals_true = list()
    pitch_true = list()
    
    for i in range(len(pitch)):
        interval = intervals[i]
        if interval[1]-interval[0] > 0.05:
            intervals_true.append(interval)
            pitch_true.append(pitch[i])
    return np.array(intervals_true), np.array(pitch_true)

In [None]:
onsets, pitch = output_to_events(data_y)
#onsets, pitch = clean_events(onsets, pitch)

In [None]:
onsets.shape

In [None]:
with open("MAPS/ENSTDkAm/MUS/MAPS_MUS-schub_d760_3_ENSTDkAm.txt") as f:
    next(f) # skip header
    
    intervals = list()
    notes = list()
    
    for line in f:
        if line == '\n':
            continue
        args = line.rstrip().split("\t")
        intervals.append(np.array([float(args[0]), float(args[1])]))
        notes.append(int(args[2]))
    onsets_real = np.array(intervals)
    pitch_real = np.array(notes)

In [None]:
onsets_real.shape

In [None]:
scores = mir_eval.transcription.evaluate(onsets_real, pitch_real, onsets, pitch)
for key, val in scores.items():
    print("[%s]: %f" % (key, val))

# From MIDI to WAV

In [None]:
tempo = 1/(min(onsets[:, 1]-onsets[:, 0]))

In [None]:
tempo = 14.33

In [None]:
track = 0
channel = 0
time = 0
duration = 2
volume = 90

In [None]:
midi = MIDIFile()
midi.addTempo(track, time, tempo*50)

In [None]:
for e, p in zip(onsets, pitch):
    l = tempo*(e[1]-e[0])
    t = tempo*e[0]
    midi.addNote(track, channel, p, t, l, volume)

In [None]:
with open("test.mid", "wb") as output_file:
    midi.writeFile(output_file)

In [None]:
FluidSynth().midi_to_audio("test.mid", "output_MUSlh.wav")

# Replicate other models

In [None]:
ConvNet = tf.keras.models.Sequential()
ConvNet.add(tf.keras.layers.Conv2D(50, (25, 5), activation="tanh", input_shape=(252, 7, 1), data_format="channels_last", padding="valid"))
ConvNet.add(tf.keras.layers.Dropout(0.5))
ConvNet.add(tf.keras.layers.MaxPooling2D((3, 1)))
ConvNet.add(tf.keras.layers.Conv2D(50, (5, 3), activation="tanh", padding="valid"))
ConvNet.add(tf.keras.layers.Dropout(0.5))
ConvNet.add(tf.keras.layers.MaxPooling2D((3, 1)))
ConvNet.add(tf.keras.layers.Flatten())
ConvNet.add(tf.keras.layers.Dense(1000, activation="sigmoid"))
ConvNet.add(tf.keras.layers.Dropout(0.5))
ConvNet.add(tf.keras.layers.Dense(500, activation="sigmoid"))
ConvNet.add(tf.keras.layers.Dropout(0.5))
ConvNet.add(tf.keras.layers.Dense(88, activation="sigmoid"))

In [None]:
ConvNet.summary()

In [None]:
stack = tf.keras.models.Sequential()
stack.add(tf.keras.layers.Conv2D(32, (3, 3), input_shape=(229, 7, 1), data_format="channels_last", padding="valid"))
stack.add(tf.keras.layers.Conv2D(32, (3, 3)))
stack.add(tf.keras.layers.MaxPooling2D((2, 1)))
stack.add(tf.keras.layers.Dropout(0.25))
stack.add(tf.keras.layers.Conv2D(64, (3, 3)))
stack.add(tf.keras.layers.MaxPooling2D((2, 1)))
stack.add(tf.keras.layers.Dropout(0.25))
stack.add(tf.keras.layers.Dense(512))
stack.add(tf.keras.layers.Dropout(0.5))
stack.add(tf.keras.layers.Dense(88))

In [None]:
onset = tf.keras.models.Sequential()
onset.add(tf.keras.layers.InputLayer(input_shape=(88,1)))
onset.add(tf.keras.layers.LSTM(128, go_backwards=True))
onset.add(tf.keras.layers.Dense(88))

In [None]:
frames = tf.keras.models.Sequential()
frames.add(tf.keras.layers.InputLayer(input_shape=(88,)))
frames.add(tf.keras.layers.Dense(88))

In [None]:
frames_ = tf.keras.models.Sequential()
frames_.add(tf.keras.layers.InputLayer(input_shape=(176,1)))
frames_.add(tf.keras.layers.LSTM(128, go_backwards=True))
frames_.add(tf.keras.layers.Dense(88))

In [None]:
frames_.summary()