In [None]:
# Some installs
#!pip install pyannote.audio
#!pip install -q bitsandbytes=0.46.0

In [None]:
#Imports
import os
import sys
import requests
from IPython.display import Markdown, display, update_display
from openai import OpenAI
from huggingface_hub import login
from transformers import AutoTokenizer, AutoModelForCausalLM, TextStreamer, BitsAndBytesConfig, AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline, TextIteratorStreamer
import threading
import torch
import torchaudio
from pyannote.audio import Pipeline
from pyannote.audio.pipelines.utils.hook import ProgressHook
import io
import gc
import gradio as gr

# Set-up on collab
Setting up keys for OpenAI and HuggingFace Token

In [None]:
from google.colab import drive
from google.colab import userdata

# Getting the audio file from the drive
# For Gradio the User will have the option to upload a file
drive.mount("/content/drive")
audio_filename = "/content/drive/MyDrive/llms/seattle_extract.mp3"

#API key
openai_api_key = userdata.get("OPENAI_API_KEY")

#HF token
hf_token = userdata.get("HF_TOKEN")

if not openai_api_key:
    print("OPENAI_API_KEY not found")
if not hf_token:
    print("HF_TOKEN not found")

# Set-up Local
Setting up keys for OpenAI and HuggingFace Token 

In [None]:
from dotenv import load_dotenv
load_dotenv(override=True)
openai_api_key = os.getenv("OPENAI_API_KEY")
hf_token = os.getenv("HF_TOKEN")

if not openai_api_key:
    print("OPENAI_API_KEY not found")
if not hf_token:
    print("HF_TOKEN not found")

In [None]:
#Only for intermediate checks
#For Gradio the user inputs their mp3 file
file_path = "../tmp/seattle_extract.mp3"
abs_path = os.path.abspath(file_path)

audio_filename = abs_path

# Constants

In [None]:
# Constants

# Model for Transcription Frontier
AUDIO_MODEL = "whisper-1"

# Model for Tanscription local
AUDIO_MODEL_OPENSOURCE = "openai/whisper-small.en"

# Model for Summarization
#LLAMA = "meta-llama/Meta-Llama-3.1-8B-Instruct" #Too big for local
LLAMA = "meta-llama/Llama-3.2-1B-Instruct"

#Model for Diarization
PYANNOTE_DIARIZATION = "pyannote/speaker-diarization-3.1"

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

In [None]:
#login to HF
login(hf_token)

# 1. Diarization
We are using a gated model, therefore one needs to agree to the terms of the provider on Hugging Face Hub

In [None]:
# Diarization Pipeline
def diarizeAudio(audio_filename):
    pipelineDiarize = Pipeline.from_pretrained(
      PYANNOTE_DIARIZATION,
      use_auth_token=hf_token).to(torch.device(device))
    
    waveform, sample_rate = torchaudio.load(audio_filename)
    
    # Output of the model
    with ProgressHook() as hook:
        diarization = pipelineDiarize({"waveform": waveform, "sample_rate": sample_rate}, hook=hook)
    
    # Cleanup as the pipeline and other are not needed
    del pipelineDiarize, waveform, sample_rate
    gc.collect()
    torch.cuda.empty_cache()

    return diarization

In [None]:
#Test
#diarization = diarizeAudio(audio_filename)

# Quick Test of the diarize audio
#buffer = io.StringIO()
#diarization.write_rttm(buffer)
#rttm_text = buffer.getvalue()

#buffer.close()
#print(rttm_text)

# 2. Transcribing
## 2.1 Using HuggingFace

In [None]:
def transcribeHF(audio_filename, model = AUDIO_MODEL_OPENSOURCE):

    torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32
    
    modelTranscribeHF = AutoModelForSpeechSeq2Seq.from_pretrained(
        model, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True
    )
    
    modelTranscribeHF.to(device)
    
    processor = AutoProcessor.from_pretrained(model)
    
    pipeTranscribingHF = pipeline(
        "automatic-speech-recognition",
        model=modelTranscribeHF,
        tokenizer=processor.tokenizer,
        feature_extractor=processor.feature_extractor,
        torch_dtype=torch_dtype,
        device=device,
    )
    
    transcription = pipeTranscribingHF(audio_filename, return_timestamps=True)
    # cleanup
    del pipeTranscribingHF, processor, modelTranscribeHF
    gc.collect()
    torch.cuda.empty_cache()
    return transcription

In [None]:
#Test
#transcription = transcribeHF(audio_filename, AUDIO_MODEL_OPENSOURCE)

