# Analysis of Bioacoustic Data

This notebook provides tools for analyzing data using a custom classifier (developed with `agile_modeling.ipynb`).

In [0]:
#@title Imports. { vertical-output: true }

import collections
from etils import epath
from ml_collections import config_dict
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from chirp.inference import colab_utils
colab_utils.initialize(use_tf_gpu=True, disable_warnings=True)

from chirp.inference import interface
from chirp.inference import tf_examples
from chirp.inference.search import bootstrap
from chirp.inference.search import search
from chirp.inference.search import display
from chirp.inference.classify import classify


In [0]:
#@title Basic Configuration. { vertical-output: true }

# Define the model: Usually perch or birdnet.
model_choice = 'perch'  #@param
# Set the base directory for the project.
working_dir = '/tmp/agile'  #@param

# Set the embedding and labeled data directories.
embeddings_path = epath.Path(working_dir) / 'embeddings'
labeled_data_path = epath.Path(working_dir) / 'labeled'
custom_classifier_path = epath.Path(working_dir) / 'custom_classifier'
embeddings_glob = embeddings_path / 'embeddings-*'


In [0]:
#@title Load Existing Project State and Models. { vertical-output: true }

# If you have already computed embeddings, run this cell to load models
# and find existing data.

if (embeddings_path / 'config.json').exists():
  # Get relevant info from the embedding configuration.
  bootstrap_config = bootstrap.BootstrapConfig.load_from_embedding_config(
      embeddings_path=embeddings_path,
      annotated_path=labeled_data_path)
  project_state = bootstrap.BootstrapState(bootstrap_config)

cfg = config_dict.ConfigDict({
    'model_path': custom_classifier_path,
    'logits_key': 'custom',
})
loaded_model = interface.LogitsOutputHead.from_config(cfg)
model = loaded_model.logits_model
class_list = loaded_model.class_list
print('Loaded custom model with classes: ')
print('\t' + '\n\t'.join(class_list.classes))

In [0]:
#@title Write classifier inference CSV. { vertical-output: true }

output_filepath = '/tmp/inference.csv'  #@param

# Set detection thresholds.
default_threshold = 0.0  #@param
if default_threshold is None:
  # In this case, all logits are written. This can lead to very large CSV files.
  class_thresholds = None
else:
  class_thresholds = collections.defaultdict(lambda: default_threshold)
  # Set per-class thresholds here.
  class_thresholds['my_class'] = 1.0

# Classes for which we do not want to write detections.
exclude_classes = ['unknown']  #@param

# include_classes is ignored if empty.
# If non-empty, only scores for these classes will be written.
include_classes = []  #@param

# Create the embeddings dataset.
embeddings_ds = tf_examples.create_embeddings_dataset(
    embeddings_path, file_glob='embeddings-*')

classify.write_inference_csv(
    embeddings_ds=embeddings_ds,
    model=model,
    labels=class_list.classes,
    output_filepath=output_filepath,
    threshold=class_thresholds,
    embedding_hop_size_s=bootstrap_config.embedding_hop_size_s,
    include_classes=include_classes,
    exclude_classes=exclude_classes)

## Call Density Estimation

See 'All Thresholds Barred': https://arxiv.org/abs/2402.15360

In [0]:
#@title Validation and Call Density. { vertical-output: true }
# For validation, we select random samples from logarithmic-quantile bins.

target_class = 'my_class'  #@param

num_bins = 4  #@param
samples_per_bin = 50  #@param
# The highest bin contains 2**-num_bins of the data.
top_k = samples_per_bin * 2**(num_bins + 1)

embeddings_ds = tf_examples.create_embeddings_dataset(
    embeddings_path, file_glob='embeddings-*')
results, all_logits = search.classifer_search_embeddings_parallel(
    embeddings_classifier=model,
    target_index=class_list.classes.index(target_class),
    random_sample=True,
    top_k=top_k,
    hop_size_s=bootstrap_config.embedding_hop_size_s,
    embeddings_dataset=embeddings_ds,
)

# Pick samples_per_bin examples from each quantile.
def get_quantile_bounds(n_bins):
  lowers = [1.0 - 1.0 / 2**(k + 1) for k in range(n_bins - 1)]
  return np.array([0.0] + lowers + [1.0])

