In [10]:
import os
import numpy as np
import pandas as pd
from scipy.io import wavfile as wv
from scipy import signal


In [11]:
class WaveDataset():
    def __init__(self, data_folder, annotation_file):
        self.data_folder = data_folder
        ds = pd.read_csv(annotation_file)
        self.filenames = list(ds['filename'])
        if 'label' in ds.columns:
            self.labels = list(ds['label'])
        else:
            self.labels = [-1 for i in range(len(self.filenames))]
        self.cache = {}
        
        
    def __len__(self):
        return(len(self.labels))

    def __getitem__(self, index):
        if index in self.cache:
            data, label = self.cache[index]
        else:
            fname = os.path.join(self.data_folder, "%04d.wav" % self.filenames[index])
            _, data = wv.read(fname)
            label = self.labels[index]
            self.cache[index] = (data, label)
        return data, label

In [12]:
def preprocessing(raw_signal):

    # Rectification 
    rectified_signal = np.abs(raw_signal)

    # Low-pass filtering -> envelope
    sample_freq = 22050 # Hz
    cutoff_freq = 5     # Hz
    norm_cutoff_freq = cutoff_freq / (sample_freq / 2.0)
    b, a = signal.butter(2, norm_cutoff_freq, 'low')
    envelope = signal.filtfilt(b, a, rectified_signal, axis=0)

    return envelope, rectified_signal

In [59]:
def extract_features(raw_signal, plot_features=False):
    
    envelope, rectified_signal = preprocessing(raw_signal)
    threshold = 0.0115 * (2 ** 16) # to compute duration and envelope slope sign
    above_thres = envelope >= threshold  # samples to consider in computation
    
    max_pos = np.argmax(envelope)
    envelope_slope_sign = np.sign(np.diff(envelope, append=0)) * above_thres    
    duration_signal = np.ones(envelope.shape) * above_thres

    # features
    duration = np.sum(duration_signal)
    
    if duration == 0:
        threshold = np.mean(envelope)
        above_thres = envelope >= threshold  # samples to consider in computation
        envelope_slope_sign = np.sign(np.diff(envelope, append=0)) * above_thres    
        duration_signal = np.ones(envelope.shape) * above_thres
        duration = np.sum(duration_signal)
        
    zero_crossing = np.count_nonzero(np.abs(np.diff(envelope_slope_sign)))
    amplitude = max(rectified_signal)
    ratio = np.trapz(envelope[:max_pos], axis=0) / np.trapz(envelope, axis=0)

    features = [duration, zero_crossing, float(amplitude), float(ratio), duration_signal]  
    
    # Illustrate features
    if plot_features:
        #print(features)
        fig, axes = plt.subplots(nrows=4, ncols=1, figsize=(8,6), sharex=True)
        axes[0].plot(raw_signal); axes[0].set_title("Señal sonora")
        axes[1].plot(envelope); axes[1].set_title("Envolvente")
        axes[2].plot(envelope_slope_sign); axes[2].set_title("Signo de la pendiente de la envolvente")
        axes[3].plot(duration_signal); axes[3].set_title("Duración")
        axes[3].set_xlabel("muestras")
        fig.tight_layout()
    
    return features

In [60]:
train = WaveDataset("data/raw/", "data/raw/train_labels.csv")


In [66]:
# get max len of the signals
waves = []
for i, (sig, label) in enumerate(train):
    duration, zero_crossing, amplitude, ratio, duration_signal =  extract_features(sig)
    waves.append(int(duration))

In [68]:
X = np.zeros([len(train), np.max(waves)])

In [71]:
# Inicialize X zeros array 
X = np.zeros([len(train), np.max(waves)])
y = []
for i, (sig, label) in enumerate(train):
    duration, zero_crossing, amplitude, ratio, duration_signal =  extract_features(sig)
    itemindex = np.where(duration_signal==1)

    sig_sliced = sig[itemindex]

    X[i,:len(sig_sliced)]= sig_sliced
    y.append(label)

y = np.asarray(y)

In [77]:
np.save('../../input/processed/X_slice.npy',X)

In [78]:
np.save('../../input/processed/y_slice.npy',y)

In [79]:
submission = WaveDataset("data/raw/", "data/raw/test_files.csv")

In [80]:
# get max len of the signals
waves = []
for i, (sig, label) in enumerate(submission):
    duration, zero_crossing, amplitude, ratio, duration_signal =  extract_features(sig)
    waves.append(int(duration))

In [81]:
# Inicialize S zeros array 
S = np.zeros([len(submission), np.max(waves)])

for i, (sig, label) in enumerate(submission):
    duration, zero_crossing, amplitude, ratio, duration_signal =  extract_features(sig)
    itemindex = np.where(duration_signal==1)

    sig_sliced = sig[itemindex]

    S[i,:len(sig_sliced)]= sig_sliced

In [82]:
np.save('data/processed/submission_slice.npy',S)