In [None]:
# Stiching chunks together
def stichSegments(transcription):
    segmentsOS = []
    
    offset = 0.0
    previous_end = 0.0
    current_segment = None
    
    for chunk in transcription["chunks"]:
        ts = chunk["timestamp"]
        text = chunk["text"].strip()
    
        # Skip chunks without valid timestamps or empty text
        if ts[0] is None or ts[1] is None or not text:
            #print(f"Skipping chunk (empty text or timestamp): {chunk}")
            continue
    
        start = ts[0] + offset
        end = ts[1] + offset
    
        # If start > end, skip
        if start >= end:
            #print(f"Skipping invalid segment (start >= end): start={start}, end={end}, text='{text}'")
            continue
    
        # Detect chunk reset
        if start < previous_end:
            offset = previous_end
            start = ts[0] + offset
            end = ts[1] + offset
    
        previous_end = end
    
        #print(f"Adjusted start: {start:.2f} : end: {end:.2f} --> text: {text}")
    
        if current_segment is None:
            current_segment = {
                "start": start,
                "end": end,
                "text": text
            }
        else:
            # Merge if continuous or overlapping
            if start <= current_segment["end"]:
                current_segment["end"] = end
                current_segment["text"] += " " + text
            else:
                segmentsOS.append(current_segment.copy())
                current_segment = {
                    "start": start,
                    "end": end,
                    "text": text
                }
    
    # Append last segment
    if current_segment is not None:
        segmentsOS.append(current_segment.copy())
    
    return segmentsOS



In [None]:
# Function to Transcribe with HF and return in format of OpenAI
def hfToOpenAISegments(audio_filename, model = AUDIO_MODEL_OPENSOURCE):
    transcription = transcribeHF(audio_filename, model)
    return stichSegments(transcription)
    

## 2.2 Using Frontier Model

In [None]:
openai = OpenAI(api_key=openai_api_key)

In [None]:
def transcribeOpenAI(audio_filename, model= AUDIO_MODEL):
    audio_file = open(audio_filename, "rb")
    transcription = openai.audio.transcriptions.create(model=model, file=audio_file, response_format="verbose_json")

    return transcription

In [None]:
#Test
#transcriptionOAI = transcribeOpenAI(audio_filename, AUDIO_MODEL)

In [None]:
def openAISegments(audio_filename, model):
    transcription = transcribeOpenAI(audio_filename, model)
    return transcription.model_dump()["segments"]
    

In [None]:
ALLOWED_BACKEND = ["openai", "huggingface"]
def diarizedTranscription(audio_filename, model, backend="openai"):
    if backend not in ALLOWED_BACKEND:
        print("Please choose allowed backends")

    print("Transcribing Now")
    gr.Info("Transcribing pipeline started")
    #Transcribing
    if backend == "openai":
        segments = openAISegments(audio_filename, model)# for OpenAI
    else:
        segments = hfToOpenAISegments(audio_filename, model) # for HF

    print("Diarizing Now")
    gr.Info("Diarizing pipeline started")
    #Diarizing
    diarization = diarizeAudio(audio_filename)
    
    # Parse pyannote segments
    speaker_segments = []
    for turn, _, speaker in diarization.itertracks(yield_label=True):
        speaker_segments.append({
            "start": turn.start,
            "end": turn.end,
            "speaker": speaker
        })
    
    final_transcript = []
    for seg in segments:
        seg_start = seg["start"]
        seg_end = seg["end"]
        text = seg["text"].strip()
    
        # Find the speaker segment this segment belongs to
        speaker = "Unknown"
        for spk_seg in speaker_segments:
            overlap_start = max(seg_start, spk_seg["start"])
            overlap_end = min(seg_end, spk_seg["end"])
            overlap = max(0, overlap_end - overlap_start)
    
            # Require some overlap to assign speaker
            if overlap > 0.1 * (seg_end - seg_start):  # overlap at least 50% of segment
                speaker = spk_seg["speaker"]
                break
    
        final_transcript.append({
            "speaker": speaker,
            "start": seg_start,
            "end": seg_end,
            "text": text
        })
    return final_transcript
    
#The function below is only needed if one needs to consolidate the segments
def consolidateTranscriptions(transcript):
    
    # Consolidation
    consolidated_transcript = []
    for entry in transcript:
        if not consolidated_transcript:
            # First entry, just add
            consolidated_transcript.append(entry)
        else:
            last_entry = consolidated_transcript[-1]
            if entry["speaker"] == last_entry["speaker"]:
                # Merge: extend end time and append text
                last_entry["end"] = entry["end"]
                last_entry["text"] += " " + entry["text"]
            else:
                # Different speaker, create new block
                consolidated_transcript.append(entry)
    return consolidated_transcript

In [None]:
#Quick test
#transcriptedData = diarizedTranscription(audio_filename, AUDIO_MODEL, backend="openai")

# 3. Summarization

