# **Data Preprocessing**




## 1. Python packages

In [1]:
# Load neccesary Python packages:
import pandas as pd
import os
import shutil
import numpy as np
import tqdm
import librosa
import ntpath
import soundfile as sf
import glob

## 2. Splitting the data


In [None]:
# Split the data in the predefined train, validation, and test groups:

# Directory to the complete dataset:
data_dir = ''

# Target directory for the splits:
target_base_dir = ''

# Load the csv file attached to the InsectSet66 dataset:
data_metadata = pd.read_csv('...../InsectSet66_Train_Val_Test_Annotation.csv')

# Select all audio file names in the InsectSet66 dataset:
files = os.listdir(data_dir)


# Iterate through the filenames to copy them to their split directory:
for File in files:

    # Match every filename to a row in the csv file:
    result_row = data_metadata.loc[data_metadata['file_name'] == File]

    # Check if there was a match:
    if not result_row.empty:

        # Retrieve the predefined group for this split:
        subset = result_row['subset'].values[0]

        # Define target directory based on the subset:
        target_dir = os.path.join(target_base_dir, subset.capitalize())

        # Make sure the target directory exists:
        if not os.path.exists(target_dir):
            os.makedirs(target_dir)

        source_file_path = os.path.join(data_dir, File)  # Source file path.
        target_file_path = os.path.join(target_dir, File)  # Target file path.

        # Copy the file to the target directory (Consider moving instead of copying when dealing with limited storage capacity):
        shutil.copy(source_file_path, target_file_path)
        # shutil.move(source_file_path, target_file_path)


In [None]:
# Check if splitting was succesful:

# Load the directories for the original and new file locations:
# Load the csv file attached to the InsectSet66 dataset:
data_metadata = pd.read_csv('...../InsectSet66_Train_Val_Test_Annotation.csv')

# Directory of data splits:
data_dir = ''

# Names of the maps containing the split data:
maps = ['Train', 'Validation', 'Test']

# Get counts from the csv file:
count_metadata = data_metadata['subset'].value_counts()

# Initialize a dictionary to hold the counts:
counts_dict = {
    'Category': [],
    'Metadata_Count': [],
    'Actual_Count': []
  }

# Fill the dictionary:
for category in maps:

    normalized_category = category.lower()

    # Add the category name:
    counts_dict['Category'].append(category)

    # Add the metadata count:
    metadata_count = count_metadata.get(normalized_category, 0)
    counts_dict['Metadata_Count'].append(metadata_count)

    # Add actual count from filesystem:
    folder_path = os.path.join(data_dir, category)
    actual_count = len(os.listdir(folder_path))
    counts_dict['Actual_Count'].append(actual_count)

# Create a DataFrame from the dictionary:
comparison_df = pd.DataFrame(counts_dict)

# Check if the values in the dataframe match:
print(comparison_df)


In [None]:
# Use this code to find missing files when values in the dataframe do not match:

# Load the csv file attached to the InsectSet66 dataset:
data_metadata = pd.read_csv('...../InsectSet66_Train_Val_Test_Annotation.csv')

# Directory of data splits:
data_dir = ''

# Names of the maps containing the split data:
maps = ['Train', 'Validation', 'Test']

# Initialize a dictionary to store filenames:
directory_files = {}

# Reading file names from directories:
for category in maps:

    folder_path = os.path.join(data_dir, category)

    # List files in the directory, only consider files (ignore subdirectories):
    files = [f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]
    directory_files[category.lower()] = set(files)

# Check for discrepancies:
for category in maps:
    normalized_category = category.lower()

    # Get file names from metadata for the current category:
    metadata_files = set(data_metadata[data_metadata['subset'] == normalized_category]['file_name'])

    # Files in metadata but not in directory:
    missing_files = metadata_files - directory_files[normalized_category]
    if missing_files:
        print(f'{len(missing_files)} Files in metadata but not in {category} directory:', missing_files)
    else:
        print(f'All files in {category} category are accounted for in the directory.')


In [None]:
# Check if none of the files got corrupted while moving them to different directories:

# Directory of data splits:
data_dir = ''

# Names of the maps containing the split data:
maps = ['Train', 'Validation', 'Test']

# Initialize count:
count = 0

# Walk through all files:
for subset in maps:
  directory = os.join.path(data_dir, subset)
  for root, dirs, files in os.walk(directory):
      for file in files:

          # Get the path to the file:
          filepath = os.path.join(root, file)

          # Check if the size of the file is 0 using os.path.getsize:
          if os.path.getsize(filepath) == 0:

              # Print the path to the file:
              count += 1
              print('0 KB file found:', filepath)

  if count == 0:
    print(f'No corrupted files in {subset}:)')


## 3. Standardizing the data

### 3.1 Custom functions

All custom functions are slightly modified versions of Faiss' (2023) preprocessing script.

