In [1]:
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets
from IPython.display import display
import numpy as np
import torch
import matplotlib.pyplot as plt
import scipy.signal as signal
from helpers import audio
from nn_modules import nn_proc
import os
import librosa

if torch.cuda.is_available():
    device = torch.device("cuda:0")
    torch.set_default_tensor_type('torch.cuda.FloatTensor')
else:
    device = torch.device("cpu")
    torch.set_default_tensor_type('torch.FloatTensor')

In [2]:
randfunc=np.random.rand

def synth_input_sample(t, chooser):
    if 'sine' == chooser:
        return audio.randsine(t)
    elif 'box' == chooser:
        return audio.box(t)
    elif 'noisysine' == chooser:
        return audio.randsine(t) + 0.1*(2*np.random.rand(t.shape[0])-1)
    elif 'noisybox' == chooser:
        return audio.box(t) * (2*np.random.rand(t.shape[0])-1)
    elif 'pluck' == chooser:
        return audio.pluck(t)
    
    
chunk_size = 4096
    
def torch_chunkify(x, chunk_size=chunk_size):
    # pads x with zeros and returns a 2D array 
    rows = int(np.ceil(x.shape[0]/chunk_size))  # this will be the batch size
    nearest_mult = rows*(chunk_size)
    xnew = np.zeros(nearest_mult)
    xnew[0:x.shape[0]] = x[0:x.shape[0]]
    xnew  = xnew.reshape(rows, chunk_size)
    x_torch = torch.autograd.Variable(torch.from_numpy(xnew).to(device), requires_grad=False).float()
    return x_torch 



sr = 44100   # sample rate
t = np.linspace(0,1,chunk_size)
old_signal_type = 'box'
x = synth_input_sample(t,old_signal_type)

device = torch.device('cpu')
torch.set_default_tensor_type('torch.FloatTensor')
x_torch = torch_chunkify(x)
y_true, y_pred = 0, 0   # saving these for later

In [9]:
# Set up the model and knob arrays


# Data settings
shrink_factor = 2  # reduce dimensionality of run by this factor
time_series_length = 8192 // shrink_factor
sampling_freq = 44100. // shrink_factor

# Analysis parameters
ft_size = 1024 // shrink_factor
hop_size = 384 // shrink_factor
expected_time_frames = int(np.ceil(time_series_length/float(hop_size)) + np.ceil(ft_size/float(hop_size)))

# Define effect
effect = audio.Compressor_new()
#knobranges = np.array([[-30,0], [1,5], [1,40],[1,40]])
knobranges = effect.knob_ranges

# Define model
model = nn_proc.MPAEC(expected_time_frames, ft_size=ft_size, hop_size=hop_size, n_knobs=len(effect.knob_names))
model.to(device)

# Load model weights
checkpoint_file = '/tmp/modelcheckpoint.tar'
checkpoint = torch.load(checkpoint_file, map_location=device)
model.load_state_dict(checkpoint['state_dict'])

In [12]:
# Define interactive widgets and their handler routine

