In [None]:
import os
import matplotlib.pyplot as plt
import numpy as np
import librosa.display
import math 
import pickle
import random

from sklearn.preprocessing import minmax_scale
import librosa
import pickle
from scipy.io import loadmat
import sklearn.utils

In [None]:
np.random.seed(42)
target_sampling_rate = 4000
volunteer_id = 1 # For dataset with all volunteers use volunteer_id=5

input_dir = "../Dataset/Non-Interfered"
google_dir = "../Dataset/Interference"
output_dir = "../Dataset/Interfered/Volunteer_1"
isExist = os.path.exists(output_dir)
if not isExist:
    os.makedirs(output_dir)

In [None]:
label_arr = ["backward","bed","bird","cat","dog","down","eight","five","follow","forward","four","go","happy","house","learn","left","marvin","nine","no","off","on","one","right","seven","sheila","six","stop","three","tree","two","up","visual","wow","yes","zero","google","siri","bixby","alexa"]
test_label_ids = [5, 11, 15, 19, 22, 26]

{'bird': 1,
 'happy': 1,
 'cat': 1,
 'dog': 1,
 'follow': 1,
 'house': 1,
 'forward': 1,
 'bed': 1,
 'backward': 1,
 'sheila': 1,
 'tree': 1,
 'two': 1,
 'down': 5,
 'four': 1,
 'eight': 1,
 'visual': 1,
 'five': 1,
 'marvin': 1,
 'go': 11,
 'learn': 1,
 'wow': 1,
 'left': 6,
 'one': 1,
 'seven': 1,
 'off': 9,
 'nine': 1,
 'right': 7,
 'up': 4,
 'stop': 10,
 'zero': 1,
 'three': 1,
 'on': 8,
 'yes': 2,
 'six': 1,
 'no': 3,
 '_silence_': 0}

In [None]:
def get_stft(x, fs, n_fft, hop_length):
    c_stft = librosa.stft(x, n_fft=n_fft, hop_length=hop_length)
    return c_stft

# Read Target Data

In [None]:
if volunteer_id > 4:
    path_dirs = []
    for file in os.listdir(input_dir):
        d = os.path.join(input_dir, file)
        if  os.path.isdir(d):
            if "." in d:
                continue
            path_dirs.append(d)
else:
    path_dirs = [os.path.join(input_dir,"Volunteer"+str(volunteer_id))]

print("path dirs", path_dirs)


In [None]:
file_extension = "general_aud.mat"

In [None]:
def read_target_data():

    all_train_mic_data = []
    all_train_imu_data = []
    all_test_mic_data = []
    all_test_imu_data = []

    all_train_mic_stft = []
    all_train_imu_stft = []
    all_test_mic_stft = []
    all_test_imu_stft = []
    
    all_test_labels = []
    all_train_labels = []


    for fpath in path_dirs:
        if ".ipynb_checkpoints" in fpath:
            continue
        print(fpath)



        train_mic_data = []
        train_imu_data = []
        test_mic_data = []
        test_imu_data = []
        train_label = []

        train_mic_stft = []
        train_imu_stft = []
        test_mic_stft = []
        test_imu_stft = []
        test_label = []


        gen = os.path.join(fpath, file_extension)
        gen_data = loadmat(gen)['word'][:,0:5]
        sample_count = gen_data.shape[0]//len(label_arr)
        
        
        for word in range(0, len(label_arr)):
            imu_data = []
            mic_data = []
            imu_stft = []
            mic_stft = []
            labels = []


            
            for ind in range(0, sample_count):
                d = gen_data[word + len(label_arr)*ind, :]
                resampled_mic = librosa.resample(d[0].reshape(-1), 44100, target_sampling_rate)
                normalizedsound = minmax_scale(resampled_mic, feature_range=(-1,1))
                mic_data.append(normalizedsound)

                imu_t = d[3][0:400,1:4]
                temp_imu = np.sqrt(imu_t[:,0]**2 + imu_t[:,1]**2 + imu_t[:,2]**2)
                normalizedimu = minmax_scale(temp_imu, feature_range=(-1,1))

                imu_data.append(normalizedimu)

                imu_stft.append(get_stft(x=normalizedimu, fs=normalizedimu.shape[0], n_fft=40, hop_length=20))
                mic_stft.append(get_stft(normalizedsound, fs=normalizedsound.shape[0], n_fft=400, hop_length=200))
                labels.append(label_arr[word])

            
            mic_data = np.asarray(mic_data)
            imu_data = np.asarray(imu_data)
            mic_stft = np.asarray(mic_stft)
            imu_stft = np.asarray(imu_stft)
            labels = np.asarray(labels)
            
            if word in test_label_ids:
                test_mic_data.append(mic_data)
                test_imu_data.append(imu_data)
                test_mic_stft.append(mic_stft)
                test_imu_stft.append(imu_stft)
                test_label.append(labels)
            
            else:

                train_mic_data.append(mic_data)
                train_imu_data.append(imu_data)
                train_mic_stft.append(mic_stft)
                train_imu_stft.append(imu_stft)
                train_label.append(labels)


        all_train_mic_stft.append(train_mic_stft)
        all_train_imu_stft.append(train_imu_stft)
        all_test_mic_stft.append(test_mic_stft)
        all_test_imu_stft.append(test_imu_stft)

        all_train_mic_data.append(train_mic_data)
        all_train_imu_data.append(train_imu_data)
        all_test_mic_data.append(test_mic_data)
        all_test_imu_data.append(test_imu_data)
        
        all_train_labels.append(train_label)
        all_test_labels.append(test_label)


    all_train_mic_data = np.asarray(all_train_mic_data)
    all_train_imu_data = np.asarray(all_train_imu_data)
    all_test_mic_data = np.asarray(all_test_mic_data)
    all_test_imu_data = np.asarray(all_test_imu_data)

    all_train_mic_stft = np.asarray(all_train_mic_stft)
    all_train_imu_stft = np.asarray(all_train_imu_stft)
    all_test_mic_stft = np.asarray(all_test_mic_stft)
    all_test_imu_stft = np.asarray(all_test_imu_stft)
    
    all_train_labels = np.asarray(all_train_labels)
    all_test_labels = np.asarray(all_test_labels)

    print(all_train_mic_data.shape, all_train_imu_data.shape, all_test_mic_data.shape, all_test_imu_data.shape) 
    print(all_train_mic_stft.shape, all_train_imu_stft.shape, all_test_mic_stft.shape, all_test_imu_stft.shape) 
    print(all_train_labels.shape, all_test_labels.shape)

    return [all_train_mic_data, all_train_imu_data, all_train_mic_stft, all_train_imu_stft, all_train_labels], [all_test_mic_data, all_test_imu_data, all_test_mic_stft, all_test_imu_stft, all_test_labels]

