In [1]:
from silero_vad import load_silero_vad, read_audio, get_speech_timestamps
from pydub import AudioSegment
from pydub.utils import mediainfo

import math
from io import BytesIO
from typing import IO, List


def _split_audio(audio_file: IO[bytes], vad_model, sampling_rate, audio_id) -> List:
        """
        This is a helper function that takes an audio file and splits it into timestamps based on VAD

        Parameters:
            - audio_file (IO[bytes]) -> Audio file you want to split timestamps of
            - vad_model (silero-vad) -> Voice Activity Detection model to analyze the audio.

        Returns:
            - audio_segments (List) -> List of audio segments (as BytesIO buffers). 

        TODO: 
        1. You can try to ceil or floor the timestamps to nearest integer but there could be a problem in recognizing audio if it merges with some random sound or previous syllable. 
        2. Change from mono to stereo if stereo sound is provided
        3. Try changing the export format to mp3 and observe the results
        """

        def split_timestamps(timestamps):
            result = []

            def find_largest_gap(chunk):
                """Find the index of the largest gap between consecutive timestamps."""
                max_gap = 0
                split_index = None

                for i in range(1, len(chunk)):
                    gap = chunk[i]['start'] - chunk[i - 1]['end']
                    if gap > max_gap:
                        max_gap = gap
                        split_index = i

                return max_gap, split_index

            def process_chunk(chunk):
                """Process a single chunk and split it if its duration is > 29 seconds."""
                # Base condition: If chunk has one or fewer timestamps, add it directly
                if len(chunk) <= 1:
                    result.append(chunk)
                    return

                # Check duration of the chunk
                duration = chunk[-1]['end'] - chunk[0]['start']
                if duration <= 29:
                    result.append(chunk)  # If duration is <= 29, keep the chunk as is
                    return

                # Find the largest gap and split at that point
                max_gap, split_index = find_largest_gap(chunk)

                # If no valid split point is found, add the chunk as is
                if max_gap <= 0 or split_index is None:
                    result.append(chunk)
                    return

                # Split the chunk into two at the split_index
                left_chunk = chunk[:split_index]
                right_chunk = chunk[split_index:]

                # Process each sub-chunk recursively
                process_chunk(left_chunk)
                process_chunk(right_chunk)

            # Start processing the chunks
            process_chunk(timestamps)

            return result

        # 1. Read audio and prepare it for processing
        audio_bytes = audio_file.getvalue()

        # 2. Convert the audio into BytesIO object
        audio = BytesIO(audio_bytes)

        # 3. Read audio with it's sampling rate
        wav = read_audio(audio, sampling_rate=sampling_rate)

        # 4. Get speech timestamps from the vad model 
        speech_timestamps = get_speech_timestamps(
            wav, 
            vad_model, 
            sampling_rate=sampling_rate, 
            return_seconds=True
        )
     
        # 5. Extract processed timestamps
        processed_timestamps = split_timestamps(speech_timestamps)

        # TODO: You can try to ceil or floor the timestamps to nearest integer but there could be a problem in recognizing audio if it merges with some random sound or previous syllable. 
        cleaned_processed_timestamps = []
        for timestamps in processed_timestamps:
            cleaned_processed_timestamps.append(
                {
                    "start": float(timestamps[0]['start']), 
                    "end": float(timestamps[-1]['end'])
                }
            )
        
        # 6. Cleaning processed timestamps
        cleaned_processed_timestamps_2 = [cleaned_processed_timestamps[0]]

        for i in range(1, len(cleaned_processed_timestamps)):
            if (cleaned_processed_timestamps[i]['end']- cleaned_processed_timestamps[i]['start']) + (cleaned_processed_timestamps_2[-1]['end'] - cleaned_processed_timestamps_2[-1]['start']) < 29:
                cleaned_processed_timestamps_2[-1]['end'] = cleaned_processed_timestamps[i]['end']
            else:
                cleaned_processed_timestamps_2.append(cleaned_processed_timestamps[i])

        # Convert processed timestamps into audio segments
        audio_segments = []
        audio = AudioSegment.from_file(audio_file)
        for j, cleaned_processed_timestamp in enumerate(cleaned_processed_timestamps):
            start_ms = cleaned_processed_timestamp['start'] * 1000  # Convert seconds to milliseconds
            end_ms = cleaned_processed_timestamp['end'] * 1000    # Convert seconds to milliseconds
            segment = audio[start_ms:end_ms]

            buffer = BytesIO()
            segment.export(buffer, format="wav")
            saving_file_name = f"./data/audio_chunks/{audio_id}-{j+1}.wav"
            segment.export(saving_file_name, format="wav") # TODO: Try changing it to wav and observe the results
            buffer.seek(0)  # Reset buffer pointer
            audio_segments.append(buffer)

        return audio_segments

