In [8]:
import os
import re
import hashlib

MAX_NUM_WAVS_PER_CLASS = 2**27 - 1  # ~134M

def which_set(filename, validation_percentage, testing_percentage):
    """Determines which data partition the file should belong to."""
    base_name = os.path.basename(filename)
    hash_name = re.sub(r'_nohash_.*$', '', base_name)
    hash_name_hashed = hashlib.sha1(hash_name.encode('utf-8')).hexdigest()
    percentage_hash = ((int(hash_name_hashed, 16) %
                        (MAX_NUM_WAVS_PER_CLASS + 1)) *
                       (100.0 / MAX_NUM_WAVS_PER_CLASS))
    if percentage_hash < validation_percentage:
        result = 'validation'
    elif percentage_hash < (testing_percentage + validation_percentage):
        result = 'testing'
    else:
        result = 'training'
    return result

def partition_dataset(data_dir, validation_percentage, testing_percentage):
    validation_list = []
    testing_list = []

    for root, _, files in os.walk(data_dir):
        for file in files:
            if file.endswith('.wav'):
                file_path = os.path.join(root, file)
                set_type = which_set(file_path, validation_percentage, testing_percentage)
                if set_type == 'validation':
                    validation_list.append(file_path)
                elif set_type == 'testing':
                    testing_list.append(file_path)
    
    return validation_list, testing_list

def write_list_to_file(file_list, file_path):
    with open(file_path, 'w') as f:
        for item in file_list:
            f.write("%s\n" % item)

# Parameters
data_dir = '/home/pmedur/strojnoUcenje/env/bin/TorchAudio/SpeechCommands/speech_commands_v0.02'  # Replace with the path to your wav files
validation_percentage = 10.0
testing_percentage = 10.0

# Partition the dataset
validation_list, testing_list = partition_dataset(data_dir, validation_percentage, testing_percentage)

# Write the lists to files
write_list_to_file(validation_list, 'validation_list.txt')
write_list_to_file(testing_list, 'testing_list.txt')


In [9]:
import os
import re
import hashlib
import random

MAX_NUM_WAVS_PER_CLASS = 2**27 - 1  # ~134M

def which_set(hash_name, validation_percentage, testing_percentage):
    """Determines which data partition the file should belong to."""
    hash_name_hashed = hashlib.sha1(hash_name.encode('utf-8')).hexdigest()
    percentage_hash = ((int(hash_name_hashed, 16) %
                        (MAX_NUM_WAVS_PER_CLASS + 1)) *
                       (100.0 / MAX_NUM_WAVS_PER_CLASS))
    if percentage_hash < validation_percentage:
        return 'validation'
    elif percentage_hash < (testing_percentage + validation_percentage):
        return 'testing'
    else:
        return 'training'

def partition_dataset(data_dir, validation_percentage, testing_percentage):
    label_speaker_files = {}

    # Group files by label and speaker ID
    for root, _, files in os.walk(data_dir):
        for file in files:
            if file.endswith('.wav'):
                file_path = os.path.join(root, file)
                base_name = os.path.basename(file_path)
                label = os.path.basename(os.path.dirname(file_path))
                speaker_id = re.sub(r'_nohash_.*$', '', base_name)

                if label not in label_speaker_files:
                    label_speaker_files[label] = {}
                if speaker_id not in label_speaker_files[label]:
                    label_speaker_files[label][speaker_id] = []
                label_speaker_files[label][speaker_id].append(file_path)

    validation_list = []
    testing_list = []

    # Randomly shuffle speaker IDs and assign to sets
    for label, speaker_files in label_speaker_files.items():
        speaker_ids = list(speaker_files.keys())
        random.shuffle(speaker_ids)  # Randomize the order of speaker IDs

        for speaker_id in speaker_ids:
            files = speaker_files[speaker_id]
            set_type = which_set(speaker_id, validation_percentage, testing_percentage)
            if set_type == 'validation':
                validation_list.extend(files)
            elif set_type == 'testing':
                testing_list.extend(files)

    return validation_list, testing_list

