This next cell will split given audio files of any length into 10 second segments, and save them to the specified output directory

In [None]:
import os
import math
import torch
import torchaudio

def split_all_audio_files(input_dir, output_dir, segment_length_sec=10):
    # Make sure output directory exists
    os.makedirs(output_dir, exist_ok=True)

    # Loop over all files in the input directory
    for filename in os.listdir(input_dir):
        # Check if the file is a .wav file
        if filename.endswith('.wav') or filename.endswith('.WAV'):
            try:
                # Full path to the original audio file
                audio_path = os.path.join(input_dir, filename)
                # Load the audio file
                waveform, sample_rate = torchaudio.load(audio_path)

                # Calculate number of samples in segment_length_sec
                num_samples_segment = segment_length_sec * sample_rate

                # Calculate total number of segments
                total_segments = math.ceil(waveform.shape[1] / num_samples_segment)

                # Split waveform into segments and save each segment to a new .wav file
                for i in range(total_segments):
                    start = i * num_samples_segment
                    end = start + num_samples_segment
                    segment = waveform[:, start:end]
                    
                    # Prepare filename for the segment
                    segment_filename = f"{filename.rstrip('.wav')}_segment{i}.wav"
                    segment_path = os.path.join(output_dir, segment_filename)

                    # Save segment as a .wav file
                    segment = (segment * 32767).short()  # Convert to 16-bit PCM format
                    torchaudio.save(segment_path, segment, sample_rate)
            except Exception as e:
                  print(f"Error processing file {audio_path}: {str(e)}")

In [None]:
input_dir = '/content/NegativeSamples'
output_dir = '/content/SplitNegative'
split_all_audio_files(input_dir, output_dir, segment_length_sec=10)

**Optional**

zip and download split files to local. or use google drive

files stored on google colab do not persist. I will download these files to review the positives again as some segments will no longer have our call after splitting.

In [None]:
from google.colab import files

!zip -r /content/SplitNegative.zip /content/SplitNegative
#!zip -r /content/SplitPositive.zip /content/SplitPositive




zip error: Nothing to do! (try: zip -r /content/SplitNegative.zip . -i /content/SplitNegative)


In [None]:
files.download("/content/SplitNegative.zip")
#files.download("/content/SplitPositive.zip")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

After reviewing the files, I placed them in a google drive

In [1]:
from google.colab import drive

drive.mount('/content/gdrive')
%cd gdrive/MyDrive
%cd Rana\ Draytonii # Location of data

Mounted at /content/gdrive
/content/gdrive/MyDrive
/content/gdrive/MyDrive/Rana Draytonii


In [None]:
%ls

