# Transcription App Notebook

This notebook demonstrates how to process a meeting video (or audio) file to:
- Extract the audio from the video.
- Transcribe the audio using an ASR model (e.g., Whisper).
- Perform speaker detection and segmentation with Silero VAD.
- Cluster the speech segments to assign speaker labels.
- Optionally assign conversation roles (e.g., Interviewer/Interviewee) using simple heuristics.

Below, you'll find code cells for each major step in the transcription process along with detailed explanations.


## Role Assignment

Using a simple heuristic (such as counting question marks), we assign conversation roles (e.g., "Interviewer" vs. "Interviewee") to each transcript segment. This step can be further improved using more advanced NLP methods.


In [3]:
import os
import sys
import argparse


In [4]:
def assign_roles(segments):
    """
    Assign roles to speakers using a simple heuristic.
    In this example, the speaker with the highest number of question marks is designated as "Interviewer".
    """
    speaker_texts = {}
    for seg in segments:
        speaker = seg.get('speaker', 'Unknown')
        text = seg.get('text', '')
        speaker_texts[speaker] = speaker_texts.get(speaker, "") + " " + text
    question_counts = {speaker: text.count('?') for speaker, text in speaker_texts.items()}
    interviewer = max(question_counts, key=question_counts.get) if question_counts else None
    
    for seg in segments:
        speaker = seg.get('speaker', 'Unknown')
        if speaker == interviewer and question_counts.get(interviewer, 0) > 0:
            seg['role'] = "Interviewer"
        else:
            seg['role'] = "Interviewee"
    return segments


In [5]:
def save_formatted_transcript(segments, output_file):
    """
    Save the transcript in the format: [role] speaker: text
    Returns a list of transcript lines (each as a dictionary).
    """
    transcript_lines = []
    try:
        with open(output_file, 'w', encoding='utf-8') as f:
            current_role = None
            current_speaker = None
            current_text = ""
            for seg in segments:
                role = seg.get('role', 'Unknown')
                speaker = seg.get('speaker', 'Unknown')
                text = seg.get('text', '').strip()
                if not text:
                    continue
                if (role, speaker) != (current_role, current_speaker) and current_text:
                    line = {"role": current_role, "speaker": current_speaker, "text": current_text}
                    transcript_lines.append(line)
                    f.write(f"[{current_role}] {current_speaker}: {current_text}\n")
                    current_text = ""
                current_role = role
                current_speaker = speaker
                if current_text:
                    current_text += " " + text
                else:
                    current_text = text
            if current_text:
                line = {"role": current_role, "speaker": current_speaker, "text": current_text}
                transcript_lines.append(line)
                f.write(f"[{current_role}] {current_speaker}: {current_text}\n")
        print(f"Role-based transcript saved to {output_file}")
        return transcript_lines
    except Exception as e:
        print(f"Error saving transcript: {e}")
        return []


## Speaker Detection and Clustering

Here we load the Silero VAD model to detect speech segments within the audio. We then compute simple embeddings for each segment and use Agglomerative Clustering to differentiate between speakers. Each segment is assigned a speaker label (e.g., Speaker 1, Speaker 2).


In [6]:
import torch
import numpy as np
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics import silhouette_score


In [7]:
def load_silero_vad():
    """
    Load the Silero VAD model and helper functions.
    """
    try:
        print("Loading Silero VAD model...")
        model, utils = torch.hub.load(repo_or_dir='snakers4/silero-vad',
                                      model='silero_vad',
                                      force_reload=True,
                                      onnx=False)
        (get_speech_timestamps, _, read_audio, _, _) = utils
        return model, get_speech_timestamps, read_audio
    except Exception as e:
        print(f"Error loading Silero VAD: {e}")
        return None, None, None



In [8]:

