In [1]:
# !apt-get install -y libsndfile1

In [2]:
!pip install PySoundFile
!pip install scikit-maad
!pip install opencv-python-headless



In [None]:
# Core ML & GPU
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from torch.utils.data import Dataset
#import tensorflow_io as tfio
#import keras_cv
#from keras.callbacks import EarlyStopping

# scikit-learn maad for when u want to grrr
from maad import sound, features

# Huggingface Transformers
from transformers import AutoFeatureExtractor, TFViTModel #TFViTForImageClassification #TFViTModel #TFAutoModel #TFWhisperModel #, TFWhisperEncoder #TFWhisperModel
from transformers import AutoModelForAudioClassification
#from transformers.models.whisper.modeling_tf_whisper import TFWhisperEncoder
from huggingface_hub import login

# Dataloading
import numpy as np
import pandas as pd
import librosa
import librosa.display
import os
import glob #for finding TFRecord files

# Colab specific
from google.colab import userdata

from tqdm.auto import tqdm

# Utilities
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

In [None]:
# MOUNT GOOGLE DRIVE

from google.colab import drive
drive.mount('/content/drive')
#!cp -r "/content/drive/My Drive/tfrecords" /content/
# !cp -r "/content/drive/MyDrive/rfcx-species-audio-detection" /content/
#print("rfcx-species-audio-detection copied successfully.")
# drive.flush_and_unmount()

In [None]:
# UNZIP FILE FROM DRIVE INTO DRIVES /content/ unzipped

ZIP_FILE_PATH = "/content/drive/MyDrive/rfcx-species-audio-detection.zip"

UNZIP_DESTINATION = "/content/rfcx_local_data/"

print(f"Unzupping {ZIP_FILE_PATH} to {UNZIP_DESTINATION} . . . ")
os.makedirs(UNZIP_DESTINATION, exist_ok=True)

!unzip -q -o "{ZIP_FILE_PATH}" -d "{UNZIP_DESTINATION}"

print("I took the zip and I unzipped the zip so now the zip is unzipped and ready")

In [None]:
# !cp -r "/content/drive/MyDrive/rfcx-species-audio-detection" /content/

In [None]:
# MODEL DEFINITION CELL

#from transformers import ASTFeatureExtractor
#from transformers import TFASTModel

model_checkpoint = "google/vit-base-patch16-224-in21k"
#model_checkpoint = "MIT/ast-finetuned-audioset-10-10-0.4593"
num_species = 24

print("Loading ViT feature extractor and BASE model . . . ")
feature_extractor = AutoFeatureExtractor.from_pretrained(model_checkpoint)
base_model = TFViTModel.from_pretrained(model_checkpoint, from_pt=True)

#feature_extractor = ASTFeatureExtractor.from_pretrained(model_checkpoint)
#base_model = TFASTModel.from_pretrained(model_checkpoint, from_pt=True)

class BiodiversityModel(tf.keras.Model):
  def __init__(self, vit_base_model):
    super().__init__()

    self.vit = vit_base_model

    self.regressor = layers.Dense(1, activation='sigmoid', name='biodiversity_score')

  def call(self, inputs, training=None, explain=False):

    transposed_inputs = tf.transpose(inputs, perm=[0, 3, 1, 2])

    # hidden_states = self.vit(
    #     pixel_values=transposed_inputs,
    #     training=training,
    # )
    vit_outputs = self.vit(
        pixel_values=transposed_inputs,
        training=training
    )

    last_hidden_state = vit_outputs.last_hidden_state

    cls_token_output = last_hidden_state[:, 0, :]

    final_score = self.regressor(cls_token_output)

    if explain:
      #last_hidden_state = vit_outputs.last_hidden_state
      return final_score, last_hidden_state

    return final_score

print('\nBuilding the final model with the custom ViT layer . . .')
model = BiodiversityModel(base_model)

model.vit.trainable = False

model.compile(
    optimizer='adam',
    loss=tf.keras.losses.MeanAbsoluteError(),
    metrics=['mae']
)

print('\n Ayo son we got it, the ViT model is build correctly and im ready to pull it')

