In [1]:
import torch
import numpy as np
import librosa
from transformers import pipeline, AutoModelForSpeechSeq2Seq, AutoProcessor
from typing import Dict, Any

import argparse
import sys
import os
import ffmpeg

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def transcribe_audio(file_path):
    print("Starting transcription...")

    device = "cuda:0" if torch.cuda.is_available() else "cpu"
    torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

    #model_id = "/gpfs/project/mukha102/CrisperWhisper/nyra_model"
    model_id = "nyrahealth/CrisperWhisper"

    # Load model and processor with eager attention implementation
    model = AutoModelForSpeechSeq2Seq.from_pretrained(
        model_id,
        torch_dtype = torch_dtype,
        low_cpu_mem_usage = True,
        use_safetensors = True,
        attn_implementation = "eager"
    )
    model.to(device)

    processor = AutoProcessor.from_pretrained(model_id)

    pipe = pipeline(
        "automatic-speech-recognition",
        model = model,
        tokenizer = processor.tokenizer,
        feature_extractor = processor.feature_extractor,
        chunk_length_s = 30,
        batch_size = 1,  # Reduced batch size
        return_timestamps = "word",
        torch_dtype = torch_dtype,
        device = device,
    )

    result = pipe(file_path)
    torch.cuda.empty_cache()  # Free up memory after each transcription
    return result

In [3]:
# === CELL 3: Run transcription interactively in Jupyter ===

# You can manually set your audio file path here:
file_path = r"C:\Users\pryce\PycharmProjects\LostInTranscription\data\audio\AAHP 005A Mattie Williams 1-16-2010.mp3"  # <-- Replace with your file path

# Optionally: Use an upload widget for convenience
# from IPython.display import display
# import ipywidgets as widgets
# uploader = widgets.FileUpload(accept='.mp3,.wav', multiple=False)
# display(uploader)

# --- Validation ---
if not os.path.exists(file_path):
    raise FileNotFoundError(f"The file '{file_path}' does not exist.")

# --- Run transcription ---
try:
    transcription = transcribe_audio(file_path)

    # Save the transcription output as a JSON file next to the audio
    output_path = file_path.rsplit(".", 1)[0] + "_transcription.json"
    with open(output_path, "w") as f:
        import json
        json.dump(transcription, f, indent=2)

    print(f"✅ Transcription complete! Saved to {output_path}")

    # Display a summary of the result directly in the notebook
    if isinstance(transcription, dict) and "text" in transcription:
        print("\n--- TRANSCRIPTION PREVIEW ---\n")
        print(transcription["text"][:1000])  # show first 1000 chars
    else:
        print("\nTranscription output structure:\n", transcription)

except Exception as e:
    print(f"❌ An error occurred while transcribing the audio: {e}")


Starting transcription...


`torch_dtype` is deprecated! Use `dtype` instead!
`torch_dtype` is deprecated! Use `dtype` instead!
Device set to use cuda:0


❌ An error occurred while transcribing the audio: ffmpeg was not found but is required to load audio files from filename


In [2]:
device = "cuda:0" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if "cuda" in device else torch.float32
model_id = "nyrahealth/CrisperWhisper"

model = AutoModelForSpeechSeq2Seq.from_pretrained(
    model_id,
    torch_dtype = torch_dtype,
    low_cpu_mem_usage = True,
    use_safetensors = True,
)

torch.cuda.empty_cache()
model.to(device)
processor = AutoProcessor.from_pretrained(model_id)

asr_pipe = pipeline(
    task = "automatic-speech-recognition",
    model = model,
    tokenizer = processor.tokenizer,
    feature_extractor = processor.feature_extractor,
    chunk_length_s = 30,  # on model card
    batch_size = 16,  # on model card
    return_timestamps = "word",
    device = 0 if "cuda" in device else -1,
)

`torch_dtype` is deprecated! Use `dtype` instead!
Device set to use cuda:0


In [3]:
print(torch.cuda.is_available())
print(torch.__version__)

True
2.8.0+cu128


In [4]:
print(model.config.max_length)

448


In [5]:
# Load Audio with sample rate 16,000, standard for CrisperWhisper with librosa

def load_audio(path: str, target_sr: int = 16000):
    audio, sr = librosa.load(path, sr = None, mono = True)
    if sr != target_sr:
        audio = librosa.resample(audio, orig_sr = sr, target_sr = target_sr)
        sr = target_sr
    return audio, sr

In [6]:
# Create audio chunks of a specified length in seconds with some overlap

def chunk_audio(audio: "np.ndarray", sr: int, chunk_s: int, overlap_s: float = 0.5):
    step = int((chunk_s - overlap_s) * sr)
    chunk_len = int(chunk_s * sr)
    chunks = []
    start = 0
    while start < len(audio):
        end = min(start + chunk_len, len(audio))
        chunk = audio[start:end]

        # pad chunk with zeroes if it's shorter than chunk_len
        if end == len(audio):
            pad = [float(0) for i in range((start + chunk_len) - len(audio))]
            chunk = np.append(chunk, pad)
        chunks.append((chunk, start / sr))  # (samples, start_time_seconds)

        if end == len(audio):
            break
        start += step
    return chunks

