<a href="https://colab.research.google.com/github/devloperAnu/Sample_to_target/blob/main/updatedSTT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# Install ffmpeg in Google Colab
!apt-get install -y ffmpeg

# Install pydub using pip
!pip install pydub # This line installs the necessary library

# Import libraries
import os
import numpy as np
import librosa
import soundfile as sf
from google.colab import files
from pydub import AudioSegment # Now this import should work
from pydub.exceptions import CouldntDecodeError

# Function to check if ffmpeg is installed
def check_ffmpeg():
    """
    Check if ffmpeg is available in the system.

    Returns:
        bool: True if ffmpeg is found, False otherwise.
    """
    import shutil
    return shutil.which("ffmpeg") is not None

# Verify ffmpeg installation
if not check_ffmpeg():
    raise RuntimeError("ffmpeg is not installed or not found in PATH. Please install ffmpeg.")

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
ffmpeg is already the newest version (7:4.4.2-0ubuntu0.22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 34 not upgraded.
Collecting pydub
  Downloading pydub-0.25.1-py2.py3-none-any.whl.metadata (1.4 kB)
Downloading pydub-0.25.1-py2.py3-none-any.whl (32 kB)
Installing collected packages: pydub
Successfully installed pydub-0.25.1


In [27]:
# Upload files one by one
try:
    print("Please upload sample.mp3")
    uploaded = files.upload()
    if not uploaded:
        raise ValueError("No sample.mp3 uploaded!")
    sample_file = list(uploaded.keys())[0]

    print("Please upload target.mp3")
    uploaded = files.upload()
    if not uploaded:
        raise ValueError("No target.mp3 uploaded!")
    target_file = list(uploaded.keys())[0]

except Exception as e:
    print(f"Error during file upload: {str(e)}")
    raise

# Validate uploaded files
if not sample_file.lower().endswith('.mp3') or not target_file.lower().endswith('.mp3'):
    raise ValueError("Uploaded files must be MP3 files!")

# Define paths for conversions
output_file = "output.mp3"
sample_wav = "sample.wav"
target_wav = "target.wav"
converted_wav = "converted.wav"

Please upload sample.mp3


Saving khansamp.mp3 to khansamp.mp3
Please upload target.mp3


Saving target.mp3 to target (4).mp3


In [28]:
import os
import shutil
import subprocess
import json
from pydub import AudioSegment
from pydub.exceptions import CouldntDecodeError

# Function to check if ffmpeg and ffprobe are installed
def check_ffmpeg():
    """
    Check if ffmpeg and ffprobe are available in the system.

    Returns:
        bool: True if both ffmpeg and ffprobe are found, False otherwise.
    """
    ffmpeg_paths = ["ffmpeg", "/usr/bin/ffmpeg", "/usr/local/bin/ffmpeg"]
    ffprobe_paths = ["ffprobe", "/usr/bin/ffprobe", "/usr/local/bin/ffprobe"]
    ffmpeg_found = any(shutil.which(path) for path in ffmpeg_paths)
    ffprobe_found = any(shutil.which(path) for path in ffprobe_paths)
    return ffmpeg_found and ffprobe_found

# Function to probe audio file for diagnostics and format detection
def probe_audio(mp3_path: str) -> dict:
    """
    Run ffprobe to diagnose audio file and detect its format and codec.

    Args:
        mp3_path (str): Path to the audio file.

    Returns:
        dict: Contains format, codec, and diagnostics or error message.
    """
    if not check_ffmpeg():
        return {"error": "Cannot probe file: ffprobe not found.", "format": None, "codec": None}
    try:
        result = subprocess.run(
            ["ffprobe", "-i", mp3_path, "-show_streams", "-show_format", "-print_format", "json"],
            capture_output=True,
            text=True,
            check=False,
            timeout=10
        )
        diagnostics = result.stderr or result.stdout or "No diagnostic output available."
        if result.returncode != 0:
            return {"error": diagnostics, "format": None, "codec": None}

        # Parse ffprobe JSON output
        try:
            probe_data = json.loads(result.stdout)
            format_name = probe_data.get("format", {}).get("format_name")
            audio_stream = next((stream for stream in probe_data.get("streams", []) if stream.get("codec_type") == "audio"), None)
            codec_name = audio_stream.get("codec_name") if audio_stream else None
            return {"format": format_name, "codec": codec_name, "diagnostics": diagnostics}
        except json.JSONDecodeError:
            return {"error": "Failed to parse ffprobe output.", "format": None, "codec": None}
    except subprocess.TimeoutExpired:
        return {"error": f"Probing {mp3_path} timed out after 10 seconds.", "format": None, "codec": None}
    except Exception as e:
        return {"error": f"Failed to probe {mp3_path}: {str(e)}", "format": None, "codec": None}

# Function to convert audio file to WAV
def audio_to_wav(audio_path: str, wav_path: str) -> None:
    """
    Convert an audio file (MP3, AAC, etc.) to WAV format.

    Args:
        audio_path (str): Path to the input audio file.
        wav_path (str): Path to save the output WAV file.

    Raises:
        FileNotFoundError: If the input file or output directory does not exist.
        ValueError: If the input file is invalid.
        RuntimeError: If conversion fails due to invalid file or missing dependencies.
    """
    # Validate input file
    if not audio_path or not isinstance(audio_path, str):
        raise ValueError(f"Invalid audio path: {audio_path}")
    if not os.path.exists(audio_path):
        raise FileNotFoundError(f"Input file not found: {audio_path}")
    if os.path.getsize(audio_path) == 0:
        raise ValueError(f"Input file is empty: {audio_path}")

    # Validate output directory
    output_dir = os.path.dirname(wav_path) or '.'
    if not os.path.exists(output_dir):
        raise FileNotFoundError(f"Output directory does not exist: {output_dir}")
    if not os.access(output_dir, os.W_OK):
        raise PermissionError(f"No write permission for output directory: {output_dir}")

    # Check if output file already exists
    if os.path.exists(wav_path):
        print(f"Warning: Overwriting existing file: {wav_path}")

    # Check for ffmpeg
    if not check_ffmpeg():
        raise RuntimeError("ffmpeg or ffprobe not installed or not found in PATH. Please install ffmpeg.")

    # Probe file to detect format and codec
    probe_result = probe_audio(audio_path)
    if probe_result.get("error"):
        print(f"Probe error for {audio_path}: {probe_result['error']}")
    else:
        print(f"Detected format for {audio_path}: {probe_result['format']}, codec: {probe_result['codec']}")

    # Warn if extension doesn't match expected audio formats
    if not audio_path.lower().endswith(('.mp3', '.mpeg', '.m4a', '.aac', '.3gp', '.mp4')):
        print(f"Warning: {audio_path} has a non-standard audio extension, attempting conversion anyway.")

    try:
        # Attempt conversion with auto-detected format
        audio = AudioSegment.from_file(audio_path)
        audio.export(wav_path, format="wav")
        print(f"Successfully converted {audio_path} to {wav_path}")
    except CouldntDecodeError as e:
        print(f"Error decoding {audio_path}: {str(e)}")
        diagnostics = probe_result.get("diagnostics", "No diagnostics available.")
        print(f"Diagnostics: {diagnostics}")
        print(f"Retrying with metadata stripping and relaxed parameters...")
        try:
            # Retry with metadata stripping and relaxed parameters
            audio = AudioSegment.from_file(
                audio_path,
                parameters=["-analyzeduration", "5000000", "-probesize", "5000000", "-map_metadata", "-1", "-vn"]
            )
            audio.export(wav_path, format="wav")
            print(f"Successfully converted {audio_path} to {wav_path} on retry")
        except CouldntDecodeError as retry_e:
            print(f"Retry failed, attempting minimal processing...")
            try:
                # Fallback: minimal processing, auto-detect codec
                audio = AudioSegment.from_file(
                    audio_path,
                    parameters=["-vn", "-ignore_unknown"]
                )
                audio.export(wav_path, format="wav")
                print(f"Successfully converted {audio_path} to {wav_path} with minimal processing")
            except Exception as final_e:
                raise RuntimeError(f"Failed to decode {audio_path} after all attempts: {str(final_e)}\nDiagnostics: {diagnostics}")
        except Exception as retry_e:
            raise RuntimeError(f"Retry failed for {audio_path}: {str(retry_e)}\nDiagnostics: {diagnostics}")
    except PermissionError as e:
        raise RuntimeError(f"Permission error during conversion of {audio_path}: {str(e)}")
    except Exception as e:
        raise RuntimeError(f"Failed to convert {audio_path} to WAV: {str(e)}")

# Convert uploaded audio files to WAV
try:
    # Validate input files from Cell 2
    for var_name in ["sample_file", "target_file"]:
        try:
            var_value = globals()[var_name]
            if not var_value or not isinstance(var_value, str):
                raise ValueError(f"{var_name} is not defined or invalid. Ensure files were uploaded correctly.")
            if not os.path.exists(var_value):
                raise FileNotFoundError(f"File not found: {var_value}")
            print(f"Verified file: {var_value} (Size: {os.path.getsize(var_value)} bytes)")
        except KeyError:
            raise ValueError(f"{var_name} is not defined. Ensure Cell 2 (uploads) was executed correctly.")

    # Attempt conversions, logging errors separately
    errors = []
    try:
        audio_to_wav(sample_file, sample_wav)
    except Exception as e:
        errors.append(f"Error converting {sample_file}: {str(e)}")

    try:
        audio_to_wav(target_file, target_wav)
    except Exception as e:
        errors.append(f"Error converting {target_file}: {str(e)}")

    # Report errors if any
    if errors:
        print("Conversion errors occurred:")
        for error in errors:
            print(error)
        raise RuntimeError("One or more conversions failed. See details above.")
    else:
        print("All audio files converted successfully.")

except Exception as e:
    print(f"Error during conversion setup: {str(e)}")
    raise

Verified file: khansamp.mp3 (Size: 17537381 bytes)
Verified file: target (4).mp3 (Size: 13834168 bytes)
Detected format for khansamp.mp3: mov,mp4,m4a,3gp,3g2,mj2, codec: aac
Successfully converted khansamp.mp3 to sample.wav
Detected format for target (4).mp3: mp3, codec: mp3
Successfully converted target (4).mp3 to target.wav
All audio files converted successfully.


In [29]:
# Function to analyze audio pitch
def analyze_pitch(file_path: str, sr: int = None) -> dict:
    """
    Analyze the pitch (fundamental frequency) of an audio file.

    Args:
        file_path (str): Path to the input audio file (e.g., WAV).
        sr (int, optional): Sampling rate. If None, use native sampling rate.

    Returns:
        dict: Dictionary containing the estimated pitch (in Hz) and sampling rate.

    Raises:
        FileNotFoundError: If the input file does not exist.
        ValueError: If the input file is not a WAV file.
        RuntimeError: If audio loading or pitch analysis fails.
    """
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"Audio file not found: {file_path}")
    if not file_path.lower().endswith('.wav'):
        raise ValueError(f"Input file must be a WAV file: {file_path}")

    try:
        y, sr = librosa.load(file_path, sr=sr)
        pitches, magnitudes = librosa.piptrack(y=y, sr=sr)
        if np.any(magnitudes > 0):
            max_magnitude_indices = np.argmax(magnitudes, axis=0)
            valid_pitches = pitches[max_magnitude_indices, np.arange(pitches.shape[1])]
            valid_pitches = valid_pitches[magnitudes[max_magnitude_indices, np.arange(pitches.shape[1])] > 0]
            pitch = np.mean(valid_pitches) if valid_pitches.size > 0 else 0
        else:
            pitch = 0
        if pitch == 0:
            print(f"Warning: No valid pitch detected in {file_path}")
        return {"pitch": pitch, "sr": sr}
    except Exception as e:
        raise RuntimeError(f"Failed to analyze pitch for {file_path}: {str(e)}")

