In [4]:
PATH_TO_INPUT_DATA = "/Users/simonmyhre/workdir/gitdir/sqml/projects/sm_multiclass_masters_project/pull_data/cache/datav3"

import pandas as pd
import os
import numpy as np
import librosa
import soundfile as sf

In [5]:
path_to_mix_data = PATH_TO_INPUT_DATA.split("/")[0:-1] + ["mixdata"]
path_to_mix_data = "/".join(path_to_mix_data)
print(path_to_mix_data)

/Users/simonmyhre/workdir/gitdir/sqml/projects/sm_multiclass_masters_project/pull_data/cache/mixdata


In [6]:
def get_wav_chunk(
    wav: np.ndarray, start: int, end: int, sample_rate: int, wav_length: int
):
    assert end <= wav_length, "Trying to create window which exceeds the wav's lenght"
    wav_chunk = wav[int(start * sample_rate) : int(end * sample_rate)]
    return wav_chunk

# Placeholder data emulating one chunk which comes through the pipeline
wav, sr = librosa.load(PATH_TO_INPUT_DATA + "/wavs/a51414a8e2b19b43ad4b9ad69265b94e.wav", sr=16000)
sf.write("cache/entire_wav_before_mix.wav", wav, sr)
wav_chunk = get_wav_chunk(wav, 0, 1, 16000, len(wav))
print(wav_chunk)
# Save the chunk to a file
sf.write("cache/wav_before_mix.wav", wav_chunk*10, sr)

[ 4.0757841e-05  5.4431752e-05  3.6259109e-05 ... -5.6102755e-05
 -7.4076983e-05 -3.1088901e-05]


In [7]:
def normalize_audio_energy(audio, target_energy=1.0):
    # Calculate the current energy of the audio
    current_energy = np.sum(np.square(audio))
    
    # Calculate the normalization factor
    # Avoid division by zero by adding a small epsilon
    normalization_factor = np.sqrt(target_energy / (current_energy + 1e-10))
    
    # Normalize the audio by the normalization factor
    normalized_audio = audio * normalization_factor
    
    return normalized_audio

In [9]:
wav_chunk_normalized = normalize_audio_energy(wav_chunk)
sf.write("cache/wav_before_mix_normalized.wav", wav_chunk_normalized, sr)

In [12]:
# Create the new function for reading a chunk of data from data in mixdata
data_in_mixdata = os.listdir(path_to_mix_data + "/wavs")
print(len(data_in_mixdata))







def _read_random_mix_file(chunk_size: int, sample_rate: int = 16000):
    """
    Takes a random audio file from the mixdata folder and returns a chunk of the audio file equal to the chunk_size.

    Args:
    - chunk_size: The length of the audio chunk to return.
    - sample_rate: The sample rate of the audio chunks.

    Returns:
    - The audio chunk.
    """

    file_loaded = False
    mix_wav = None

    while not file_loaded:
        try:
            random_file = np.random.choice(data_in_mixdata)
            mix_wav, sr = librosa.load(path_to_mix_data + "/wavs/" + random_file, sr=sample_rate)

            # If chunk_size is larger than the mix_wav, continue to the next iteration
            if chunk_size > len(mix_wav):
                continue

            # File loaded successfully and meets the chunk_size requirement
            file_loaded = True

        except Exception as e:
            # Handle the exception (e.g., print or log)
            print(f"Error loading file: {e}")
            # Continue to the next iteration to try loading another file
            continue

    # Once a valid file is loaded, extract the chunk
    start = np.random.randint(0, len(mix_wav) - chunk_size)
    end = start + chunk_size
    return mix_wav[start:end]

def apply_mix(
        wav_chunk,
        sample_rate: int = 16000,
        mix_ratio_from: float = 0.1,
        mix_ratio_to: float = 0.03,
    ):
    """
    Mixes wav_chunk with a randomly selected audio chunk.

    Args:
    - wav_chunk: The primary audio chunk to mix.
    - sample_rate: The sample rate of the audio chunks.
    - mix_ratio: The ratio of the secondary chunk to mix with the primary chunk.

    Returns:
    - The mixed audio chunk.
    """

    secondary_chunk = _read_random_mix_file(
        chunk_size=len(wav_chunk), sample_rate=sample_rate
    )

    if len(secondary_chunk) != len(wav_chunk):
        raise ValueError(
            "The secondary chunk and wav_chunk must have the same length"
        )

    sf.write("cache/mixing_wav.wav", secondary_chunk, sr)
    sf.write("cache/mixing_wav_normalized.wav", normalize_audio_energy(secondary_chunk), sr)

    mix_ratio = np.random.uniform(mix_ratio_from, mix_ratio_to)

    assert 0 <= mix_ratio <= 1, "The mixing ratio must be between 0 and 1"

    # Mixing the chunks
    mixed_chunk = (wav_chunk*10) + (secondary_chunk * (mix_ratio))
    return mixed_chunk

new_wav_chunk = apply_mix(wav_chunk, sr)
sf.write("cache/wav_after_mix.wav", new_wav_chunk, sr)
sf.write("cache/wav_after_mix_normalized.wav", normalize_audio_energy(new_wav_chunk), sr)

161


In [14]:
# Test the augmenter
PATH_TO_SKYLINE = "/Users/simonmyhre/workdir/gitdir/skyline"
import sys

sys.path.append(PATH_TO_SKYLINE)
from cirrus.datamaker.augmenter.augmenter import Augmenter


In [16]:
augmenter = Augmenter(path_to_input_data=PATH_TO_INPUT_DATA)
wav_augmented = augmenter.augment_file(wav_chunk, sr, "mix_1")
sf.write("cache/wav_augmented.wav", wav_augmented, sr)
