In [None]:
from google.colab import drive
drive.mount("/content/drive")

Mounted at /content/drive


In [None]:
%cd "/content/drive/MyDrive/UrbanSound8K"

/content/drive/MyDrive/UrbanSound8K


In [None]:
sampling_rate = 44100
row_len = 513 # Number of columns: 1 + n_fft/2
col_len = 401 # Number of rows: 1 + (sampling_rate*audio_duration)/(0.01*sampling_rate); 0.01*sampling_rate = hop 
# audio_length = sampling_rate*audio_duration
# audio_duration = 4 seconds
folder = "test"

In [None]:
import numpy as np
import pandas as pd

import librosa
from keras.utils.np_utils import to_categorical

import tensorflow as tf

In [None]:
def wav2feat(wavfile, Fs = sampling_rate):
    x, _ = librosa.core.load(wavfile, sr = Fs, mono = True)
    hop = int(0.01*Fs) # 10ms
    win = int(0.02*Fs) # 20ms
    X = librosa.stft(x, n_fft = 1024, hop_length = hop, win_length = win, window = 'hann', center = True, pad_mode = "reflect")
    X = np.abs(X)

    if X.shape[0] > row_len:
        max_offset = X.shape[0] - row_len
        offset = np.random.randint(max_offset)
        X = X[offset : (row_len + offset), :]
    else:
        if X.shape[0] < row_len:
            max_offset = row_len - X.shape[0]
            offset = np.random.randint(max_offset)
        else:
            offset = 0
        X = np.pad(X, ((offset, row_len - X.shape[0] - offset), (0, 0)), "constant")

    if X.shape[1] > col_len:
        max_offset = X.shape[1] - col_len
        offset = np.random.randint(max_offset)
        X = X[:, offset : (col_len + offset)]
    else:
        if X.shape[1] < col_len:
            max_offset = col_len - X.shape[1]
            offset = np.random.randint(max_offset)
        else:
            offset = 0
        X = np.pad(X, ((0, 0), (offset, col_len - X.shape[1] - offset)), "constant")

    return X

In [None]:
def prepare_data():
    print("Number of training samples processed: ")
    Y = to_categorical(train_df["label_idx"], num_classes = n_classes)
    for i, data in (train_df[["slice_file_name", "fold"]].iterrows()):
        fname = data[0]
        data_dir = str(data[1])
        fpath = "fold" + data_dir + "/" + fname
        STFT = wav2feat(fpath)
        STFT = np.expand_dims(STFT, axis = -1)

        np.save(folder + "/STFT%d.npy"%i, STFT)
        np.save(folder + "/target%d.npy"%i, Y[i])

        if(i != 0 and i%200 == 0):
            print(i, end = ".. ")

    print(train_df.shape[0], end = ".. ")
    print("Done!")

    return

In [None]:
train_df = pd.read_csv("UrbanSound8K_" + folder + ".csv")
train_df.head(5)

Unnamed: 0,slice_file_name,fsID,start,end,salience,fold,classID,class
0,88121-8-1-0.wav,88121,10.767184,11.870933,2,10,8,siren
1,7389-1-2-0.wav,7389,78.753483,82.753483,2,4,1,car_horn
2,159751-8-0-16.wav,159751,9.150495,13.150495,2,4,8,siren
3,188004-8-0-1.wav,188004,0.5,4.5,2,6,8,siren
4,159751-8-0-6.wav,159751,4.150495,8.150495,2,4,8,siren


In [None]:
train_df = pd.read_csv("UrbanSound8K_train.csv")
LABELS = list(train_df["class"].unique())
label_idx = {label: i for i, label in enumerate(LABELS)}
n_classes = len(train_df["class"].unique())

train_df = pd.read_csv("UrbanSound8K_" + folder + ".csv")
train_df["label_idx"] = train_df["class"].apply(lambda x : label_idx[x])

In [None]:
print("Number of .wav files = ", train_df.shape[0])

Number of .wav files =  836


In [None]:
label_idx

{'air_conditioner': 0,
 'car_horn': 8,
 'children_playing': 4,
 'dog_bark': 2,
 'drilling': 5,
 'engine_idling': 3,
 'gun_shot': 9,
 'jackhammer': 6,
 'siren': 7,
 'street_music': 1}

In [None]:
prepare_data()

Number of training samples processed: 
200.. 400.. 600.. 800.. 836.. Done!


In [None]:
def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
    return tf.train.Feature(bytes_list = tf.train.BytesList(value = [value]))

def _float_feature(value):
    """Returns a float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def _int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

In [None]:
def serialize_example(feature0, feature1):
    feature = {
      "STFT": _bytes_feature(feature0),
      "target": _bytes_feature(feature1)
      }
    
    example_proto = tf.train.Example(features=tf.train.Features(feature = feature))
    
    return example_proto.SerializeToString()

In [None]:
LEN = train_df.shape[0]

In [None]:
SIZE = 250
CT = LEN//SIZE + int(LEN%SIZE != 0)

for j in range(CT):
    print(); print('Writing TFRecord %i of %i...'%(j + 1, CT))
    CT2 = min(SIZE , LEN - j*SIZE)
    
    with tf.io.TFRecordWriter(folder + "fold1_%.2i-%i.tfrec"%(j, CT2)) as writer:
        for k in range(CT2):
            # img = cv2.imread(PATH + IMGS_train1[SIZE*j + k] + '.jpg')
            # img = cv2.imencode('.jpg', img, (cv2.IMWRITE_JPEG_QUALITY, 94))[1].tostring()
            idx = SIZE*j + k
            X_train = np.load(folder + "/STFT%d.npy"%idx)
            Y_train = np.load(folder + "/target%d.npy"%idx)

            X_train = X_train.tobytes()
            Y_train = Y_train.tobytes()

            example = serialize_example(X_train, Y_train)           
            
            writer.write(example)
            if k%100 == 0: print(k, ', ', end = '')


Writing TFRecord 1 of 4...
0 , 100 , 200 , 
Writing TFRecord 2 of 4...
0 , 100 , 200 , 
Writing TFRecord 3 of 4...
0 , 100 , 200 , 
Writing TFRecord 4 of 4...
0 , 