dummy_input = tf.ones((1, 224, 224, 3))
model(dummy_input)
model.summary()

In [None]:
def calculate_adi(audio, sr, n_fft=2048, hop_length=512):
  try:

    # 1. Calculate the spectrogram (power)
    # need the power spectrogram (Sxx), not the log-mel spectrogram
    Sxx, freqs, _ = sound.spectrogram(audio, sr, n_fft=n_fft, hop_length=hop_length, window='hann', flim=(0, 12000))

    # 2. Calculate ADI
    # focus only on traditional biophony band (2k-11k)

    fmin = 2000.0
    fmax = 13000.0

    n_bands = 10

    adi_score = features.acoustic_diversity_index(
        Sxx,
        freqs,
        fmin=fmin,
        fmax=fmax,
        n_bands=n_bands,
        db_threshold=-50.0 # ignores background noise quieter than -50dB
    )

    # maad funciton returns a value between 0 and log(n_bands)
    # normalize by dividing by log(n_bands) to get a 0-1 score
    normalized_adi = adi_score / np.log(n_bands)

    return np.float32(normalized_adi)

  except Exception as e:
    print(f"Warning: Could not calculate ADI. Error {e}")
    return np.float32(0.0)


In [None]:
# DATALOADING AND PREPROCESSING

def load_and_process_from_path(file_path_tensor):

  def _process_audio_file(path_bytes):
    audio_path = path_bytes.numpy().decode('utf-8')

    try:
      audio, sr = librosa.load(audio_path, sr=16000, duration=60)
    except Exception as e:
      print(f"Warning: Could not load {audio_path}. Error {e}")
      audio = np.zeros(16000 * 60, dtype=np.float32)
      sr = 16000

    mel_spectrogram = librosa.feature.melspectrogram(y=audio, sr=16000, n_fft=2048, hop_length=512, n_mels=224)
    log_mel_spectrogram = librosa.power_to_db(mel_spectrogram, ref=np.max)
    log_mel_spectrogram = np.nan_to_num(log_mel_spectrogram)

    raw_adi_score = calculate_adi(audio, sr)

    spectrogram_rgb = np.stack((log_mel_spectrogram,) * 3, axis=-1)

    resized_spectrogram_rgb = tf.image.resize(spectrogram_rgb, [224, 224]).numpy()

    #spectrogram_rgb = np.stack((log_mel_spectrogram,) * 3, axis=-1)
    inputs = feature_extractor(images=resized_spectrogram_rgb, return_tensors='np', do_rescale=False, do_resize=False)
    pixel_values = np.squeeze(inputs['pixel_values'], axis=0)
    pixel_values = np.moveaxis(pixel_values, 0, -1)

    #return pixel_values.astype(np.float32), np.float32(raw_aci_score)

    return pixel_values.astype(np.float32), np.float32(raw_adi_score)

  features, raw_adi = tf.py_function(
      func=_process_audio_file, inp=[file_path_tensor], Tout=[tf.float32, tf.float32]
  )

  features.set_shape([224, 224, 3])
  raw_adi.set_shape([])

  return features, raw_adi

In [None]:
BATCH_SIZE = 16

DRIVE_FLAC_PATH = "/content/rfcx_local_data/rfcx-species-audio-detection/train/"

print(f"Loading files from: {DRIVE_FLAC_PATH}")
all_flac_files = sorted(glob.glob(DRIVE_FLAC_PATH + "*.flac"))

split_point = int(len(all_flac_files) * 0.8)
train_files = all_flac_files[:split_point]
val_files = all_flac_files[split_point:]


In [None]:
# NDSI Calibration pass

import soundfile as sf

# silent_audio = np.zeros(16000*60, dtype=np.float32)
# blank_path = "/content/drive/MyDrive/rfcx-species-audio-detection/blank.flac"

# sf.write(blank_path, silent_audio, 16000)
# print(f"Created new blank file at {blank_path}")

