# Extract clips from Xenocanto Snapshot (including negatives)

env opso12 (Python 3.11.14)\
Back in the bioacousitcs-cookbook you can find a version of this notebook that works with a lat/long filter for negatives.\
 The notebook below has been modified (with Santiagos help) to extract negative clips based on specific target species eBird codes. You can also create a target (positive) dataset in this notebook, but if you already hand annotated your positives, just ignore the target_df at the end.

In [None]:
from pathlib import Path
from tqdm.autonotebook import tqdm
import pandas as pd
import datasets
from pathlib import Path
from opensoundscape import annotations

In [None]:

# Make sure to dont change this, otherwise you will donwload the whole dataset again
cache_dir = "/media/kiwi/datasets/annotated/xeno_canto_snapshot/data_cache" 
ds = datasets.load_dataset(
    "DBD-research-group/BirdSet",
    "XCL",
    cache_dir=cache_dir, 
)
t = ds["train"]

print("The number of recordings in the dataset is", len(t))

## Change first line depending of your target species

In [None]:
target="Virginia Rail"  # Replace with your target species name

# Read the file line by line and skip rows that cause decode errors
import io

#Clements taxonomy available at: https://www.birds.cornell.edu/clementschecklist/wp-content/uploads/2024/10/Clements-v2024-October-2024-rev.xlsx
ebird_path = "/media/auk/projects/srg/Kitzes_projects/ECOO53_LaSelva_Loca/ebird_codes.txt"
good_lines = []
with open(ebird_path, "rb") as f:
    for line in f:
        try:
            good_lines.append(line.decode("utf-8"))
        except UnicodeDecodeError:
            continue

ebird_df = pd.read_csv(io.StringIO(''.join(good_lines)), sep="\t")
target_row = ebird_df[ebird_df["English name"] == target]
target_row

## Subset snapshot to recordings with your target species

In [None]:
# Make a new dataset object with just your target species recordings
target_ebird_code=str(target_row['species_code'].values[0])
label_feature = t.features["ebird_code"]

farewell_int = label_feature.str2int(target_ebird_code) 
filtered = t.filter(lambda example: example["ebird_code"] == farewell_int)

In [None]:
# Access an example
example = filtered[1]

import librosa
import librosa.display
import matplotlib.pyplot as plt
import numpy as np

# Load the audio from the .ogg file
audio_path = example["filepath"]
y, sr = librosa.load(audio_path, sr=None)

# Calculate the spectrogram (mel or linear, up to you; use linear here)
S = librosa.stft(y, n_fft=1024, hop_length=256)
S_db = librosa.amplitude_to_db(np.abs(S), ref=np.max)

# Set up the axes for time for drawing detected_events and peaks
duration = librosa.get_duration(y=y, sr=sr)
times = librosa.frames_to_time(np.arange(S_db.shape[1]), sr=sr, hop_length=256, n_fft=1024)

fig, ax = plt.subplots(figsize=(12, 4))
img = librosa.display.specshow(S_db, sr=sr, hop_length=256, x_axis='time', y_axis='hz', ax=ax)
plt.colorbar(img, ax=ax, format="%+2.0f dB")
ax.set_title("Spectrogram with detected events and peaks")

# Draw rectangles for each detected event [start, end]
for ev in example.get("detected_events", []):
    start, end = ev
    ax.add_patch(
        plt.Rectangle(
            (start, 0),  # (x,y)
            end - start,
            sr // 2,    # full frequency range (or np.max(S_db.shape[0]))
            color='lime',
            linewidth=2,
            fill=False
        )
    )

# Draw vertical lines for each peak
for peak in example.get("peaks", []):
    ax.axvline(peak, color='yellow', linestyle='--', linewidth=2)

plt.tight_layout()
plt.show()

print(example["detected_events"])

print(example["peaks"])
from IPython.display import Audio, display

display(Audio(example['filepath']))

## Parameters you need to set before calling the audio files

In [None]:
#  TARGET SPECIES
# Duration of clips to extract
clip_duration = 1.5 #vira beg is .5 seconds

# Maximum number of events to extract (based on peaks)
max_events = 20

#  NEGATIVE SPECIES
# Maximum number of events to extract (based on peaks)
neg_max_events = 15

target_negatives = ["sora", "kinrai4", "y00475", "comgal1", "pibgre", "sonspa", "swaspa", "wilfly", "sancra", "rewbla", "marwre", "yelwar1", "grycat", "eawpew", "easkin", "reevir1", "bkcchi", "easpho", "killde", "reshaw", "rusbla", "comgra", "cedwax", "comyel", "sposan", "ribgul", "horgre", "amhgul1", "blujay", "rethaw"]  # Replace with actual eBird codes for negative species

In [None]:
target = []
negatives = []
print("Converting target_negatives to integers...")

# First convert all target_negatives to integers using the label_feature
target_negatives_int = [label_feature.str2int(code) for code in target_negatives]
print(f"Original target negatives: {target_negatives}")
print(f"Converted to integers: {target_negatives_int}")


In [None]:

for i in tqdm(range(len(t))):
    file = t[i]
    current_code = file['ebird_code']
    
    if file['ebird_code'] == farewell_int:
        detected_events = file["detected_events"].copy()

        if len(detected_events) < 1:
            detected_events = [[0, clip_duration]]
        elif len(detected_events) > max_events:
            detected_events = file["detected_events"][:max_events]

        for j, (start, end) in enumerate(detected_events):
            record = {
                "file": file["filepath"],
                "start_time": start,
                "annotation": file["ebird_code"],
                "quality": file['quality'],
            }
            target.append(record)

    elif current_code in target_negatives_int:  # Check against converted integers
        detected_events = file["detected_events"].copy()
        
        if len(detected_events) < 1:
            detected_events = [[0, clip_duration]]
        elif len(detected_events) > 3:
            detected_events = file["detected_events"][:neg_max_events]

        for j, (start, end) in enumerate(detected_events):
            record_other = {
                "file": file["filepath"],
                "start_time": start,
                "annotation": file["ebird_code"],
                "quality": file['quality'],
            }
            negatives.append(record_other)

print(f"Total target examples collected: {len(target)}")
print(f"Total negative examples collected: {len(negatives)}")

In [None]:
target_df = pd.DataFrame(target)
negative_df = pd.DataFrame(negatives)

# convert integer annotation to list of one annotation per row
# this is the format used by annotations.categorical_to_multi_hot
target_df["annotation_list"] = [[x] for x in target_df["annotation"]]
negative_df["annotation_list"] = [[x] for x in negative_df["annotation"]]

In [None]:
# show columns and sample rows
print(negative_df.columns.tolist())
display(negative_df.head())

In [None]:
target_df.head(5)

In [None]:
negative_df.to_csv('jects/blra/training_data//home/brg226/pronegative_samples.csv', index=False)

In [None]:
# generate multi-hot encoded dataframe for classifier training
multihot_labels_sparse, classes = annotations.categorical_to_multi_hot(
    negative_df["annotation_list"], sparse=True
)
ebird_classes = [
    t.info.features["ebird_code_multilabel"].feature.int2str(c) for c in classes
]
labels = pd.DataFrame.sparse.from_spmatrix(
    multihot_labels_sparse,
    index=pd.MultiIndex.from_frame(negative_df[["file", "start_time"]]),
    columns=ebird_classes,
)
# save pickle file
labels.to_pickle(f"/home/brg226/projects/blra/training_data/negative_multihot_labels.pkl")



In [None]:
labels.head()