	
## Poultry Audio Classification with Deep Learning and Burn Layer Fusion

This notebook implements a deep learning-based approach for classifying poultry audio signals, inspired by the paper "Optimizing poultry audio signal classification with deep learning and burn layer fusion".

The model uses a custom Burn Layer to enhance robustness by injecting controlled random noise during training.



In [7]:
# Import necessary libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import librosa
import librosa.display
import os
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers, backend as K
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from sklearn.model_selection import train_test_split, KFold
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
from sklearn.preprocessing import StandardScaler
import warnings
warnings.filterwarnings('ignore')

import random
np.random.seed(42)
random.seed(42)
tf.random.set_seed(42)

# Check if GPU is available
physical_devices = tf.config.list_physical_devices('GPU')
if physical_devices:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)

## Custom Burn Layer Implementation

The Burn Layer is a key innovation from the paper that adds controlled random noise during training to improve model robustness.

In [8]:
class BurnLayer(layers.Layer):
    def __init__(self, burn_intensity=0.2, **kwargs):
        super(BurnLayer, self).__init__(**kwargs)
        self.burn_intensity = burn_intensity
    
    def call(self, inputs, training=None):

        if training:

            return inputs + self.burn_intensity * tf.random.normal(shape=tf.shape(inputs))
        else:

            return inputs
    
    def get_config(self):
        config = super(BurnLayer, self).get_config()
        config.update({"burn_intensity": self.burn_intensity})
        return config

## Audio Feature Extraction and Preprocessing

In [9]:
def extract_features(audio_path, sr=44100, duration=2.0, n_mfcc=20):
    """
    Extract audio features from a file
    """
    try:
        # Load audio file with specified sample rate and duration
        y, sr = librosa.load(audio_path, sr=sr, duration=duration)
        
        # If audio is shorter than duration, pad it
        if len(y) < int(duration * sr):
            y = np.pad(y, (0, int(duration * sr) - len(y)))
        
        # Extract MFCC features
        mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=n_mfcc)
        
        # Extract chromagram
        chroma = librosa.feature.chroma_stft(y=y, sr=sr)
        
        # Extract spectral contrast
        contrast = librosa.feature.spectral_contrast(y=y, sr=sr)
        
        # Calculate Melspectrogram
        mel_spec = librosa.feature.melspectrogram(y=y, sr=sr)
        mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
        
        return y, sr, mfccs, mel_spec_db, chroma, contrast
        
    except Exception as e:
        print(f"Error extracting features from {audio_path}: {e}")
        return None, None, None, None, None, None

def load_and_preprocess_data(data_path, sr=44100, duration=2.0, n_mfcc=20):
    """
    Load and preprocess all audio files in the dataset
    """
    X_mel = []
    y = []
    file_paths = []
    classes = ['Healthy', 'Noise', 'Unhealthy']
    class_counts = {}
    
    for i, category in enumerate(classes):
        path = os.path.join(data_path, category)
        print(f"Loading {category} samples...")
        count = 0
        
        for filename in os.listdir(path):
            if not filename.lower().endswith('.wav'):
                continue
                
            file_path = os.path.join(path, filename)
            _, _, _, mel_spec_db, _, _ = extract_features(file_path, sr=sr, duration=duration, n_mfcc=n_mfcc)
            
            if mel_spec_db is not None:
                X_mel.append(mel_spec_db)
                y.append(i)
                file_paths.append(file_path)
                count += 1
        
        class_counts[category] = count
        print(f"  Loaded {count} samples for {category}")
    
    X_mel = np.array(X_mel)
    y = np.array(y)
    
    # Reshape mel spectrograms for CNN input
    X_mel = X_mel.reshape(X_mel.shape[0], X_mel.shape[1], X_mel.shape[2], 1)
    
    return X_mel, y, file_paths, class_counts

## Data Augmentation Function


In [10]:
def augment_data(X_mel, y, augmentation_factor=2):
    """
    Perform data augmentation on mel spectrograms
    """
    X_aug = []
    y_aug = []
    
    # First include all original samples
    for i in range(len(X_mel)):
        X_aug.append(X_mel[i])
        y_aug.append(y[i])
    
    # Then create augmented versions
    for i in range(len(X_mel)):
        mel_spec = X_mel[i].squeeze()
        
        # Create augmentation_factor-1 augmented versions
        for _ in range(augmentation_factor - 1):
            aug_mel_spec = mel_spec.copy()
            
            # Add random noise
            noise_factor = np.random.uniform(0.005, 0.02)
            noise = np.random.normal(0, noise_factor, aug_mel_spec.shape)
            aug_mel_spec = aug_mel_spec + noise
            
            # Shift in time (roll)
            shift_amount = np.random.randint(-10, 10)
            aug_mel_spec = np.roll(aug_mel_spec, shift_amount, axis=1)
            
            # Frequency masking (mask random frequency bands)
            if np.random.random() > 0.5:
                num_masks = np.random.randint(1, 3)
                for _ in range(num_masks):
                    f0 = np.random.randint(0, aug_mel_spec.shape[0] - 5)
                    f_width = np.random.randint(1, 5)
                    aug_mel_spec[f0:f0+f_width, :] = aug_mel_spec.min()
            
            # Time masking (mask random time segments)
            if np.random.random() > 0.5:
                num_masks = np.random.randint(1, 3)
                for _ in range(num_masks):
                    t0 = np.random.randint(0, aug_mel_spec.shape[1] - 5)
                    t_width = np.random.randint(1, 5)
                    aug_mel_spec[:, t0:t0+t_width] = aug_mel_spec.min()
            
            # Ensure values are valid
            aug_mel_spec = np.clip(aug_mel_spec, -80, 0)
            
            # Add to augmented data
            X_aug.append(aug_mel_spec.reshape(X_mel[i].shape))
            y_aug.append(y[i])
    
    return np.array(X_aug), np.array(y_aug)

