In [2]:
class Config:
    # Audio preprocessing parameters
    SAMPLING_RATE = 16000
    DURATION = 3.0  # seconds
    SAMPLES_PER_TRACK = int(SAMPLING_RATE * DURATION)
    
    # MFCC parameters
    N_MFCC = 13
    N_FFT = 2048
    HOP_LENGTH = 512
    N_MELS = 40
    
    # Data augmentation parameters
    NOISE_FACTOR = 0.1
    VALID_SPLIT = 0.15
    SHUFFLE_SEED = 43
    BATCH_SIZE = 32
    
    # Model parameters
    NUM_CLASSES = 4  # Update based on your speaker count
    LEARNING_RATE = 0.001
    EPOCHS = 50

config = Config()
print(config.N_MFCC)

13


In [None]:
import tensorflow as tf
import numpy as np
import librosa

def load_and_preprocess_audio(file_path, config):
    """Load and preprocess audio file."""
    try:
        # Đọc file audio sử dụng librosa thay vì tf.io.read_file
        audio, sr = librosa.load(file_path, sr=config.SAMPLING_RATE, mono=True)
        
        # Normalize audio
        audio = audio / np.max(np.abs(audio))
        
        # Remove silence
        audio = remove_silence(audio)
        
        # Normalize volume
        audio = normalize_volume(audio)
        
        # Ensure consistent length
        if len(audio) > config.SAMPLES_PER_TRACK:
            audio = audio[:config.SAMPLES_PER_TRACK]
        else:
            # Pad with zeros if audio is too short
            padding = config.SAMPLES_PER_TRACK - len(audio)
            audio = np.pad(audio, (0, padding))
        
        # Convert to tensor
        audio = tf.convert_to_tensor(audio, dtype=tf.float32)
        
        return audio
        
    except Exception as e:
        print(f"Error processing file {file_path}: {str(e)}")
        # Return a zero tensor of the correct shape as fallback
        return tf.zeros(config.SAMPLES_PER_TRACK, dtype=tf.float32)

def normalize_volume(audio, target_db=-20.0):
    """Normalize the volume of an audio signal to a target decibel level."""
    try:
        current_db = librosa.amplitude_to_db(np.abs(audio))
        adjustment_db = target_db - np.mean(current_db)
        normalized_audio = audio * np.power(10, adjustment_db / 20.0)
        return normalized_audio
    except:
        return audio

def remove_silence(audio, threshold=0.01, frame_length=2048, hop_length=512):
    """Remove silence from an audio signal based on a threshold."""
    try:
        non_silent_indices = np.where(np.abs(audio) > threshold)[0]
        if len(non_silent_indices) == 0:
            return audio  # Return original audio if all is silent
        start_index = non_silent_indices[0]
        end_index = non_silent_indices[-1] + 1
        return audio[start_index:end_index]
    except:
        return audio

: 

In [None]:
import tensorflow as tf
import os
from pathlib import Path

def load_audio(file_path, config):
    """Load and preprocess audio file."""
    try:
        # Load audio file using librosa
        audio, _ = librosa.load(str(file_path), sr=config.SAMPLING_RATE)
        
        # Ensure consistent length
        if len(audio) > config.SAMPLES_PER_TRACK:
            audio = audio[:config.SAMPLES_PER_TRACK]
        else:
            # Pad with zeros if audio is too short
            padding = config.SAMPLES_PER_TRACK - len(audio)
            audio = np.pad(audio, (0, padding))
            
        return tf.convert_to_tensor(audio, dtype=tf.float32)
    except Exception as e:
        print(f"Error processing file {file_path}: {str(e)}")
        return None

