In [None]:
%pip install pyannote.audio

In [None]:
from pyannote.audio import Pipeline
from pyannote.audio.pipelines.utils.hook import ProgressHook

import torchaudio

In [None]:
import os
from pathlib import Path

key_path = os.path.join(Path.home(), ".keys", "huggingface.key")
key = open(key_path, "r").read().strip()

In [None]:
pipeline = Pipeline.from_pretrained('pyannote/speaker-diarization', use_auth_token=key)

In [None]:
audio_path = '/data/sbdp/PHOENIX/PROTECTED/CAMI/CAMI221/onsite_interview/processed/CAMI-CAMI221-onsiteInterview_audio_webcam-day002_9.mp3'

waveform, sample_rate = torchaudio.load(audio_path)
with ProgressHook() as hook:
    diarization = pipeline({"waveform": waveform, "sample_rate": sample_rate}, hook=hook, num_speakers=2)

In [None]:
speakers = []
cumulative_durations = []

for segment, _, label in diarization.itertracks(yield_label=True):
    speaker = label
    start, end, duration = segment.start, segment.end, segment.duration

    if speaker not in speakers:
        speakers.append(speaker)
        cumulative_durations.append(0)

    speaker_index = speakers.index(speaker)
    cumulative_durations[speaker_index] += duration
    
for speaker in speakers:
    speaker_index = speakers.index(speaker)
    print(f"{speaker} spoke for {cumulative_durations[speaker_index]} seconds.")

# interviewer speaks less than interviewee
interviewer_index = cumulative_durations.index(min(cumulative_durations))
interviewee_index = cumulative_durations.index(max(cumulative_durations))

print(f"Interviewer: {speakers[interviewer_index]}")

In [None]:
with open("diarization.txt", "w") as text_file:
    for segment, _, label in diarization.itertracks(yield_label=True):
        speaker = label
        start, end, duration = segment.start, segment.end, segment.duration
        start_ms = int(start * 1000)
        end_ms = int(end * 1000)

        if speaker == speakers[interviewer_index]:
            text_file.write(f"{start_ms},{end_ms},Interviewer\n")

        if speaker == speakers[interviewee_index]:
            text_file.write(f"{start_ms},{end_ms},Participant\n")

In [None]:
import whisper

In [None]:
model = whisper.load_model("large")

In [None]:
audio_path = '/data/sbdp/PHOENIX/PROTECTED/CAMI/CAMI221/onsite_interview/processed/CAMI-CAMI221-onsiteInterview_audio_webcam-day002_9.mp3'

In [None]:
result = model.transcribe(audio_path, verbose=True, word_timestamps=True, language="en")

In [None]:
result

In [None]:

segments = result['segments']
for segment in segments:
    start, end, transcript = int(segment['start']), int(segment['end']), segment['text']

    # words = segment['words']
    # for word in words:
    #     start, end, word = (word['start']), (word['end']), word['word']
    #     start_ms = int(start * 1000)
    #     end_ms = int(end * 1000)
    #     print(f"{start} - {end}: {word}")

    start_ms = int(start * 1000)
    end_ms = int(end * 1000)
    print(f"{start} - {end}: {transcript}")

In [None]:
with open("transcript.txt", "w") as text_file:
    for segment in segments:
        start, end, transcript = int(segment['start']), int(segment['end']), segment['text']

        words = segment['words']
        for word in words:
            start, end, word = (word['start']), (word['end']), word['word']
            start_ms = int(start * 1000)
            end_ms = int(end * 1000)
            text_file.write(f"{start_ms},{end_ms},{word}\n")

        # start_ms = int(start * 1000)
        # end_ms = int(end * 1000)

        # text_file.write(f"{start_ms},{end_ms},{transcript}\n")

