In [1]:
import os
import pydub
import librosa
import matplotlib.pyplot as plt
import numpy as np
from scipy.io import wavfile
from scipy import signal
import tensorflow as tf
from tensorflow.keras.layers import Dense, Activation, TimeDistributed, GRU, Conv1D, Dropout, BatchNormalization
from tensorflow.keras import Model

2025-02-11 02:08:20.261928: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
DIR = r"/media/shiva/ML/NLP/trigger_word_detection/dataset/raw_data"

In [3]:
def load_audio_file(DIR):
    pos = []
    neg = []
    back = []
    for i in os.listdir(DIR):
        d = os.path.join(DIR, i)
        for aud in os.listdir(d):
            audio_file = os.path.join(d, aud)
            if audio_file.endswith('wav'):
                audio = pydub.AudioSegment.from_wav(audio_file)
                if i == 'activates':
                    pos.append(audio)
                elif i == 'backgrounds':
                    back.append(audio)
                elif i == 'negatives':
                    neg.append(audio)

    return pos, neg, back

In [4]:
def get_random_time(segment_ms, background_len_ms):
    segment_start  = np.random.randint(low = 0, high = background_len_ms - segment_ms)
    segment_end = segment_start + segment_ms - 1

    return (segment_start, segment_end)

In [5]:
def is_overlapping(segment_time, previos_segment_time):
    segment_start, segment_end = segment_time
    overlap = False
    for prev_str, prev_end in previos_segment_time:
        if segment_start <= prev_end and segment_end >= prev_str:
            overlap = True
            break
    
    return overlap

In [6]:
def insert_audio_clip(background, audio_clip, previous_time):
    segment_ms = len(audio_clip)
    random_time = get_random_time(segment_ms, len(background))
    retry = 5

    while is_overlapping(random_time, previous_time) and retry >= 0:
        random_time = get_random_time(segment_ms, len(background))
        retry -= 1
    
    if not is_overlapping(random_time, previous_time):
        previous_time.append(random_time)
        new_background = background.overlay(audio_clip, position = random_time[0])

    else:
        new_background = background
        random_time = (len(background), len(background))

    return new_background, random_time

In [7]:
def insert_ones(y, segment_end_ms):
    _, ty = y.shape
    segment_end_y = int(segment_end_ms * ty / 10000.0)

    if segment_end_y < ty:
        for i in range(segment_end_y + 1, segment_end_y + 51):
            if i < ty:
                y[0, i] = 1

    return y

In [8]:
def get_wav_info(wav_file):
    rate, data = wavfile.read(wav_file)
    return rate, data

In [9]:
def graph_spectrogram_librosa(wav_file):
    # rate, data = get_wav_info(wav_file)
    nfft = 200 # Length of each window segment
    # fs = 8000 # Sampling frequencies
    noverlap = 120 # Overlap between windows
    aud, _ = librosa.load(wav_file, sr = 44100, mono = True)
    stft = librosa.stft(aud, n_fft = nfft, hop_length = nfft - noverlap)
    
    stft = np.abs(stft)
    stft = librosa.power_to_db(stft ** 2, ref = np.max)
    
    return stft

In [10]:
def graph_spectrogram_pydub(wav_file):
    rate, data = get_wav_info(wav_file)
    nfft = 200 # Length of each window segment
    fs = 8000 # Sampling frequencies
    noverlap = 120 # Overlap between windows
    nchannels = data.ndim
    if nchannels == 1:
        pxx, freqs, bins, im = plt.specgram(data, nfft, fs, noverlap = noverlap)
    elif nchannels == 2:
        pxx, freqs, bins, im = plt.specgram(data[:,0], nfft, fs, noverlap = noverlap)
    return pxx

In [11]:
def create_training_ex(background, positive, negative, ty):
    background -= 20
    y = np.zeros((1, ty))
    previous_segment = []

    number_of_positives = np.random.randint(0, 5)
    random_indices = np.random.randint(len(positive), size=number_of_positives)
    random_positives = [positive[i] for i in random_indices]
    number_of_negatives = np.random.randint(0, 3)
    random_indices = np.random.randint(len(negative), size=number_of_negatives)
    random_negatives = [negative[i] for i in random_indices]
    print(f"no_of_psoitives: {number_of_positives}")
    print(f"random_psoitives: {random_positives}")
    print(f"no_of_negatives: {number_of_negatives}")
    print(f"random_negetives: {random_negatives}")

    for random_activate in random_positives:
        background, segment_time = insert_audio_clip(background, random_activate, previous_segment)
        segment_start, segment_end = segment_time
        y = insert_ones(y, segment_end)

    for random_neg in random_negatives:
        background, segment_time = insert_audio_clip(background, random_neg, previous_segment)

    background = background.apply_gain(-20 -background.dBFS)

    file_handle = background.export("train" + ".wav", format="wav")

    x = graph_spectrogram_librosa("train.wav")
    
    return x, y
        