def create_dataset(data_dir, noise_dir, config, augment=True):
    """Create dataset from directory with noise augmentation."""
    # Get all audio files and their labels
    audio_data = []
    labels = []
    speaker_dirs = [d for d in os.listdir(data_dir) if os.path.isdir(Path(data_dir) / d)]
    
    # Load speaker audio files
    for label, speaker in enumerate(speaker_dirs):
        speaker_path = Path(data_dir) / speaker
        for audio_file in speaker_path.glob('*.wav'):
            audio = load_audio(audio_file, config)
            if audio is not None:
                audio_data.append(audio)
                labels.append(label)
    
    # Convert to tensors
    audio_data = tf.stack(audio_data)
    labels = tf.convert_to_tensor(labels)
    
    # Create dataset
    dataset = tf.data.Dataset.from_tensor_slices((audio_data, labels))
    
    # Load and preprocess audio
    dataset = dataset.map(
        lambda x, y: (load_and_preprocess_audio(x, config), y),
        num_parallel_calls=tf.data.AUTOTUNE
    )
    
    # Filter out failed loadings (zero tensors)
    dataset = dataset.filter(
        lambda x, y: tf.not_equal(tf.reduce_sum(tf.abs(x)), 0.0)
    )
    
    if augment:
        # Load noise files if available
        if noise_dir and os.path.exists(noise_dir):
            noise_paths = list(Path(noise_dir).glob('*.wav'))
            if noise_paths:
                noise_dataset = tf.data.Dataset.from_tensor_slices([str(p) for p in noise_paths])
                noise_dataset = noise_dataset.map(
                    lambda x: load_and_preprocess_audio(x, config),
                    num_parallel_calls=tf.data.AUTOTUNE
                )
                noise_dataset = noise_dataset.cache()
            else:
                noise_dataset = None
        else:
            noise_dataset = None
            
        # Apply augmentation
        dataset = dataset.map(
            lambda x, y: (apply_augmentation(x, config, noise_dataset), y),
            num_parallel_calls=tf.data.AUTOTUNE
        )
    
    # Extract MFCC features
    dataset = dataset.map(
        lambda x, y: (extract_mfcc(x, config), y),
        num_parallel_calls=tf.data.AUTOTUNE
    )
    
    return dataset, len(speaker_dirs)

In [None]:
import tensorflow as tf

def add_background_noise(audio, noise_dataset, snr_db=10):
    """Add background noise to audio with specific SNR."""
    if noise_dataset is None:
        return audio
        
    # Get a random noise sample
    noise = next(iter(noise_dataset.shuffle(1)))
    
    # Calculate signal and noise power
    signal_power = tf.reduce_mean(tf.square(audio))
    noise_power = tf.reduce_mean(tf.square(noise))
    
    # Calculate noise scaling factor for target SNR
    snr = tf.pow(10.0, snr_db / 10.0)
    scale = tf.sqrt(signal_power / (noise_power * snr))
    
    # Add scaled noise to audio
    noisy_audio = audio + scale * noise
    
    # Normalize
    return noisy_audio / tf.reduce_max(tf.abs(noisy_audio))

def time_shift(audio, shift_max=0.1):
    """Apply random time shift."""
    # Convert to int32 explicitly and handle float multiplication
    size = tf.cast(tf.shape(audio)[0], tf.float32)
    shift_amt = tf.cast(size * shift_max, tf.int32)
    
    # Generate random shift within range
    shift = tf.random.uniform(
        [], 
        minval=-shift_amt,
        maxval=shift_amt + 1,  # +1 because uniform excludes maxval
        dtype=tf.int32
    )
    
    return tf.roll(audio, shift, axis=0)

def apply_augmentation(audio, config, noise_dataset=None):
    """Apply all augmentation techniques."""
    # Randomly apply augmentations
    if tf.random.uniform([]) > 0.5:
        audio = add_background_noise(audio, noise_dataset, snr_db=10)
    if tf.random.uniform([]) > 0.5:
        audio = time_shift(audio)
    return audio

In [None]:
import tensorflow as tf