https://github.com/mariusfaiss/InsectSet47-InsectSet66-Adaptive-Representations-of-Sound-for-Automatic-Insect-Recognition/blob/main/SplitAudioChunks.py


In [None]:
def preprocess_audio(wave_paths, sample_rate, sample_buffer, out_path):
    """
    Main file to preprocess a list of audio files.
    Audio files longer than sample_buffer are chunked into small
    overlapping fixed size windows.
    Audio files shorter than sample_buffer are padded with zeros or
    looped to sample_buffer size, respectively.

    Args:
        wave_paths: list, filepaths to audio files to be preprocessed
        sample_rate: int, sample rate of audio files
        sample_buffer: float, total sample buffer length. Calculated as window_size*sample_rate.
        out_path: str, output path for saving.
    """

    for filename in tqdm(wave_paths):
        audio, _ = librosa.load(filename, sr=sample_rate)
        file_length = librosa.get_duration(y=audio, sr=sample_rate)
        name = ntpath.basename(filename[:-4])

        samples_total = file_length * sample_rate

        if samples_total < sample_buffer:
            pad_short(audio, sample_rate, sample_buffer, samples_total, out_path, name)
            loop_short(audio, sample_rate, sample_buffer, samples_total, out_path, name)
        elif file_length >= chunk_length:
            chunk_long(audio, sample_rate, sample_buffer, samples_total, out_path, name)


In [None]:
def loop_short(audio, sample_rate, sample_buffer, samples_total, out_path, name):
        """
        Loop short audio files until the sample_buffer length is reached.

        Args:
            audio: array, audio waveform.
            sample_rate: int, sample rate of audio files.
            sample_buffer: float, total sample buffer length.Calculated as window_size*sample_rate.
            samples_total: int, total number of samples for calculating the amount of loops.
            out_path: str, output path for saving.
            name: str, name of the audio file.
        """

        count = int(sample_buffer / samples_total) + (sample_buffer % samples_total > 0)
        i = 1
        loop = audio

        while i < count:
            loop = np.concatenate([loop, audio])
            i += 1

        loop = loop[: int(sample_buffer)]
        sf.write(f'{out_path+name}_loop.wav', loop, sample_rate)


In [None]:
def pad_short(audio, sample_rate, sample_buffer, samples_total, out_path, name):
        """
        Pad short audio files until the sample_buffer length is reached.

        Args:
            audio: array, audio waveform.
            sample_rate: int, sample rate of audio files.
            sample_buffer: float, total sample buffer length. Calculated as window_size*sample_rate.
            samples_total: int, total number of samples for calculating the amount of loops.
            out_path: str, output path for saving.
            name: str, name of the audio file.
        """

        pad = int(sample_buffer - samples_total)
        wave = np.pad(audio, (0, pad))

        sf.write(f'{out_path+name}_padded.wav', wave, sample_rate)


In [None]:
def chunk_long(audio, sample_rate, sample_buffer, samples_total, out_path, name):
        """
        Chunk audio files into small overlapping fixed size windows.
        End chunks are wrapped.

        Args:
            audio: array, audio waveform.
            sample_rate: int, sample rate of audio files.
            sample_buffer: float, total sample buffer length. Calculated as window_size*sample_rate.
            samples_total: int, total number of samples for calculating the amount of loops.
            out_path: str, output path for saving.
            name: str, name of the audio file.
        """

        samples_wrote = 0
        counter = 1
        while samples_wrote < samples_total:
            if (samples_total - samples_wrote) >= sample_buffer:
                chunk = audio[samples_wrote: int(samples_wrote + sample_buffer)]
                sf.write(f'{out_path+name}_chunk{counter}.wav', chunk, sample_rate)
                samples_wrote = int(samples_wrote + sample_buffer - overlap_samples)
                counter += 1

            # Wrap audio for end chunks:
            if (samples_total - samples_wrote) < sample_buffer:
                if (samples_total - samples_wrote) > min_samples:
                    wrap_length = int(sample_buffer - (samples_total - samples_wrote))
                    wrap = audio[0: int(wrap_length)]
                    chunk = audio[samples_wrote: int(samples_wrote + sample_buffer)]
                    wrapped_file = np.concatenate([chunk, wrap])
                    sf.write(f'{out_path+name}_wrap{counter}.wav', wrapped_file, sample_rate)
                    counter += 1
                samples_wrote = int(samples_wrote + sample_buffer - overlap_samples)


### 3.2 Model 1

In [None]:
# Define (window length, window overlap):
sample_rate = 44100
chunk_length = 5
chunk_overlap = 2.5
min_length = 1.25

# calculate global variables:
sample_buffer = chunk_length * sample_rate         # Number of samples per chunk.
overlap_samples = chunk_overlap * sample_rate      # Overlap of chunks in samples.
min_samples = min_length * sample_rate             # Minimum end samples.