# Analyze sample and target audio
try:
    sample_features = analyze_pitch(sample_wav)
    print(f"Sample audio pitch: {sample_features['pitch']:.2f} Hz (Sampling rate: {sample_features['sr']} Hz)")
    target_features = analyze_pitch(target_wav)
    print(f"Target audio pitch: {target_features['pitch']:.2f} Hz (Sampling rate: {target_features['sr']} Hz)")
except Exception as e:
    print(f"Error during pitch analysis: {str(e)}")
    raise

Sample audio pitch: 590.55 Hz (Sampling rate: 44100 Hz)
Target audio pitch: 422.12 Hz (Sampling rate: 44100 Hz)


In [30]:
# Apply pitch shift to target audio
try:
    # Calculate pitch shift (in semitones)
    sample_pitch = sample_features['pitch']
    target_pitch = target_features['pitch']
    if target_pitch > 0 and sample_pitch > 0:
        n_steps = 12 * np.log2(sample_pitch / target_pitch)
        print(f"Pitch shift required: {n_steps:.2f} semitones")
    else:
        print("Warning: Cannot compute pitch shift (invalid pitch detected)")
        n_steps = 0

    # Limit pitch shift to avoid artifacts
    if abs(n_steps) > 24:
        print(f"Warning: Pitch shift ({n_steps:.2f} semitones) is too large; limiting to ±24 semitones")
        n_steps = max(min(n_steps, 24), -24)

    # Load target audio and apply pitch shift
    y_target, sr_target = librosa.load(target_wav, sr=target_features['sr'])
    y_shifted = librosa.effects.pitch_shift(y=y_target, sr=sr_target, n_steps=n_steps)

    # Save the modified audio
    sf.write(converted_wav, y_shifted, sr_target)
    print(f"Pitch-shifted audio saved to {converted_wav}")