def extract_mfcc(audio, config):
    """Extract MFCC features from audio signal."""
    
    # Convert to spectrogram
    stfts = tf.signal.stft(
        audio,
        frame_length=config.N_FFT,
        frame_step=config.HOP_LENGTH,
        fft_length=config.N_FFT
    )
    
    # Convert to magnitude spectrogram
    spectrograms = tf.abs(stfts)
    
    # Convert to mel spectrograms
    num_spectrogram_bins = tf.shape(spectrograms)[-1]
    mel_weight_matrix = tf.signal.linear_to_mel_weight_matrix(
        num_mel_bins=config.N_MELS,
        num_spectrogram_bins=num_spectrogram_bins,
        sample_rate=config.SAMPLING_RATE,
        lower_edge_hertz=0,
        upper_edge_hertz=config.SAMPLING_RATE/2
    )
    
    mel_spectrograms = tf.tensordot(spectrograms, mel_weight_matrix, 1)
    
    # Convert to log mel spectrograms
    log_mel_spectrograms = tf.math.log(mel_spectrograms + 1e-6)
    
    # Calculate MFCCs using DCT
    mfccs = tf.signal.dct(log_mel_spectrograms, type=2, n=config.N_MFCC)
    
    # Add channel dimension for CNN
    mfccs = tf.expand_dims(mfccs, -1)
    
    return mfccs

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models

def build_model(input_shape, num_classes):
    """Build CNN model for speaker recognition."""
    
    model = models.Sequential([
        # First Conv Block
        layers.Conv2D(32, (3, 3), activation='relu', padding='same', input_shape=input_shape),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),
        
        # Second Conv Block
        layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),
        
        # Third Conv Block
        layers.Conv2D(128, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),
        
        # Dense Layers
        layers.Flatten(),
        layers.Dense(256, activation='relu'),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        layers.Dense(num_classes, activation='softmax')
    ])
    
    return model

In [None]:
import tensorflow as tf
from pathlib import Path

def train_model(train_dir, noise_dir, config):
    """Train the speaker recognition model."""
    # Create datasets
    dataset, num_speakers = create_dataset(train_dir, noise_dir, config, augment=True)
    config.NUM_CLASSES = num_speakers
    
    # Calculate dataset size
    dataset_size = len(list(dataset))
    val_size = int(dataset_size * config.VALID_SPLIT)
    train_size = dataset_size - val_size
    
    # Shuffle and batch datasets
    dataset = dataset.shuffle(buffer_size=dataset_size, seed=config.SHUFFLE_SEED)
    train_ds = dataset.take(train_size).batch(config.BATCH_SIZE)
    val_ds = dataset.skip(train_size).batch(config.BATCH_SIZE)
    
    # Prefetch for performance
    train_ds = train_ds.prefetch(tf.data.AUTOTUNE)
    val_ds = val_ds.prefetch(tf.data.AUTOTUNE)
    
    # Get input shape from first batch
    for batch in train_ds.take(1):
        input_shape = batch[0].shape[1:]
        break
    
    # Build and compile model
    model = build_model(input_shape, config.NUM_CLASSES)
    model = compile_model(model, config.LEARNING_RATE)
    
    # Create callbacks
    callbacks = [
        tf.keras.callbacks.EarlyStopping(
            monitor='val_loss',
            patience=5,
            restore_best_weights=True
        ),
        tf.keras.callbacks.ModelCheckpoint(
            filepath='best_model.keras',
            monitor='val_accuracy',
            save_best_only=True
        ),
        tf.keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=3,
            min_lr=1e-6
        )
    ]
    
    # Train model
    history = model.fit(
        train_ds,
        validation_data=val_ds,
        epochs=config.EPOCHS,
        callbacks=callbacks
    )
    
    return model, history

if __name__ == "__main__":
    # Set directories
    base_dir = Path("/kaggle/input/dataset-text-dependent/datasets")
    train_dir = base_dir / "train"
    noise_dir = base_dir / "noise"
    
    # Initialize config
    # config = Config()
    
    # Train model
    model, history = train_model(train_dir, noise_dir, config)
    
    # Save final model
    model.save("final_model.keras")
    
    # Plot training history
    import matplotlib.pyplot as plt
    
    plt.figure(figsize=(10, 4))
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Model Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    
    plt.tight_layout()
    plt.savefig('training_history.png')
    plt.show()