def get_speech_embeddings(audio_path, vad_model, get_speech_timestamps, read_audio):
    """
    Detect speech segments and compute simple embeddings.
    Returns an array of embeddings and a list of segment dictionaries.
    """
    try:
        wav = read_audio(audio_path, sampling_rate=16000)
        speech_timestamps = get_speech_timestamps(wav, vad_model, sampling_rate=16000, 
                                                    min_speech_duration_ms=500, 
                                                    max_speech_duration_s=float('inf'),
                                                    min_silence_duration_ms=500)
        print(f"Detected {len(speech_timestamps)} speech segments")
        embeddings = []
        segments = []
        
        def get_speech_segment(start_sample, end_sample):
            return wav[start_sample:end_sample]
        
        for i, segment in enumerate(speech_timestamps):
            start_sample = segment['start']
            end_sample = segment['end']
            speech_segment = get_speech_segment(start_sample, end_sample)
            if len(speech_segment) < 1600:
                continue
            # Use clone().detach() to avoid warnings.
            segment_tensor = torch.tensor(speech_segment).clone().detach().float()
            spec = torch.stft(
                segment_tensor,
                n_fft=512,
                hop_length=160,
                win_length=400,
                window=torch.hann_window(400),
                return_complex=True
            )
            spec = torch.abs(spec)
            freq_bands = [(0, 10), (10, 20), (20, 50), (50, 100), (100, 256)]
            feature_vector = []
            for low, high in freq_bands:
                band_energy = torch.mean(spec[low:high, :]).item()
                feature_vector.append(band_energy)
            zero_crossings = torch.sum(torch.abs(torch.sign(segment_tensor[1:]) - torch.sign(segment_tensor[:-1]))).item() / 2
            zero_crossing_rate = zero_crossings / len(segment_tensor)
            feature_vector.append(zero_crossing_rate)
            energy = torch.mean(torch.abs(segment_tensor)).item()
            feature_vector.append(energy)
            start_time = start_sample / 16000
            end_time = end_sample / 16000
            embeddings.append(feature_vector)
            segments.append({
                'start': start_time,
                'end': end_time,
                'length': end_time - start_time
            })
        return np.array(embeddings), segments
    except Exception as e:
        print(f"Error extracting speech embeddings: {e}")
        return None, None


In [9]:
def cluster_speakers(embeddings, segments, min_speakers=2, max_speakers=5):
    """
    Cluster speech segments to assign speaker labels.
    """
    if len(embeddings) == 0:
        print("No speech segments detected")
        return []
    
    embeddings = (embeddings - np.mean(embeddings, axis=0)) / (np.std(embeddings, axis=0) + 1e-8)
    best_score = -1
    best_labels = None
    best_n_clusters = min_speakers
    max_speakers = min(max_speakers, len(embeddings))
    
    for n_clusters in range(min_speakers, max_speakers + 1):
        if n_clusters >= len(embeddings):
            continue
        
        # Remove affinity parameter when using ward linkage.
        clustering = AgglomerativeClustering(n_clusters=n_clusters, linkage='ward')
        labels = clustering.fit_predict(embeddings)
        if n_clusters == 1 or len(set(labels)) <= 1:
            continue
        score = silhouette_score(embeddings, labels)
        print(f"Clusters: {n_clusters}, Silhouette Score: {score:.3f}")
        if score > best_score:
            best_score = score
            best_labels = labels
            best_n_clusters = n_clusters
    
    for i, segment in enumerate(segments):
        if i < len(best_labels):
            segment['speaker'] = f"Speaker {best_labels[i] + 1}"
    print(f"Selected {best_n_clusters} speakers with silhouette score: {best_score:.3f}")
    return segments

In [10]:
def assign_transcript_to_speakers(whisper_segments, vad_segments):
    """
    Align transcript segments from Whisper with VAD segments based on time overlap.
    """
    for w_segment in whisper_segments:
        w_start = w_segment['start']
        w_end = w_segment['end']
        best_overlap = 0
        best_speaker = "Unknown"
        for v_segment in vad_segments:
            v_start = v_segment['start']
            v_end = v_segment['end']
            overlap_start = max(w_start, v_start)
            overlap_end = min(w_end, v_end)
            if overlap_end > overlap_start:
                overlap_duration = overlap_end - overlap_start
                if overlap_duration > best_overlap:
                    best_overlap = overlap_duration
                    if 'speaker' in v_segment:
                        best_speaker = v_segment['speaker']
        w_segment['speaker'] = best_speaker
    return whisper_segments


## Audio Extraction from Video

In this section, we extract the audio track from a given video file using the `pydub` library. The audio is converted to mono and resampled to 16kHz to ensure compatibility with the speech models.


In [11]:
import os
from pydub import AudioSegment
import whisper


In [12]:
def convert_video_to_audio(video_path, audio_path):
    """
    Extract audio from a video file and save as a WAV file.
    Converts to mono and 16kHz sample rate.
    """
    try:
        if not os.path.exists(video_path):
            raise FileNotFoundError(f"Video file not found: {video_path}")
        video = AudioSegment.from_file(video_path)
        video = video.set_channels(1)
        video = video.set_frame_rate(16000)
        video.export(audio_path, format="wav")
        print(f"Audio extracted successfully to {audio_path}")
        return True
    except Exception as e:
        print(f"Error converting video to audio: {e}")
        return False

## Transcription with Whisper

This section loads the Whisper ASR model and transcribes the extracted audio. The output includes the full transcript along with time-stamped segments.