def write_list_to_file(file_list, file_path):
    organized_entries = organize_file_entries(file_list)
    with open(file_path, 'w') as f:
        for item in organized_entries:
            f.write("%s\n" % item)

def organize_file_entries(file_list):
    organized_entries = []
    for file_path in file_list:
        base_name = os.path.basename(file_path)
        label = os.path.basename(os.path.dirname(file_path))
        speaker_id_match = re.search(r'([^_]*)_nohash_', base_name)
        utterance_number_match = re.search(r'_nohash_([0-9]+)\.wav$', base_name)
        
        if speaker_id_match and utterance_number_match:
            speaker_id = speaker_id_match.group(1)
            utterance_number = utterance_number_match.group(1)
            organized_entries.append(f"{label} {speaker_id} {utterance_number}")
        else:
            print(f"Filename format issue: {file_path}")
    return organized_entries

# Parameters
data_dir = '/home/pmedur/strojnoUcenje/env/bin/TorchAudio/SpeechCommands/speech_commands_v0.02'  # Replace with the path to your wav files
validation_percentage = 10.0
testing_percentage = 10.0

# Seed the random number generator for reproducibility
random.seed()

# Partition the dataset
validation_list, testing_list = partition_dataset(data_dir, validation_percentage, testing_percentage)

# Write the lists to files
write_list_to_file(validation_list, 'validation_list.txt')
write_list_to_file(testing_list, 'testing_list.txt')


In [3]:
import os
import numpy as np
import librosa
import python_speech_features
from os import listdir
from os.path import isdir, join
import random
from collections import defaultdict
from sklearn.model_selection import KFold