anchor_files = [
    "/content/drive/MyDrive/rfcx-species-audio-detection/blank.flac",
    "/content/drive/MyDrive/rfcx-species-audio-detection/anchor/airport_baggage.flac",
    "/content/drive/MyDrive/rfcx-species-audio-detection/anchor/bowling_alley.flac",
    "/content/drive/MyDrive/rfcx-species-audio-detection/anchor/bus.flac",
    "/content/drive/MyDrive/rfcx-species-audio-detection/anchor/laundromat.flac",
    "/content/drive/MyDrive/rfcx-species-audio-detection/anchor/subway.flac",
    "/content/drive/MyDrive/rfcx-species-audio-detection/anchor/urban_park_birds.flac",
    "/content/drive/MyDrive/rfcx-species-audio-detection/anchor/wind_stairwell.flac",
]

calibration_file_list = train_files + anchor_files

print(f"Calibrating adi scores across {len(calibration_file_list)} total files . . .")

cal_paths_ds = tf.data.Dataset.from_tensor_slices(calibration_file_list)
cal_ds_for_adi_only = cal_paths_ds.map(load_and_process_from_path, num_parallel_calls=tf.data.AUTOTUNE)


cal_ds_zipped = tf.data.Dataset.zip((cal_ds_for_adi_only, cal_paths_ds))

print("Iterating and pairing adi scores with filenames . . .")
adi_data = []

for (features, label), path in tqdm(cal_ds_zipped, total=len(calibration_file_list)):
  adi_data.append((path.numpy().decode('utf-8'), label.numpy()))

df_adi = pd.DataFrame(adi_data, columns=['filename', 'raw_adi'])

print("\n--- ADI Score Analysis ---")

pd.set_option('display.max_colwidth', None)

print(df_adi.sort_values(by='raw_adi', ascending=False).head(5))

adi_scores = df_adi['raw_adi'].values

#aci_scores = [label.numpy() for features, label in tqdm(cal_ds_for_aci_only)]

min_adi = np.min(adi_scores)
max_adi = np.max(adi_scores)
print(f"Calibration done. Min ADI: {min_:.4f}, Max ADI: {max_adi:.4f}")

def normalize_label(features, raw_adi_label):
  normalized_label = (raw_adi_label - min_adi) / (max_adi - min_adi + 1e-10)
  normalized_label = tf.clip_by_value(normalized_label, 0.0, 1.0)
  return features, normalized_label

print("Building training dataset (from Amazon files only) . . .")

#paths_ds = tf.data.Dataset.from_tensor_slices(train_files)

train_paths_ds = tf.data.Dataset.from_tensor_slices(calibration_file_list)

train_ds = (
    train_paths_ds.map(load_and_process_from_path, num_parallel_calls=tf.data.AUTOTUNE)
    .map(normalize_label, num_parallel_calls=tf.data.AUTOTUNE)
    .cache()
    .shuffle(1024)
    .batch(BATCH_SIZE)
    .prefetch(tf.data.AUTOTUNE)
)

print("Buuilding validation dataset . . .")
val_paths_ds = tf.data.Dataset.from_tensor_slices(val_files)
val_ds = (
    val_paths_ds.map(load_and_process_from_path, num_parallel_calls=tf.data.AUTOTUNE)
    .map(normalize_label, num_parallel_calls=tf.data.AUTOTUNE)
    .cache()
    .batch(BATCH_SIZE)
    .prefetch(tf.data.AUTOTUNE)
)

print("\n--- Final Dataset Test ---")
for features, labels in train_ds.take(1):
  print("Data pipeline working with .flac files and normalized ADI (with Domain Adaptation)!")
  print("Features shape:", features.shape)
  print("Example Normalized ADI scores:", labels.numpy()[:5])

In [None]:
# PLOT THE DISTRIBUTION OF ADI SCORES ACROSS DATASET

print(f"Mean adi: {np.mean(adi_scores):.4f}")
print(f"Median adi: {np.median(adi_scores):.4f}")

plt.figure(figsize=(10, 6))
plt.hist(adi_scores, bins=100)
plt.title('Distribution of Raw adi Scores (Including Anchors)')
plt.xlabel('Raw adi Score')
plt.ylabel('Number of Files')
plt.axvline(np.mean(adi_scores), color='red', linestyle='dashed', linewidth=2, label=f'Mean: {np.mean(adi_scores):.4f}')
plt.axvline(np.median(adi_scores), color='green', linestyle='dashed', linewidth=2, label=f'Median: {np.median(adi_scores):.4f}')
plt.legend()
plt.show()