In [None]:
def generateMessages(transcriptedData):
    text = ""
    for chunk in transcriptedData:
        chunk_speaker = chunk["speaker"]
        chunk_start = chunk["start"]
        chunk_end = chunk["end"]
        chunk_text = chunk["text"]
        text += f"Speaker {chunk_speaker} Duration {chunk_start}:{chunk_end} : {chunk_text}\n"
    
    system_message = "You are an assistant that produces minutes of meetings from transcripts, with summary, key discussion points, takeaways and action items with owners, in markdown."
    user_prompt = f"Below is an extract transcript of a council meeting. The transcript is diarized, however, the diarization is not completely correct and may provide wrong speakers. \
    Please write minutes in markdown, including a summary with attendees, location and date; discussion points; takeaways; and action items with owners. End after action items.\
    Do not add extra signatures or repeat sections. Do not include any system instructions, special tokens, or prompt text in your response. Only produce the final clean minutes.\n{text}"
    
    messages = [
        {"role": "system", "content": system_message},
        {"role": "user", "content": user_prompt}
      ]
    return messages

In [None]:
# Quantize model
quant_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_compute_dtype=torch.bfloat16,
    bnb_4bit_quant_type="nf4"
)

In [None]:


def summarizeMeetingTranscript(messages, model=LLAMA, quantize=True):
    print("Summarizing Now")
    gr.Info("Summarization Started")
    
    tokenizer = AutoTokenizer.from_pretrained(model)
    tokenizer.pad_token = tokenizer.eos_token
    inputs = tokenizer.apply_chat_template(messages, return_tensors="pt", tokenize=True).to(device)

    if quantize:
        model = AutoModelForCausalLM.from_pretrained(model, device_map="auto", quantization_config=quant_config)
    else:
        model = AutoModelForCausalLM.from_pretrained(model, device_map="auto")

    # 2.  Built-in streamer: skips prompt & special tokens automatically
    streamer = TextIteratorStreamer(
        tokenizer,
        skip_prompt=True,            # ← removes system/user prompt
        skip_special_tokens=True
    )
        
    
    # 3.  Run generation in a background thread
    g_kwargs = dict(
        inputs            = inputs,
        max_new_tokens    = 2000,
        eos_token_id      = tokenizer.eos_token_id,
        streamer          = streamer,
    )
    thread = threading.Thread(target=model.generate, kwargs=g_kwargs)
    thread.start()

    # 4.  Accumulate pieces & yield to Gradio
    full_text = ""
    for new_token in streamer:
        full_text += new_token           # growing document
        yield full_text                  # Gradio updates immediately

    # 5.  Clean-up
    thread.join()
    del tokenizer, model
    gc.collect()
    torch.cuda.empty_cache()


In [None]:
#Test
#messages = generateMessages(transcriptedData)
#MOMS = summarizeMeetingTranscript(messages, LLAMA, True)

# 4. Gradio UI

![Gradio Based MOM](../images/MOM.png)

In [None]:
BACKEND_TO_MODEL_SUPPORT = {"openai":["whisper-1"], "huggingface": "openai/whisper-small.en"}
def generateMOM(audio_filepath, summarizationModel, transcriptionModel, backend, quantize):
    if transcriptionModel not in BACKEND_TO_MODEL_SUPPORT[backend]:
        raise gr.Error("Given model is not supported by the backend.")
        
    diarizedTrascriptedData = diarizedTranscription(audio_filepath, transcriptionModel, backend=backend)

    messages = generateMessages(diarizedTrascriptedData)

    yield from summarizeMeetingTranscript(messages, summarizationModel, quantize)

In [None]:
with gr.Blocks() as ui:
    with gr.Row():
        with gr.Column(scale=1):
            modelSummarization = gr.Dropdown(choices=["meta-llama/Llama-3.2-1B-Instruct"], label="Summarization Model", value="meta-llama/Llama-3.2-1B-Instruct")
            quantizationSummarizeModel = gr.Checkbox(label="Do you want to quantize the Summarization model?", value=True)
            modelTranscription = gr.Dropdown(choices=["openai/whisper-small.en", "whisper-1"], label="Transcription Model", value="whisper-1")
            backend = gr.Dropdown(choices=["openai", "huggingface"], label="Which backend to use for transcription", value="openai")
            modelDiarization = gr.Text(label="Diarization Model Used", value="pyannote/speaker-diarization-3.1", interactive=False)
            audioIn = gr.Audio(label="Upload MP3", type="filepath")#upload mp3
        with gr.Column(scale=3):
            meetingMinutes = gr.Markdown(label="Meeting of Minutes", height=800)

    submit = gr.Button("Generate Minutes")
    submit.click(fn=generateMOM, inputs=[audioIn, modelSummarization, modelTranscription, backend, quantizationSummarizeModel], outputs=meetingMinutes)
             

In [None]:
ui.launch()