[0m[01;34mRana1[0m/  [01;34mRana3[0m/  [01;34mRana4[0m/  [01;34mRana5[0m/  [01;34mResampledAudio[0m/  [01;34mSplitNegative[0m/  [01;34mSplitPositive[0m/


**Resample to correct frequency, apply labels, and convert to tensors**



**Before Running:**


1.   specify path to positive and negative samples
2.   create folder and specify path for resampled audio



**Prepare JSON files for datasets:**

This script creates a JSON file for each of the train, validation, and test sets, each named train_data.json, val_data.json, and test_data.json, respectively.

Each entry in the JSON file will contain the path to the audio file, and the corresponding label.

Also, you might want to adjust the path of the wav field to match the environment where the model will be trained.

In [2]:
import os
import torch
import torchaudio
import pandas as pd
from sklearn.model_selection import train_test_split
import json
import csv

# Create labels.csv
labels = {
    'index': [0, 1],  # Modify the index values as per your labels
    'mid': ['/m/positive', '/m/negative'],  # Modify the MID values as per your labels
    'display_name': ['Positive', 'Negative']  # Modify the display names as per your labels
}

df = pd.DataFrame(labels)
df.to_csv('labels.csv', index=False)

# Changes sampling frequency of audio file to 16kHz required by the AST model
def resampler(audio_path, save_dir):
    # load your audio file
    waveform, sample_rate = torchaudio.load(audio_path)

    # define resampler
    resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)

    # resample the waveform
    waveform_resampled = resampler(waveform)
  
    # create new filename
    base_filename = os.path.basename(audio_path)
    new_filename = os.path.splitext(base_filename)[0] + '_resampled.wav'
    new_path = os.path.join(save_dir, new_filename)

    # save the resampled audio
    torchaudio.save(new_path, waveform_resampled, sample_rate=16000)

    return new_path


# Function to convert audio to Mel spectrogram (No longer used)
def audio_to_mel_spectrogram(wav_name, mel_bins, target_length=1024):
    waveform, sr = torchaudio.load(wav_name)
    assert sr == 16000, 'input audio sampling rate must be 16kHz'

    fbank = torchaudio.compliance.kaldi.fbank(
        waveform, htk_compat=True, sample_frequency=sr, use_energy=False,
        window_type='hanning', num_mel_bins=mel_bins, dither=0.0, frame_shift=10)

    n_frames = fbank.shape[0]

    p = target_length - n_frames
    if p > 0:
        m = torch.nn.ZeroPad2d((0, 0, 0, p))
        fbank = m(fbank)
    elif p < 0:
        fbank = fbank[0:target_length, :]

    fbank = (fbank - (-4.2677393)) / (4.5689974 * 2)
    return fbank

# Function to create an index dictionary file per model specifications
def make_index_dict(label_csv):
    index_lookup = {}
    with open(label_csv, 'r') as f:
        csv_reader = csv.DictReader(f)
        for row in csv_reader:
            index_lookup[row['display_name']] = row['mid']
    return index_lookup

import json

# Function to create .json file with filenames and labels per model specifications
def create_data_json(dataset, labels, filename, index_dict):
    data = []
    for wav_path, label in zip(dataset, labels):
        entry = {
            "wav": wav_path,
            "labels": index_dict[label],  
        }
        data.append(entry)
    with open(filename, 'w') as f:
        json.dump({"data": data}, f, indent=4)



# Define the path where your positive and negative .wav files are stored
positive_audio_path = 'SplitPositive'
negative_audio_path = 'SplitNegative'
# Define a directory to save resampled audio files (16kHz)
resampled_audio_dir = 'ResampledAudio'

# Define the target length for your spectrograms (only used for mel spectrogram)
target_length = 1000
mel_bins = 128  # Number of bins in Mel spectrogram

# Define labels
positive_label = 1
negative_label = 0

# Prepare dataset
dataset = []
numeric_labels = []  # For train_test_split and torch.Tensor
string_labels = []  # For JSON file

# Process positive samples
for filename in os.listdir(positive_audio_path):
    if filename.endswith('.wav') or filename.endswith('.WAV'):
        filepath = os.path.join(positive_audio_path, filename)
        filepath = resampler(filepath, resampled_audio_dir)  # Resample and get new file path
        dataset.append(filepath)  # Save filepath instead of spectrogram
        numeric_labels.append(positive_label)
        string_labels.append('Positive')

# Process negative samples
for filename in os.listdir(negative_audio_path):
    if filename.endswith('.wav') or filename.endswith('.WAV'):
        filepath = os.path.join(negative_audio_path, filename)
        filepath = resampler(filepath, resampled_audio_dir)  # Resample and get new file path
        dataset.append(filepath)  # Save filepath instead of spectrogram
        numeric_labels.append(negative_label)
        string_labels.append('Negative')

numeric_labels = torch.Tensor(numeric_labels)

# train_test_split
dataset_trainval, dataset_test, labels_trainval, labels_test, string_labels_trainval, string_labels_test = train_test_split(dataset, numeric_labels, string_labels, test_size=0.15, random_state=42, stratify=numeric_labels)
dataset_train, dataset_val, labels_train, labels_val, string_labels_train, string_labels_val = train_test_split(dataset_trainval, labels_trainval, string_labels_trainval, test_size=0.15, random_state=42, stratify=labels_trainval)

index_dict = make_index_dict('labels.csv')

# Check if the keys you'll use exist in the dictionary (testing)
expected_keys = ['Positive', 'Negative']
for key in expected_keys:
    if key not in index_dict:
        print(f"Key '{key}' not found in index_dict")
    else:
        print(f"Key '{key}' found in index_dict. Corresponding value is {index_dict[key]}")
print("index_dict:", index_dict)


create_data_json(dataset_train, string_labels_train, 'train_data.json', index_dict)
create_data_json(dataset_val, string_labels_val, 'val_data.json', index_dict)
create_data_json(dataset_test, string_labels_test, 'test_data.json', index_dict)




Key 'Positive' found in index_dict. Corresponding value is /m/positive
Key 'Negative' found in index_dict. Corresponding value is /m/negative
index_dict: {'Positive': '/m/positive', 'Negative': '/m/negative'}


**Data Augmentation:**

This will "Stretch" the data by applying augmentation that should change the audio files in ways that keep the integrity of the frog call but change it in ways that allows them to act as more new data to train on

In [None]:
import torchaudio.transforms as T

class AudioAugmentation:
    def __init__(self, noise_std=0.01, time_shift_seconds=0.5, volume_var=0.1):
        self.noise_std = noise_std
        self.time_shift_seconds = time_shift_seconds
        self.volume_var = volume_var

    def __call__(self, waveform, sample_rate):
        # Apply time shift
        time_shift_samples = int(self.time_shift_seconds * sample_rate)
        waveform = T.TimeShift(time_shift_samples)(waveform)

        # Apply volume variation
        volume_scale = 1 + self.volume_var * (torch.rand(1) - 0.5)
        waveform = volume_scale * waveform

        # Apply additive noise
        noise = torch.normal(0, self.noise_std, waveform.shape)
        waveform = waveform + noise
        
        return waveform.clamp_(-1, 1)  # Ensure waveform values are in [-1, 1]

augment = AudioAugmentation()

# Apply augmentation to each item in the training set
dataset_train_augmented = []
labels_train_augmented = []

for (waveform, label) in zip(dataset_train, labels_train):
    waveform_augmented = augment(waveform, 16000)
    dataset_train_augmented.append(waveform_augmented)
    labels_train_augmented.append(label)  # The label remains the same after augmentation

# Convert to PyTorch tensors
dataset_train_augmented = torch.stack(dataset_train_augmented)
labels_train_augmented = torch.Tensor(labels_train_augmented)

AttributeError: ignored