In [None]:
import pandas as pd
from pathlib import Path
import matplotlib.pyplot as plt
import seaborn as sns

from biodcase_eda.helpers import load_csvs, distribution_per_source
from soundbay.utils.metadata_processing import correct_call_times_with_duration, bg_from_non_overlap_calls, non_overlap_df

In [None]:
base_location = "/Users/shai/personal/deepvoice"
dataset_path = Path(base_location) / 'biodcase' / 'datasets'

In [None]:
# Default audio preprocessing parameters from config
SAMPLE_RATE = 250  # Default sample rate for processing
DATA_SAMPLE_RATE = 250  # Original sample rate of the audio files
N_FFT = 128  # Number of FFT points
HOP_LENGTH = 16  # Hop length between windows
N_MELS = 64  # Number of mel frequency bins
F_MIN = 0  # Minimum frequency
F_MAX = 125  # Maximum frequency

# Dataset parameters
BATCH_SIZE = 64
NUM_WORKERS = 4
SEQ_LENGTH = 10  # Length of each segment in seconds
MARGIN_RATIO = 0.25  # Ratio of margin around the annotation
SLICE_FLAG = False  # Whether to slice the audio into smaller segments
LABEL_TYPE = 'single_label'
PATH_HIERARCHY = 1

NUMERIC_MAPPING = {'bma': 1, 'bmb': 2, 'bmd': 3, 'bmz': 4, 'bp20': 5, 'bp20plus': 6, 'bpd': 7}

In [None]:
def process_annotations_csvs(annotations_dir: Path, split: str) -> pd.DataFrame:
    all_dfs = load_csvs(annotations_dir, split)
    # Combine all DataFrames
    result_df = pd.concat(all_dfs, ignore_index=True).assign(label=lambda x: x['annotation'].map(NUMERIC_MAPPING), filename=lambda x: x.dataset + "/" + x.filename.str.replace(".wav", ""))
    
    bg_df = bg_from_non_overlap_calls(non_overlap_df(result_df.assign(label=1)).assign(label= 1))

    # concat calls and background
    df = pd.concat([
        result_df, bg_df.query('label == 0')]
        ).sort_values('filename').reset_index()

    return df

def add_files_length(df: pd.DataFrame, annotations_dir: Path) -> pd.DataFrame:
    # there are 5 files with wrong annotations, e.g. :
    #       dataset	            filename			                            start_datetime	
    # 10293	elephantisland2014	elephantisland2014/2014-10-05T02-00-00_000		2014-10-05 03:01:37.027000+00:00
    return correct_call_times_with_duration(df, audio_files_path=str(annotations_dir.parent / "audio"))

In [None]:
df_train = process_annotations_csvs(dataset_path / "train" / "annotations", 'train')
df_val = process_annotations_csvs(dataset_path / "validation" / "annotations", 'validation')
df = pd.concat([df_train, df_val])

In [None]:
df.sort_values(['filename', 'begin_time']).head(5)

In [None]:
distribution_per_source(df)


In [None]:

def plot_distribution(df, group_by, title, figsize=(20, 5)):
    """Plot distribution of call lengths for a given grouping."""
    # Set style
    plt.style.use('seaborn')
    sns.set_palette("husl")
    
    # Create figure
    n_groups = len(df[group_by].unique())
    n_cols = min(n_groups, 4)  # Max 4 columns
    n_rows = (n_groups + n_cols - 1) // n_cols  # Ceiling division
    
    fig, axes = plt.subplots(n_rows, n_cols, figsize=figsize)
    fig.suptitle(title, fontsize=16, y=1.02)
    
    # Flatten axes if needed
    if n_rows > 1:
        axes = axes.flatten()
    
    # Plot histograms
    for i, (group, data) in enumerate(df.groupby(group_by)):
        sns.histplot(data=data['call_length'], 
                    ax=axes[i],
                    bins=30,
                    kde=True)
        axes[i].set_title(group)
        axes[i].set_xlabel('Call Length (seconds)')
        axes[i].set_ylabel('Count')
        axes[i].set_xlim(0, data['call_length'].max() * 1.1)
    
    # Remove empty subplots
    for j in range(i + 1, len(axes)):
        fig.delaxes(axes[j])
    
    plt.tight_layout()
    plt.show()

# Plot by dataset
plot_distribution(df, 'dataset', 'Call Length Distribution by Dataset', figsize=(20, 10))
# Plot by annotation
plot_distribution(df, 'annotation', 'Call Length Distribution by Annotation', figsize=(20, 5))

## Train and Val analysis
per dataset frequency analysis + train Vs val analysis

In [None]:
train_df_only_calls = df.loc[(df['source'] == "train") & (df['label'] != 0)]
val_df_only_calls = df.loc[(df['source'] == "validation") & (df['label'] != 0)]

In [None]:
from biodcase_eda.helpers import boxplot_freq_per_label, segment_frequency_overlap
def analysis_per_source(df_: pd.DataFrame, source: str) -> None:
    print('\n\n SOURCE:', source)
    boxplot_freq_per_label(df_)
    segments = segment_frequency_overlap(df_)
    print(segments)
analysis_per_source(train_df_only_calls, 'train')
analysis_per_source(val_df_only_calls, 'validation')

