In [1]:
import os
import librosa
import numpy as np
from tqdm import tqdm
from pydub import AudioSegment, silence

In [14]:
# Constants
TARGET_DURATION = 2.5 * 1000  # 2.5 seconds in milliseconds
MINIMUM_AUDIO_DURATION = 1000  # minimum after silence removal (gun,glass-400 scream-300 neutral-1s)
SILENCE_THRESHOLD = -40  # dB  (gun,glass-35 scream-30)
MAX_SILENCE_DURATION = 200 # 500
OVERLAP_RATIO = 0.25
SAMPLE_RATE = 22050  # 16 kHz
OUTPUT_DIR = "../data/proccessed/processed_dataset"
INPUT_DIR = "../data/raw/dataset"

class_names = ['glass_break', 'gunshot', 'scream', 'neutral']

# Ensure output directory exists
for class_name in class_names:
    os.makedirs(f"{OUTPUT_DIR}/{class_name}", exist_ok=True)

In [15]:
def preprocess_audio(file_path, output_path):
    # Step 1: Load audio using librosa
    audio, sr = librosa.load(file_path, sr=SAMPLE_RATE)  # Loads and resamples
    original_duration = len(audio) / sr * 1000  # Original duration in ms
    
    # Convert librosa audio back to AudioSegment for silence detection
    audio = AudioSegment(
        (audio * 32767).astype(np.int16).tobytes(),
        frame_rate=SAMPLE_RATE,
        sample_width=2,
        channels=1
    )
       
    # Step 1: Remove silence
    non_silent_chunks = silence.detect_nonsilent(
        audio, min_silence_len=MAX_SILENCE_DURATION, silence_thresh=SILENCE_THRESHOLD
    )
    trimmed_audio = AudioSegment.silent(0)
    for start, end in non_silent_chunks:
        trimmed_audio += audio[start:end]

    # Ensure the audio has at least 1500 ms after silence removal
    if len(trimmed_audio) < MINIMUM_AUDIO_DURATION:
        # print(f"Audio is too short after silence removal. file path: {file_path}")
        return 0, original_duration, len(trimmed_audio)
    
    # Step 2: Ensure 22050 kHz sample rate
    samples = np.array(trimmed_audio.get_array_of_samples())
    resampled_audio = librosa.resample(samples.astype(float), 
                                       orig_sr=trimmed_audio.frame_rate, 
                                       target_sr=SAMPLE_RATE)
    
    # Convert resampled audio to AudioSegment
    resampled_audio = AudioSegment(
        resampled_audio.astype(np.int16).tobytes(), frame_rate=SAMPLE_RATE,
        sample_width=2, channels=1
    )

    # Step 3: Chunk or pad audio to 2.5 seconds
    duration = len(resampled_audio)  # Duration in milliseconds
    chunks = []
    
    if duration > TARGET_DURATION:
        step = int(TARGET_DURATION * (1 - OVERLAP_RATIO))
        for i in range(0, int(duration - TARGET_DURATION + step), step):
            chunks.append(resampled_audio[i:i + TARGET_DURATION])
            
        # Removing the last chunk to make sure all the cunks are 2.5 seconds   
        chunks.pop()   

        
    else:
        # If 2.5s > audio duration > 1.5s, add padding
        padding_needed = TARGET_DURATION - duration
        padding_start = AudioSegment.silent(duration=padding_needed // 2)
        padding_end = AudioSegment.silent(duration=padding_needed - len(padding_start))
        padded_audio = padding_start + resampled_audio + padding_end
        chunks.append(padded_audio)   
    
    # Save chunks
    base_name = os.path.splitext(os.path.basename(file_path))[0]
    for i, chunk in enumerate(chunks):
        chunk.export(
            os.path.join(output_path, f"{base_name}_chunk_{i + 1}.wav"),
            format="wav"
        )
    return len(chunks), original_duration, len(trimmed_audio)

In [16]:
# Process all audio files in a directory
def process_dataset(input_dir, output_path, class_name):
    # Get a list of all the audio files in the directory
    audio_files = [f for f in os.listdir(input_dir) if f.endswith((".wav", ".mp3"))]
    
    # Use tqdm for progress bar
    for file_name in tqdm(audio_files, desc=f"Processing files in {class_name}", unit="file", ncols=100):
        file_path = os.path.join(input_dir, file_name)
        
        # Call the function to preprocess the audio file
        num_chunks, orig_duration, trimmed_duration = preprocess_audio(file_path, output_path)
        
        # print(f"Processed {file_name}:")
        # print(f"  - Original Duration: {orig_duration / 1000:.2f}s")
        # print(f"  - Trimmed Duration: {trimmed_duration / 1000:.2f}s")
        # print(f"  - Chunks Created: {num_chunks}")

In [17]:
for class_name in class_names:
    input_path = f"{INPUT_DIR}/{class_name}"
    output_path = f"{OUTPUT_DIR}/{class_name}"
    process_dataset(input_path, output_path, class_name)

Processing files in glass_break: 100%|██████████████████████████| 335/335 [00:14<00:00, 23.18file/s]
Processing files in gunshot: 100%|████████████████████████████| 1794/1794 [00:45<00:00, 39.31file/s]
Processing files in scream: 100%|█████████████████████████████| 2232/2232 [01:08<00:00, 32.42file/s]
Processing files in neutral: 100%|████████████████████████████| 2366/2366 [01:43<00:00, 22.88file/s]