class SpeechDataLoader:
    def __init__(self, dataset_path, feature_sets_file, sample_rate=8000, num_mfcc=16, len_mfcc=16, val_ratio=0.1, test_ratio=0.1, n_splits=2):
        self.dataset_path = dataset_path
        self.feature_sets_file = feature_sets_file
        self.sample_rate = sample_rate
        self.num_mfcc = num_mfcc
        self.len_mfcc = len_mfcc
        self.val_ratio = val_ratio
        self.test_ratio = test_ratio
        self.n_splits = n_splits
        self.target_list = None
        self.data_by_id = None
        self.filenames = None
        self.y = None
        self.x_train = None
        self.y_train = None
        self.x_val = None
        self.y_val = None
        self.x_test = None
        self.y_test = None

    def load_data(self):
        # Load target labels
        self.target_list = [name for name in listdir(self.dataset_path) if isdir(join(self.dataset_path, name))]
        self.target_list.remove('_background_noise_')
        self.target_list.remove('.ipynb_checkpoints')

        # Load filenames and labels, grouped by ID
        self.data_by_id = defaultdict(list)
        for index, target in enumerate(self.target_list):
            files = listdir(join(self.dataset_path, target))
            files = [f for f in files if f.endswith('.wav')]  # Filter out non-wav files
            for file in files:
                id_part = file.split('_')[0]
                self.data_by_id[(target, id_part)].append((file, index))

        # Convert data_by_id to lists
        self.filenames = []
        self.y = []
        for (target, id_part), files in self.data_by_id.items():
            for file, label in files:
                self.filenames.append(join(target, file))
                self.y.append(label)

        # Shuffle data by ID
        ids = list(self.data_by_id.keys())
        random.shuffle(ids)
        self.filenames = []
        self.y = []
        for (target, id_part) in ids:
            for file, label in self.data_by_id[(target, id_part)]:
                self.filenames.append(join(target, file))
                self.y.append(label)

    def extract_features(self, filenames, y_orig):
        out_x = []
        out_y = []

        for index, filename in enumerate(filenames):
            path = join(self.dataset_path, filename)
            mfccs = self.calc_mfcc(path)

            if mfccs.shape[1] == self.len_mfcc:
                out_x.append(mfccs)
                out_y.append(y_orig[index])

        return out_x, out_y

    def calc_mfcc(self, path):
        signal, fs = librosa.load(path, sr=self.sample_rate)
        mfccs = python_speech_features.base.mfcc(signal,
                                                 samplerate=fs,
                                                 winlen=0.256,
                                                 winstep=0.050,
                                                 numcep=self.num_mfcc,
                                                 nfilt=26,
                                                 nfft=2048,
                                                 preemph=0.0,
                                                 ceplifter=0,
                                                 appendEnergy=False,
                                                 winfunc=np.hanning)
        return mfccs.transpose()

    def k_fold_split(self):
        """
        total_ids = len(self.data_by_id)
        ids = list(self.data_by_id.keys())
        random.shuffle(ids)
        print(total_ids)
        # Calculate the size of the data for further testing
        further_test_size = int(total_ids * 0.05)
        
        # Take a portion of data for further testing
        further_test_ids = ids[:further_test_size]
        ids = ids[further_test_size:]
        print(len(ids))
        kf = KFold(n_splits=self.n_splits, shuffle=True, random_state=None)
        folds = []
        print("started folding")
        
        for train_val_index, test_index in kf.split(ids):
            train_val_ids = [ids[i] for i in train_val_index]
            test_ids = [ids[i] for i in test_index]
            print(len(train_val_index))
            print(len(test_index))
            # Determine the size of the validation and test sets based on the number of folds
            val_size = int(len(train_val_ids) * self.val_ratio)
            test_size = int(len(test_ids) * self.test_ratio)
    
            # Split the training and validation sets
            val_ids = train_val_ids[:val_size]
            train_ids = train_val_ids[val_size:]
    
            # Split the test set
            test_ids = test_ids[:test_size]
    
            # Extract features for each partition
            x_train_fold, y_train_fold = self.extract_partition(train_ids)
            x_val_fold, y_val_fold = self.extract_partition(val_ids)
            x_test_fold, y_test_fold = self.extract_partition(test_ids)
    
            # Append the fold to the list of folds
            folds.append((x_train_fold, y_train_fold, x_val_fold, y_val_fold, x_test_fold, y_test_fold))
            print("fold split made")

        # Save the further testing data
        further_test_data = self.extract_partition(further_test_ids)
        further_test_file = 'further_test_data.npz'
        np.savez(further_test_file,
                 x_test=further_test_data[0],
                 y_test=further_test_data[1])
        print(f"Saved {further_test_file}")
        
        return folds
    """
        total_ids = len(self.data_by_id)
        ids = list(self.data_by_id.keys())
        random.shuffle(ids)
    
        # Calculate the size of the data for further testing
        further_test_size = int(total_ids * 0.05)
    
        # Take a portion of data for further testing
        further_test_ids = ids[:further_test_size]
        remaining_ids = ids[further_test_size:]
    
        # KFold cross-validation
        kf = KFold(n_splits=10, shuffle=True, random_state=None)
        folds = []
        print("started folding")
        for train_val_index, test_index in kf.split(remaining_ids):
            train_val_ids = [remaining_ids[i] for i in train_val_index]
            test_ids = [remaining_ids[i] for i in test_index]
    
            # Split train_val_ids into training and validation sets
            val_size = int(len(train_val_ids) * 0.1)
            train_ids = train_val_ids[val_size:]
            val_ids = train_val_ids[:val_size]
    
            # Extract features for each partition
            x_train_fold, y_train_fold = self.extract_partition(train_ids)
            x_val_fold, y_val_fold = self.extract_partition(val_ids)
            x_test_fold, y_test_fold = self.extract_partition(test_ids)
    
            # Append the fold to the list of folds
            folds.append((x_train_fold, y_train_fold, x_val_fold, y_val_fold, x_test_fold, y_test_fold))
            print("fold split made")
    
        # Save the further testing data
        x_further_test, y_further_test = self.extract_partition(further_test_ids)
        further_test_path = 'further_test_data.npz'
        np.savez(further_test_path, x_test=x_further_test, y_test=y_further_test)

        return folds

    """
    def k_fold_split(self):
        total_ids = len(self.data_by_id)
        ids = list(self.data_by_id.keys())
        random.shuffle(ids)

        kf = KFold(n_splits=self.n_splits, shuffle=True, random_state=None)
        folds = []
        print("started folding")
        
        for train_val_index, test_index in kf.split(ids):
            train_val_ids = [ids[i] for i in train_val_index]
            test_ids = [ids[i] for i in test_index]
            
            val_set_size = int(len(train_val_ids) * self.val_ratio / (1 - self.test_ratio))
            random.shuffle(train_val_ids)
            val_ids = train_val_ids[:val_set_size]
            train_ids = train_val_ids[val_set_size:]
            
            x_train_fold, y_train_fold = self.extract_partition(train_ids)
            x_val_fold, y_val_fold = self.extract_partition(val_ids)
            x_test_fold, y_test_fold = self.extract_partition(test_ids)

            folds.append((x_train_fold, y_train_fold, x_val_fold, y_val_fold, x_test_fold, y_test_fold))
            print("Fold split made")
        return folds
    """
    
    def extract_partition(self, ids):
        filenames = []
        labels = []
        for (target, id_part) in ids:
            for file, label in self.data_by_id[(target, id_part)]:
                filenames.append(join(target, file))
                labels.append(label)
        return self.extract_features(filenames, labels)

    def save_folds(self, folds):
        for i, (x_train_fold, y_train_fold, x_val_fold, y_val_fold, x_test_fold, y_test_fold) in enumerate(folds):
            fold_file = f'fold_{i + 1}.npz'
            np.savez(fold_file,
                     x_train_fold=x_train_fold,
                     y_train_fold=y_train_fold,
                     x_val_fold=x_val_fold,
                     y_val_fold=y_val_fold,
                     x_test_fold=x_test_fold,
                     y_test_fold=y_test_fold)
            print(f"Saved {fold_file}")

