In [None]:
import torch
from speechbox import ASRDiarizationPipeline # Only import this
# from datasets import load_dataset # Not needed if loading local files manually
from datasets import Dataset
import os
# from pyannote.audio import Pipeline # No longer needed here
from pyannote.audio import Pipeline

from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv() 
HF_TOKEN = os.getenv('HF_TOKEN')


# instantiate the pipeline
from pyannote.audio import Pipeline
pipeline = Pipeline.from_pretrained(
  "pyannote/speaker-diarization-3.1",
  use_auth_token=HF_TOKEN)

# run the pipeline on an audio file
diarization = pipeline("transcribing_1.mp3")

# dump the diarization output to disk using RTTM format
with open("audio.rttm", "w") as rttm:
    diarization.write_rttm(rttm)



  std = sequences.std(dim=-1, correction=1)


In [1]:
import whisper
import torch
from pyannote.audio import Pipeline
from pyannote.core import Segment
import os 
from dotenv import load_dotenv
# Do not show warnings
import warnings
warnings.filterwarnings("ignore")
# Load environment variables from .env file
load_dotenv() 



# --- Configuration ---
AUDIO_FILE = "MP.wav" # <--- Replace with your audio file path
WHISPER_MODEL = "tiny" # Choose 'tiny', 'base', 'small' for "Whisper Mini"
PYANNOTE_PIPELINE = "pyannote/speaker-diarization-3.1"
HF_TOKEN = os.getenv('HF_TOKEN') # <--- Replace with your token or ensure logged in via CLI

# --- Check for GPU ---
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {DEVICE}")

# --- 1. Load Diarization Pipeline ---
print("Loading diarization pipeline...")
try:
    # If using token directly:
    # pipeline = Pipeline.from_pretrained(PYANNOTE_PIPELINE, use_auth_token=HF_TOKEN)
    # If logged in via CLI:
    pipeline = Pipeline.from_pretrained(PYANNOTE_PIPELINE, use_auth_token=HF_TOKEN)
    pipeline.to(torch.device(DEVICE))
    print("Diarization pipeline loaded.")
except Exception as e:
    print(f"Error loading diarization pipeline: {e}")
    print("Please ensure you have accepted user conditions on Hugging Face Hub and have a valid token.")
    exit()

# --- 2. Perform Speaker Diarization ---
print("Performing speaker diarization...")
try:
    diarization = pipeline(AUDIO_FILE, num_speakers=None) # Let pyannote detect number of speakers
    # Or specify num_speakers if known: diarization = pipeline(AUDIO_FILE, num_speakers=2)
    print("Diarization complete.")

    # Convert pyannote diarization to a list of speaker segments for easier lookup
    speaker_segments = []
    for turn, _, speaker in diarization.itertracks(yield_label=True):
        speaker_segments.append({
            "start": turn.start,
            "end": turn.end,
            "speaker": speaker
        })
    print(f"Found {len(speaker_segments)} speaker turns.")
    if not speaker_segments:
        print("Warning: No speaker segments found by pyannote.")

except Exception as e:
    print(f"Error during diarization: {e}")
    exit()

# --- 3. Load Whisper Model and Transcribe ---
print(f"Loading Whisper model '{WHISPER_MODEL}'...")
whisper_model = whisper.load_model(WHISPER_MODEL, device=DEVICE)
print("Whisper model loaded.")

print("Transcribing audio with word timestamps...")
try:
    # Set word_timestamps=True
    options = whisper.DecodingOptions(fp16 = torch.cuda.is_available()) # fp16 only works on CUDA
    result = whisper_model.transcribe(AUDIO_FILE, word_timestamps=True, **vars(options))
    print("Transcription complete.")
except Exception as e:
    print(f"Error during transcription: {e}")
    exit()

# --- 4. Align Transcription with Diarization ---
print("Aligning transcription with speaker segments...")

