In [1]:
import requests
import pandas as pd
import os
from pathlib import Path


SAVE_PATH = Path("../data/xeno-canto/")

In [2]:
def download_bird_recordings(bird_species_df, save_directory = SAVE_PATH):
    for index, row in bird_species_df.iterrows():
        species_name = row['Latin name']
        if os.listdir(os.path.join(save_directory, species_name)):
            print(f"Skipping downloading for species {species_name}...")
            continue
        page = 1
        numPages = 1
        while page < numPages + 1:
            url = f"https://www.xeno-canto.org/api/2/recordings?query={species_name}&page={page}"
            response = requests.get(url)
            if response.status_code == 200:
                data = response.json()
                numPages = data['numPages']
                if int(data['numRecordings']) > 0:
                    recordings = data['recordings']
                    for i, recording in enumerate(recordings):
                        if recording['q'] in ['A', 'B', 'C', 'no score']:
                            recording_id = recording['id']
                            recording_url = recording['file']
                            file_name = f"{species_name}/{recording_id}.mp3"
                            file_path = os.path.join(save_directory, file_name)
                            with open(file_path, 'wb') as f:
                                f.write(requests.get(recording_url).content)
                            print(f"Downloaded recording {i+1} for {species_name}")
                        else:
                            continue
                    page += 1
                else:
                    break
            else:
                print(f"Failed to retrieve data for {species_name}")
                break

### Load names of species with `priority` = 1

In [3]:
bird_species_df = pd.read_csv('../data/selected_species.csv', sep=',')

In [4]:
bird_species_priority_df = bird_species_df[bird_species_df['Priority'] == 1]

In [5]:
species = pd.DataFrame(bird_species_priority_df['Latin name'])

In [None]:
species

### Create necessary folder when data will be saved

In [7]:
def create_folders_for_bird_species(bird_species_df, base_directory=SAVE_PATH):
    for index, row in bird_species_df.iterrows():
        species_name = row['Latin name']  
        folder_path = os.path.join(base_directory, species_name)
        os.makedirs(folder_path, exist_ok=True)
        print(f"Created folder for species: {species_name} at {folder_path}")

In [None]:
create_folders_for_bird_species(species)

### Download recordings

In [None]:
download_bird_recordings(species)

### Split data on train/test/val and split into chunks

In [8]:
import os
import shutil
from pydub import AudioSegment
from mutagen.mp3 import MP3
from pathlib import Path
    
def get_audio_duration(file_path):
    try:
        # Try using metadata first for performance
        audio = MP3(file_path)
        if audio.info.length:
            return audio.info.length
    except Exception as e:
        print(f"Mutagen failed for {file_path}: {e}")
    
    try:
        audio = AudioSegment.from_file(file_path)
        return audio.duration_seconds
    except Exception as e:
        print(f"Both mutagen and pydub failed for {file_path}: {e}")
        return 0

def split_audio(input_path, output_dir, max_duration=20, min_duration=5):
    audio = AudioSegment.from_file(input_path)
    max_duration_ms = max_duration * 1000
    min_duration_ms = min_duration * 1000

    num_chunks, remainder = divmod(len(audio), max_duration_ms)

    # If the last chunk is too short, exclude it
    if remainder >= min_duration_ms:
        num_chunks += 1

    for i in range(num_chunks):
        start_ms = i * max_duration_ms
        end_ms = min((i + 1) * max_duration_ms, len(audio))

        chunk = audio[start_ms:end_ms]

        # Save the chunk
        output_path = os.path.join(output_dir, f"{Path(input_path).stem}_part{i + 1}.mp3")
        try:
            chunk.export(output_path, format="mp3")
        except Exception as e:
            print(f'There was an error exporting {output_path}: {e}')