In [4]:
loader = SpeechDataLoader(dataset_path='/home/pmedur/strojnoUcenje/env/bin/TorchAudio/SpeechCommands/speech_commands_v0.02 (copy)',
                          feature_sets_file='all_targets_mfcc_sets.npz')
loader.load_data()
folds = loader.k_fold_split()
loader.save_folds(folds)

started folding
fold split made
fold split made
fold split made
fold split made
fold split made
fold split made
fold split made
fold split made
fold split made
fold split made
Saved fold_1.npz
Saved fold_2.npz
Saved fold_3.npz
Saved fold_4.npz
Saved fold_5.npz
Saved fold_6.npz
Saved fold_7.npz
Saved fold_8.npz
Saved fold_9.npz
Saved fold_10.npz


In [8]:
feature_sets_path = '/home/pmedur/strojnoUcenje/env/bin/Tensorflow_speech_recognition/tflite-speech-recognition-master'
feature_sets_filename = 'all_targets_mfcc_sets.npz'
feature_sets = np.load(join(feature_sets_path, feature_sets_filename))
print(feature_sets.files)
# Assigning feature sets
x_train = feature_sets['x_train']
y_train = feature_sets['y_train']
x_val = feature_sets['x_val']
y_val = feature_sets['y_val']
x_test = feature_sets['x_test']
y_test = feature_sets['y_test']
print(x_train.shape)
print(x_val.shape)
print(x_test.shape)

['x_train', 'y_train', 'x_val', 'y_val', 'x_test', 'y_test']
(77441, 16, 16)
(9689, 16, 16)
(9726, 16, 16)


