In [1]:
# Fix download with resume capability and checks
import os
import tarfile
import requests

libri_dir = "/content/librispeech"
os.makedirs(libri_dir, exist_ok=True)

def download_file(url, destination):
    """Download file with resume capability"""
    print(f"📥 Downloading from {url}...")

    # Head request to get file size
    response = requests.head(url)
    file_size = int(response.headers.get('content-length', 0))

    # Check if file already exists
    if os.path.exists(destination):
        current_size = os.path.getsize(destination)
        if current_size == file_size:
            print(f"✅ File already downloaded: {destination}")
            return True
        else:
            print(f"🔄 Resuming download... ({current_size}/{file_size} bytes)")

    # Download with progress
    headers = {}
    if os.path.exists(destination):
        current_size = os.path.getsize(destination)
        headers = {'Range': f'bytes={current_size}-'}

    response = requests.get(url, headers=headers, stream=True)
    mode = 'ab' if headers else 'wb'

    with open(destination, mode) as f:
        for chunk in response.iter_content(chunk_size=8192):
            if chunk:
                f.write(chunk)

    print(f"✅ Download completed: {destination}")
    return True

# Download LibriSpeech
url = "https://www.openslr.org/resources/12/train-clean-100.tar.gz"
file_path = f"{libri_dir}/train-clean-100.tar.gz"

# Download the file
download_file(url, file_path)

# Verify file size
file_size = os.path.getsize(file_path)
print(f"📦 File size: {file_size} bytes")

# Extract
print("🔄 Extracting...")
try:
    with tarfile.open(file_path, 'r:gz') as tar:
        tar.extractall(libri_dir)
    print("✅ Extraction successful!")
except Exception as e:
    print(f"❌ Extraction failed: {e}")
    print("🔄 Trying alternative extraction method...")
    !tar -xzf {file_path} -C {libri_dir}

print(f"📁 Dataset ready at: {libri_dir}")

📥 Downloading from https://www.openslr.org/resources/12/train-clean-100.tar.gz...
✅ File already downloaded: /content/librispeech/train-clean-100.tar.gz
📦 File size: 6387309499 bytes
🔄 Extracting...


  tar.extractall(libri_dir)


✅ Extraction successful!
📁 Dataset ready at: /content/librispeech


In [28]:
# First, let's explore the dataset structure
import os

libri_path = "/content/librispeech/LibriSpeech"

def explore_dataset(path):
    print("📁 Dataset Structure:")
    for root, dirs, files in os.walk(path):
        level = root.replace(path, '').count(os.sep)
        indent = ' ' * 2 * level
        print(f"{indent}{os.path.basename(root)}/")
        subindent = ' ' * 2 * (level + 1)
        for file in files[:5]:  # Show first 5 files
            if file.endswith('.flac') or file.endswith('.txt'):
                print(f"{subindent}{file}")
        if len(files) > 5:
            print(f"{subindent}... and {len(files) - 5} more files")

explore_dataset(libri_path)

# Check audio files
print("\n🎵 Checking audio files...")
audio_files = []
for root, dirs, files in os.walk(libri_path):
    for file in files:
        if file.endswith('.flac'):
            audio_files.append(os.path.join(root, file))

print(f"Total audio files found: {len(audio_files)}")
if audio_files:
    print(f"Sample file: {audio_files[0]}")

📁 Dataset Structure:
LibriSpeech/
  train-clean-100/
    5456/
      58161/
        5456-58161-0006.flac
        5456-58161-0019.flac
        5456-58161-0016.flac
        5456-58161-0011.flac
        5456-58161-0007.flac
        ... and 25 more files
      24741/
        5456-24741-0004.flac
        5456-24741-0022.flac
        5456-24741-0019.flac
        5456-24741-0012.flac
        5456-24741-0010.flac
        ... and 21 more files
      62043/
        5456-62043-0017.flac
        5456-62043-0002.flac
        5456-62043-0023.flac
        5456-62043-0013.flac
        5456-62043-0032.flac
        ... and 35 more files
      62014/
        5456-62014.trans.txt
        5456-62014-0004.flac
        5456-62014-0013.flac
        5456-62014-0006.flac
        5456-62014-0005.flac
        ... and 15 more files
    7148/
      59157/
        7148-59157-0040.flac
        7148-59157-0021.flac
        7148-59157.trans.txt
        7148-59157-0014.flac
        7148-59157-0006.flac
        ... and 3

In [33]:
# Install required dependencies
!pip install -q transformers datasets torch torchaudio librosa soundfile evaluate jiwer gradio

import torch
import numpy as np
import librosa
from datasets import Dataset
from transformers import (
    WhisperFeatureExtractor, WhisperTokenizer, WhisperProcessor,
    WhisperForConditionalGeneration, Seq2SeqTrainingArguments,
    Seq2SeqTrainer
)
from dataclasses import dataclass
from typing import Any, Dict, List, Union
import evaluate
import gradio as gr
import os

