In [1]:
import os
import librosa
import numpy as np
import torch
import torch.utils.data as data

In [65]:
#------ encoding-decoding section ------#
'''
Assuming data is an np.ndarray with shape (num_sample, 1), or at least an np.ndarray
Assuming num_possible_value is an integer representing how many different possible values (by default it's 256)
The return value should be an np.ndarray with shape (num_sample,num_possible_value)
'''
def one_hot_encode(data:np.ndarray, num_possible_value:int = 256) -> np.ndarray:

    possible_values = np.arange(num_possible_value)
    data = data.reshape((-1,1)) #just in case I miss something

    re = np.cast['float32'](data==possible_values)
    
    return re

'''
Assuming data is an np.ndarray with shape (num_sample, num_possible_value)
Shape won't be checked since most likely we don't need that
The return value should be an np.ndarray with shape (num_sample,)
'''
def one_hot_decode(data:np.ndarray, axis=1) -> np.ndarray:
    return data.argmax(axis)

'''
Assuming data is an np.ndarray with shape (num_sample,), it should be normalized to -1~1
Assuming num_possible_value is an integer representing how many different possible values (by default it's 256)
The return value should be an np.ndarray with shape (num_sample,)
Based on a tensorflow implementation: 
    https://github.com/ibab/tensorflow-wavenet/blob/master/wavenet/ops.py
tf.minimum will be ignored (since np.min doesn't support broadcasting)
'''
def mu_law_encode(data:np.ndarray, num_possible_value:int=256) -> np.ndarray:

    #check if data is normalized
    if (data>1.0).any() or (data<-1.0).any():
        raise ValueError('Normalize Data First')

    mu = float(num_possible_value-1)
    data_abs = np.abs(data)
    magnitude = np.log1p(mu*data_abs) / np.log1p(mu+1)
    signal = np.sign(data) * magnitude

    re = ((signal+1)/2 * mu +0.5)

    return np.cast['int32'](re)

'''
Assuming data is an np.ndarray with shape (num_sample,)
Assuming num_possible_value is an integer representing how many different possible values (by default it's 256)
The return value should be an np.ndarray with shape (num_sample,)
Based on a tensorflow implementation: 
    https://github.com/ibab/tensorflow-wavenet/blob/master/wavenet/ops.py
'''
def mu_law_decode(data:np.ndarray, num_possible_value:int=256) -> np.ndarray:

    mu = num_possible_value - 1
    signal = 2*(np.cast['float32'](data)/mu) - 1
    magnitude = (1 / mu) * ((1 + mu)**np.abs(signal) - 1)

    re = np.sign(signal) * magnitude

    return re
#------ encoding-decoding section finished ------#


In [69]:
#------ data-loading section ------#
'''
Assuming file is an string representing the path of one audio file
Assuming sr is the sample rate of the audio file (by default it's 16000) (why 16000? cuz 16kHz is pretty common for mp3)
Since librosa supprots resampling, we don't really need to care about sampling that much
However, according to https://github.com/ibab/tensorflow-wavenet/blob/master/wavenet/ops.py,
resampling might cause error
In that case, the best practice might be using the sample rate of the audio file (which can be modified in other methods)
The return value should be an np.ndarray with shape (num_sample,)
The return value should be normalized to -1~1 (done by librosa)
'''
def load_audio_file(file:str, sr:int=16000, trim:bool=True) -> np.ndarray:
    
    data, _ = librosa.load(file, sr=sr, mono=True)
    if trim:
        data, _ = librosa.effects.trim(data)

    print(file)
    print(data.shape)
        
    return data


In [27]:
'''
Sub-class of data.Dataset
It will iterate the folder containing sound files
However, program WILL NOT check if it is a sound file,
plz make sure all files under source folder are sound files
'''
class AudioDataset(data.Dataset):
    '''
    Assuming source_folder is a string representing the path to the folder with sound files
    Assuming sr is an integer representing the sample rate (by default it's 16000)
    Assuming num_possible_value is an integer representing how many different possible values (by default it's 256)
    Assuming trim is a boolean representing whether removing blank at the beginning and the end or not
    '''
    def __init__(self, source_folder:str='AudioData', sr:int=16000, num_possible_value:int=256, trim=True):
        super(AudioDataset, self).__init__()

        self.num_possible_value = num_possible_value
        self.sr = sr
        self.trim = trim
        self.source_folder = source_folder
        self.file_list = [x for x in os.listdir(source_folder)]

    '''
    Override
    Returning an np.ndarray representing an audio file at a time
    Data will be one-hot encoded (in other words, each input sample will have 256 features instead of one)
    '''
    def __getitem__(self, index):

        file_path = os.path.join(self.source_folder,\
                                 self.file_list[index])
        
        data = load_audio_file(file_path, self.sr, self.trim)
        mu_encoded_data = mu_law_encode(data, self.num_possible_value) #training data
        one_hot_encoded_data = one_hot_encode(mu_encoded_data, self.num_possible_value) #labels

        return one_hot_encoded_data

    '''
    Override
    Returnning the size of dataset
    '''
    def __len__(self):
        return len(self.file_list)