def forward_and_plot(x, x_torch, threshold, ratio, attack, release, refresh=True):
    # Input: x should be a continuous 1D array of mono audio
    #        x_cuda should be "chunkified" into a 2D array of windows to send to the network 
    #        attack and release are in miliseconds only due to IPython display limitations
    
    global y_true, y_pred   # for playing audio later 
    # update the model (useful for checking progress during model training!)
    
    if refresh:
        checkpoint = torch.load(checkpoint_file, map_location=device)
        model.load_state_dict(checkpoint['state_dict'])
    
    y_true = audio.compressor_new_fast(x, threshold, ratio, attack/1000, release/1000)
    
    # use the same knob settings for all chunks
    thresh_nn = (threshold-knobranges[0][0])/(knobranges[0][1]-knobranges[0][0]) - 0.5
    ratio_nn = (ratio-knobranges[1][0])/(knobranges[1][1]-knobranges[1][0]) - 0.5
    attack_nn = (attack/1000-knobranges[2][0])/(knobranges[2][1]-knobranges[2][0]) - 0.5 
    release_nn = (release/1000-knobranges[3][0])/(knobranges[3][1]-knobranges[3][0]) - 0.5
    knobs = np.array([thresh_nn, ratio_nn, attack_nn, release_nn])
    rows = x_torch.size()[0]
    knobs = np.tile(knobs,(rows,1))
    knobs_torch = torch.autograd.Variable(torch.from_numpy(knobs).to(device), requires_grad=False).float()

    # call the network
    y_pred_torch, mag, mag_hat = model.forward(x_torch, knobs_torch)
    
    # Plot
    y_pred = y_pred_torch.data.cpu().numpy().flatten()[0:t.shape[0]]  #flattened numpy version
    
    plt.figure(figsize=(8,5))
    plt.plot(t,x,c='b',lw=1.5, label='Input')
    plt.plot(t,y_true,c='r',lw=1.5, label='Target')
    plt.plot(t,y_pred,c=(0,0.5,0,0.75),lw=1.5, label='Predicted')
    
    thresh_line = 10**(threshold/20.0)*np.ones(2) # show threshold line
    plt.plot([t[0],t[-1]],thresh_line,c='k',lw=1, linestyle='dashed', label='Threshold') 
    plt.legend(loc='lower right')
    plt.ylim(-1,1)
    plt.show()
    return 

@interact(signal_type=['box','sine','pluck','noisybox','noisysine'],\
    threshold=(knobranges[0][0],knobranges[0][1],1), \
    ratio=(knobranges[1][0],knobranges[1][1],0.1), \
    attack=(knobranges[2][0]*1000,knobranges[2][1]*1000,knobranges[2][0]*1000), \
    release=(knobranges[3][0]*1000,knobranges[3][1]*1000,knobranges[3][0]*1000))
def demowidget1(signal_type, threshold, ratio, attack, release):
    global old_signal_type, x, x_torch

    if (signal_type != old_signal_type): # don't regen x unless input changed
        x = synth_input_sample(t, signal_type)
        x_torch = torch_chunkify(x)
    old_signal_type = signal_type
    forward_and_plot(x, x_torch, threshold, ratio, attack, release)


A Jupyter Widget

## How about operating on 'real' audio?

In [13]:
def read_audio_file(filename, sr=44100):
    signal, sr = librosa.load(filename, sr=sr, mono=True) # convert to mono
    return signal, sr


def readaudio_generator(seconds=2,  path=os.path.expanduser('~')+'/datasets/signaltrain/Val', sr=44100,
    random_every=True):
    """
    reads audio from any number of audio files sitting in directory 'path'
    supplies a window of length "seconds". If random_every=True, this window will be randomly chosen
    """
    # seq_size = amount of audio samples to supply from file
    # basepath = directory containing Train, Val, and Test directories
    # path = audio files for dataset  (can be Train, Val or test)
    # random_every = get a random window every time next is called, or step sequentially through file
    files = os.listdir(path)
    seq_size = seconds * sr 
    read_new_file = True
    start = -seq_size
    while True:
        if read_new_file:
            filename = path+'/'+np.random.choice(files)  # pick a random audio file in the directory
            #print("Reading new data from "+filename+" ")
            data, sr = read_audio_file(filename, sr=sr)
            read_new_file=False   # don't keep switching files  everytime generator is called


        if (random_every): # grab a random window of the signal
            start = np.random.randint(0,data.shape[0]-seq_size)
        else:
            start += seq_size
        xraw = data[start:start+seq_size]   # the newaxis just gives us a [1,] on front
        # Note: any 'windowing' happens after the effects are applied, later
        rc = ( yield xraw )         # rc is set by generator's send() method.  YIELD here is the output
        if isinstance(rc, bool):    # can set read_new by calling send(True)
            read_new_file = rc
        
# get new audio
ra_gen = readaudio_generator(seconds=4)
x = next(ra_gen)
t = np.arange(0, x.shape[0]/sr, 1/sr)
# reshape it
x_torch = torch_chunkify(x)