In [None]:
# TRAINING

steps_per_epoch = -(-len(train_files) // BATCH_SIZE)
validation_steps = -(-len(val_files) // BATCH_SIZE)

print("Starting initial training with the base model frozen . . .")
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=5,
    steps_per_epoch=steps_per_epoch,
    validation_steps=validation_steps
    #callbacks=[early_stopping]
)

print("Section 1 training complete!")

# FINE TUNING

print("\nUnfreezing the ViT base model for fine-tuning . . .")
model.vit.trainable = True

lr_scheduler = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.2,
    patience=5,
    verbose=1,
    min_lr=1e-7
)

early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=5,
    restore_best_weights=True,
    verbose=1
)

checkpoint_path = "/content/drive/MyDrive/best_model.keras"

model_checkpoint = ModelCheckpoint(
    filepath=checkpoint_path,
    monitor='val_loss',
    save_best_only=True,
    verbose=1
)

learning_rate = 1e-3

print(f"Re-compiliing model with a much lower learning rate {learning_rate} . . . ")
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
    loss=tf.keras.losses.MeanAbsoluteError(),
    metrics=['mae']
)


print("\nStarting fine-tuning . . .")
fine_tune_history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=25,
    initial_epoch=history.epoch[-1] + 1,
    steps_per_epoch=steps_per_epoch,
    validation_steps=validation_steps,
    callbacks=[early_stopping, lr_scheduler, model_checkpoint]
    #callbacks=[early_stopping]
)

print("Section 2 fine-tuning was another fat dub")

In [None]:
# VISUALIZE MEL AUDIO SPECTROGRAM FOR SINGLE FLAC TO CONFIRM NO ISSUES

#path_to_audio_file = "/content/000316da7.flac"
#path_to_audio_file = "/content/drive/MyDrive/rfcx-species-audio-detection/train/00d442df7.flac"
#path_to_audio_file = "/content/drive/MyDrive/rfcx-species-audio-detection/train/0072f0839.flac"
#path_to_audio_file = "/content/drive/MyDrive/rfcx-species-audio-detection/train/011f25080.flac"

# LETS BOWL
#path_to_audio_file = "/content/drive/MyDrive/rfcx-species-audio-detection/anchor/bowling_alley.flac"

# BLANK FILE
#path_to_audio_file = "/content/drive/MyDrive/rfcx-species-audio-detection/blank.flac"

# LOW SCORE
path_to_audio_file = "/content/rfcx_local_data/rfcx-species-audio-detection/train/e48bf871c.flac"

# path_to_audio_file = "/content/drive/MyDrive/rfcx-species-audio-detection/blank.flac"
#path_to_audio_file = "/content/drive/MyDrive/rfcx-species-audio-detection/anchor/airport_baggage.flac"
# path_to_audio_file = "/content/drive/MyDrive/rfcx-species-audio-detection/anchor/bowling_alley.flac"
# path_to_audio_file = "/content/drive/MyDrive/rfcx-species-audio-detection/anchor/bus.flac"
#path_to_audio_file = "/content/drive/MyDrive/rfcx-species-audio-detection/anchor/laundromat.flac"
# path_to_audio_file = "/content/drive/MyDrive/rfcx-species-audio-detection/anchor/subway.flac"
#path_to_audio_file = "/content/drive/MyDrive/rfcx-species-audio-detection/anchor/urban_park_birds.flac"
#path_to_audio_file = "/content/drive/MyDrive/rfcx-species-audio-detection/anchor/wind_stairwell.flac"

# GLITCHED AUDIO FILE, WAS THE HIGHEST SCORE BACK WHEN WE USED ACI
#path_to_audio_file = "/content/drive/MyDrive/rfcx-species-audio-detection/train/288e5d13f.flac"
#path_to_audio_file = "/content/7e14bfa8e.flac"
#path_to_audio_file = "/content/drive/MyDrive/rfcx-species-audio-detection/train/716dccfed.flac"