In [7]:
# Format pipeline outputs into readable verbatim transcript with word level timestamps

# walk through in debug after the pipeline is implemented to eliminate unnecessary functionality
def format_verbatim_output(hf_out: Dict[str, Any]):
    # hf_out expected to include 'text' and 'chunks' where each chunk has 'timestamps' at word level
    lines = []
    for chunk in hf_out.get("chunks", []):
        chunk_start = chunk.get("timestamp", (None, None))[0]
        words = chunk.get("words") or chunk.get("timestamps") or []
        for w in words:
            # try to support both shapes
            if isinstance(w, dict):
                word_text = w.get("word") or w.get("text")
                ts = w.get("timestamp") or w.get("times") or w.get("start_end")
                if isinstance(ts, (list, tuple)) and len(ts) >= 2:  # if the pipeline stores words with timestamp tuples
                    start_ts = ts[0]
                else:
                    start_ts = None
            elif isinstance(w, (list, tuple)) and len(w) >= 3:  # [word, start, end]
                word_text = w[0]
                start_ts = w[1]
            else:  # fallback
                word_text = str(w)
                start_ts = None
            if start_ts is None:
                line = f"{word_text}"
            else:
                line = f"[{start_ts:.3f}] {word_text}"
            lines.append(line)
    return "\n".join(lines)

In [8]:
def transcribe_audio(
    audio_path: str,
    asr_pipeline,
    chunk_length_s: int = 30,
    overlap_s: float = 0.5,
) -> str:
    audio, sr = load_audio(audio_path, target_sr = 16000)
    audio_chunks = chunk_audio(audio, sr, chunk_s = chunk_length_s, overlap_s = overlap_s)

    print("Chunks:", len(audio_chunks))
    print("Chunk Size:", chunk_length_s * sr)
    for chunk, start in audio_chunks:
        if chunk.size != chunk_length_s * sr:
            print("Bad Size:", chunk.size)

    full_chunks = []
    for chunk, start in audio_chunks:
        sample = {"array": chunk.astype("float32"), "sampling_rate": sr}
        hf_out = asr_pipeline(sample)
        print(hf_out)

        # shift timestamps by chunk_start
        if "chunks" in hf_out:
            for c in hf_out["chunks"]:
                if "words" in c:
                    for w in c["words"]:
                        ts = w.get("timestamp") or w.get("times") or w.get("start_end")
                        if isinstance(ts, (list, tuple)) and len(ts) >= 2:
                            # shift both start and end timestamps
                            w_ts0 = ts[0] + start
                            w_ts1 = ts[1] + start
                            w["timestamp"] = (w_ts0, w_ts1)
        hf_out["_chunk_start"] = start
        full_chunks.append(hf_out)

    combined = {"chunks": []}
    for out in full_chunks:
        if "chunks" in out:
            combined["chunks"].extend(out["chunks"])
        else:
            if "timestamps" in out:
                combined["chunks"].append ({
                        "timestamp": (out.get("_chunk_start", 0), None),
                        "words": out["timestamps"],
                    })
            else:
                combined["chunks"].append ({
                    "timestamp": (out.get("_chunk_start", 0), None),
                    "words": [{"word": out.get("text", "").strip()
                    }]}
                )

    transcript = format_verbatim_output(combined)
    return transcript

In [9]:
audio_file = r"C:\Users\pryce\PycharmProjects\LostInTranscription\data\audio\AAHP 005A Mattie Williams 1-16-2010.mp3"
output_txt = r"C:\Users\pryce\PycharmProjects\LostInTranscription\data\WER0\001_test.txt"

print("Transcribing:", audio_file)
transcript_text = transcribe_audio(audio_file, asr_pipe, chunk_length_s = 30, overlap_s = 0.5)

# display first 50 lines
print("---- Transcript preview ----")
print("\n".join(transcript_text.splitlines()[:50]))
print("... (truncated, first 50 lines)")

# save
with open(output_txt, "w", encoding = "utf-8") as f:
    f.write(transcript_text)

print(f"Saved full transcript to: {output_txt}")

Transcribing: C:\Users\pryce\PycharmProjects\LostInTranscription\data\audio\AAHP 001Rosa Williams 4-28-2009.mp3
Chunks: 86
Chunk Size: 480000


Using custom `forced_decoder_ids` from the (generation) config. This is deprecated in favor of the `task` and `language` flags/config options.
Transcription using a multilingual Whisper will default to language detection followed by transcription instead of translation to English. This might be a breaking change for your use case. If you want to instead always translate your audio to English, make sure to pass `language='en'`. See https://github.com/huggingface/transformers/pull/28687 for more details.


RuntimeError: The size of tensor a (2) must match the size of tensor b (0) at non-singleton dimension 1