## Visualization Function

In [11]:
def visualize_audio(file_path, sr=44100, duration=2.0):
    """
    Visualize audio file with waveform, MFCC, and Mel Spectrogram
    """
    # Extract features
    y, sr, mfccs, mel_spec_db, chroma, contrast = extract_features(file_path, sr=sr, duration=duration)
    
    if y is None:
        print(f"Could not load audio file: {file_path}")
        return
    
    # Create figure with 4 subplots
    plt.figure(figsize=(15, 12))
    
    # Plot waveform
    plt.subplot(4, 1, 1)
    librosa.display.waveshow(y, sr=sr)
    plt.title('Waveform')
    
    # Plot MFCC
    plt.subplot(4, 1, 2)
    librosa.display.specshow(mfccs, sr=sr, x_axis='time')
    plt.colorbar(format='%+2.0f dB')
    plt.title('MFCCs')
    
    # Plot Mel Spectrogram
    plt.subplot(4, 1, 3)
    librosa.display.specshow(mel_spec_db, sr=sr, x_axis='time', y_axis='mel')
    plt.colorbar(format='%+2.0f dB')
    plt.title('Mel Spectrogram')
    
    # Plot Chromagram
    plt.subplot(4, 1, 4)
    librosa.display.specshow(chroma, sr=sr, x_axis='time', y_axis='chroma')
    plt.colorbar()
    plt.title('Chromagram')
    
    plt.tight_layout()
    plt.show()

def visualize_augmentation(X_mel, index, augmentation_factor=3):
    """
    Visualize original and augmented mel spectrograms
    """
    # Get original mel spectrogram
    original_mel = X_mel[index].squeeze()
    
    # Create augmented versions
    augmented_mels = []
    for _ in range(augmentation_factor):
        aug_mel = original_mel.copy()
        
        # Add random noise
        noise_factor = np.random.uniform(0.005, 0.02)
        noise = np.random.normal(0, noise_factor, aug_mel.shape)
        aug_mel = aug_mel + noise
        
        # Shift in time (roll)
        shift_amount = np.random.randint(-10, 10)
        aug_mel = np.roll(aug_mel, shift_amount, axis=1)
        
        # Frequency masking
        if np.random.random() > 0.5:
            f0 = np.random.randint(0, aug_mel.shape[0] - 5)
            f_width = np.random.randint(1, 5)
            aug_mel[f0:f0+f_width, :] = aug_mel.min()
        
        # Time masking
        if np.random.random() > 0.5:
            t0 = np.random.randint(0, aug_mel.shape[1] - 5)
            t_width = np.random.randint(1, 5)
            aug_mel[:, t0:t0+t_width] = aug_mel.min()
        
        augmented_mels.append(aug_mel)
    
    # Visualize original and augmented spectrograms
    plt.figure(figsize=(15, 10))
    
    # Original
    plt.subplot(2, 2, 1)
    librosa.display.specshow(original_mel, x_axis='time', y_axis='mel')
    plt.colorbar(format='%+2.0f dB')
    plt.title('Original Mel Spectrogram')
    
    # Augmented versions
    for i, aug_mel in enumerate(augmented_mels):
        plt.subplot(2, 2, i+2)
        librosa.display.specshow(aug_mel, x_axis='time', y_axis='mel')
        plt.colorbar(format='%+2.0f dB')
        plt.title(f'Augmented Version {i+1}')
    
    plt.tight_layout()
    plt.show()

## Model Architecture
This model implements the architecture described in the paper, with convolutional blocks, Burn Layer and global average pooling

In [12]:
def build_burn_model(input_shape, num_classes=3):
    """
    Build the model with Burn Layer as described in the paper
    """
    # Input tensor
    inputs = layers.Input(shape=input_shape)
    
    # Apply initial Burn Layer to input
    x = BurnLayer(burn_intensity=0.2)(inputs)
    
    # First convolutional block
    x = layers.Conv2D(64, (3, 3), padding='same', kernel_regularizer=tf.keras.regularizers.l2(0.001))(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling2D((2, 2))(x)
    
    # Second convolutional block
    x = layers.Conv2D(128, (3, 3), padding='same', kernel_regularizer=tf.keras.regularizers.l2(0.001))(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling2D((2, 2))(x)
    
    # Third convolutional block
    x = layers.Conv2D(256, (3, 3), padding='same', kernel_regularizer=tf.keras.regularizers.l2(0.001))(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling2D((2, 2))(x)
    
    # Global average pooling to create a fusion layer
    x = layers.GlobalAveragePooling2D()(x)
    
    # Dense layer
    x = layers.Dense(256)(x)
    x = layers.Activation('relu')(x)
    x = layers.Dropout(0.6)(x)
    
    # Second Burn Layer with reduced intensity
    x = BurnLayer(burn_intensity=0.1)(x)
    
    # Output layer
    outputs = layers.Dense(num_classes, activation='softmax')(x)
    
    model = models.Model(inputs=inputs, outputs=outputs)
    
    # Compile with Adamax optimizer as used in the paper
    model.compile(
        optimizer=optimizers.Adamax(learning_rate=0.001),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    
    return model