bounds = get_quantile_bounds(num_bins)
q_bounds = np.quantile(all_logits, bounds)
binned = [[] for _ in range(num_bins)]
for r in results.search_results:
  bin = np.argmax(r.score < q_bounds) - 1
  binned[bin].append(r)
binned = [np.random.choice(b, samples_per_bin) for b in binned]

combined = []
for b in binned:
  combined.extend(b)
np.random.shuffle(combined)

samples_per_page = 10
page_state = display.PageState(np.ceil(len(combined) / samples_per_page))

display.display_paged_results(
    search.TopKSearchResults(combined, len(combined)),
    page_state, samples_per_page,
    embedding_sample_rate=project_state.embedding_model.sample_rate,
    source_map=project_state.source_map,
    exclusive_labels=True,
    checkbox_labels=[target_class, f'not {target_class}', 'unsure'],
)

In [0]:
#@title Collate results and write validation log. { vertical-output: true }

validation_log_filepath = epath.Path(working_dir) / 'validation.csv'

filenames = []
timestamp_offsets = []
scores = []
is_pos = []

for r in combined:
  if not r.label_widgets: continue
  value = r.label_widgets[0].value
  if value is None:
    continue
  filenames.append(r.filename)
  scores.append(r.score)
  timestamp_offsets.append(r.timestamp_offset)
  if value == target_class:
    is_pos.append(1)
  elif value == f'not {target_class}':
    is_pos.append(-1)
  elif value == 'unsure':
    is_pos.append(0)

label = [target_class for _ in range(len(filenames))]
log = pd.DataFrame({
    'filenames': filenames,
    'timestamp_offsets': timestamp_offsets,
    'scores': scores,
    'is_pos': is_pos})
log.to_csv(output_filepath, mode='a')

In [0]:
#@title Estimate Model Quality and Call Density. { vertical-output: true }

import scipy

# Collect validated labels by bin.
bin_pos = [0 for i in range(num_bins)]
bin_neg = [0 for i in range(num_bins)]
for score, pos in zip(scores, is_pos):
  bin = np.argmax(score < q_bounds) - 1
  if pos == 1:
    bin_pos[bin] += 1
  elif pos == -1:
    bin_neg[bin] += 1

# Create beta distributions.
prior = 0.1
betas = [scipy.stats.beta(p + prior, n + prior)
         for p, n in zip(bin_pos, bin_neg)]
# MLE positive rate in each bin.
mle_b = np.array([bin_pos[b] / (bin_pos[b] + bin_neg[b] + 1e-6)
                  for b in range(num_bins)])
# Probability of each bin, P(b).
p_b = np.array([2**-k for k in range(1, num_bins)] + [2**(-num_bins + 1)])

# MLE total call density.
q_mle = np.dot(mle_b, p_b)

num_beta_samples = 10_000
q_betas = []
for _ in range(num_beta_samples):
  qs_pos = np.array([b.rvs(size=1)[0] for b in betas])  # P(+|b)
  q_beta = np.dot(qs_pos, p_b)
  q_betas.append(q_beta)

# Plot call density estimate.
plt.figure(figsize=(10, 5))
xs, ys, _ = plt.hist(q_betas, density=True, bins=25, alpha=0.25)
plt.plot([q_mle, q_mle], [0.0, np.max(xs)], 'k:', alpha=0.75,
         label='q_mle')

low, high = np.quantile(q_betas, [0.05, 0.95])
plt.plot([low, low], [0.0, np.max(xs)], 'g', alpha=0.75, label='low conf')
plt.plot([high, high], [0.0, np.max(xs)], 'g', alpha=0.75, label='high conf')

plt.xlim(0.0, 1.0)
plt.xlabel('Call Rate (q)')
plt.ylabel('P(q)')
plt.title(f'Call Density Estimation ({target_class})')
plt.legend()
plt.show()

print(f'MLE Call Density: {q_mle:.4f}')
print(f'(Low/MLE/High) Call Density Estimate: ({low:5.4f} / {q_mle:5.4f} / {high:5.4f})')
