In [4]:
import librosa
import os
import numpy as np
import json
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout

# MAEC - Multilabel Audio Event Classification

# Function to chunk audio
def chunk_audio(audio_path, chunk_duration=2.0, sr=22050, allow_partial=True):
    y, sr = librosa.load(audio_path, sr=sr)
    chunk_length = int(chunk_duration * sr)
    if allow_partial:
        chunks = [y[i:i + chunk_length] for i in range(0, len(y), chunk_length)]
    else:
        chunks = [y[i:i + chunk_length] for i in range(0, len(y), chunk_length) if len(y[i:i + chunk_length]) == chunk_length]
    return chunks


# Function to convert chunks to spectrograms
def chunks_to_spectrograms(chunks, sr=22050):
    spectrograms = []
    for chunk in chunks:
        S = librosa.feature.melspectrogram(y=chunk, sr=sr, n_mels=128)
        S_DB = librosa.power_to_db(S, ref=np.max)

        # Make sure it’s exactly (128, 128)
        if S_DB.shape[1] < 128:
            pad = 128 - S_DB.shape[1]
            S_DB = np.pad(S_DB, ((0, 0), (0, pad)), mode='constant')
        elif S_DB.shape[1] > 128:
            S_DB = S_DB[:, :128]

        spectrograms.append(S_DB)
    return np.array(spectrograms)


# Function to create the model
def create_model(input_shape, num_types, num_sub_categories):
    inputs = Input(shape=input_shape)
    x = Conv2D(32, (3, 3), activation='relu')(inputs)
    x = MaxPooling2D((2, 2))(x)
    x = Conv2D(64, (3, 3), activation='relu')(x)
    x = MaxPooling2D((2, 2))(x)
    x = Conv2D(128, (3, 3), activation='relu')(x)
    x = MaxPooling2D((2, 2))(x)
    x = Flatten()(x)
    x = Dense(256, activation='relu')(x)
    x = Dropout(0.5)(x)

    type_output = Dense(num_types, activation='sigmoid', name='type_output')(x)
    sub_category_output = Dense(num_sub_categories, activation='sigmoid', name='sub_category_output')(x)

    model = Model(inputs=inputs, outputs=[type_output, sub_category_output])
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics={'type_output': 'accuracy', 'sub_category_output': 'accuracy'})
    return model

# Example labels for each chunk
# Load real labels from training_data.json (generated from your audio folders)
with open(r"C:\Users\User\Downloads\Animal Sounds 5\Animal Sounds\training_data.json", "r") as f:
    training_data = json.load(f)

# Extract one label per file (for now we treat each file as a "chunk")
# Use MultiLabelBinarizer for one-hot encoding
mlb_types = MultiLabelBinarizer()
mlb_sub_categories = MultiLabelBinarizer()

spectrograms = []
y_types_aligned = []
y_sub_categories_aligned = []

for item in training_data:
    audio_path = os.path.join(r"C:\Users\User\Downloads\Animal Sounds 5\Animal Sounds", item["file"])

    try:
        chunks = chunk_audio(audio_path)
        specs = chunks_to_spectrograms(chunks)

        for spec in specs:
            padded = np.pad(spec, ((0, 0), (0, max(128 - spec.shape[1], 0))), mode='constant')
            spectrograms.append(padded)

            y_types_aligned.append([item["type"]])
            y_sub_categories_aligned.append([item["sub_category"]])

    except Exception as e:
        print(f"⚠️ Skipped {audio_path}: {e}")

# Convert to arrays and encode labels
spectrograms = np.array(spectrograms)
spectrograms = np.expand_dims(spectrograms, axis=-1)

y_types = mlb_types.fit_transform(y_types_aligned)
y_sub_categories = mlb_sub_categories.fit_transform(y_sub_categories_aligned)


# Split the data
X_train, X_val, y_train_types, y_val_types, y_train_sub_categories, y_val_sub_categories = train_test_split(
    spectrograms, y_types, y_sub_categories, test_size=0.2, random_state=42
)

# Create and train the model
input_shape = (128, 128, 1)  # adjust according to your spectrogram dimensions
num_types = len(mlb_types.classes_)
num_sub_categories = len(mlb_sub_categories.classes_)
model = create_model(input_shape, num_types, num_sub_categories)

# Train the model
history = model.fit(
    X_train, 
    {'type_output': y_train_types, 'sub_category_output': y_train_sub_categories},
    epochs=20, 
    batch_size=32,
    validation_data=(X_val, {'type_output': y_val_types, 'sub_category_output': y_val_sub_categories})
)

# Predict on validation data
type_predictions, sub_category_predictions = model.predict(X_val)

threshold = 0.3  # Lower the threshold to capture more predictions

def interpret_predictions(type_preds, sub_category_preds, type_labels, sub_category_labels, threshold=0.3):
    results = []
    for t_pred, s_pred in zip(type_preds, sub_category_preds):
        chunk_results = []
        for i, type_prob in enumerate(t_pred):
            if type_prob > threshold:
                chunk_results.append({"label": type_labels[i], "type": "type", "confidence": f"{type_prob:.2f}"})
        for i, sub_category_prob in enumerate(s_pred):
            if sub_category_prob > threshold:
                chunk_results.append({"label": sub_category_labels[i], "type": "sub_category", "confidence": f"{sub_category_prob:.2f}"})
        results.append(chunk_results)
    return results

def process_audio_file(audio_path, model, mlb_types, mlb_sub_categories, chunk_duration=2.0, sr=22050, threshold=0.3):
    chunks = chunk_audio(audio_path, chunk_duration, sr, allow_partial=True)
    if not chunks:
        print(f"❌ No valid chunks found in {audio_path}")
        return None

    spectrograms = chunks_to_spectrograms(chunks, sr)
    if spectrograms.size == 0:
        print(f"❌ No spectrograms generated from {audio_path}")
        return None

    spectrograms = np.array([np.pad(s, ((0, 0), (0, max(128 - s.shape[1], 0))), mode='constant') for s in spectrograms])
    spectrograms = np.expand_dims(spectrograms, axis=-1)  # Add channel dimension for CNN

    type_predictions, sub_category_predictions = model.predict(spectrograms)

    results = interpret_predictions(type_predictions, sub_category_predictions, mlb_types.classes_, mlb_sub_categories.classes_, threshold)

    formatted_results = []
    for i, chunk_results in enumerate(results):
        chunk_formatted = []
        for result in chunk_results:
            chunk_formatted.append(result)
        formatted_results.append({"chunk": i + 1, "results": chunk_formatted})

    return json.dumps(formatted_results, indent=4)


# Example usage
audio_path = r'C:\Users\User\Downloads\Animal Sounds 5\Animal Sounds\Tasmanian Devil.wav'
json_output = process_audio_file(audio_path, model, mlb_types, mlb_sub_categories)
print(json_output)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
[
    {
        "chunk": 1,
        "results": [
            {
                "label": "tasmanian",
                "type": "type",
                "confidence": "0.49"
            }
        ]
    },
    {
        "chunk": 2,
        "results": [
            {
                "label": "tasmanian",
                "type": "type",
                "confidence": "0.50"
            }
        ]
    },
    {
        "chunk": 3,
        "results": []
    },
    {
        "chunk": 4,
        "results": [
            {
                "label": "Devil",
                "type": "sub_category",
                "confidence": "0.38"
            }
        ]
    },
    {
        "chunk": 5,
        "results": [
            {
                "label": "tasmanian",
            