In [2]:
pip install openai-whisper

Collecting openai-whisper
  Downloading openai-whisper-20240930.tar.gz (800 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/800.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m800.5/800.5 kB[0m [31m24.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting tiktoken (from openai-whisper)
  Downloading tiktoken-0.8.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.6 kB)
Collecting triton>=2.0.0 (from openai-whisper)
  Downloading triton-3.1.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.3 kB)
Downloading triton-3.1.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (209.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m209.5/209.5 MB[0m [31m7.3 MB/s[0m eta [36m0:0

In [3]:
pip install pyannote.audio

Collecting pyannote.audio
  Downloading pyannote.audio-3.3.2-py2.py3-none-any.whl.metadata (11 kB)
Collecting asteroid-filterbanks>=0.4 (from pyannote.audio)
  Downloading asteroid_filterbanks-0.4.0-py3-none-any.whl.metadata (3.3 kB)
Collecting lightning>=2.0.1 (from pyannote.audio)
  Downloading lightning-2.5.0.post0-py3-none-any.whl.metadata (40 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.4/40.4 kB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting omegaconf<3.0,>=2.1 (from pyannote.audio)
  Downloading omegaconf-2.3.0-py3-none-any.whl.metadata (3.9 kB)
Collecting pyannote.core>=5.0.0 (from pyannote.audio)
  Downloading pyannote.core-5.0.0-py3-none-any.whl.metadata (1.4 kB)
Collecting pyannote.database>=5.0.1 (from pyannote.audio)
  Downloading pyannote.database-5.1.0-py3-none-any.whl.metadata (1.2 kB)
Collecting pyannote.metrics>=3.2 (from pyannote.audio)
  Downloading pyannote.metrics-3.2.1-py3-none-any.whl.metadata (1.3 kB)
Collecting pyannote.p

In [1]:
import whisper
from transformers import pipeline
from datetime import datetime
import os
import shutil
from pyannote.audio import Pipeline
import torch
import wave
import contextlib

class CallProcessor:
    def __init__(self, base_dir="call_records", auth_token=None):
        self.base_dir = base_dir
        self.audio_dir = os.path.join(base_dir, "audio")
        self.summary_dir = os.path.join(base_dir, "summaries")

        # Initialize Whisper for transcription
        self.transcriber = whisper.load_model("base")
        # Initialize summarization pipeline
        self.summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
        # Initialize speaker diarization pipeline
        if auth_token:
            self.diarization = Pipeline.from_pretrained(
                "pyannote/speaker-diarization@2.1",
                use_auth_token=auth_token
            )
            if torch.cuda.is_available():
                self.diarization = self.diarization.to(torch.device("cuda"))
        else:
            print("Warning: No auth token provided for speaker diarization")
            self.diarization = None

        # Create directories if they don't exist
        for directory in [self.audio_dir, self.summary_dir]:
            if not os.path.exists(directory):
                os.makedirs(directory)

    def get_audio_duration(self, audio_path):
        """Get duration of audio file in seconds"""
        with contextlib.closing(wave.open(audio_path, 'r')) as f:
            frames = f.getnframes()
            rate = f.getframerate()
            duration = frames / float(rate)
            return duration

    def diarize_audio(self, audio_path):
        """
        Perform speaker diarization on audio file
        Returns list of segments with speaker labels and timestamps
        """
        if not self.diarization:
            print("Speaker diarization not available - missing auth token")
            return None

        try:
            # Run diarization
            diarization = self.diarization(audio_path)

            # Convert to list of segments
            segments = []
            for turn, _, speaker in diarization.itertracks(yield_label=True):
                segments.append({
                    'speaker': speaker,
                    'start': turn.start,
                    'end': turn.end
                })
            return segments
        except Exception as e:
            print(f"Error during diarization: {e}")
            return None

    def transcribe_with_speakers(self, audio_path, diarization_segments):
        """
        Transcribe audio with speaker labels
        """
        try:
            # Get full transcription
            result = self.transcriber.transcribe(audio_path)
            full_text = result["text"]

            if not diarization_segments:
                return full_text

            # Break transcription into segments based on timestamps
            segments = result["segments"]

            # Match transcription segments with speaker labels
            transcription_with_speakers = []
            for segment in segments:
                # Find matching speaker segment
                segment_mid_time = (segment['start'] + segment['end']) / 2
                matching_speaker = None

                for speaker_segment in diarization_segments:
                    if (speaker_segment['start'] <= segment_mid_time and
                        speaker_segment['end'] >= segment_mid_time):
                        matching_speaker = speaker_segment['speaker']
                        break

                # Format segment with speaker label
                speaker_label = matching_speaker if matching_speaker else "Unknown Speaker"
                transcription_with_speakers.append(
                    f"[{speaker_label}]: {segment['text']}"
                )

            return "\n".join(transcription_with_speakers)

        except Exception as e:
            print(f"Error during transcription: {e}")
            return None

    def save_audio(self, audio_file_path):
        """
        Save a copy of the audio file with timestamp
        """
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        file_extension = os.path.splitext(audio_file_path)[1]
        new_filename = f"call_{timestamp}{file_extension}"
        new_path = os.path.join(self.audio_dir, new_filename)

        try:
            shutil.copy2(audio_file_path, new_path)
            return new_path, timestamp
        except Exception as e:
            print(f"Error saving audio file: {e}")
            return None, None

    def generate_summary(self, text):
        """
        Generate a summary of the transcribed text
        """
        try:
            max_chunk_length = 1024
            chunks = [text[i:i + max_chunk_length] for i in range(0, len(text), max_chunk_length)]

            summaries = []
            for chunk in chunks:
                summary = self.summarizer(chunk, max_length=130, min_length=30, do_sample=False)
                summaries.append(summary[0]['summary_text'])

            return " ".join(summaries)
        except Exception as e:
            print(f"Error during summarization: {e}")
            return None

    def format_email(self, transcription, summary, audio_path, speaker_count=None, call_metadata=None):
        """
        Format the email with transcription and summary
        """
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M")

        email_body = f"""
Subject: Call Summary Report - {timestamp}

AUDIO FILE LOCATION:
{audio_path}

{"NUMBER OF SPEAKERS DETECTED: " + str(speaker_count) if speaker_count else ""}

SUMMARY:
{summary}

FULL TRANSCRIPTION (WITH SPEAKERS):
{transcription}

{'CALL METADATA:' if call_metadata else ''}
{call_metadata if call_metadata else ''}
"""
        return email_body

    def save_summary(self, email_content, timestamp):
        """
        Save the email content to a text file
        """
        filename = f"call_summary_{timestamp}.txt"
        filepath = os.path.join(self.summary_dir, filename)

        try:
            with open(filepath, 'w', encoding='utf-8') as f:
                f.write(email_content)
            print(f"Summary saved to: {filepath}")
            return filepath
        except Exception as e:
            print(f"Error saving summary: {e}")
            return None

def process_call(audio_file_path, auth_token, base_dir="call_records"):
    """
    Main function to process a call recording
    """
    processor = CallProcessor(base_dir, auth_token)

    # Step 1: Save audio file
    print("Saving audio file...")
    saved_audio_path, timestamp = processor.save_audio(audio_file_path)
    if not saved_audio_path:
        return False

    # Step 2: Perform speaker diarization
    print("Detecting speakers...")
    diarization_segments = processor.diarize_audio(saved_audio_path)
    speaker_count = len(set(segment['speaker'] for segment in diarization_segments)) if diarization_segments else None

    # Step 3: Transcribe with speaker labels
    print("Transcribing audio...")
    transcription = processor.transcribe_with_speakers(saved_audio_path, diarization_segments)
    if not transcription:
        return False

    # Step 4: Summarize
    print("Generating summary...")
    summary = processor.generate_summary(transcription)
    if not summary:
        return False

    # Step 5: Format email and save
    print("Formatting and saving summary...")
    email_content = processor.format_email(
        transcription=transcription,
        summary=summary,
        audio_path=saved_audio_path,
        speaker_count=speaker_count,
        call_metadata={
            "Audio File": os.path.basename(saved_audio_path),
            "Date": datetime.now().strftime("%Y-%m-%d %H:%M"),
            "Duration": f"{processor.get_audio_duration(saved_audio_path):.2f} seconds",
            "Number of Speakers": speaker_count if speaker_count else "Unknown"
        }
    )

    # Save summary
    saved_summary = processor.save_summary(email_content, timestamp)
    return saved_summary is not None

In [2]:
auth_token = "EXAMPLETOKEN"
audio_file = "/content/phonecall.wav"
success = process_call(audio_file, auth_token)

100%|███████████████████████████████████████| 139M/139M [00:02<00:00, 54.5MiB/s]
  checkpoint = torch.load(fp, map_location=device)
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/1.58k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.63G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/363 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

Device set to use cuda:0


config.yaml:   0%|          | 0.00/500 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/17.7M [00:00<?, ?B/s]

config.yaml:   0%|          | 0.00/318 [00:00<?, ?B/s]

INFO:pytorch_lightning.utilities.migration.utils:Lightning automatically upgraded your loaded checkpoint from v1.5.4 to v2.5.0.post0. To apply the upgrade to your files permanently, run `python -m pytorch_lightning.utilities.upgrade_checkpoint ../root/.cache/torch/pyannote/models--pyannote--segmentation/snapshots/c4c8ceafcbb3a7a280c2d357aee9fbc9b0be7f9b/pytorch_model.bin`


Model was trained with pyannote.audio 0.0.1, yours is 3.3.2. Bad things might happen unless you revert pyannote.audio to 0.x.
Model was trained with torch 1.10.0+cu102, yours is 2.5.1+cu121. Bad things might happen unless you revert torch to 1.x.


hyperparams.yaml:   0%|          | 0.00/1.92k [00:00<?, ?B/s]

  wrapped_fwd = torch.cuda.amp.custom_fwd(fwd, cast_inputs=cast_inputs)


embedding_model.ckpt:   0%|          | 0.00/83.3M [00:00<?, ?B/s]

mean_var_norm_emb.ckpt:   0%|          | 0.00/1.92k [00:00<?, ?B/s]

classifier.ckpt:   0%|          | 0.00/5.53M [00:00<?, ?B/s]

label_encoder.txt:   0%|          | 0.00/129k [00:00<?, ?B/s]

  state_dict = torch.load(path, map_location=device)
  stats = torch.load(path, map_location=device)


Saving audio file...
Detecting speakers...


It can be re-enabled by calling
   >>> import torch
   >>> torch.backends.cuda.matmul.allow_tf32 = True
   >>> torch.backends.cudnn.allow_tf32 = True
See https://github.com/pyannote/pyannote-audio/issues/1370 for more details.



Transcribing audio...
Generating summary...
Formatting and saving summary...
Summary saved to: call_records/summaries/call_summary_20250105_214955.txt
