In [22]:
import utils
from pathlib import Path
import feedparser
import mimetypes

class Episode:
    """ Class that holds episode information """
    def __init__(self, podcast_title, episode_feed):
        self.podcast = podcast_title
        # Episode Information
        self.title = episode_feed.title
        for link in episode_feed.links:
            if link['type'] == 'audio/mpeg':
                self.url = link.href
        # Save files paths
        self.file_paths = self.setup_paths()

    def setup_paths(self):
        """ Sets up the file pats for the episode """
        paths = {}
        # Files' names
        AUDIO_FILE_NAME = "audio.mp3"
        TRANSCRIPT_FILE_NAME = "transcript.txt"
        SUMMARY_FILE_NAME = "summary.txt"
        HIGHLIGHTS_FILE_NAME = "highlights.txt"
        # Setup content folder
        CONTENT_DIRECTORY = Path("podcasts")
        CONTENT_DIRECTORY.mkdir(exist_ok=True)
        # Setup podcast folder
        podcast_folder_name = utils.to_filename(self.podcast)
        podcast_directory = CONTENT_DIRECTORY.joinpath(podcast_folder_name)
        podcast_directory.mkdir(exist_ok=True)
        # Setup episode folder
        episode_directory = podcast_directory.joinpath(f"episode_{0}")
        episode_directory.mkdir(exist_ok=True)
        # Determin audio, transcript, summary and highlights path
        paths['audio_path'] = episode_directory.joinpath(AUDIO_FILE_NAME)
        paths['transcript_path']= episode_directory.joinpath(TRANSCRIPT_FILE_NAME)
        paths['summary_path'] = episode_directory.joinpath(SUMMARY_FILE_NAME)
        paths['highlights_path'] = episode_directory.joinpath(HIGHLIGHTS_FILE_NAME)

        return paths

    def download(self):
        """ Downloads the episode audio """
        # Check file type before downloading
        file_type, _ = mimetypes.guess_type(self.url)
        if file_type != 'audio/mpeg':
            raise ValueError("Invalid audio file type")
        utils.download_audio(self.url, audio_path=self.file_paths['audio_path'])


class Podcast:
    """ Class that holds the podcast feed and its metadata """
    def __init__(self, url):
        self.feed = feedparser.parse(url)
        # Podcast Information
        self.title = self.feed.feed.title
        self.subtitle = self.feed.feed.subtitle
        # Last Episode
        self.episode = Episode(self.title, self.feed.entries[0])


In [None]:
import torch
import whisperx
import gc

class AudioTranscriber:

    def __init__(self, trans_model="large-v2", batch_size=16, device='cuda',
                 compute_type="float16"):
        self.device = device
        self.batch_size = batch_size
        self.compute_type = compute_type
        self.trans_model = trans_model
        self.HF_TOKEN = "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"

    def transcribe_audio(self, audio_path, align=True, diarize=True):
        audio = whisperx.load_audio(audio_path)
        print("Transcribing audio with Whisper...")
        trans_model = whisperx.load_model(self.trans_model, device=self.device, compute_type=self.compute_type)
        result = trans_model.transcribe(audio, batch_size=self.batch_size)
        # delete model if low on GPU resources
        gc.collect(); torch.cuda.empty_cache(); del trans_model

        if align:
          print("Aligning the transcription with the audio...")
          align_model, metadata = whisperx.load_align_model(language_code=result['language'],
                                                            device=self.device)
          result = whisperx.align(result["segments"], align_model,
                                                 metadata, audio, self.device, return_char_alignments=False)
          # delete model if low on GPU resources
          gc.collect(); torch.cuda.empty_cache(); del align_model

          if diarize:
            print("Diarizing...")
            diarize_model = whisperx.DiarizationPipeline(use_auth_token=self.HF_TOKEN, device=self.device)
            diarize_segments = diarize_model(audio)
            result = whisperx.assign_word_speakers(diarize_segments, result)
            # delete model if low on GPU resources
            gc.collect(); torch.cuda.empty_cache(); del diarize_model


        return result

In [23]:
podcast = Podcast("http://feeds.feedburner.com/TEDTalks_audio")
podcast.episode.download()