except Exception as e:
    print(f"Error during pitch shifting: {str(e)}")
    raise

Pitch shift required: 5.81 semitones
Pitch-shifted audio saved to converted.wav


In [31]:
# Function to convert WAV to MP3
def wav_to_mp3(wav_path: str, mp3_path: str) -> None:
    """
    Convert a WAV file to MP3 format.

    Args:
        wav_path (str): Path to the input WAV file.
        mp3_path (str): Path to save the output MP3 file.

    Raises:
        FileNotFoundError: If the input WAV file or output directory does not exist.
        ValueError: If the input file is not a WAV file or output is not an MP3 file.
        RuntimeError: If conversion fails due to invalid file or missing dependencies.
    """
    if not os.path.exists(wav_path):
        raise FileNotFoundError(f"Input WAV file not found: {wav_path}")
    if not wav_path.lower().endswith('.wav'):
        raise ValueError(f"Input file must be a WAV file: {wav_path}")
    if not mp3_path.lower().endswith('.mp3'):
        raise ValueError(f"Output file must be an MP3 file: {mp3_path}")

    output_dir = os.path.dirname(mp3_path) or '.'
    if not os.path.exists(output_dir):
        raise FileNotFoundError(f"Output directory does not exist: {output_dir}")
    if not os.access(output_dir, os.W_OK):
        raise PermissionError(f"No write permission for output directory: {output_dir}")

    if os.path.exists(mp3_path):
        print(f"Warning: Overwriting existing file: {mp3_path}")

    try:
        audio = AudioSegment.from_wav(wav_path)
        audio.export(mp3_path, format="mp3")
        print(f"Successfully converted {wav_path} to {mp3_path}")
    except Exception as e:
        raise RuntimeError(f"Failed to convert {wav_path} to MP3: {str(e)}")

# Convert pitch-shifted WAV to MP3 and clean up
try:
    wav_to_mp3(converted_wav, output_file)

    # Clean up temporary WAV files
    for temp_file in [sample_wav, target_wav, converted_wav]:
        if os.path.exists(temp_file):
            try:
                os.remove(temp_file)
                print(f"Deleted temporary file: {temp_file}")
            except Exception as e:
                print(f"Warning: Failed to delete {temp_file}: {str(e)}")
        else:
            print(f"Warning: Temporary file not found: {temp_file}")

except Exception as e:
    print(f"Error during WAV-to-MP3 conversion or cleanup: {str(e)}")
    raise

Successfully converted converted.wav to output.mp3
Deleted temporary file: sample.wav
Deleted temporary file: target.wav
Deleted temporary file: converted.wav


In [32]:
# Download the output MP3
try:
    if not os.path.exists(output_file):
        raise FileNotFoundError(f"Output file not found: {output_file}")

    print(f"Downloading {output_file}...")
    files.download(output_file)
    print(f"Successfully initiated download of {output_file}")

except FileNotFoundError as e:
    print(f"Error: {str(e)}")
except Exception as e:
    print(f"Error during download: {str(e)}")

Downloading output.mp3...


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Successfully initiated download of output.mp3