# Function to find the speaker for a given timestamp
def get_speaker_for_timestamp(timestamp, segments):
    for segment in segments:
        if segment["start"] <= timestamp < segment["end"]:
            return segment["speaker"]
    # Handle edge cases or words outside detected segments (assign to nearest? or mark unknown?)
    # Simple approach: return None or a default label if no segment matches
    # More robust: find the *closest* segment (might be needed for gaps)
    return "UNKNOWN_SPEAKER" # Or handle this case as needed

aligned_transcript = []
# Process Whisper results, which can have multiple segments
if 'segments' in result:
    for segment in result['segments']:
        if 'words' in segment:
            for word_info in segment['words']:
                word_start = word_info['start']
                word_end = word_info['end']
                word_text = word_info['word']

                # Use the middle of the word time to find the speaker
                word_mid_time = word_start + (word_end - word_start) / 2

                # Find the speaker segment this word belongs to
                speaker_label = get_speaker_for_timestamp(word_mid_time, speaker_segments)

                aligned_transcript.append({
                    "start": word_start,
                    "end": word_end,
                    "word": word_text,
                    "speaker": speaker_label
                })
        else:
             print("Warning: Segment found with no 'words' key. Check Whisper output structure.")

else:
    print("Warning: No 'segments' key found in Whisper result. Check Whisper output structure.")


print("Alignment complete.")

# --- 5. Format and Print Output ---
print("\n--- Speaker-Aligned Transcript ---")

if not aligned_transcript:
    print("No words found to align.")
else:
    current_speaker = aligned_transcript[0]['speaker']
    current_segment_start = aligned_transcript[0]['start']
    current_text = ""

    for i, word_data in enumerate(aligned_transcript):
        speaker = word_data['speaker']
        word = word_data['word']
        end_time = word_data['end']

        if speaker == current_speaker:
            current_text += word
        else:
            # Speaker changed, print previous segment
            print(f"[{current_segment_start:.2f}s - {last_end_time:.2f}s] {current_speaker}: {current_text.strip()}")
            # Start new segment
            current_speaker = speaker
            current_segment_start = word_data['start']
            current_text = word

        last_end_time = end_time # Keep track of the end time of the last word processed

        # Print the last segment after the loop finishes
        if i == len(aligned_transcript) - 1:
             print(f"[{current_segment_start:.2f}s - {last_end_time:.2f}s] {current_speaker}: {current_text.strip()}")

print("\n--- End of Transcript ---")

Using device: cuda
Loading diarization pipeline...


INFO:speechbrain.utils.quirks:Applied quirks (see `speechbrain.utils.quirks`): [allow_tf32, disable_jit_profiling]
INFO:speechbrain.utils.quirks:Excluded quirks specified by the `SB_DISABLE_QUIRKS` environment (comma-separated list): []


Diarization pipeline loaded.
Performing speaker diarization...
Diarization complete.
Found 294 speaker turns.
Loading Whisper model 'tiny'...
Whisper model loaded.
Transcribing audio with word timestamps...
Transcription complete.
Aligning transcription with speaker segments...
Alignment complete.

--- Speaker-Aligned Transcript ---
[0.00s - 17.54s] SPEAKER_00: Okay, so let's jump right in. Today, we're diving into something that I think is really like on everyone's mind these days. How do we wrap our heads around these really advanced AI systems, especially the ones we keep hearing about the large language models or LLMs?
[17.62s - 18.42s] SPEAKER_01: Right, right.
[18.52s - 20.46s] SPEAKER_00: It can feel so, so technical.
[20.66s - 21.12s] SPEAKER_01: Absolutely.
[21.70s - 23.56s] SPEAKER_00: But you know, what if we could take a shortcut?
[24.30s - 25.16s] SPEAKER_01: Interesting. What if
[25.16s - 26.00s] SPEAKER_00: we thought about it
[26.00s - 27.14s] UNKNOWN_SPEAKER: like,
[27