In [None]:
## train vs val
from biodcase_eda.helpers import compare_dataset_sources, barplot_labels_per_dataset, plot_frequency_distributions, plot_call_length
def compare_train_val(train_df, val_df):
    compare_dataset_sources(train_df, val_df)
    barplot_labels_per_dataset(train_df, val_df)
    plot_frequency_distributions(train_df, val_df)
    plot_call_length(train_df, val_df)

compare_train_val(train_df_only_calls, val_df_only_calls)

## save the final csv

In [None]:
for s in ["train", "validation"]:
    print('SOURCE:', s)
    df[df['source'] == s].to_csv(dataset_path / s / 'all_annotations.csv', index=False)

## Assuming we saved the final csv as "all_annotations.csv" - Load wavs and see some spectrograms

In [None]:
from soundbay.data import ClassifierDataset

# Create dataset with minimal required parameters
dataset = ClassifierDataset(
    data_path=str(dataset_path / "train/audio"),
    metadata_path=str(dataset_path / "train/all_annotations.csv"),
    preprocessors={
        'spectrogram': {
            '_target_': 'torchaudio.transforms.Spectrogram',
            'n_fft': N_FFT,
            'hop_length': HOP_LENGTH
        },
        'amplitude_2_db': {
            '_target_': 'torchaudio.transforms.AmplitudeToDB'
        },
        'peak_norm': {
            '_target_': 'soundbay.data.PeakNormalize'
        }
    },
    seq_length=SEQ_LENGTH,
    data_sample_rate=DATA_SAMPLE_RATE,
    sample_rate=SAMPLE_RATE,
    mode="val",
    slice_flag=SLICE_FLAG,
    margin_ratio=MARGIN_RATIO,
    augmentations=None,  
    augmentations_p=0.0, 
    label_type='single_label',  
    path_hierarchy=1
)

In [None]:
# Read the annotations file
annotations_path = dataset_path / "train/all_annotations.csv"
annotations_df = pd.read_csv(annotations_path)
# Get unique combinations of dataset and label
unique_combinations = annotations_df[['dataset', 'label']].drop_duplicates().sort_values('dataset')
unique_combinations.shape

In [None]:
import librosa.display
import matplotlib.pyplot as plt

def load_audio(file_path, begin_sample, duraion, sr=SAMPLE_RATE): 
    y, sr =  librosa.load(file_path, sr=sr, offset=begin_sample, duration=duraion)
    return y

def plot_spectrogram(spectrogram, sr=SAMPLE_RATE, hop_length=HOP_LENGTH, title="Mel Spectrogram", 
                     start_time=None, end_time=None, low_freq=None, high_freq=None, linewidth=2):
    """Plot a spectrogram using librosa's display functions"""
    plt.figure(figsize=(10, 4))
    
    # Display the spectrogram
    img = librosa.display.specshow(spectrogram, 
                            sr=sr,
                            hop_length=hop_length,
                            x_axis='time',
                            y_axis='hz',
                            )
    
    # Add bounding box if all parameters are provided
    if all(x is not None for x in [start_time, end_time, low_freq, high_freq]):
        # Create rectangle patch
        import matplotlib.patches as patches
        rect = patches.Rectangle(
            (start_time - linewidth, low_freq-linewidth),  # (x,y)
            end_time - start_time + linewidth,   # width
            high_freq + linewidth - low_freq,    # height
            linewidth=2,
            edgecolor='r',
            facecolor='none'
        )
        plt.gca().add_patch(rect)
    
    plt.colorbar(format='%+2.0f dB')
    plt.title(title)
    plt.tight_layout()
    plt.show()


In [None]:
import torch
metadata = dataset.metadata.copy()
# Plot spectrograms for each combination
for _, row in unique_combinations.iloc[:20].iterrows():
    dataset_name = row['dataset']
    label = row['label']
    
    # Filter annotations for this combination
    filtered_annotations = metadata[
        (metadata['dataset'] == dataset_name) & 
        (metadata['label'].astype(str) == str(label))
    ]
    try: 
        filtered_annotations = filtered_annotations.sample(1)
    
        print(f"\nPlotting spectrograms for dataset: {dataset_name}, label: {label}")
        
        # Plot first 3 samples for this combination
        for i in filtered_annotations.index:
            audio_processed, label, audio_raw, _ = dataset.__getitem__(i)
            audio_processed = audio_processed.squeeze(0).detach().numpy()

            # Get the filename and time range
            data = metadata.loc[i]
            filename = data['filename']
            begin_time = data['begin_time']
            end_time = data['end_time']
            min_freq = data['low_frequency']
            max_freq = data['high_frequency']
            # Print some information about the sample
            
            try:
                # Plot the spectrogram
                plot_spectrogram(audio_processed, 
                            sr=SAMPLE_RATE,
                            hop_length=HOP_LENGTH,
                            title=f'Dataset: {dataset_name}, Sample {i}\nLabel: {label}\nTime: {begin_time:.2f}-{end_time:.2f}s, Duration: {end_time-begin_time:.2f}, min freq: {min_freq}, max freq: {max_freq}',
                            start_time=0,
                            end_time=end_time - begin_time,
                            low_freq=min_freq,
                            high_freq=max_freq
                            )
                
                plt.show()
                

                
            except Exception as e:
                print(f"Error processing sample {i}: {str(e)}")
    except: 
        pass