def organize_files(save_dir, split_ratios={'train': 0.8, 'val': 0.1, 'test': 0.1}, min_duration=5):
    """
    Organizes bird audio recordings into train, val, and test sets.
    """
    assert sum(split_ratios.values()) == 1.0, 'Split ratios must sum to 1.0'

    split_dirs = {
        "train": os.path.join(save_dir, "train"),
        "val": os.path.join(save_dir, "val"),
        "test": os.path.join(save_dir, "test")
    }

    for split_dir in split_dirs.values():
        os.makedirs(split_dir, exist_ok=True)

    species_dirs = [d for d in os.listdir(save_dir) if (os.path.isdir(os.path.join(save_dir, d)) and d not in split_dirs.keys())]

    for species in species_dirs:
        species_path = os.path.join(save_dir, species)
        recordings = [os.path.join(species_path, f) for f in os.listdir(species_path) if f.endswith(".wav") or f.endswith(".mp3")]

        recordings_with_durations = [(rec, get_audio_duration(rec)) for rec in recordings]
        # Remove recordings shorter than min_duration
        recordings_with_durations = [(rec, dur) for rec, dur in recordings_with_durations if dur >= min_duration]

        recordings_with_durations.sort(key=lambda x: x[1], reverse=True)

        splits = {"train": [], "val": [], "test": []}
        split_durations = {k: 0 for k in splits}

        for recording, duration in recordings_with_durations:
            best_split = min(splits, key=lambda s: split_durations[s] / split_ratios[s])
            splits[best_split].append(recording)
            split_durations[best_split] += duration

        for split, files in splits.items():
            split_species_dir = os.path.join(split_dirs[split], species)
            os.makedirs(split_species_dir, exist_ok=True)

            for file_path in files:
                try:
                    shutil.move(file_path, split_species_dir)
                except Exception as e:
                    print(f'Failed to move {file_path} to {split_species_dir}: {e}')

        print(f"Finished organizing recordings for {species}.")
        try:
            shutil.rmtree(species_path)
        except Exception as e:
            print(f'Failed to remove directory, you should remove it manually: {species_path}')

def split_files(save_dir, max_duration=20, min_duration=5):
    """
    Splits audio files in train, val, and test sets into chunks.
    """
    for split in ["train", "val", "test"]:
        split_dir = os.path.join(save_dir, split)
        for species in os.listdir(split_dir):
            species_dir = os.path.join(split_dir, species)
            if os.path.isdir(species_dir):
                print(f"Starting splitting recordings for {species} in {split} set.")
                for file_path in os.listdir(species_dir):
                    full_path = os.path.join(species_dir, file_path)
                    if os.path.isfile(full_path):
                        try:
                            split_audio(full_path, species_dir, max_duration=max_duration, min_duration=min_duration)
                        except Exception as e:
                            print(f'There was an error splitting: {full_path}, removing...')
                        os.remove(full_path)

In [None]:
organize_files(SAVE_PATH)

In [None]:
split_files(SAVE_PATH)

### Plot species counts

In [None]:
import os
import matplotlib.pyplot as plt

species_file_counts = {}
for split in ["train", "val", "test"]:
    split_dir = os.path.join(SAVE_PATH, split)
    if os.path.exists(split_dir):
        for species in os.listdir(split_dir):
            species_dir = os.path.join(split_dir, species)
            if os.path.isdir(species_dir):
                species_file_counts[species] = species_file_counts.get(species, 0) + len(os.listdir(species_dir))

plt.figure(figsize=(10, 6))
plt.bar(species_file_counts.keys(), species_file_counts.values())
plt.xticks(rotation=45, ha='right')
plt.xlabel("Species")
plt.ylabel("Number of Files")
plt.title("Number of Files After Splitting by Species")
plt.tight_layout()
plt.show()

### Unsplit data

In [None]:
# Merge splitted data in case there is a need to do something on a whole dataset
import os
import shutil

create_folders_for_bird_species(species)

split_folders = ["train", "val", "test"]

for folder_name in split_folders:
    folder_path = os.path.join(SAVE_PATH, folder_name)
    
    if os.path.isdir(folder_path):
        for subfolder_name in os.listdir(folder_path):
            subfolder_path = os.path.join(folder_path, subfolder_name)
            
            if os.path.isdir(subfolder_path):
                for file_name in os.listdir(subfolder_path):
                    file_path = os.path.join(subfolder_path, file_name)
                    destination_path = os.path.join(SAVE_PATH, subfolder_name, file_name)
                    
                    shutil.move(file_path, destination_path)
                
                if not os.listdir(subfolder_path):
                    shutil.rmtree(subfolder_path)
                    print(f"Empty directory: {subfolder_path} has been deleted.")
            else:
                print(f"{subfolder_name} is not a directory, skipping.")
        if not os.listdir(folder_path):
            shutil.rmtree(folder_path)
            print(f"Empty split folder: {folder_path} has been deleted.")

### Delete corrupted files

In [None]:
import librosa

files_corrupted_count = 0
for root, _, files in os.walk(SAVE_PATH):
    for file in files:
        if not (file.endswith('.mp3') or file.endswith('.wav')):
            continue
        audio_path = os.path.join(root, file)
        try:
            waveform, sr = librosa.load(audio_path, sr=16000)
        except Exception as e:
            print(f'Error loading {audio_path}: {e}')
            try:
                os.remove(audio_path)
            except:
                print(f'Failed to remove: {audio_path}')
            files_corrupted_count += 1

print(files_corrupted_count)