target_train, target_test = read_target_data()
pickle.dump( target_train, open( os.path.join(output_dir, "target_train.p"), "wb" ) )
pickle.dump( target_test, open( os.path.join(output_dir, "target_test.p"), "wb" ) )    

# Read Noise Data

In [None]:
def read_noise_data():
    all_train_noise_data = []
    all_test_noise_data = []
    all_train_labels = []
    all_test_labels = []

    not_present_ind = []

    for words in label_arr:
        temp_dir = os.path.join(google_dir, words)
        print(temp_dir)
        if not os.path.exists(temp_dir):
            not_present_ind.append(label_arr.index(words))
            continue
        labeled_data = []
        labeled_stft = []
        labels = []
        count_file = 0
        for file in os.listdir(temp_dir):
            if file.endswith(".wav"):
                if count_file == 500:
                    break
                d = librosa.load(os.path.join(temp_dir, file))
                if len(d[0]) != 22050:
                    continue
                resampled_mic = librosa.resample(d[0].reshape(-1), 22050, target_sampling_rate)
                normalizedsound = minmax_scale(resampled_mic, feature_range=(-1,1))
                labeled_data.append(normalizedsound)
                labels.append(words)
                count_file += 1
        if label_arr.index(words) in test_label_ids:
            all_test_noise_data.append(labeled_data)
            all_test_labels.append(labels)
        else:
            all_train_noise_data.append(labeled_data)
            all_train_labels.append(labels)
            

    all_train_noise_data = np.asarray(all_train_noise_data)
    all_test_noise_data = np.asarray(all_test_noise_data)
    
    all_train_labels = np.asarray(all_train_labels)
    all_test_labels = np.asarray(all_test_labels)

    print(all_train_noise_data.shape, all_test_noise_data.shape)
    
    return [all_train_noise_data, not_present_ind, all_train_labels], [all_test_noise_data, not_present_ind, all_test_labels]

noise_train, noise_test = read_noise_data()

pickle.dump( noise_train, open( os.path.join(output_dir, "noise_train.p"), "wb" ) )
pickle.dump( noise_test, open( os.path.join(output_dir, "noise_test.p"), "wb" ) )  

# making datasets

In [None]:
%%capture output1
not_present_ind = noise_test[1]
test_index = []
for user in range(0, 1):
    for word in range(0,6):
        for i in range(0, 10):
            noise_word = 0
            for noise_word_ind in range(0, 4):
                if noise_word_ind in not_present_ind:
                    continue
                if noise_word_ind == word:
                    continue

                for j in range(0, 500):
                    test_index.append([[user, word, i],[noise_word, j]])
                noise_word += 1
    
shuffled_test_index = sklearn.utils.shuffle(test_index)
print(len(shuffled_test_index))
pickle.dump( shuffled_test_index, open( os.path.join(output_dir, "combination_index_test.p"), "wb" ) ) 


In [None]:
print(test_index[1050])

In [None]:

not_present_ind = noise_train[1]
train_index = []
for user in range(0, 1):
    for word in range(0,33):
        for i in range(0, 10):
            noise_word = 0
            for noise_word_ind in range(0, 26):
                if noise_word_ind in not_present_ind:
                    continue
                if noise_word_ind == word:
                    continue

                for j in range(0, 500):
                    train_index.append([[user, word, i],[noise_word, j]])
                noise_word += 1
    
shuffled_train_index = sklearn.utils.shuffle(train_index)
print(len(shuffled_train_index))
pickle.dump( shuffled_train_index, open( os.path.join(output_dir, "combination_index_train.p"), "wb" ) ) 

In [None]:
print(train_index[10050])