try:
  audio_np, sr = librosa.load(path_to_audio_file, sr=16000)
  print("Successfully loaded audio file")
except Exception as e:
  print(f"Error loading file: {e}")
  audio_np = np.zeros(16000 * 60)
  sr = 16000

mel_spectrogram = librosa.feature.melspectrogram(
    y=audio_np, sr=16000, n_fft=2048, hop_length=512, n_mels=224
)

log_mel_spectrogram = librosa.power_to_db(mel_spectrogram, ref=np.max)
log_mel_spectrogram = np.nan_to_num(log_mel_spectrogram)

#biodiversity_score = calculate_aci(log_mel_spectrogram)

#raw_aci_score = calculate_aci(log_mel_spectrogram)

raw_adi_score = calculate_adi_manual(audio_np, sr)

normalized_score = (raw_adi_score - min_adi) / (max_adi - min_adi + 1e-10)
normalized_score = np.clip(normalized_score, 0, 1)

plt.figure(figsize=(12, 5))
librosa.display.specshow(
    log_mel_spectrogram,
    sr=16000,
    x_axis='time',
    y_axis='mel',
    fmax=8000
    #cmap='jet'
)


plt.colorbar(format='%+2.0f dB')
plt.title(f'Mel Spectrogram (Biodiversity Score: {normalized_score:.2f})')
plt.xlabel('Time (s)')
plt.ylabel('Frequency (Hz)')
plt.tight_layout()
plt.show()

In [None]:
TEST_FLAC_PATH = "/content/rfcx_local_data/rfcx-species-audio-detection/test/"
test_files = sorted(glob.glob(TEST_FLAC_PATH + "*.flac"))

test_paths_ds = tf.data.Dataset.from_tensor_slices(test_files)
test_ds = (
    test_paths_ds.map(load_and_process_from_path, num_parallel_calls=tf.data.AUTOTUNE)
    .map(normalize_label, num_parallel_calls=tf.data.AUTOTUNE)
    .cache()
    .batch(BATCH_SIZE)
    .prefetch(tf.data.AUTOTUNE)
)

results = model.evaluate(test_ds)

print("\n--- Test Results ---")
print(f"Test Loss (MAE): {results[0]:.4f}")
print(f"Test Metric (MAE): {results[1]:.4f}")

In [None]:
from sklearn.metrics import r2_score, mean_squared_error

print("Running full prediction loop to get labels and predictions . . .")

y_pred_normalized = model.predict(val_ds)

y_true_normalized = []

for features, labels in val_ds.as_numpy_iterator():
  y_true_normalized.extend(labels)

y_true_normalized = np.array(y_true_normalized)
y_pred_normalized = np.squeeze(y_pred_normalized)

df_results = pd.DataFrame({
    'filename': val_files,
    'true_score': y_true_normalized,
    'pred_score': y_pred_normalized
})

print("\n--- Files with big boy scores:")
print(df_results.sort_values(by='pred_score', ascending=False).head())

print("\n--- Files with teeny tiny scores:")
print(df_results.sort_values(by='pred_score', ascending=False).tail())

print("\n--- Final Model Performance Metrics ---")

r2 = r2_score(y_true_normalized, y_pred_normalized)
print(f"R-Squared (R2): {r2:.4f}")

rmse = np.sqrt(mean_squared_error(y_true_normalized, y_pred_normalized))
print(f"Root Mean Squared Error (RMSE): {rmse:.4f}")
print(f"Mean Absolute Error (MAE): {np.mean(np.abs(y_true_normalized - y_pred_normalized)):.4f}")

print("\nGenerating 'Predicted vs. Actual' Plot . . . ")

plt.figure(figsize=(10, 8))
plt.scatter(y_true_normalized, y_pred_normalized, alpha=0.3)

perfect_line = np.linspace(0, 1, 100)
plt.plot(perfect_line, perfect_line, color='red', linestyle='--', linewidth=2, label='Perfect Prediction')

plt.xlabel('True Normalized ADI (0-1 Scale)', fontsize=14)
plt.ylabel('Predicted Normalized ADI (0-1 Scale)', fontsize=14)
plt.title('Predicted vs. Actual Biodiversity Score', fontsize=18)
plt.legend()
plt.grid(True)
plt.show()