In [None]:
def ms_to_srt(ms: float) -> str:
    # Convert milliseconds to seconds
    s = ms / 1000
    # Extract hours, minutes and seconds
    h = int(s // 3600)
    m = int((s % 3600) // 60)
    s = int(s % 60)
    # Extract milliseconds
    mil = int((ms % 1000))
    # Format the timestamp as HH:MM:SS,MIL
    return f"{h:02d}:{m:02d}:{s:02d},{mil:03d}"


In [None]:
from typing import Optional, List


class SubtitleElement:
    def __init__(
        self,
        start_ms: int,
        end_ms: int,
        text: str,
        speaker: str,
        index: Optional[int] = None,
    ) -> None:
        self.index = index
        self.start_ms = start_ms
        self.end_ms = end_ms
        self.text = text.strip()
        self.speaker = speaker

    def __str__(self) -> str:
        string_representation = f"""{self.index}\n{ms_to_srt(self.start_ms)} --> {ms_to_srt(self.end_ms)}\n[{self.speaker}]\n{self.text.strip()}\n"""
        return string_representation

    def __repr__(self) -> str:
        return self.__str__()


class Subtitles:
    def __init__(self) -> None:
        self.index = 0
        self.elements: List[SubtitleElement] = []

    def add_element(self, element: SubtitleElement) -> None:
        element.index = self.index
        self.index += 1
        self.elements.append(element)

    def join_adjacent_elements(self):
        # If speaker is the same, and the first element doesn't end with a stop symbol, join them
        stop_characters = [".", "?", "!"]
        max_words_per_line = 7

        idx = 0
        while idx < len(self.elements) - 1:
            element = self.elements[idx]
            next_element = self.elements[idx + 1]

            if (
                element.speaker == next_element.speaker
                and element.text[-1] not in stop_characters
                and len(element.text.split(" ")) < max_words_per_line
            ):
                element.text = element.text.strip() + " " + next_element.text.strip()
                element.end_ms = next_element.end_ms
                self.elements.remove(next_element)
            else:
                idx += 1

    def __str__(self) -> str:
        string_representation = ""
        for element in self.elements:
            string_representation += str(element) + "\n"
        return string_representation

    def __repr__(self) -> str:
        return self.__str__()
    
    def to_file(self, path: str) -> None:
        with open(path, "w") as text_file:
            text_file.write(str(self))

In [None]:
transcript = open("transcript.txt", "r").read().strip()
diarization = open("diarization.txt", "r").read().strip()

transcript = transcript.split("\n")
diarization = diarization.split("\n")

subtitles = Subtitles()

for transcript_line in transcript:
    transcript_parts = transcript_line.split(",")
    transcript_start = transcript_parts[0]
    transcript_end = transcript_parts[1]
    transcript_text = ",".join(transcript_parts[2:])

    transcript_start, transcript_end = int(transcript_start), int(transcript_end)

    cumulative_durations = dict()
    cumulative_durations["Interviewer"] = 0
    cumulative_durations["Participant"] = 0

    for diarization_line in diarization:
        diarization_start, diarization_end, speaker = diarization_line.split(",")
        diarization_start, diarization_end = int(diarization_start), int(
            diarization_end
        )

        if diarization_start > transcript_end:
            # cumulative_durations[speaker] += transcript_end - transcript_start
            break
        elif diarization_end < transcript_start:
            continue

        if diarization_end < transcript_start:
            cumulative_durations[speaker] += diarization_end - diarization_start
        elif diarization_start > transcript_end:
            pass
        else:
            cumulative_durations[speaker] += transcript_end - max(
                diarization_start, transcript_start
            )

    primary_speaker = max(cumulative_durations, key=cumulative_durations.get)
    print(f"{primary_speaker}: {transcript_text}")

    subtitle_element = SubtitleElement(
        start_ms=transcript_start,
        end_ms=transcript_end,
        text=transcript_text,
        speaker=primary_speaker,
    )
    subtitles.add_element(subtitle_element)

    # # convert from ms to srt format (00:00:00,000)
    # start = ms_to_srt(transcript_start)
    # end = ms_to_srt(transcript_end)

    # srt_file.write(f"{transcript_idx}\n")
    # srt_file.write(f"{start} --> {end}\n")
    # srt_file.write(f"{primary_speaker}\n")
    # srt_file.write(f"{cumulative_durations}\n")
    # srt_file.write(f"{transcript_text.strip()}\n\n")

    # transcript_idx += 1

In [None]:
subtitles.join_adjacent_elements()
subtitles.to_file("subtitles.srt")

In [None]:
transcript = open("transcript.txt", "r").read().strip()
diarization = open("diarization.txt", "r").read().strip()

transcript = transcript.split("\n")
diarization = diarization.split("\n")

transcript_idx = 0
with open("subtitles.srt", "w") as srt_file:
    for transcript_line in transcript:
        transcript_parts = transcript_line.split(",")
        transcript_start = transcript_parts[0]
        transcript_end = transcript_parts[1]
        transcript_text = ",".join(transcript_parts[2:])

        transcript_start, transcript_end = int(transcript_start), int(transcript_end)

        cumulative_durations = dict()
        cumulative_durations["Interviewer"] = 0
        cumulative_durations["Participant"] = 0
        
        for diarization_line in diarization:
            diarization_start, diarization_end, speaker = diarization_line.split(",")
            diarization_start, diarization_end = int(diarization_start), int(diarization_end)

            if diarization_start > transcript_end:
                # cumulative_durations[speaker] += transcript_end - transcript_start
                break
            elif diarization_end < transcript_start:
                continue

            if diarization_end < transcript_start:
                cumulative_durations[speaker] += diarization_end - diarization_start
            elif diarization_start > transcript_end:
                pass
            else:
                cumulative_durations[speaker] += transcript_end - max(diarization_start, transcript_start)

        primary_speaker = max(cumulative_durations, key=cumulative_durations.get)
        print(f"{primary_speaker}: {transcript_text}")

        # convert from ms to srt format (00:00:00,000)
        start = ms_to_srt(transcript_start)
        end = ms_to_srt(transcript_end)

        srt_file.write(f"{transcript_idx}\n")
        srt_file.write(f"{start} --> {end}\n")
        srt_file.write(f"{primary_speaker}\n")
        srt_file.write(f"{cumulative_durations}\n")
        srt_file.write(f"{transcript_text.strip()}\n\n")

        transcript_idx += 1

    

