In [None]:
!pip install librosa soundfile webrtcvad pydub

In [None]:
import os
import numpy as np
import librosa
import soundfile as sf
from pydub import AudioSegment
from google.colab import drive
import webrtcvad
import wave
import contextlib
from datetime import datetime

In [None]:
class AudioProcessingError(Exception):
    """Custom exception for audio processing errors."""
    pass

class AudioProcessor:
    def __init__(self, target_sr: int = 16000, min_segment_length: int = 20, max_segment_length: int = 30):
        """
        Initialize the audio processor.
        
        Args:
            target_sr: Target sample rate in Hz
            min_segment_length: Minimum segment length in seconds
            max_segment_length: Maximum segment length in seconds
            
        Raises:
            ValueError: If invalid parameters are provided
        """
        try:
            if target_sr <= 0:
                raise ValueError("Target sample rate must be positive")
            if min_segment_length <= 0 or max_segment_length <= 0:
                raise ValueError("Segment lengths must be positive")
            if min_segment_length >= max_segment_length:
                raise ValueError("Minimum segment length must be less than maximum segment length")
            
            self.target_sr = target_sr
            self.min_segment_length = min_segment_length
            self.max_segment_length = max_segment_length
            self.vad = webrtcvad.Vad(3)  # Aggressiveness mode 3 (highest)
            
            # Set up logging
            logging.basicConfig(level=logging.INFO,
                              format='%(asctime)s - %(levelname)s - %(message)s')
            self.logger = logging.getLogger(__name__)
            
        except Exception as e:
            raise AudioProcessingError(f"Failed to initialize AudioProcessor: {str(e)}")
        
    def mount_drive(self) -> None:
        """
        Mount Google Drive with error handling.
        
        Raises:
            AudioProcessingError: If drive mounting fails
        """
        try:
            drive.mount('/content/drive')
            self.logger.info("Google Drive mounted successfully")
        except Exception as e:
            raise AudioProcessingError(f"Failed to mount Google Drive: {str(e)}")
        
    def validate_file_path(self, file_path: str) -> None:
        """
        Validate if file exists and is accessible.
        
        Args:
            file_path: Path to the file
            
        Raises:
            AudioProcessingError: If file validation fails
        """
        if not os.path.exists(file_path):
            raise AudioProcessingError(f"File not found: {file_path}")
        if not os.path.isfile(file_path):
            raise AudioProcessingError(f"Not a file: {file_path}")
        if not os.access(file_path, os.R_OK):
            raise AudioProcessingError(f"File not readable: {file_path}")
        
    def load_and_resample(self, file_path: str) -> Tuple[np.ndarray, int]:
        """
        Load audio file and resample if necessary.
        
        Args:
            file_path: Path to the audio file
            
        Returns:
            Tuple of audio data and sample rate
            
        Raises:
            AudioProcessingError: If loading or resampling fails
        """
        try:
            self.validate_file_path(file_path)
            
            # Load audio file
            self.logger.info(f"Loading audio file: {file_path}")
            audio, sr = librosa.load(file_path, sr=None)
            
            if len(audio) == 0:
                raise AudioProcessingError("Empty audio file")
            
            # Resample if sample rate is higher than target
            if sr > self.target_sr:
                self.logger.info(f"Resampling from {sr}Hz to {self.target_sr}Hz")
                audio = librosa.resample(audio, orig_sr=sr, target_sr=self.target_sr)
                sr = self.target_sr
                
            return audio, sr
            
        except librosa.LibrosaError as e:
            raise AudioProcessingError(f"Failed to load or resample audio: {str(e)}")
        except Exception as e:
            raise AudioProcessingError(f"Unexpected error during audio loading: {str(e)}")
    
    def detect_voice_activity(self, audio: np.ndarray, sr: int) -> List[Tuple[float, float]]:
        """
        Detect segments with voice activity.
        
        Args:
            audio: Audio data
            sr: Sample rate
            
        Returns:
            List of (start, end) tuples in seconds
            
        Raises:
            AudioProcessingError: If voice activity detection fails
        """
        try:
            # Convert to 16-bit PCM
            audio_pcm = (audio * 32768).astype(np.int16)
            
            # Parameters for VAD
            frame_duration = 30  # ms
            frames_per_window = sr * frame_duration // 1000
            
            # Split audio into frames
            frames = []
            for i in range(0, len(audio_pcm), frames_per_window):
                frame = audio_pcm[i:i + frames_per_window]
                if len(frame) == frames_per_window:
                    frames.append(frame.tobytes())
            
            if not frames:
                raise AudioProcessingError("No valid frames found in audio")
            
            # Detect speech in frames
            is_speech = []
            for frame in frames:
                try:
                    is_speech.append(self.vad.is_speech(frame, sr))
                except Exception as e:
                    self.logger.warning(f"Failed to process frame, marking as non-speech: {str(e)}")
                    is_speech.append(False)
            
            # Find continuous speech segments
            segments = []
            start = None
            for i, speech in enumerate(is_speech):
                if speech and start is None:
                    start = i
                elif not speech and start is not None:
                    end = i
                    duration = (end - start) * frame_duration / 1000
                    if duration >= self.min_segment_length:
                        segments.append((
                            start * frame_duration / 1000,
                            min(end * frame_duration / 1000,
                                start * frame_duration / 1000 + self.max_segment_length)
                        ))
                    start = None
            
            if not segments:
                self.logger.warning("No voice activity segments detected")
                
            return segments
            
        except Exception as e:
            raise AudioProcessingError(f"Failed to detect voice activity: {str(e)}")
    
    def get_next_file_number(self, output_dir: str, filename_prefix: str) -> int:
        """
        Find the next available file number in the sequence.
        
        Args:
            output_dir: Output directory
            filename_prefix: Prefix for output filenames
            
        Returns:
            Next available file number
            
        Raises:
            AudioProcessingError: If directory access fails
        """
        try:
            if not os.path.exists(output_dir):
                return 0
                
            existing_files = os.listdir(output_dir)
            existing_numbers = []
            
            # Extract existing numbers from filenames
            for filename in existing_files:
                if filename.startswith(filename_prefix) and filename.endswith('.wav'):
                    try:
                        num_str = filename.replace(filename_prefix + '-', '').replace('.wav', '')
                        num = int(num_str)
                        existing_numbers.append(num)
                    except ValueError:
                        continue
            
            return max(existing_numbers + [-1]) + 1
            
        except Exception as e:
            raise AudioProcessingError(f"Failed to get next file number: {str(e)}")
    
    def save_segments(self, audio: np.ndarray, sr: int, segments: List[Tuple[float, float]], 
                     output_dir: str, filename_prefix: str) -> List[str]:
        """
        Save audio segments to files with sequential naming.
        
        Args:
            audio: Audio data
            sr: Sample rate
            segments: List of (start, end) tuples
            output_dir: Output directory
            filename_prefix: Prefix for output filenames
            
        Returns:
            List of saved filenames
            
        Raises:
            AudioProcessingError: If saving segments fails
        """
        try:
            os.makedirs(output_dir, exist_ok=True)
            
            if not os.access(output_dir, os.W_OK):
                raise AudioProcessingError(f"Output directory not writable: {output_dir}")
            
            # Get the starting file number
            current_number = self.get_next_file_number(output_dir, filename_prefix)
            
            saved_files = []
            for start, end in segments:
                try:
                    # Convert time to samples
                    start_sample = int(start * sr)
                    end_sample = int(end * sr)
                    
                    # Extract segment
                    segment = audio[start_sample:end_sample]
                    
                    # Generate filename with sequential numbering
                    filename = f"{filename_prefix}-{current_number:03d}.wav"
                    filepath = os.path.join(output_dir, filename)
                    
                    # Save segment
                    sf.write(filepath, segment, sr)
                    saved_files.append(filename)
                    self.logger.info(f"Saved segment: {filename}")
                    
                    # Increment counter
                    current_number += 1
                    
                except Exception as e:
                    self.logger.error(f"Failed to save segment {current_number}: {str(e)}")
                    continue
            
            if not saved_files:
                raise AudioProcessingError("No segments were successfully saved")
                
            return saved_files
            
        except Exception as e:
            raise AudioProcessingError(f"Failed to save segments: {str(e)}")
    
    def process_audio_file(self, input_file: str, output_dir: str, filename_prefix: str) -> Tuple[int, List[str]]:
        """
        Process a single audio file.
        
        Args:
            input_file: Path to input audio file
            output_dir: Output directory
            filename_prefix: Prefix for output filenames
            
        Returns:
            Tuple of number of segments created and list of saved filenames
            
        Raises:
            AudioProcessingError: If processing fails
        """
        try:
            # Load and resample audio
            self.logger.info("Starting audio processing")
            audio, sr = self.load_and_resample(input_file)
            
            # Detect voice activity segments
            self.logger.info("Detecting voice activity")
            segments = self.detect_voice_activity(audio, sr)
            
            if not segments:
                self.logger.warning("No voice segments detected in the audio file")
                return 0, []
            
            # Save segments and get list of saved files
            self.logger.info("Saving segments")
            saved_files = self.save_segments(audio, sr, segments, output_dir, filename_prefix)
            
            return len(segments), saved_files
            
        except Exception as e:
            raise AudioProcessingError(f"Failed to process audio file: {str(e)}")

def main():
    try:
        # Initialize processor
        processor = AudioProcessor(
            target_sr=16000,
            min_segment_length=20,
            max_segment_length=30
        )
        
        # Mount Google Drive
        processor.mount_drive()
        
        # Configure paths
        input_file = '/content/drive/MyDrive/path/to/your/audio.mp3'  # Update this
        output_dir = '/content/drive/MyDrive/path/to/output'  # Update this
        filename_prefix = 'training'  # This will create files like training-000.wav, training-001.wav, etc.
        
        # Process audio file
        num_segments, saved_files = processor.process_audio_file(input_file, output_dir, filename_prefix)
        
        print(f"\nProcessing complete!")
        print(f"Created {num_segments} segments.")
        print("\nSaved files:")
        for filename in saved_files:
            print(f"- {filename}")
            
    except AudioProcessingError as e:
        print(f"\nError during audio processing: {str(e)}")
        logging.error(f"Audio processing failed: {str(e)}")
    except Exception as e:
        print(f"\nUnexpected error: {str(e)}")
        logging.error(f"Unexpected error: {str(e)}", exc_info=True)

if __name__ == "__main__":
    main()