# Name of maps containing the splits:
dsets = ['Train', 'Val', 'Test']

# Directory to output standardized fragments:
outdir = f''

# Create a new dictionary for saving the standardized fragments:
os.makedirs(outdir, exist_ok=False)

# Iterate through the maps:
for ds in dsets:
    os.makedirs(f'{outdir}/{ds}', exist_ok=False)
    paths = glob(f'/content/drive/MyDrive/Thesis/{ds}/*.wav')
    out_path = f'{outdir}/{ds}/'
    preprocess_audio(paths, sample_rate, sample_buffer, out_path)


# Load the csv file attached to the InsectSet66 dataset:
df = pd.read_csv('...../InsectSet66_Train_Val_Test_Annotation.csv')
df = df[['file_name', 'unique_file', 'path', 'label', 'subset']]

all_dfs = []
for i in tqdm(range(len(df))):
    name = ntpath.basename(df.iloc[i]['path'][:-4])
    subset = df.iloc[i]['subset']
    subset = 'val' if subset == 'validation' else subset
    chunks = glob(f'{outdir}/{subset}/{name}*.wav')
    n_chunks = len(chunks)
    new_df = pd.DataFrame(np.tile(df.iloc[i].values, n_chunks).reshape(n_chunks, len(df.columns)) , columns=df.columns)
    new_df['path'] = chunks
    all_dfs.append(new_df)

pp_df = pd.concat(all_dfs)
pp_df.to_csv(f'{outdir}/metadata.csv', index=False)

In [None]:
# Check if none of the files got corrupted while moving them to different directories:

# Directory of data splits:
data_dir = ''

# Names of the maps containing the split data:
maps = ['Train', 'Validation', 'Test']

# Initialize count:
count = 0

# Walk through all files:
for subset in maps:
  directory = os.join.path(data_dir, subset)
  for root, dirs, files in os.walk(directory):
      for file in files:

          # Get the path to the file:
          filepath = os.path.join(root, file)

          # Check if the size of the file is 0 using os.path.getsize:
          if os.path.getsize(filepath) == 0:

              # Print the path to the file:
              count += 1
              print('0 KB file found:', filepath)

  if count == 0:
    print(f'No corrupted files in {subset}:)')


### 3.3 Model 2

In [None]:
# Define (window length, window overlap):
sample_rate = 44100
chunk_length = 5
chunk_overlap = 3.75
min_length = 1.25

# calculate global variables:
sample_buffer = chunk_length * sample_rate         # Number of samples per chunk.
overlap_samples = chunk_overlap * sample_rate      # Overlap of chunks in samples.
min_samples = min_length * sample_rate             # Minimum end samples.

# Name of maps containing the splits:
dsets = ['Train', 'Val', 'Test']

# Directory to output standardized fragments:
outdir = f''

# Create a new dictionary for saving the standardized fragments:
os.makedirs(outdir, exist_ok=False)

# Iterate through the maps:
for ds in dsets:
    os.makedirs(f'{outdir}/{ds}', exist_ok=False)
    paths = glob(f'/content/drive/MyDrive/Thesis/{ds}/*.wav')
    out_path = f'{outdir}/{ds}/'
    preprocess_audio(paths, sample_rate, sample_buffer, out_path)


# Load the csv file attached to the InsectSet66 dataset:
df = pd.read_csv('...../InsectSet66_Train_Val_Test_Annotation.csv')
df = df[['file_name', 'unique_file', 'path', 'label', 'subset']]

all_dfs = []
for i in tqdm(range(len(df))):
    name = ntpath.basename(df.iloc[i]['path'][:-4])
    subset = df.iloc[i]['subset']
    subset = 'val' if subset == 'validation' else subset
    chunks = glob(f'{outdir}/{subset}/{name}*.wav')
    n_chunks = len(chunks)
    new_df = pd.DataFrame(np.tile(df.iloc[i].values, n_chunks).reshape(n_chunks, len(df.columns)) , columns=df.columns)
    new_df['path'] = chunks
    all_dfs.append(new_df)

pp_df = pd.concat(all_dfs)
pp_df.to_csv(f'{outdir}/metadata.csv', index=False)

In [None]:
# Check if none of the files got corrupted while moving them to different directories:

# Directory of data splits:
data_dir = ''

# Names of the maps containing the split data:
maps = ['Train', 'Validation', 'Test']

# Initialize count:
count = 0

# Walk through all files:
for subset in maps:
  directory = os.join.path(data_dir, subset)
  for root, dirs, files in os.walk(directory):
      for file in files:

          # Get the path to the file:
          filepath = os.path.join(root, file)

          # Check if the size of the file is 0 using os.path.getsize:
          if os.path.getsize(filepath) == 0:

              # Print the path to the file:
              count += 1
              print('0 KB file found:', filepath)

  if count == 0:
    print(f'No corrupted files in {subset}:)')