In [2]:
# import os
# from silero_vad import load_silero_vad, read_audio, get_speech_timestamps
# from pydub import AudioSegment
# from pydub.utils import mediainfo

# import math
# from io import BytesIO
# from typing import IO, List

# vad_model = load_silero_vad()

# def _split_audio(audio_file):
#     # 1. Read audio and prepare it for processing
#     audio_bytes = audio_file.getvalue()

#     # 2. Convert the audio into BytesIO object
#     audio = BytesIO(audio_bytes)

#     # 3. Read audio with it's sampling rate
#     wav = read_audio(audio, sampling_rate=8000)

#     # 4. Get speech timestamps from the vad model 
#     speech_timestamps = get_speech_timestamps(
#         wav, 
#         vad_model, 
#         sampling_rate=8000, 
#         return_seconds=True
#     )

#     for speech_timestamp in speech_timestamps:
#         duration = speech_timestamp['end'] - speech_timestamp['start']
#         print(duration)

In [3]:
# AUDIO_FILE_PATH = "./data/audio_recordings/1d70dc3e-73e6-419e-8270-a323efdbeb3f.mp3"

# with open(AUDIO_FILE_PATH, "rb") as f:
#     audio_f = f.read()

# audio_f = BytesIO(audio_f)

# _split_audio(audio_f)

In [4]:
import os

vad_model = load_silero_vad()

for root, _, files in os.walk("./data/audio_recordings/"):
    for file in files:
        audio_file_path = os.path.join(root, file)

        with open(audio_file_path, "rb") as f:
            audio = f.read()

        audio_id = audio_file_path.split("/")[-1][:-4]
        audio = BytesIO(audio)

        audio_segments = _split_audio(audio, vad_model, 8000, audio_id)

In [5]:
import os
from pydub.utils import mediainfo

# Specify the directory containing the audio files
folder_path = "./data/audio_chunks"

# Function to calculate the total audio duration in seconds
def get_total_audio_duration(folder_path):
    total_duration = 0

    # Iterate through files in the folder
    for filename in os.listdir(folder_path):
        file_path = os.path.join(folder_path, filename)
        
        # Check if it's an audio file (you can extend this with more audio formats)
        if filename.endswith(('.mp3', '.wav', '.flac', '.ogg', '.m4a')):  
            try:
                # Get audio duration info
                audio_info = mediainfo(file_path)
                duration = float(audio_info['duration'])
                total_duration += duration
            except Exception as e:
                print(f"Could not process {filename}: {e}")
    
    return total_duration

# Call the function
total_duration_seconds = get_total_audio_duration(folder_path)

# Convert seconds to hours, minutes, and seconds
hours = total_duration_seconds // 3600
minutes = (total_duration_seconds % 3600) // 60
seconds = total_duration_seconds % 60

print(f"Total audio duration: {int(hours)} hours, {int(minutes)} minutes, {int(seconds)} seconds")


Total audio duration: 6 hours, 44 minutes, 9 seconds