In [13]:
def transcribe_audio(audio_path, model_name="base"):
    """
    Transcribe the audio using Whisper.
    Returns the transcription result dictionary.
    """
    try:
        print(f"Loading Whisper model: {model_name}")
        model = whisper.load_model(model_name)
        print("Transcribing audio...")
        result = model.transcribe(audio_path, verbose=True)
        return result
    except Exception as e:
        print(f"Error transcribing audio: {e}")
        return None


In [14]:
def parse_transcript_file(transcript_file):
    """
    Parse the transcript file and create a list of dictionaries for insertion.
    Each line in the transcript is assumed to be in the format:
      [Role] Speaker Label: Transcript text
    The function extracts:
      - role: text within the square brackets
      - speaker_label: text between the square bracket and the colon
      - transcript: text after the colon.
    Since no meeting_id, start_time, or end_time information is available, these will be set to None.
    """
    transcript_lines = []
    with open(transcript_file, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            # Find the role in square brackets
            if line.startswith('['):
                try:
                    role_end = line.index(']')
                    role = line[1:role_end]
                except ValueError:
                    role = "Unknown"
            else:
                role = "Unknown"
            
            # Split at the colon
            if ':' in line:
                header, transcript_text = line.split(':', 1)
            else:
                header, transcript_text = line, ""
            
            # Remove the role part from header to get the speaker label
            speaker_label = header.replace(f'[{role}]', '').strip()
            
            transcript_lines.append({
                "meeting_id": None,
                "speaker_label": f"{role} - {speaker_label}",
                "transcript": transcript_text.strip(),
                "start_time": None,
                "end_time": None
            })
    return transcript_lines


## Database Functions

In [15]:
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import os
from dotenv import load_dotenv
load_dotenv()

True

In [16]:
DB_URL = os.getenv("DB_URL", "postgresql+psycopg2://your_db_user:your_db_password@localhost/your_db_name")
Base = declarative_base()

  Base = declarative_base()


In [17]:
class Transcript(Base):
    __tablename__ = 'transcripts'
    id = Column(Integer, primary_key=True)
    meeting_id = Column(String(50), nullable=True)  # You can provide a meeting ID if available.
    speaker_label = Column(String(100))
    transcript = Column(Text)
    start_time = Column(Float, nullable=True)  # e.g., seconds into the video.
    end_time = Column(Float, nullable=True)
    created_at = Column(DateTime, default=datetime.utcnow)

def insert_transcript_lines_sqlalchemy(db_url = DB_URL, transcript_lines=[]):
    """
    Insert transcript lines into the PostgreSQL database using SQLAlchemy.
    
    Each transcript_line in transcript_lines should be a dictionary with keys:
      - meeting_id (optional)
      - speaker_label (e.g., "Interviewer - Speaker 1")
      - transcript (the text content)
      - start_time (float)
      - end_time (float)
    """
    engine = create_engine(db_url)
    Base.metadata.create_all(engine)  # Create table if it doesn't exist.
    Session = sessionmaker(bind=engine)
    session = Session()
    
    try:
        for line in transcript_lines:
            transcript_entry = Transcript(
                meeting_id=line.get('meeting_id'),
                speaker_label=line.get('speaker_label'),
                transcript=line.get('transcript'),
                start_time=line.get('start_time'),
                end_time=line.get('end_time')
            )
            session.add(transcript_entry)
        session.commit()
        print("Transcript lines inserted successfully using SQLAlchemy!")
    except Exception as e:
        session.rollback()
        print(f"Error inserting transcript lines: {e}")
    finally:
        session.close()


In [18]:
def parse_transcript_file(transcript_file):
    """
    Parse the transcript file and create a list of dictionaries for insertion.
    Each line in the transcript is assumed to be in the format:
      [Role] Speaker Label: Transcript text
    The function extracts:
      - role: text within the square brackets
      - speaker_label: text between the square bracket and the colon
      - transcript: text after the colon.
    Since no meeting_id, start_time, or end_time information is available, these will be set to None.
    """
    transcript_lines = []
    with open(transcript_file, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            # Find the role in square brackets
            if line.startswith('['):
                try:
                    role_end = line.index(']')
                    role = line[1:role_end]
                except ValueError:
                    role = "Unknown"
            else:
                role = "Unknown"
            
            # Split at the colon
            if ':' in line:
                header, transcript_text = line.split(':', 1)
            else:
                header, transcript_text = line, ""
            
            # Remove the role part from header to get the speaker label
            speaker_label = header.replace(f'[{role}]', '').strip()
            
            transcript_lines.append({
                "meeting_id": None,
                "speaker_label": f"{role} - {speaker_label}",
                "transcript": transcript_text.strip(),
                "start_time": None,
                "end_time": None
            })
    return transcript_lines


## Saving the Transcript

Finally, the processed transcript—with speaker and role labels—is saved to a text file. Each line in the output file is formatted as:


In [20]:
def process_video(video_path, model="base", output="role_transcript.txt", 
                  min_speakers=2, max_speakers=2,
                  db_url="postgresql+psycopg2://user:1234@192.168.10.132:5432/amas"):
    # Create temporary directory for audio extraction
    os.makedirs("temp", exist_ok=True)
    audio_path = os.path.join("temp", os.path.basename(video_path) + ".wav")
    
    # Step 1: Extract audio from video
    if not convert_video_to_audio(video_path, audio_path):
        print("Video conversion failed.")
        return
    
    # Step 2: Transcribe audio using Whisper
    transcription = transcribe_audio(audio_path, model)
    if transcription is None:
        print("Transcription failed.")
        return
    
    # Step 3: Load Silero VAD
    vad_model, get_speech_timestamps, read_audio = load_silero_vad()
    if vad_model is None:
        print("Silero VAD failed to load. Exiting.")
        return
    
    # Step 4: Get speech embeddings and segments using VAD
    embeddings, vad_segments = get_speech_embeddings(audio_path, vad_model, get_speech_timestamps, read_audio)
    if embeddings is None or len(embeddings) == 0:
        print("No speech segments detected. Exiting.")
        return
    
    # Step 5: Cluster segments to assign speaker labels
    vad_segments = cluster_speakers(embeddings, vad_segments,
                                    min_speakers=min_speakers,
                                    max_speakers=max_speakers)
    
    # Step 6: Assign Whisper transcript segments to speakers based on time overlap
    segments_with_speakers = assign_transcript_to_speakers(transcription["segments"], vad_segments)
    
    # Step 7: Assign roles using a simple heuristic (e.g., based on question counts)
    segments_with_roles = assign_roles(segments_with_speakers)
    
    # Step 8: Save the formatted transcript and retrieve transcript lines
    transcript_lines = save_formatted_transcript(segments_with_roles, output)
    
    # Clean up temporary audio file
    os.remove(audio_path)
    print("\nProcessing completed successfully!")
    
    transcript_lines = parse_transcript_file(output)
    
    # Step 9: Insert transcript lines into PostgreSQL using SQLAlchemy
    insert_transcript_lines_sqlalchemy(db_url, transcript_lines)


In [21]:
process_video(r"C:\Users\HARSH DADIYA\Downloads\sample_audio_for_transcript.mp3", model="base")

Audio extracted successfully to temp\sample_audio_for_transcript.mp3.wav
Loading Whisper model: base
Transcribing audio...




Detecting language using up to the first 30 seconds. Use `--language` to specify the language
Detected language: English
[00:00.000 --> 00:03.000]  Yes, Gauri, tell me something about you.
[00:04.000 --> 00:07.000]  Thank you for allowing me to introduce myself.
[00:08.000 --> 00:09.000]  My name is Gauri Swamali.
[00:09.000 --> 00:11.000]  I am from Rajasthan.
[00:11.000 --> 00:16.000]  I am doing computer science engineering from Githa Anjali Institute of Technical Studies.
[00:16.000 --> 00:23.000]  I secured 9.2 CGPA in 10 standard, 73.4 CG percentage and 12 standard.
[00:23.000 --> 00:27.000]  And currently in engineering, my aggregates code is 77 percentage.
[00:28.000 --> 00:32.000]  Computer science is omnipresent, that is, it is present in every field.
[00:32.000 --> 00:39.000]  And hence, I have invested these last 3.5 almost years in developing my software skills.
[00:39.000 --> 00:46.000]  My technical skills include the programming in Python, Java, C, C++ databases.
[00:46

Downloading: "https://github.com/snakers4/silero-vad/zipball/master" to C:\Users\HARSH DADIYA/.cache\torch\hub\master.zip


Detected 141 speech segments


  segment_tensor = torch.tensor(speech_segment).clone().detach().float()


Clusters: 2, Silhouette Score: 0.305
Selected 2 speakers with silhouette score: 0.305
Role-based transcript saved to role_transcript.txt

Processing completed successfully!


OperationalError: (psycopg2.OperationalError) connection to server at "192.168.10.132", port 5432 failed: Connection timed out (0x0000274C/10060)
	Is the server running on that host and accepting TCP/IP connections?

(Background on this error at: https://sqlalche.me/e/20/e3q8)