print("✅ All dependencies installed!")
print(f"🎯 CUDA available: {torch.cuda.is_available()}")

# Prepare the dataset
def prepare_librispeech_dataset(data_path, num_samples=500):
    """Prepare LibriSpeech dataset for training"""
    print("📁 Preparing LibriSpeech dataset...")

    audio_paths = []
    texts = []

    # Walk through all directories
    for speaker_dir in os.listdir(data_path):
        speaker_path = os.path.join(data_path, speaker_dir)
        if not os.path.isdir(speaker_path):
            continue

        for chapter_dir in os.listdir(speaker_path):
            chapter_path = os.path.join(speaker_path, chapter_dir)
            if not os.path.isdir(chapter_path):
                continue

            # Look for .trans.txt file
            trans_file = os.path.join(chapter_path, f"{speaker_dir}-{chapter_dir}.trans.txt")

            if os.path.exists(trans_file):
                with open(trans_file, 'r') as f:
                    for line in f:
                        if len(audio_paths) >= num_samples:
                            break
                        line = line.strip()
                        if line:
                            parts = line.split(' ', 1)
                            if len(parts) == 2:
                                audio_id, text = parts
                                audio_file = os.path.join(chapter_path, f"{audio_id}.flac")
                                if os.path.exists(audio_file):
                                    audio_paths.append(audio_file)
                                    texts.append(text)
            if len(audio_paths) >= num_samples:
                break
        if len(audio_paths) >= num_samples:
            break

    print(f"✅ Found {len(audio_paths)} audio-text pairs")
    return audio_paths, texts

# Load data
audio_paths, texts = prepare_librispeech_dataset("/content/librispeech/LibriSpeech/train-clean-100", 500)

# Create dataset
dataset = Dataset.from_dict({
    'audio_path': audio_paths,
    'text': texts
})

print("Sample data:")
for i in range(min(3, len(dataset))):
    print(f"Text: {dataset[i]['text'][:80]}...")
    print("---")

# Load Whisper model and processor
print("🧠 Loading Whisper model...")
model_name = "openai/whisper-small"
feature_extractor = WhisperFeatureExtractor.from_pretrained(model_name)
tokenizer = WhisperTokenizer.from_pretrained(model_name, language="english", task="transcribe")
processor = WhisperProcessor.from_pretrained(model_name, language="english", task="transcribe")
model = WhisperForConditionalGeneration.from_pretrained(model_name)

# Configure model
model.config.forced_decoder_ids = None
model.config.suppress_tokens = []
model.generation_config.forced_decoder_ids = processor.get_decoder_prompt_ids(language="english", task="transcribe")

print("✅ Model loaded!")

# Preprocess function
def prepare_dataset(batch):
    """Preprocess audio and text"""
    audio_path = batch["audio_path"]
    text = batch["text"]

    try:
        # Load audio file
        audio_array, sampling_rate = librosa.load(audio_path, sr=16000)

        # Compute input features
        input_features = feature_extractor(
            audio_array,
            sampling_rate=16000
        ).input_features[0]

        # Encode target text
        labels = tokenizer(text).input_ids

        return {
            "input_features": input_features,
            "labels": labels
        }
    except Exception as e:
        print(f"Error processing {audio_path}: {e}")
        return None

# Preprocess dataset
print("🔄 Preprocessing dataset...")
processed_data = []
for i in range(len(dataset)):
    result = prepare_dataset(dataset[i])
    if result is not None:
        processed_data.append(result)

print(f"✅ Processed {len(processed_data)} samples")

# Create new dataset from processed data
dataset = Dataset.from_list(processed_data)

# Split dataset
dataset_split = dataset.train_test_split(test_size=0.2, seed=42)
train_dataset = dataset_split["train"]
eval_dataset = dataset_split["test"]

print(f"✅ Train samples: {len(train_dataset)}")
print(f"✅ Eval samples: {len(eval_dataset)}")

# Verify sample
print("\nVerifying sample structure:")
sample = train_dataset[0]
print(f"input_features type: {type(sample['input_features'])}")
print(f"input_features shape: {np.array(sample['input_features']).shape}")
print(f"labels type: {type(sample['labels'])}")
print(f"labels length: {len(sample['labels'])}")

# Custom Data Collator for Whisper
@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # Split inputs and labels
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]

        # Pad input features
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # Pad labels
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # Replace padding with -100 to ignore loss
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        # If bos token is appended in previous tokenization step,
        # cut bos token here as it's append later anyways
        if (labels[:, 0] == self.processor.tokenizer.bos_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch

# Setup training
print("⚙️ Setting up training...")

# Metrics
wer_metric = evaluate.load("wer")

def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids

    # Replace -100 with pad token
    label_ids[label_ids == -100] = tokenizer.pad_token_id

    # Decode predictions and labels
    pred_str = tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = tokenizer.batch_decode(label_ids, skip_special_tokens=True)

    # Compute WER
    wer = wer_metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

# Data collator
data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)

