In [2]:
import tensorflow as tf
import numpy as np

class DilatedResidualConv(tf.keras.Model):
    def __init__(self, n_channel, filter_size, dilation_size):
        super().__init__()
        self.dilated_conv = tf.keras.layers.Conv2D(
            n_channel * 2, (self.filter_size, 1), 
            padding='same', dilation_rate=(dilation_size, 1)
        )
        self.merge_conv = tf.keras.layers.Conv2D(
            n_channel, (1, 1), padding='same',
        )
        
    @tf.function
    def call(self, x):
        x = self.dilated_conv(x)
        tanh, sign = tf.split(x, 2, axis=1)
        tanh = tf.nn.tanh(tanh)
        sign = tf.nn.sigmoid(sign)
        return self.merge_conv(sign * tanh)
          
    
class ResidualConvNet(tf.keras.Model):
    def __init__(self, n_depth, n_layers = 4, filter_size=2):
        super().__init__()
        self.conv_layers = []
        for i in range(n_layers):
            self.conv_layers.append(
                DilatedResidualConv(n_depth, n_layers, (i + 1) * 2)
            )

    def call(self, x):
        results = []
        for conv in self.conv_layers:
            y = conv(x)
            results.append(y)
            x += y
        return sum(results)

class WaveNetPredictor(tf.keras.Sequential):
    def __init__(self, n_dim):
        super().__init__([
            tf.nn.relu,
            tf.keras.layers.Conv2D(n_dim, (1, 1), padding='SAME', activation='relu'),
            tf.keras.layers.Conv2D(n_dim, (1, 1), padding='SAME'),
            tf.keras.layers.Flatten(),
            tf.keras.Dense(n_dim)
        ])
        
    
class WaveNet(tf.keras.Model):
    def __init__(self, n_pred_dim, n_depth, n_layers = 4, filter_size=2):
        self.resnet = ResidualConvNet(n_depth, n_layers, filter_size)
        self.predictor = WaveNetPredictor(n_pred_dim)
    
    @tf.function
    def call(self, x):
        return self.predictor(self.resnet(x))
    
class MuLaw(object):
    def __init__(self, mu=255, int_type=np.int32, float_type=np.float32):
        self.mu = mu
        self.int_type = int_type
        self.float_type = float_type

    def transform(self, x):
        x = x.astype(self.float_type)
        y = np.sign(x) * np.log(1 + self.mu * np.abs(x)) / np.log(1 + self.mu)
        y = np.digitize(y, 2 * np.arange(self.mu) / self.mu - 1) - 1
        return y.astype(self.int_type)

    def inverse_transform(self, y):
        y = y.astype(self.float_type)
        y = 2 * y / self.mu - 1
        x = np.sign(y) / self.mu * ((1 + self.mu) ** np.abs(y) - 1)
        return x.astype(self.float_type)

In [7]:
fold_index = 0
task = 'home'

In [25]:
# Load training setup
import pandas as pd
from pathlib import Path
import wavio
from tqdm.notebook import tqdm

root_dir = Path('evaluation_setup')

def get_annotation(task, fold_index, target):
    df = pd.read_csv(
        root_dir / f'{task}_fold{fold_index+1}_{target}.txt', sep='\t', 
        header=None, names=['file', 'class', 'start', 'end', 'event']
    )
    df['id'] = df['file'].apply(lambda x: Path(x).stem)
    return df

def load_dataset(target):
    df = get_annotation(task, fold_index, target)
    wav_dict = {}
    for file in tqdm(df['file'].unique()):
        wav_dict[Path(file).stem] = wavio.readwav(str(file))
    return df, wav_dict

df, wav_dict = load_dataset('train')


HBox(children=(FloatProgress(value=0.0, max=7.0), HTML(value='')))




In [1]:
n_sampling = 8192
