In [6]:
from pyannote.audio import Pipeline
from transformers import AutoProcessor, WhisperForConditionalGeneration
import pandas as pd
import torchaudio
import os
import torch
import openai
import os
import pandas as pd
import cv2
from moviepy.editor import VideoFileClip
from pydub import AudioSegment

import warnings
warnings.filterwarnings("ignore")

In [7]:

with open("openai_api_key.txt", "r") as f: 
    openai_api_key = f.read().strip()
with open("huggingface_token.txt", "r") as f:
    huggingface_token = f.read().strip()


os.environ["OPENAI_API_KEY"] = openai_api_key
openai.api_key = openai_api_key

In [8]:
class DialogueExtractor:
    def __init__(self, audio_clip_path, transcription_type='api'):
        self.audio_path = audio_clip_path
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        self.speaker_diarization_model = Pipeline.from_pretrained(
            "pyannote/speaker-diarization-3.1",
            use_auth_token=huggingface_token
        ).to(self.device)
        
        if transcription_type == 'api':
            self.processor = None
            self.local_transcriber = None
        elif transcription_type == 'local':
            self.processor = AutoProcessor.from_pretrained("openai/whisper-large-v3")
            self.local_transcriber = WhisperForConditionalGeneration.from_pretrained("openai/whisper-large-v3").to(self.device)
        else:
            raise ValueError("Invalid transcription type. Choose either 'api' or 'local'")
        
        self.waveform, self.sample_rate = torchaudio.load(self.audio_path)
    
    def _diarize(self):
        output = self.speaker_diarization_model({'waveform': self.waveform, 'sample_rate': self.sample_rate}, 
                                                min_speakers=2)
        segments = list(output.itersegments())
        
        diarization = pd.DataFrame(columns=['start', 'end', 'speaker'])
        for segment in segments:
            speakers = output.get_labels(segment)
            for speaker in speakers:
                start = segment.start
                end = segment.end
                
                diarization.loc[len(diarization)] = [start, end, speaker]

        return diarization

    def _transcribe_api(self, clip_path):
        with open(clip_path, "rb") as audio_file:
            transcription = openai.audio.transcriptions.create(
                model="whisper-1",
                file=audio_file,
                language="en"
            )

        return transcription.text

    def _transcribe_local(self, audio_path):
        wav, sr = torchaudio.load(audio_path)
        wav = wav.mean(dim=0).numpy()
        inputs = self.processor(wav, return_tensors="pt", sampling_rate=self.sample_rate).to(self.device)
        input_features = inputs.input_features
        seq = self.local_transcriber.generate(inputs=input_features)

        transcription = self.processor.batch_decode(seq, skip_special_tokens=True)[0]

        return transcription
    
    def _transcribe(self, audio_path):
        if self.local_transcriber is not None:
            return self._transcribe_local(audio_path)
        else:
            return self._transcribe_api(audio_path)

    def extract_dialogue(self):
        diarization_df = self._diarize()
        
        transcription_df = diarization_df.copy()
        transcription_df['transcription'] = None
        for i in range(len(diarization_df)):
            start = diarization_df.loc[i, 'start']
            end = diarization_df.loc[i, 'end']
            
            wav = self.waveform[:, int(start*self.sample_rate) : int(end*self.sample_rate)].clone()
            
            tmp_path = f"temp_{i}.wav"
            torchaudio.save(tmp_path, wav, self.sample_rate)
            transcription = self._transcribe(tmp_path)
            # os.remove(tmp_path)

            transcription_df.loc[i, 'transcription'] = transcription
        
        return transcription_df

def extract_clip_from_full(full_vid_path, clip_path, secs, duration):
    print(f"Extracting clip from {full_vid_path} to {clip_path}")
    cap = cv2.VideoCapture(full_vid_path)
    print(f"Is opened?: {cap.isOpened()}")
    
    clip = VideoFileClip(full_vid_path)
    subclip = clip.subclip(secs, secs+duration)
    subclip.write_videofile(clip_path, fps=5, codec='libx264', audio=True, audio_fps=100*160, audio_codec="pcm_s16le", verbose=False, logger=None)
    print(f"Video clip saved at {clip_path}")
    print()
    
def extract_audio(video_path, audio_path):
    video = VideoFileClip(video_path)
    audio = video.audio
    audio.write_audiofile(audio_path, verbose=False, logger=None)
    
    audio = AudioSegment.from_wav(audio_path)
    audio = audio.set_frame_rate(16000)
    audio.export(audio_path, format="wav")

In [9]:
full_vid_path = '../../full_videos/LFB1_full.mp4'
video_clip_path = 'clip.avi'
audio_clip_path = 'clip.wav'
start_secs = 15
duration = 20

extract_clip_from_full(full_vid_path, video_clip_path, start_secs, duration)
extract_audio(video_clip_path, audio_clip_path)

Extracting clip from ../../full_videos/LFB1_full.mp4 to clip.avi
Is opened?: True
Video clip saved at clip.avi



In [10]:
extractor = DialogueExtractor(audio_clip_path, transcription_type='api')
extractor.extract_dialogue()

Unnamed: 0,start,end,speaker,transcription
0,0.030969,1.009719,SPEAKER_02,their criteria.
1,1.009719,3.254094,SPEAKER_00,She symptomatically feels fine.
2,4.334094,6.089094,SPEAKER_02,What's the criteria now to come back?
3,7.219719,12.636594,SPEAKER_00,I think it's like a week if you're on the 7th ...
4,13.092219,13.547844,SPEAKER_01,OK.
