In [4]:
from pathlib import Path

from pydub import AudioSegment
from pydub.utils import db_to_float
from pydub.utils import mediainfo

input_path = Path('./data/audio_input')
output_path = Path('./data/audio_output')
unique_extensions = set()


In [5]:
for file in input_path.rglob('*'):  # Recursively iterates through all files in subdirectories
    if file.is_file():
        unique_extensions.add(file.suffix)

print(unique_extensions)  # Outputs a set of unique file extensions

{'.wav', '.mp3'}


In [6]:
durations = []

for file in input_path.rglob('*'):
    if file.is_file() and file.suffix in {'.wav', '.mp3'}:
        info = mediainfo(str(file))
        duration = float(info['duration'])
        durations.append(duration)

if durations:
    print(f"Max length: {max(durations)} seconds")
    print(f"Min length: {min(durations)} seconds")
else:
    print("No audio files found.")

  m = re.match('([su]([0-9]{1,2})p?) \(([0-9]{1,2}) bit\)$', token)
  m2 = re.match('([su]([0-9]{1,2})p?)( \(default\))?$', token)
  elif re.match('(flt)p?( \(default\))?$', token):
  elif re.match('(dbl)p?( \(default\))?$', token):


KeyboardInterrupt: 

In [3]:
def split_audio(input_folder, output_folder, segment_length=2000, min_amplitude_db=-30):
    input_folder_path = Path(input_folder)
    output_folder_path = Path(output_folder)
    min_amplitude = db_to_float(min_amplitude_db)

    for file in input_folder_path.rglob('*'):
        if file.is_file() and file.suffix.lower() in {'.wav', '.mp3'}:
            try:
                # Determine the relative path of the file from the input folder
                relative_path = file.relative_to(input_folder_path)

                # Create the corresponding output subfolder
                output_subfolder = output_folder_path / relative_path.parent
                output_subfolder.mkdir(parents=True, exist_ok=True)

                audio = AudioSegment.from_file(str(file), format=file.suffix.lower()[1:])

                for i, chunk in enumerate(audio[::segment_length]):
                    # Check if the maximum amplitude of the chunk is above the threshold
                    if chunk.max_dBFS > min_amplitude_db:
                        output_file = output_subfolder / f"{file.stem}_{i}{file.suffix}"
                        chunk.export(output_file, format=file.suffix.lower()[1:])
                        print(f"Exported: {output_file}")
                    else:
                        print(f"Skipped chunk {i} from {relative_path} (below amplitude threshold)")

                print(f"Processed: {relative_path}")
            except Exception as e:
                print(f"Error processing {file}: {str(e)}")
    print("Finished")


segment_length = 2000  # 2 seconds
min_amplitude_db = -30  # Minimum amplitude threshold in dB
split_audio(input_path, output_path, segment_length, min_amplitude_db)



Processed: Active/GH001 - Active - Day - 141022_0659_0751-24-0.wav
Processed: Active/Hive1_12_06_2018_QueenBee_H1_audio___16_40_00-22-4-upsampled.wav
Processed: Active/Hive1_12_06_2018_QueenBee_H1_audio___16_10_00-14-1-upsampled.wav
Processed: Active/Hive1_12_06_2018_QueenBee_H1_audio___15_20_00-30-14-upsampled.wav
Processed: Active/Hive1_12_06_2018_QueenBee_H1_audio___16_30_00-2-14-upsampled.wav
Processed: Active/GH001 - Active - Day - 141022_0659_0751-86-12.wav
Processed: Active/Hive1_12_06_2018_QueenBee_H1_audio___17_00_00-10-25-upsampled.wav
Processed: Active/Hive1_12_06_2018_QueenBee_H1_audio___15_00_00-2-28-upsampled.wav
Processed: Active/GH001 - Active - Day - 141022_0659_0751-82-0.wav
Processed: Active/CF003 - Active - Day - (221)-0-46.wav
Processed: Active/#DH001 - Active - Normal - 10000 Bees.mp3
Processed: Active/Hive1_12_06_2018_QueenBee_H1_audio___16_20_00-10-2-upsampled.wav
Processed: Active/Hive1_12_06_2018_QueenBee_H1_audio___16_50_00-16-8-upsampled.wav
Processed: Activ