In [5]:
# Access the data for each fold
for i, (x_train_fold, y_train_fold, x_val_fold, y_val_fold, x_test_fold, y_test_fold) in enumerate(folds):
    print(f"Fold {i + 1}")
    print("Training samples:", len(x_train_fold))
    print("Training targets:", len(y_train_fold))
    print("Validation samples:", len(x_val_fold))
    print("Validation targets:", len(y_val_fold))
    print("Testing samples:", len(x_test_fold))
    print("Testing targets:", len(y_test_fold))


test_sets_path = '/home/pmedur/strojnoUcenje/env/bin/Tensorflow_speech_recognition/tflite-speech-recognition-master'
test_sets_filename = 'further_test_data.npz'
test_sets = np.load(join(test_sets_path, test_sets_filename))
x_test = test_sets['x_test']
y_test = test_sets['y_test']
print(len(x_test), len(y_test))

Fold 1
Training samples: 74661
Training targets: 74661
Validation samples: 8373
Validation targets: 8373
Testing samples: 9086
Testing targets: 9086
Fold 2
Training samples: 74446
Training targets: 74446
Validation samples: 8351
Validation targets: 8351
Testing samples: 9323
Testing targets: 9323
Fold 3
Training samples: 74395
Training targets: 74395
Validation samples: 8348
Validation targets: 8348
Testing samples: 9377
Testing targets: 9377
Fold 4
Training samples: 74502
Training targets: 74502
Validation samples: 8375
Validation targets: 8375
Testing samples: 9243
Testing targets: 9243
Fold 5
Training samples: 74387
Training targets: 74387
Validation samples: 8367
Validation targets: 8367
Testing samples: 9366
Testing targets: 9366
Fold 6
Training samples: 74634
Training targets: 74634
Validation samples: 8297
Validation targets: 8297
Testing samples: 9189
Testing targets: 9189
Fold 7
Training samples: 74595
Training targets: 74595
Validation samples: 8370
Validation targets: 8370
T

In [1]:
import numpy as np
from os.path import join

# Load the test data
test_sets_path = '/home/pmedur/strojnoUcenje/env/bin/Tensorflow_speech_recognition/tflite-speech-recognition-master'
test_sets_filename = 'further_test_data.npz'
test_sets = np.load(join(test_sets_path, test_sets_filename))

# Print all keys in the .npz file
print("Keys in the .npz file:", test_sets.files)

# Print the shape of each dataset in the .npz file
for key in test_sets.files:
    print(f"Shape of {key}: {test_sets[key].shape}")

# Optionally, print some samples from each dataset
for key in test_sets.files:
    print(f"First 5 samples from {key}: {test_sets[key][:5]}")


Keys in the .npz file: ['x_test', 'y_test']
Shape of x_test: (4736, 16, 16)
Shape of y_test: (4736,)
First 5 samples from x_test: [[[-8.70189777e+01 -6.48526714e+01 -3.68849118e+01 ... -4.69820656e+01
   -4.74103109e+01 -5.28231571e+01]
  [-9.87812317e-01  1.84348644e+00  5.28241541e+00 ... -7.23295317e+00
   -4.92526404e+00 -1.89042105e+00]
  [ 2.81920711e+00  1.05662018e+01  1.06800097e+01 ...  1.17842296e+00
    2.63428594e+00  4.35082154e+00]
  ...
  [-1.50546452e-02  7.66339290e-01 -9.23205255e-01 ... -8.73862684e-01
   -5.30805788e-01  9.14999231e-02]
  [-1.31747528e+00 -7.49570258e-01  3.75609763e-02 ... -8.38265017e-01
   -9.17036677e-01 -5.80532993e-01]
  [-5.43645768e-01 -8.43938676e-01 -8.04250199e-01 ...  3.26841319e-01
    4.15967951e-01  3.43705418e-01]]

 [[-1.79392601e+01 -1.50582367e+01 -1.42862228e+01 ... -4.69396647e+01
   -4.22841649e+01 -4.36599941e+01]
  [ 4.74644730e+00  4.93073893e+00  5.11698782e+00 ... -1.16546350e+01
   -1.22623247e+01 -1.21386413e+01]
  [ 7.