In [82]:
'''
Sub-class of data.DataLoader
It will generate training data and label pairs according to given params
'''
class AudioDataLoader(data.DataLoader):
    def __init__(self, receptive_field:int,\
                        source_folder:str='AudioData',\
                        batch_size:int=1,\
                        sr:int=16000,\
                        num_possible_value:int=256,\
                        trim=True):

        dataset = AudioDataset(source_folder, sr,num_possible_value, trim)
        super(AudioDataLoader, self).__init__(dataset, batch_size, True) #True for shuffling

        self.receptive_field = receptive_field
        self.collate_fn = self.generate_training_pairs
        self.has_gpu = torch.cuda.is_available()
        self.sample_size = 10
    
    '''
    Equivalent to torch.from_numpy
    Autograd is guaranteed, use of gpu is guaranteed
    '''
    def numpy_to_variable(self, data: np.ndarray) -> torch.Tensor:

        tensor = torch.from_numpy(data).float()
        re = torch.autograd.Variable(tensor.cuda()) if self.has_gpu\
            else torch.autograd.Variable(tensor)

        return re
    @staticmethod
    def _variable(data):
        tensor = torch.from_numpy(data).float()

        if torch.cuda.is_available():
            return torch.autograd.Variable(tensor.cuda())
        else:
            return torch.autograd.Variable(tensor)
        
    def calc_sample_size(self, audio):
        return self.sample_size if len(audio[0]) >= self.sample_size\
                                else len(audio[0])
    '''
    Customized collate_fn
    '''
    def generate_training_pairs(self, stacked_input):
        audio = np.pad(stacked_input, [[0, 0], [self.receptive_field, 0], [0, 0]], 'constant')
        
        if self.sample_size:
            sample_size = self.calc_sample_size(audio)

            while sample_size > self.receptive_field:
                inputs = audio[:, :sample_size, :]
                targets = audio[:, self.receptive_field:sample_size, :]

                yield self._variable(inputs),\
                      self._variable(one_hot_decode(targets, 2))

                audio = audio[:, sample_size-self.receptive_field:, :]
                sample_size = self.calc_sample_size(audio)
        else:
            targets = audio[:, self.receptive_field:, :]
            return self._variable(audio),\
                   self._variable(one_hot_decode(targets, 2))
        
        #return mu_encoded_data, one_hot_encoded_data
#------ data-loading section finished ------#

In [80]:
class AudioDataLoader(data.DataLoader):
    def __init__(self, receptive_field:int,\
                        source_folder:str='AudioData',\
                        batch_size:int=1,\
                        sr:int=16000,\
                        num_possible_value:int=256,\
                        trim=True):

        dataset = AudioDataset(source_folder, sr,num_possible_value, trim)
        super(AudioDataLoader, self).__init__(dataset, 1, True) #True for shuffling

        self.receptive_field = receptive_field
        self.pseudo_batch = batch_size #batch NOT created by reading several sound file at a time
                                        #instead, it should be created from a single file
        self.collate_fn = self.generate_training_pairs
        self.has_gpu = torch.cuda.is_available()
    
    '''
    Equivalent to torch.from_numpy
    Autograd is guaranteed, use of gpu is guaranteed
    '''
    def numpy_to_variable(self, data: np.ndarray) -> torch.Tensor:

        tensor = torch.from_numpy(data).float()
        re = torch.autograd.Variable(tensor.cuda()) if self.has_gpu\
            else torch.autograd.Variable(tensor)

        return re

    '''
    Assuming data is an np.ndarray representing remaining data with shape (num_samples, num_possible_values)
    This method will calculate the actually batch size according to pseudo batch size and remaining data length
    '''
    def calculate_batch_size(self, data:np.ndarray):
        return self.pseudo_batch if len(data) > self.receptive_field + self.pseudo_batch\
            else len(data)-self.receptive_field

    '''
    Customized collate_fn
    '''
    def generate_training_pairs(self, stacked_input):
        #stacked_input.shape = (1, num_samples, num_possible_values)

        #zero-padding
        stacked_input = np.pad(stacked_input, [[0,0], [self.receptive_field,0], [0,0]], 'constant')
        #now stacked_input.shape = (1, num_samples+recepetive_field, num_possible_values)

        data = stacked_input[0] #data = (num_samples+receptive_field, num_possible_values)
                                #Also, num_possible_values are output dim, the inputdim should always be 1
        actual_batch_size = self.calculate_batch_size(data)

        #build a batch with stupid loop
        targets = data[self.receptive_field+1:self.receptive_field+1+actual_batch_size,] 
        print(targets)
        print('next')
        targets = targets.reshape((actual_batch_size,1,-1)) #targets.shape=(batch_size, 1, num_possible_values)
        print(targets)
        
        return None, None

In [84]:
    print('testing')
    a = AudioDataLoader(source_folder=r'D:\LIGN167_Final\AudioData',receptive_field=8, batch_size=1,num_possible_value=16)
    for dataset in a:
        for i, n in dataset:
            print(i)
            print('next')
            print(n)
            break
        break

testing
D:\LIGN167_Final\AudioData\untitled.wav
(190882,)
tensor([[[0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0.]]],
       device='cuda:0')
next
tensor([[8., 8.]], device='cuda:0')


In [74]:
len(np.array([[1,2,3],[3,2,1],[2,3,1]])[3:3,])

0