In [None]:
import os
import sys
import torch
from IPython.display import Audio

# remember to open the "venv" (source venv/bin/activate) then (where required) 
# basic requirements
#    pip install torch torchaudio transformers datasets accelerate
# I think torch has buch of native cuda stuff for NVIDIA cards which I have not had for years

############## # For Integrated GPUs
device = "cpu"
torch_dtype = torch.float32
##############

############## # For NVIDIA GPU
#device = "cuda:0" if torch.cuda.is_available() else "cpu"
#torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32
#############


In [None]:
from datasets import load_dataset
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline

In [None]:
def adjust_pauses_for_hf_pipeline_output(pipeline_output, split_threshold=0.12):
    """
    Adjust pause timings by distributing pauses up to the threshold evenly between adjacent words.
    """

    adjusted_chunks = pipeline_output["chunks"].copy()

    for i in range(len(adjusted_chunks) - 1):
        current_chunk = adjusted_chunks[i]
        next_chunk = adjusted_chunks[i + 1]

        current_start, current_end = current_chunk["timestamp"]
        next_start, next_end = next_chunk["timestamp"]
        pause_duration = next_start - current_end

        if pause_duration > 0:
            if pause_duration > split_threshold:
                distribute = split_threshold / 2
            else:
                distribute = pause_duration / 2

            # Adjust current chunk end time
            adjusted_chunks[i]["timestamp"] = (current_start, current_end + distribute)

            # Adjust next chunk start time
            adjusted_chunks[i + 1]["timestamp"] = (next_start - distribute, next_end)
    pipeline_output["chunks"] = adjusted_chunks

    return pipeline_output




In [None]:
## Request access to the model at nyrahealth/CrisperWhisper

In [None]:
#from huggingface_hub import login
#login()

In [None]:
# initialise model
model_id = "nyrahealth/CrisperWhisper"
model = AutoModelForSpeechSeq2Seq.from_pretrained(
    model_id, 
    torch_dtype=torch_dtype, 
    low_cpu_mem_usage=True,
    use_safetensors=True,
    attn_implementation="eager"
)


In [None]:

model.to(device)

In [None]:

processor = AutoProcessor.from_pretrained(model_id)

pipe = pipeline(
    "automatic-speech-recognition", # Type of task
    model=model,                    # The loaded CrisperWhisper model
    tokenizer=processor.tokenizer,  # Converts text to tokens
    feature_extractor=processor.feature_extractor, # Processes audio input
    chunk_length_s=30,              # Process audio in 30-second chunks
    batch_size=16,                  # Number of chunks processed at once
    return_timestamps='word',       # Get word-level timestamps
    torch_dtype=torch_dtype,        # Precision (float16 for GPU, float32 for CPU)
    device=device,                  # Computing device (GPU/CPU)
    model_kwargs={"language": "en"}
  )



In [None]:
# Audio file transcript test
audio_path = '/home/diego/CRIISP-WP6/data/'
transcript_path = '/home/diego/CRIISP-WP6/data/transcripts/'
audio_file = '/home/diego/CRIISP-WP6/data/audiotest.wav'
Audio(audio_file)

In [None]:
hf_pipeline_output = pipe(audio_file)

In [None]:
transcription_data = adjust_pauses_for_hf_pipeline_output(hf_pipeline_output)

In [None]:
print(transcription_data)

In [None]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import librosa
import numpy as np

def plot_interactive_waveform(audio_path, transcription_data):
    # Load the audio file
    y, sr = librosa.load(audio_path)

    # Create time array
    time = np.arange(len(y)) / sr

    # Create figure
    fig = make_subplots(rows=2, cols=1, shared_xaxes=True)

    # Add waveform
    fig.add_trace(
        go.Scatter(x=time, y=y, name='Waveform'),
        row=1, col=1
    )

    # Add word segments
    for chunk in transcription_data['chunks']:
        start_time, end_time = chunk['timestamp']
        text = chunk['text']

        # Add segment highlight
        fig.add_vrect(
            x0=start_time,
            x1=end_time,
            fillcolor="rgba(0,0,255,0.1)",
            layer="below",
            line_width=0,
            row=2, col=1
        )

        # Add text annotation
        fig.add_annotation(
            x=start_time,
            y=0,
            text=text,
            showarrow=False,
            textangle=45,
            row=2, col=1
        )

    fig.update_layout(
        title='Interactive Waveform with Word-Level Annotations',
        height=800,
        showlegend=False
    )

    fig.show()

# Usage:
# plot_interactive_waveform('your_audio_file.wav', transcription_data)


In [None]:
plot_interactive_waveform(audio_file, transcription_data)

In [None]:
import os
import json
from datetime import datetime

def save_transcription(transcription_data, transcript_path, file_identifier=None, format_type="text_only"):
    """
    Save transcription data to the specified path

    Args:
        transcription_data: The CrisperWhisper output dictionary
        transcript_path: Directory path to save transcripts
        file_identifier: Identifier for the file (audio filename or timestamp)
        format_type: "text_only", "timestamped", or "json"
    """
    # Create directory if it doesn't exist
    os.makedirs(transcript_path, exist_ok=True)

    # Generate identifier if not provided
    if file_identifier is None:
        file_identifier = datetime.now().strftime("%Y%m%d_%H%M%S")

    # Create full output path with appropriate extension
    extension = ".json" if format_type == "json" else ".txt"
    filename = f"{file_identifier}_{format_type}{extension}"
    output_file_path = os.path.join(transcript_path, filename)

    if format_type == "text_only":
        with open(output_file_path, 'w', encoding='utf-8') as f:
            f.write(transcription_data["text"])

    elif format_type == "timestamped":
        with open(output_file_path, 'w', encoding='utf-8') as f:
            for chunk in transcription_data["chunks"]:
                start, end = chunk["timestamp"]
                end_str = f"{end:.2f}" if end is not None else "END"
                f.write(f"[{start:.2f}-{end_str}] {chunk['text']}\n")

    elif format_type == "json":
        with open(output_file_path, 'w', encoding='utf-8') as f:
            json.dump(transcription_data, f, indent=2)

    else:
        raise ValueError("format_type must be 'text_only', 'timestamped', or 'json'")

    return output_file_path




In [None]:
# Assuming the audio file is named "interview1.wav"
output_file = save_transcription(transcription_data, 
                                '/home/diego/CRIISP-WP6/data/transcripts/',
                                file_identifier="audiotest", 
                                format_type="json") # format_type: "text_only", "timestamped", or "json"
print(f"Saved transcript to: {output_file}")