In [None]:
import os
import time
import json
from datetime import datetime
import warnings
import numpy as np
from tqdm import tqdm
import tensorflow as tf
import tensorflow.keras.backend as K
from tensorflow.python.keras import optimizers
from tensorflow.python.keras import backend as K
from tensorflow.python.ops import state_ops
from tensorflow.python.ops import math_ops
import matplotlib.pylab as plt
import librosa

In [None]:
# Configuration defaults
DEFAULT_CONFIG = {
    'general': {
        'dataset_directory': 'data/audio_samples',
    },
    'preprocessing': {
        'compute_embeddings': True,
        'files_to_PCM': True,
    },
    'training': {
        'epochs': 30,
        'batch_size': 32,
        'initial_lr': 0.001,
        'use_validation': True,
        'use_multiprocessing': True,
        'mode': 'normal',
        'ckpt_freq': 5
    },
    'model': {
        'num_classes': None,
        'input_shape': (96, 64, 1),  # Mel spectrogram dimensions
    }
}

class Paths:
    """Helper class to manage paths"""
    def __init__(self, base_dir='.'):
        self.base_dir = base_dir
        self.timestamp = None
        self.CONF = None

    def get_dataset_dir(self):
        return os.path.join(self.base_dir, 'data', 'dataset')

    def get_models_dir(self):
        return os.path.join(self.base_dir, 'models')

    def get_timestamped_dir(self):
        return os.path.join(self.base_dir, 'runs', self.timestamp)

    def get_checkpoints_dir(self):
        return os.path.join(self.get_timestamped_dir(), 'checkpoints')

    def get_stats_dir(self):
        return os.path.join(self.get_timestamped_dir(), 'stats')

    def get_embeddings_dir(self):
        return os.path.join(self.get_timestamped_dir(), 'embeddings')

    def get_ts_splits_dir(self):
        return os.path.join(self.get_timestamped_dir(), 'splits')


In [None]:
class customAdam(optimizers.Adam):
    """Custom Adam optimizer with learning rate multiplier"""
    def __init__(self, lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0., amsgrad=False,
                 lr_mult=0.1, excluded_vars=[], **kwargs):
        super().__init__(lr=lr, beta_1=beta_1, beta_2=beta_2, epsilon=epsilon, decay=decay, amsgrad=amsgrad, **kwargs)
        with K.name_scope(self.__class__.__name__):
            self.lr_mult = lr_mult
            self.excluded_vars = excluded_vars

    def get_updates(self, loss, params):
        grads = self.get_gradients(loss, params)
        self.updates = [state_ops.assign_add(self.iterations, 1)]

        lr = self.lr
        if self.initial_decay > 0:
            lr = lr * (1. / (1. + self.decay * math_ops.cast(self.iterations, K.dtype(self.decay))))

        t = math_ops.cast(self.iterations, K.floatx()) + 1
        lr_t = lr * (K.sqrt(1. - math_ops.pow(self.beta_2, t)) / (1. - math_ops.pow(self.beta_1, t)))

        ms = [K.zeros(K.int_shape(p), dtype=K.dtype(p)) for p in params]
        vs = [K.zeros(K.int_shape(p), dtype=K.dtype(p)) for p in params]

        if self.amsgrad:
            vhats = [K.zeros(K.int_shape(p), dtype=K.dtype(p)) for p in params]
        else:
            vhats = [K.zeros(1) for _ in params]

        self.weights = [self.iterations] + ms + vs + vhats

        for p, g, m, v, vhat in zip(params, grads, ms, vs, vhats):
            multiplied_lr_t = lr_t * self.lr_mult if p.name not in self.excluded_vars else lr_t

            m_t = (self.beta_1 * m) + (1. - self.beta_1) * g
            v_t = (self.beta_2 * v) + (1. - self.beta_2) * math_ops.square(g)

            if self.amsgrad:
                vhat_t = math_ops.maximum(vhat, v_t)
                p_t = p - multiplied_lr_t * m_t / (K.sqrt(vhat_t) + self.epsilon)
                self.updates.append(state_ops.assign(vhat, vhat_t))
            else:
                p_t = p - multiplied_lr_t * m_t / (K.sqrt(v_t) + self.epsilon)

            self.updates.append(state_ops.assign(m, m_t))
            self.updates.append(state_ops.assign(v, v_t))
            new_p = p_t

            if getattr(p, 'constraint', None) is not None:
                new_p = p.constraint(new_p)

            self.updates.append(state_ops.assign(p, new_p))
        return self.updates


In [None]:
class DataSequence(tf.keras.utils.Sequence):
    """Data generator for training and validation"""
    def __init__(self, X, y, batch_size, num_classes):
        self.X = X
        self.y = y
        self.batch_size = batch_size
        self.num_classes = num_classes

    def __len__(self):
        return int(np.ceil(len(self.X) / self.batch_size))

    def __getitem__(self, idx):
        batch_x = self.X[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_y = self.y[idx * self.batch_size:(idx + 1) * self.batch_size]

        # Load audio files and convert to spectrograms
        X = np.array([self.load_audio(file_path) for file_path in batch_x])
        y = tf.keras.utils.to_categorical(batch_y, num_classes=self.num_classes)

        return X, y

    def load_audio(self, file_path):
        if file_path.endswith('.npy'):
            return np.load(file_path)
        else:
            # Load and preprocess audio file
            audio, sr = librosa.load(file_path, sr=None)
            mel_spec = librosa.feature.melspectrogram(y=audio, sr=sr)
            mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
            return mel_spec_db