In [None]:
print("\nGenerating 'Residuals' plot . . . ")

residuals = y_true_normalized - y_pred_normalized

plt.figure(figsize=(10, 6))
plt.scatter(y_pred_normalized, residuals, alpha=0.3)

plt.axhline(y=0, color='red', linestyle='--', linewidth=2, label='Zero Error')

plt.xlabel('Predicted Score', fontsize=14)
plt.ylabel('Residual (Error)', fontsize=14)
plt.title('Residuals Plot', fontsize=18)
plt.legend()
plt.grid(True)
plt.show()

In [None]:
import tensorflow.keras.backend as K
import cv2

def generate_grad_cam_heatmap(img_array, model):

  # 1. Create new functional API model for explainability
  vit_base_model = model.vit
  regressor_head = model.regressor


  input_layer = tf.keras.Input(shape=(224, 224, 3))

  transposed_inputs = tf.transpose(input_layer, perm=[0, 3, 1, 2])

  vit_outputs = vit_base_model(pixel_values=transposed_inputs, training=False)

  last_hidden_state = vit_outputs.last_hidden_state
  cls_token_output = last_hidden_state[:, 0, :]

  final_score = regressor_head(cls_token_output)

  cam_model = tf.keras.Model(inputs=input_layer, outputs=[final_score, last_hidden_state])

  img_tensor = tf.convert_to_tensor(img_array)

  with tf.GradientTape() as tape:
    final_score_pred, last_hidden_state_output = cam_model(img_tensor, training=False)

  grads = tape.gradient(final_score_pred, last_hidden_state_output)

  if grads is None:
    print("ERROR: Gradient is None. The functional model graph failed.")
    return np.zeros((img_array.shape[1], img_array.shape[2]), dtype=np.uint8)

  patch_grads = grads[:, 1:, :]
  heatmap = tf.reduce_mean(patch_grads, axis=-1)
  heatmap = tf.reshape(heatmap, (14, 14))

  heatmap = tf.maximum(heatmap, 0) / (tf.math.reduce_max(heatmap) + K.epsilon())
  heatmap = heatmap.numpy()
  heatmap = cv2.resize(heatmap, (img_array.shape[2], img_array.shape[1]))
  heatmap = (heatmap * 255).astype(np.uint8)

  return heatmap

In [None]:
# cell for backend prediction function

import cv2
import io
import base64


def get_prediction_and_heatmap(audio_file_path, model, feature_extractor, min_adi, max_adi):

  # 1. preprocess audio file
  audio, sr = librosa.load(audio_file_path, sr=16000, duration=60)

  # get the label for display
  raw_score = calculate_adi_manual(audio, sr)
  normalized_score = (raw_score - min_adi) / (max_adi - min_adi + 1e-10)
  normalized_score = np.clip(normalized_score, 0, 1)

  # get the image
  mel_spec = librosa.feature.melspectrogram(y=audio, sr=16000, n_fft=2048, hop_length=512, n_mels=224)
  log_mel_spec = librosa.power_to_db(mel_spec, ref=np.max)
  log_mel_spec_rgb = np.stack((log_mel_spec,) * 3, axis=-1)

  # resize and use feature extractor to get (1, 3, 224, 224) input
  resized_spec = tf.image.resize(log_mel_spec_rgb, [224, 224]).numpy()
  inputs = feature_extractor(images=resized_spec, return_tensors='np', do_rescale=False, do_resize=False)

  # this is the (1, 224, 224, 3) array for Grad-CAM
  img_array_for_cam = np.moveaxis(np.squeeze(inputs['pixel_values'], axis=0), 0, -1)
  img_array_for_cam = np.expand_dims(img_array_for_cam, axis=0)

  # 2. get model prediction
  predicted_score = model.predict(img_array_for_cam)[0][0]

  # 3. generate grad-CAM heatmap
  heatmap = generate_grad_cam_heatmap(img_array_for_cam, model)

  # 4. create and encode overlay image

  # matplotlib to create overlay
  fig, ax = plt.subplots()

  # plot original log-mel spectrogram
  librosa.display.specshow(log_mel_spec, sr=sr, x_axis='time', y_axis='mel', fmax=8000, ax=ax)

  # overlay the heatmap, resied to match the original spec's dimensions
  heatmap_resized = cv2.resize(heatmap, (log_mel_spec.shape[1], log_mel_spec.shape[0]))
  ax.imshow(heatmap_resized, cmap='jet', alpha=0.5, aspect='auto')

  ax.set_title(f"Predicted Score: {predicted_score:.2f} (True Score: {normalized_score:.2f})")

  # save the plot to a buffer
  buf = io.BytesIO()
  plt.savefig(buf, format='png', bbox_inches='tight')
  plt.close(fig)

  image_b64 = base64.b64encode(buf.getvalue().decode('uft-8'))

  return {
      "biodiversity_score": float(predicted_score),
      "true_score": float(normalized_score),
      "explainability_map_b64": image_b64
      # also add plotly distribution plot
  }

