In [None]:
import matplotlib.pyplot as plt
import numpy as np
from timeit import default_timer as timer
import config
import dsp_utils
import datasets
from frontend import FrontEndProcessor

processor = FrontEndProcessor(config)
storage = datasets.HdfStorage(config.get_dataset_path('r'), "raw/tree")

def get_frames():
    return storage.fetch_subset("",0,1, "RANDOM")[0]

# Decorators and handlers
def plot_vector(title, vector):
    print(title)
    plt.plot(vector)
    plt.show()
    
def plot_matrix(title, matrix):
    print(title)
    plt.pcolormesh(range(matrix.shape[1]), range(matrix.shape[0]), matrix, cmap=plt.cm.Blues)
    plt.show()
        
def fund_freq_mean_error(frames, error_seq):
    sg = processor.make_spectrogram(frames)
    std_sqrt = np.sqrt(np.std(sg, axis = 1))
    mean_error = np.sqrt(np.sum(std_sqrt*np.square(error_seq)) / config.step_count)
    return mean_error

def before(fn):
    def wrapped(*args,**kwargs):
        dsp_utils.write("before.wav", args[0], config.framerate)
        sg = processor.make_spectrogram(args[0])
        plot_matrix("Spectrogram before", sg.T)
        return fn(*args,**kwargs)
    return wrapped
    
def after(fn):
    def wrapped(*args,**kwargs):
        out = fn(*args,**kwargs)
        if out[0].ndim==1:
            dsp_utils.write("after.wav", out[0], out[1])
            seg_length = dsp_utils.get_best_segment_length(out[1])
            sg = dsp_utils.make_spectrogram(out[0], seg_length=seg_length, step_count=config.step_count)
            plot_matrix("Spectrogram after", sg.T)
        else:
            if out[0].dtype == np.complex_:
                plot_matrix("Spectrogram after", np.abs(out[0]).T)
            else:
                plot_matrix("Spectrogram after", out[0].T)
            frames = dsp_utils.restore_frames(out[0], frame_count=out[2])
            dsp_utils.write("after.wav", frames, out[1])
        return out
    return wrapped

In [None]:
def read_and_write_test(in_path, out_path):
    data = dsp_utils.read(in_path)
    dsp_utils.write(out_path, data, cfg.framerate)

read_and_write_test("before.wav", "after.wav")

In [None]:
@before
def make_spectrogram_test(frames):
    pass

frames = get_frames()
make_spectrogram_test(frames)

In [None]:
@before
@after
def restore_frames_test(frames):
    sg = dsp_utils.make_spectrogram(frames, config.seg_length,
                            step_count=config.step_count, keep_phase=True)
    restored = dsp_utils.restore_frames(sg, frame_count=len(frames))
    return restored, config.framerate

frames = get_frames()
restore_frames_test(frames)

In [None]:
def spectrogram_speed_test(frames, n):
    start = timer()
    for i in range(n):
        processor.make_spectrogram(frames)
    end = timer()
    print("Time, ms")
    print((end-start)*1000/n)

frames = get_frames()
spectrogram_speed_test(frames, 1000)

In [None]:
@after
@before
def blur_test(frames):
    sg = processor.make_spectrogram(frames)
    sg = dsp_utils.blur(sg, config.blur_size)
    return sg, config.framerate, len(frames)

frames = get_frames()
blur_test(frames)

In [None]:
@before
@after
def framerate_test(frames, new_framerate):
    n = config.framerate // new_framerate
    new_frames = frames[::n]
    return new_frames, new_framerate

frames = get_frames()
framerate_test(frames, 8000)

In [None]:
@before
def deviation_test(frames):
    sg = processor.make_spectrogram(frames)
    std = np.std(sg, axis=1)
    plot_vector("Standart deviation", std)

frames = get_frames()
deviation_test(frames)

In [None]:
@before
def fund_freq_test(frames, new_framerate=None, new_step_count=None):
    if new_framerate is None:
        new_framerate = config.framerate
    if new_step_count is None:
        new_step_count = config.step_count

    n = config.framerate // new_framerate
    new_frames = frames[::n]
    exact_funds = processor.get_fund_freq(frames)
    plot_vector("Fundamental frequency", exact_funds)
    approx_funds = dsp_utils.get_fund_freq(new_frames, new_framerate, new_step_count,
                                   min_freq=config.min_fund_freq, max_freq=config.max_fund_freq,
                                   )
    approx_funds = np.repeat(approx_funds, config.step_count//new_step_count)
    error = exact_funds - approx_funds
    plot_vector("Error sequence", error)
    print("Mean squared error")
    mean_error = fund_freq_mean_error(frames, error)
    print(mean_error)

frames = get_frames()
fund_freq_test(frames)

In [None]:
def fund_freq_speed_test(frames, n):
    start = timer()
    for i in range(n):
        processor.get_fund_freq(frames)
    end = timer()
    print("Time, ms")
    print((end-start)*1000/n)

frames = get_frames()
fund_freq_speed_test(frames, 1000)

In [None]:
@before
def get_harmonics_test(frames):
    sg = processor.make_spectrogram(frames)
    fund_freqs = processor.get_fund_freq(frames)
    harmonics, amps = processor.get_harmonics(fund_freqs, sg)
    plot_vector("Fundamental frequency", fund_freqs)
    plot_matrix("Harmonics' amplitudes", amps.T)

frames = get_frames()
get_harmonics_test(frames)

In [None]:
@after
@before
def restore_spectrogram_test(frames):
    sg = dsp_utils.make_spectrogram(frames, seg_length=config.seg_length,
                            step_count=config.step_count, keep_phase=True)
    fund_freqs = processor.get_fund_freq(frames)
    _, amps = processor.get_harmonics(fund_freqs, sg)
    plot_matrix("Harmonics' amps", np.abs(amps).T)
    sg = dsp_utils.restore_spectrogram(amps, config.freq_count, config.freq_res)
    return sg, len(frames), config.framerate

frames = get_frames()
restore_spectrogram_test(frames)

In [None]:
@before
@after
def norm_harmonics_test(frames):
    sg = processor.make_spectrogram(frames)
    fund_freqs = processor.get_fund_freq(frames)
    _, amps = processor.get_harmonics(fund_freqs, sg)
    plot_matrix("Harmonics' amps", np.abs(amps).T)

    normed = dsp_utils.norm_harmonics(sg, amps)
    plot_matrix("Harmonics' amps normed", np.abs(normed).T)

    sg = dsp_utils.restore_spectrogram(normed, config.freq_count, config.freq_res)
    return sg, len(frames), config.framerate

frames = get_frames()
norm_harmonics_test(frames)

In [None]:
def bulk_frontend_processing_test(count):
    frames_list = []
    for i in range(count):
        frames_list.append(get_frames())
    frames = np.stack(frames_list, axis=0)

    flatten = np.reshape(frames, [-1])
    dsp_utils.write("before.wav", flatten, config.framerate)
    result = np.zeros((len(flatten)))
    for i in range(frames.shape[0]):
        hs = processor.process(frames[i])
        sg = dsp_utils.restore_spectrogram(hs, config.freq_count, config.freq_res)
        result[i*config.framerate:(i+1)*config.framerate] = dsp_utils.restore_frames(sg,
                                                                       frame_count=config.framerate)
    dsp_utils.write("after.wav", result, config.framerate)

bulk_frontend_processing_test(10)

In [None]:
def frontend_processing_speed_test(frames, n):
    start = timer()
    for i in range(n):
        processor.process(frames)
    end = timer()
    print("Time, ms")
    print((end-start)*1000/n)

frames = get_frames()
frontend_processing_speed_test(frames, 1000)