# Training arguments
training_args = Seq2SeqTrainingArguments(
    output_dir="/content/whisper-librispeech-finetuned",
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    gradient_accumulation_steps=2,
    learning_rate=1e-5,
    warmup_steps=50,
    max_steps=200,
    gradient_checkpointing=False,  # Disabled to avoid backward graph issues
    fp16=torch.cuda.is_available(),
    eval_strategy="steps",
    eval_steps=50,
    save_strategy="steps",
    save_steps=50,
    logging_steps=25,
    predict_with_generate=True,
    generation_max_length=225,
    load_best_model_at_end=False,
    push_to_hub=False,
    report_to=["tensorboard"],
    remove_unused_columns=False,
)

print("✅ Training arguments set!")

# Create trainer
print("🚀 Creating trainer...")
trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    processing_class=processor.feature_extractor,  # Updated parameter name
)

# Start training
print("🎯 Starting training...")
training_result = trainer.train()

print("✅ Fine-tuning completed!")
print(f"Final training loss: {training_result.training_loss:.4f}")

# Save model
print("💾 Saving model...")
trainer.save_model("/content/whisper-librispeech-finetuned")
processor.save_pretrained("/content/whisper-librispeech-finetuned")

# Test model
print("🧪 Testing model...")
fine_tuned_model = WhisperForConditionalGeneration.from_pretrained("/content/whisper-librispeech-finetuned")
fine_tuned_processor = WhisperProcessor.from_pretrained("/content/whisper-librispeech-finetuned")

# Move model to GPU if available
if torch.cuda.is_available():
    fine_tuned_model = fine_tuned_model.to("cuda")

print("Model loaded and ready!")

# Gradio app
def transcribe_audio(audio_file):
    if audio_file is None:
        return "Please upload or record audio"

    try:
        audio_array, sampling_rate = librosa.load(audio_file, sr=16000)
        inputs = fine_tuned_processor(
            audio_array,
            sampling_rate=16000,
            return_tensors="pt"
        )

        # Move to GPU if available
        if torch.cuda.is_available():
            inputs = inputs.to("cuda")

        with torch.no_grad():
            predicted_ids = fine_tuned_model.generate(inputs.input_features)

        transcription = fine_tuned_processor.batch_decode(
            predicted_ids,
            skip_special_tokens=True
        )[0]
        return transcription

    except Exception as e:
        return f"Error: {str(e)}"

print("🎨 Launching Gradio app...")
iface = gr.Interface(
    fn=transcribe_audio,
    inputs=gr.Audio(sources=["upload", "microphone"], type="filepath"),
    outputs=gr.Textbox(label="Transcription"),
    title="🎤 Fine-tuned Whisper Speech Recognition",
    description="Upload audio or record to get transcription from fine-tuned Whisper model"
)

iface.launch(share=True)

✅ All dependencies installed!
🎯 CUDA available: True
📁 Preparing LibriSpeech dataset...
✅ Found 500 audio-text pairs
Sample data:
Text: THE DIAMOND WEDDING BY EDMUND CLARENCE STEDMAN O LOVE LOVE LOVE WHAT TIMES WERE ...
---
Text: YOU MARRIED PSYCHE UNDER THE ROSE WITH ONLY THE GRASS FOR BEDDING HEART TO HEART...
---
Text: SO HAVE WE READ IN CLASSIC OVID HOW HERO WATCHED FOR HER BELOVED IMPASSIONED YOU...
---
🧠 Loading Whisper model...
✅ Model loaded!
🔄 Preprocessing dataset...
✅ Processed 500 samples
✅ Train samples: 400
✅ Eval samples: 100

Verifying sample structure:
input_features type: <class 'list'>
input_features shape: (80, 3000)
labels type: <class 'list'>
labels length: 70
⚙️ Setting up training...
✅ Training arguments set!
🚀 Creating trainer...
🎯 Starting training...


You're using a WhisperTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.


Step,Training Loss,Validation Loss,Wer
50,0.6452,0.442556,0.143227
100,0.2608,0.319572,0.101102
150,0.1229,0.264208,0.10499
200,0.0676,0.24465,0.103694


Using custom `forced_decoder_ids` from the (generation) config. This is deprecated in favor of the `task` and `language` flags/config options.
The attention mask is not set and cannot be inferred from input because pad token is same as eos token. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.


✅ Fine-tuning completed!
Final training loss: 0.3511
💾 Saving model...
🧪 Testing model...
Model loaded and ready!
🎨 Launching Gradio app...
Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://84e46913734b5a1ae6.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