In [12]:
positive, negative, backgrounds = load_audio_file(DIR)
Ty = 1375

In [13]:
x, y = create_training_ex(backgrounds[0], positive, negative, Ty)

no_of_psoitives: 0
random_psoitives: []
no_of_negatives: 2
random_negetives: [<pydub.audio_segment.AudioSegment object at 0x7f438a7c7940>, <pydub.audio_segment.AudioSegment object at 0x7f4420f05cc0>]


In [14]:
def create_dataset():
    X, Y = [], []
    batch_samples = 32
    for i in range(0, batch_samples):
        x, y = create_training_ex(backgrounds[i % 2], positive, negative, Ty)
        X.append(x.swapaxes(0, 1))
        Y.append(y.swapaxes(0, 1))

    return np.array(X), np.array(Y)

In [15]:
x, y = create_dataset()

no_of_psoitives: 1
random_psoitives: [<pydub.audio_segment.AudioSegment object at 0x7f438d7cfeb0>]
no_of_negatives: 2
random_negetives: [<pydub.audio_segment.AudioSegment object at 0x7f438a7c75e0>, <pydub.audio_segment.AudioSegment object at 0x7f438a7c7dc0>]
no_of_psoitives: 2
random_psoitives: [<pydub.audio_segment.AudioSegment object at 0x7f4420f05c90>, <pydub.audio_segment.AudioSegment object at 0x7f4420f05c90>]
no_of_negatives: 2
random_negetives: [<pydub.audio_segment.AudioSegment object at 0x7f438a7c7eb0>, <pydub.audio_segment.AudioSegment object at 0x7f438a7c76a0>]
no_of_psoitives: 0
random_psoitives: []
no_of_negatives: 2
random_negetives: [<pydub.audio_segment.AudioSegment object at 0x7f438a7c7940>, <pydub.audio_segment.AudioSegment object at 0x7f438a7c7dc0>]
no_of_psoitives: 2
random_psoitives: [<pydub.audio_segment.AudioSegment object at 0x7f4420f078b0>, <pydub.audio_segment.AudioSegment object at 0x7f442375a3e0>]
no_of_negatives: 1
random_negetives: [<pydub.audio_segment.Au

In [16]:
print(x.shape)
print(y.shape)

(32, 5513, 101)
(32, 1375, 1)


In [17]:
# print(x.shape)
# print(y.shape)

In [18]:
def modelf(input_shape):
    X_input = tf.keras.Input(shape = input_shape)
    X = Conv1D(filters=196, kernel_size=15, strides=4)(X_input)
    # Batch normalization
    X = BatchNormalization()(X)
    # ReLu activation
    X = Activation('relu')(X)
    # dropout (use 0.8)
    X = Dropout(rate=0.8)(X)             
    # GRU (use 128 units and return the sequences)
    X = GRU(128, return_sequences=True)(X)
    # dropout (use 0.8)
    X = Dropout(rate=0.8)(X)
    # Batch normalization.
    X = BatchNormalization()(X)             
    # GRU (use 128 units and return the sequences)
    X = GRU(128, return_sequences=True)(X)
    # dropout (use 0.8)
    X = Dropout(rate=0.8)(X)       
    # Batch normalization
    X = BatchNormalization()(X) 
    # dropout (use 0.8)
    X = Dropout(rate=0.8)(X)
    X = TimeDistributed(Dense(1, activation='sigmoid'))(X)
    model = Model(inputs = X_input, outputs = X)
    
    return model  

In [19]:
n_fft = 200
model = modelf((None, (n_fft // 2) + 1))

In [20]:
model.summary()

In [21]:
model.layers[2].trainable = False
model.layers[7].trainable = False
model.layers[10].trainable = False

In [22]:
opt = tf.keras.optimizers.Adam()
model.compile(loss='binary_crossentropy', optimizer=opt, metrics=["accuracy"])

In [None]:
model.fit(x, y, batch_size = 8, epochs=5)

Epoch 1/6
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 2s/step - accuracy: 0.5427 - loss: 1.4814
Epoch 2/6
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 2s/step - accuracy: 0.6817 - loss: 0.9329
Epoch 3/6
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 2s/step - accuracy: 0.7746 - loss: 0.6878
Epoch 4/6
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 2s/step - accuracy: 0.8558 - loss: 0.5336
Epoch 5/6
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 2s/step - accuracy: 0.8929 - loss: 0.5236
Epoch 6/6
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 2s/step - accuracy: 0.9050 - loss: 0.5514


<keras.src.callbacks.history.History at 0x7f438229d780>

In [24]:
model.save(r'/media/shiva/ML/NLP/trigger_word_detection/model/44.1khz.keras')