In [14]:
@interact(threshold=(knobranges[0][0],knobranges[0][1],1), \
    ratio=(knobranges[1][0],knobranges[1][1],0.1), \
    attack=(knobranges[2][0]*1000,knobranges[2][1]*1000,knobranges[2][0]*1000), \
    release=(knobranges[3][0]*1000,knobranges[3][1]*1000,knobranges[3][0]*1000))
def demowidget2(threshold, ratio, attack, release):
    forward_and_plot(x, x_torch, threshold, ratio, attack, release)

A Jupyter Widget

## So what does it sound like?

Note that there's "bug/feature" in Juptyer Notebook's audio "display" whereby it rescales the audio...which makes it almost useless for checking how a compressor performs.  Se first we're going to define our own. 

In [15]:
# Redefine IPython audio display widget to disable normalization
from IPython.core.display import DisplayObject
class Audio(DisplayObject):
    def __init__(self, data=None, filename=None, url=None, embed=None, rate=None, autoplay=False, norm=True):
        if filename is None and url is None and data is None:
            raise ValueError("No image data found. Expecting filename, url, or data.")
        if embed is False and url is None:
            raise ValueError("No url found. Expecting url when embed=False")
            
        if url is not None and embed is not True:
            self.embed = False
        else:
            self.embed = True
        self.autoplay = autoplay
        super(Audio, self).__init__(data=data, url=url, filename=filename)
            
        if self.data is not None and not isinstance(self.data, bytes):
            self.data = self._make_wav(data,rate,norm)
            
    def reload(self):
        """Reload the raw data from file or URL."""
        import mimetypes
        if self.embed:
            super(Audio, self).reload()

        if self.filename is not None:
            self.mimetype = mimetypes.guess_type(self.filename)[0]
        elif self.url is not None:
            self.mimetype = mimetypes.guess_type(self.url)[0]
        else:
            self.mimetype = "audio/wav"
            
            
    def _make_wav(self,data,rate,norm):
        """ Transform a numpy array to a PCM bytestring """
        import struct
        from io import BytesIO
        import wave
        if norm:
            maxabsvalue = max(map(abs,data))
            scaled = map(lambda x: int(x/maxabsvalue*32767), data)  
        else:
            scaled = map(lambda x: int(np.clip(x,-1,1)*32767), data)

        fp = BytesIO()
        waveobj = wave.open(fp,mode='wb')
        waveobj.setnchannels(1)
        waveobj.setframerate(rate)
        waveobj.setsampwidth(2)
        waveobj.setcomptype('NONE','NONE')
        waveobj.writeframes(b''.join([struct.pack('<h',x) for x in scaled]))
        val = fp.getvalue()
        waveobj.close()
        return val
    
    def _data_and_metadata(self):
        """shortcut for returning metadata with url information, if defined"""
        md = {}
        if self.url:
            md['url'] = self.url
        if md:
            return self.data, md
        else:
            return self.data
        
    def _repr_html_(self):
        src = """
                <audio controls="controls" {autoplay}>
                    <source src="{src}" type="{type}" />
                    Your browser does not support the audio element.
                </audio>
              """
        return src.format(src=self.src_attr(),type=self.mimetype, autoplay=self.autoplay_attr())

    def src_attr(self):
        import base64
        if self.embed and (self.data is not None):
                return """data:{type};base64,{base64}""".format(type=self.mimetype, 
                                                                base64=base64.b64encode(self.data).decode('ascii'))
        elif self.url is not None:
            return self.url
        else:
            return ""

    def autoplay_attr(self):
        if(self.autoplay):
            return 'autoplay="autoplay"'
        else:
            return ''

Input audio:

In [16]:
Audio(x, rate=44100, norm=False)

'Real' compressor output, i.e. Target audio:

In [17]:
Audio(y_true, rate=44100,norm=False)

Network output, i.e Predicted audio 

In [18]:
Audio(y_pred, rate=44100, norm=False)