In [None]:
def reduce_noise_stft(audio_segment: AudioSegment, noise, noise_reduction_factor: float = 0.5):
    # Convert to numpy array and normalize
    samples = np.array(audio_segment.get_array_of_samples()).astype(np.float32)
    samples /= np.max(np.abs(samples))  # Normalize to [-1, 1]

    noise_samples = np.array(noise.get_array_of_samples()).astype(np.float32)
    # noise_samples /= np.max(np.abs(samples))

    # Define STFT parameters
    fs = audio_segment.frame_rate
    nperseg = 1024  # Larger value = better frequency resolution, less temporal resolution
    noverlap = nperseg // 2  # 50% overlap
    window = 'hann'

    # Perform STFT
    f, t, Zxx = stft(samples, fs=fs, nperseg=nperseg, noverlap=noverlap, window=window)

    # Estimate noise profile (use the first 10% of the signal as noise)
    noise_profile = np.abs(Zxx[:, :int(0.01 * Zxx.shape[1])]).mean(axis=1, keepdims=True)

    # Apply noise reduction by subtracting noise profile with a smaller factor
    magnitude = np.abs(Zxx)
    phase = np.angle(Zxx)

    # Perform noise subtraction, apply a limit to prevent artifacts
    magnitude_denoised = magnitude - noise_reduction_factor * noise_profile
    magnitude_denoised = np.maximum(magnitude_denoised, magnitude * 0.01)  # Apply a floor to avoid complete silence

    # Reconstruct the complex spectrogram (preserve phase)
    Zxx_denoised = magnitude_denoised * np.exp(1j * phase)

    # Perform inverse STFT to get back to time-domain signal
    _, reconstructed = istft(Zxx_denoised, fs=fs, nperseg=nperseg, noverlap=noverlap, window=window)

    # Convert back to AudioSegment, ensuring we scale properly
    reconstructed = np.int16(reconstructed / np.max(np.abs(reconstructed)) * 32767)  # Ensure proper scaling
    return audio_segment._spawn(reconstructed.tobytes())


f = AudioSegment.from_file("./data/audio_input/missing_queen/CJ001 - Missing Queen - Day -  (100)-6-1.wav")
n = AudioSegment.from_file("./data/noise/noise_sample.wav")
denoised = reduce_noise_stft(f, n, noise_reduction_factor=0.1)
denoised.export('./data/denoised.wav', format="wav")

In [None]:
def reduce_noise(audio_segment, noise_sample_path=noise_sample):
    noise_audio, noise_sr = librosa.load(noise_sample_path, sr=None)

    samples = np.array(audio_segment.get_array_of_samples()).astype(np.float32)
    if audio_segment.channels > 1:
        # If stereo, average channels to mono
        samples = samples.reshape((-1, audio_segment.channels)).mean(axis=1)

    # Reduce noise
    reduced_samples = noisereduce.reduce_noise(y=samples, y_noise=noise_audio, sr=noise_sr)

    # Convert back to AudioSegment
    reduced_chunk = AudioSegment(
        reduced_samples.tobytes(),
        frame_rate=audio_segment.frame_rate,
        sample_width=audio_segment.sample_width,
        channels=1)

    return reduced_chunk

In [None]:
def reduce_noise_deep_filter(audio_segment: AudioSegment, noise_reduction_factor: float = 1.0):
    # Initialize DeepFilterNet model
    model, df_state, _ = init_df()

    # Convert AudioSegment to numpy array
    samples = np.array(audio_segment.get_array_of_samples()).astype(np.float32)
    samples /= np.finfo(samples.dtype).max  # Normalize to [-1, 1]

    # Create a temporary file to store the audio
    with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_file:
        temp_filename = temp_file.name
        audio_segment.export(temp_filename, format="wav")

    try:
        # Load audio using DeepFilterNet's load_audio function
        audio, _ = load_audio(temp_filename, sr=df_state.sr())

        # Enhance (denoise) the audio
        enhanced = enhance(model, df_state, audio)

        # Apply noise reduction factor
        if noise_reduction_factor != 1.0:
            enhanced = audio + noise_reduction_factor * (enhanced - audio)

        # Convert back to int16 for AudioSegment
        enhanced_int16 = np.int16(enhanced * 32767)

        # Create a new AudioSegment from the enhanced audio
        return AudioSegment(
            enhanced_int16.tobytes(),
            frame_rate=df_state.sr(),
            sample_width=2,
            channels=1
        )
    finally:
        # Clean up the temporary file
        os.unlink(temp_filename)


# Load your audio file
input_file = "./data/audio_input/missing_queen/CJ001 - Missing Queen - Day -  (100)-6-1.wav"
audio = AudioSegment.from_file(input_file)

# Apply noise reduction
denoised = reduce_noise_deep_filter(audio)

# Export the denoised audio
denoised.export('./data/denoised_deep_filter.wav', format="wav")

print("Denoising complete. Output saved as 'denoised_deep_filter.wav'")


In [None]:
def split_audio(input_folder, output_folder, segment_length=2000, min_amplitude_db=-30):
    input_folder_path = Path(input_folder)
    output_folder_path = Path(output_folder)

    file_counter = 0
    for file in input_folder_path.rglob('*'):
        if file.is_file() and file.suffix.lower() in {'.wav', '.mp3'}:
            file_counter += 1
            try:
                relative_path = file.relative_to(input_folder_path)

                output_subfolder = output_folder_path / relative_path.parent
                output_subfolder.mkdir(parents=True, exist_ok=True)

                audio = AudioSegment.from_file(str(file), format=file.suffix.lower()[1:])
                audio = reduce_noise_deep_filter(audio, noise_reduction_factor=0.5)

                for i, chunk in enumerate(audio[::segment_length]):
                    if chunk.dBFS > min_amplitude_db:
                        output_file = output_subfolder / f"{relative_path.parent}_{file_counter}_{i}.wav"
                        chunk.export(output_file, format=file.suffix.lower()[1:])
            except Exception as e:
                print(f"Error processing {file}: {str(e)}")
