In [None]:
# !pip install moviepy pydub

In [None]:
import whisper
import pathlib
import json
from moviepy.editor import VideoFileClip
from pydub import AudioSegment
from pydub.utils import make_chunks
from helpers import utils
import random

In [None]:
NBS_DIR = pathlib.Path().resolve().parent
BASE_DIR = NBS_DIR
DATASET_DIR = BASE_DIR / "dataset"

In [None]:
whisper_model = "tiny" # tiny, base, small, medium, large 
model = whisper.load_model(whisper_model) 

In [None]:
def make_audio_chunks(auto_path, chunk_dir=None):
    path = pathlib.Path(auto_path).resolve()
    if f"{str(path)}".endswith('.mp4'):
        clip = VideoFileClip(str(path))
        audio = clip.audio
        path = path.parent / 'audio.mp3'
        audio.write_audiofile(str(path))
    myaudio = AudioSegment.from_file(str(path))
    chunks = make_chunks(myaudio, 30 * 1000) # 30 seconds -> 30_000 ms
    #Export all of the individual chunks as wav files
    parent = path.parent
    if not isinstance(chunk_dir, pathlib.Path): 
        chunk_dir = parent / "chunk"
    chunk_dir.mkdir(exist_ok=True)
    for current_chunk_path in chunk_dir.glob("*.wav"):
        current_chunk_path.unlink()
    for i, chunk in enumerate(chunks):
        i_padded = f"{i}"
        if len(i_padded) == 1:
            i_padded = f"0{i}"
        chunk_name = chunk_dir / f"{i_padded}-chunk.wav"
        chunk.export(str(chunk_name), format="wav")
    return len(list(chunk_dir.glob('*.wav')))

In [None]:
def detect_language(chunk_path):
    audio = whisper.load_audio(str(chunk_path))
    audio = whisper.pad_or_trim(audio)
    
    # make log-Mel spectrogram and move to the same device as the model
    if whisper_model == "large":
        mel = whisper.log_mel_spectrogram(audio=audio, n_mels=128).to(model.device)
    else:
        mel = whisper.log_mel_spectrogram(audio).to(model.device)
    
    # detect the spoken language
    _, probs = model.detect_language(mel)
    print(f"Detected language: {max(probs, key=probs.get)}")
    return max(probs, key=probs.get)

In [None]:
def download_and_chunk_audio_files(max_podcasts=10):
    for path in list(DATASET_DIR.glob("**/**/podcast.json"))[:max_podcasts]:
        podcast_detail_dir = path.parent
        podcasts_dir = podcast_detail_dir.parent
        post_dir = podcasts_dir.parent
        post_id = post_dir.name
        podcast_data = json.loads(path.read_text())
        podcast_id = podcast_data.get('trackId')
        episode_url = podcast_data.get('episodeUrl')
        # print(episode_url, podcast_detail_dir)
        episode_url = utils.convert_encoded_url(episode_url)
        fname = utils.get_fname(episode_url)
        if fname is None:
            continue
        fpath = podcast_detail_dir / fname
        if not fpath.exists():
            print("Downloading", fname)
            fname = utils.download_file(episode_url, destination_path=podcast_detail_dir)
            print('Download complete')
        print('Chunking', fname)
        chunk_dir = podcast_detail_dir / "chunk"
        total_chunks = 0
        try:
            total_chunks = make_audio_chunks(fpath, chunk_dir=chunk_dir)
        except:
            pass
        print('Chunking done with', total_chunks, 'total chunks')
        print('Extracting audio language')
        lang = "unknown"
        chunk_list = list(chunk_dir.glob("*.wav"))
        if len(chunk_list)>0:
            random_chunk = random.choice(chunk_list)
            print('Random sample for language detection', pathlib.Path(random_chunk).name)
            try:
                lang = detect_language(random_chunk)
            except:
                pass
            lang_path = podcast_detail_dir / "pred-language.txt"
        lang_path.write_text(lang)
        print('Predicted audio language is', lang)
        print()

download_and_chunk_audio_files()

In [None]:
def transcribe_chunks(transcribe_all = True, max_podcasts=10):
    for path in list(DATASET_DIR.glob("**/**/podcast.json"))[:max_podcasts]:
        podcast_detail_dir = path.parent
        lang_path = podcast_detail_dir / "pred-language.txt"
        if not lang_path.exists():
            continue
        transcript_path = podcast_detail_dir / 'transcript.txt'
        if transcript_path.exists() and not transcribe_all:
            continue
        chunk_dir = podcast_detail_dir / "chunk"
        files = list(chunk_dir.glob("*.wav"))
        sorted_files = sorted(files, key=lambda file: int(file.stem.split('-')[0]))
        print('Transcribing', podcast_detail_dir.name)
        for chunk_path in sorted_files:
            chunk_path = pathlib.Path(chunk_path).resolve()
            print('Working on', chunk_path.relative_to(podcast_detail_dir))
            result = model.transcribe(str(chunk_path)) 
            chunk_text_path = chunk_path.parent / f"{chunk_path.stem}.json"
            chunk_text_path.write_text(json.dumps(result))
        print()

transcribe_chunks(transcribe_all=False)

In [None]:
def chunk_transcriptions_to_full_transcript():
    for path in list(DATASET_DIR.glob("**/**/podcast.json")):
        podcast_detail_dir = path.parent
        lang_path = podcast_detail_dir / "pred-language.txt"
        if not lang_path.exists():
            continue
        chunk_dir = podcast_detail_dir / "chunk"
        transcript = ""
        files = list(chunk_dir.glob("*.json"))
        sorted_files = sorted(files, key=lambda file: int(file.stem.split('-')[0]))
        for result_path in sorted_files:
            result_path = pathlib.Path(result_path).resolve()
            if not result_path.exists():
                continue
            try:
                result_data = json.loads(result_path.read_text())
            except:
                result_data = {}
            text = result_data.get('text')
            if isinstance(text, str):
                transcript += f" {text} "
        transcript_path = podcast_detail_dir / 'transcript.txt'
        if transcript != "":
            transcript_path.write_text(transcript)
        print(transcript_path, 'done')

chunk_transcriptions_to_full_transcript()