In [None]:
#!pip install opencv-python-headless

import cv2

#path_to_audio_file = "/content/drive/MyDrive/rfcx-species-audio-detection/train/00d442df7.flac"
#path_to_audio_file = "/content/drive/MyDrive/rfcx-species-audio-detection/train/288e5d13f.flac"

#high score
path_to_audio_file = "/content/rfcx_local_data/rfcx-species-audio-detection/train/eb62e367c.flac"

# good effort score
#path_to_audio_file = "/content/rfcx_local_data/rfcx-species-audio-detection/train/e48bf871c.flac"


print(f"Loading and processing: {path_to_audio_file}")

try:
  audio, sr = librosa.load(path_to_audio_file, sr=16000, duration=60)
except Exception as e:
  print(f"Warning: Could not load {path_to_audio_file}. Error {e}")
  audio = np.zeros(16000*60, dtype=np.float32)

mel_spectrogram = librosa.feature.melspectrogram(y=audio, sr=16000, n_fft=2048, hop_length=512, n_mels=224)
log_mel_spectrogram = librosa.power_to_db(mel_spectrogram, ref=np.max)
log_mel_spectrogram = np.nan_to_num(log_mel_spectrogram)

spectrogram_rgb = np.stack((log_mel_spectrogram,) * 3, axis=-1)
resized_spectrogram_rgb = tf.image.resize(spectrogram_rgb, [224, 224]).numpy()

inputs = feature_extractor(images=resized_spectrogram_rgb, return_tensors='np', do_rescale=False, do_resize=False)
pixel_values = np.squeeze(inputs['pixel_values'], axis=0)
pixel_values = np.moveaxis(pixel_values, 0, -1)

img_array_for_cam = np.expand_dims(pixel_values, axis=0)

print("Generating Grad-CAM heatmap . . . ")
heatmap = generate_grad_cam_heatmap(
    img_array_for_cam,
    model
)

print("Plotting results . . .")

predicted_score = model.predict(img_array_for_cam)[0][0]

fig, ax = plt.subplots(figsize=(14, 5))

HOP_LENGTH = 512
SR = 16000
FMAX = 8000
duration_sec = (log_mel_spectrogram.shape[1]*HOP_LENGTH) / SR

librosa.display.specshow(
    log_mel_spectrogram,
    sr=sr,
    x_axis='time',
    y_axis='mel',
    fmax=8000,
    ax=ax,
    cmap='magma',
)

heatmap_resized = cv2.resize(heatmap, (log_mel_spectrogram.shape[1], log_mel_spectrogram.shape[0]))

ax.imshow(
    heatmap_resized,
    cmap='jet',
    alpha=0.9,
    aspect='auto',
    extent=[0, duration_sec, 0, FMAX]
)

ax.set_title(f"Grad-CAM Heatmap (Predicted Score: {predicted_score:.2f})")
mappable = ax.collections[0] if ax.collections else ax.images[0]
plt.colorbar(mappable, ax=ax, format="%+2.0f dB")
# plt.colorbar(ax.collections[1], format="%+2.0f dB")
plt.tight_layout()
plt.show()