In [1]:
from dj_notebook import activate

plus = activate()

Output()

In [40]:
import re
import json
import httpx

import subprocess

from pathlib import Path
from contextlib import chdir
from datetime import timedelta

## Get all mp3 URLs

First, we get all the mp3 URLs from all audio objects and create one
directory for each audio file.

In [67]:
class MP3Url:
    def __init__(self, base_dir, url, title):
        self.base_dir = base_dir
        self.url = url
        self.title = title
        self.prefix = url.split("/")[-1].split(".")[0]
        self.podcast_dir = base_dir / self.prefix

    def __repr__(self):
        return self.title


def download(target_path, url):
    response = httpx.get(url)
    with target_path.open("wb") as file:
        file.write(response.content)    


def split_mp3_into_chunks(mp3_path, base_name):
    print("mp3 dir: ", mp3_path.parent)
    with chdir(mp3_path.parent):
        subprocess.run([
            "ffmpeg", "-i", mp3_path.name, "-f", "segment",
            "-segment_time", "1200", "-c", "copy", f"{base_name}_out_%03d.mp3"
        ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)


def prepare_for_whisper(mp3_url):
    podcast_dir = mp3_url.podcast_dir
    podcast_dir.mkdir(exist_ok=True)
    episode_path = podcast_dir / f"{mp3_url.prefix}.mp3"
    if not episode_path.exists():
        download(episode_path, mp3_url.url)
    if len(list(podcast_dir.glob("*mp3"))) < 2:
        split_mp3_into_chunks(episode_path, mp3_url.prefix)
    return mp3_url

In [68]:
for audio in plus.Audio.objects.all().order_by("-created")[3:8]:
    print(audio.mp3.url)

https://d2mmy4gxasde9x.cloudfront.net/cast_audio/pp_56.mp3
https://d2mmy4gxasde9x.cloudfront.net/cast_audio/pp_55.mp3
https://d2mmy4gxasde9x.cloudfront.net/cast_audio/pp_54.mp3
https://d2mmy4gxasde9x.cloudfront.net/cast_audio/pp_53.mp3
https://d2mmy4gxasde9x.cloudfront.net/cast_audio/pp_52.mp3


In [69]:
base_audio_dir = Path.cwd() / "audio"
mp3_urls = [MP3Url(base_audio_dir, audio.mp3.url, audio.title) for audio in plus.Audio.objects.all().order_by("-created")[3:8]]
# mp3_urls = [MP3Url(base_audio_dir, "https://d2mmy4gxasde9x.cloudfront.net/cast_audio/pp_60.mp3", "Python 3.13")]
prepared_urls = []
for mp3_url in mp3_urls:
    prepared_urls.append(prepare_for_whisper(mp3_url))
print(prepared_urls)

[DevOps Redux, Freelancing, Typescript und Typisierung, PyTest, Kubernetes]


## Generate the Transcripts Using MacWhisper

Manual Step 🫠.

- Open File
- Select all chunks
- Use groq whisperv3
- Export to DOTe and VTT

## Combine DOTe files

In [70]:
def parse_timecode(timecode):
    match = re.match(r"(\d+):(\d+):(\d+),(\d+)", timecode)
    if not match:
        raise ValueError(f"Invalid timecode format: {timecode}")
    h, m, s, ms = map(int, match.groups())
    return timedelta(hours=h, minutes=m, seconds=s, milliseconds=ms)

def format_timecode(delta):
    total_seconds = int(delta.total_seconds())
    hours = total_seconds // 3600
    minutes = (total_seconds % 3600) // 60
    seconds = total_seconds % 60
    milliseconds = delta.microseconds // 1000
    return f"{hours:02}:{minutes:02}:{seconds:02},{milliseconds:03}"

def process_files(file_list):
    combined_lines = []
    offset = timedelta()

    for filename in file_list:
        with open(filename, 'r') as f:
            data = json.load(f)
            for line in data["lines"]:
                new_line = dict(line)
                start_time = parse_timecode(line["startTime"])
                end_time = parse_timecode(line["endTime"])

                # Adjust times with offset
                new_line["startTime"] = format_timecode(start_time + offset)
                new_line["endTime"] = format_timecode(end_time + offset)
                combined_lines.append(new_line)

            # Update offset with the last endTime of this file
            if len(data["lines"]) > 0:
                last_end_time = parse_timecode(data["lines"][-1]["endTime"])
                # print("last_end_time: ", last_end_time)
                offset += last_end_time
                # print("offset: ", offset)

    return {"lines": combined_lines}


def combine_dote_files(mp3_url):
    podcast_dir = mp3_url.podcast_dir
    transcript_files = sorted(podcast_dir.glob(f"{mp3_url.prefix}_out_*.dote"))
    dote_combined = process_files(transcript_files)

    combined_dote_path = podcast_dir/f"{mp3_url.prefix}_dote.json" 
    with combined_dote_path.open("w") as f:
        json.dump(dote_combined, f)
    return podcast_dir, mp3_url.prefix, combined_dote_path

In [71]:
dote_paths = []
for mp3_url in prepared_urls:
    dote_paths.append(combine_dote_files(mp3_url))

## Transform DOTe to Podlove Format

In [72]:
def time_to_ms(time_str):
    h, m, s_ms = time_str.split(':')
    s, ms = s_ms.split(',')
    return int(h) * 3600000 + int(m) * 60000 + int(s) * 1000 + int(ms)

def convert_dote_to_podlove(input_file):
    with open(input_file, 'r') as infile:
        data = json.load(infile)
    
    transcripts = []
    for line in data.get("lines", []):
        start_ms = time_to_ms(line["startTime"])
        end_ms = time_to_ms(line["endTime"])
        transcript = {
            "start": line["startTime"].replace(',', '.'),
            "start_ms": start_ms,
            "end": line["endTime"].replace(',', '.'),
            "end_ms": end_ms,
            "speaker": line["speakerDesignation"],
            "voice": "",  # assuming no voice data is available
            "text": line["text"]
        }
        transcripts.append(transcript)
    
    return {"transcripts": transcripts}


def transform_dote_to_podlove(podcast_dir, podcast_prefix, dote_path):
    podlove_transcript = convert_dote_to_podlove(dote_path)
    podlove_path = podcast_dir / f"{podcast_prefix}_whisper3_podlove.json"
    with open(podlove_path, 'w') as f:
        json.dump(podlove_transcript, f)

In [73]:
for podcast_dir, podcast_preix, dote_path in dote_paths:
    transform_dote_to_podlove(podcast_dir, podcast_preix, dote_path)

## Combine VTT files

In [74]:
import webvtt


def parse_vtt_timecode(timecode):
    match = re.match(r"(\d+):(\d{2}):(\d{2})\.(\d{3})", timecode)
    if not match:
        raise ValueError(f"Invalid timecode format: {timecode}")
    h, m, s, ms = map(int, match.groups())
    return timedelta(hours=h, minutes=m, seconds=s, milliseconds=ms)


def format_vtt_timecode(delta):
    total_seconds = int(delta.total_seconds())
    hours = total_seconds // 3600
    minutes = (total_seconds % 3600) // 60
    seconds = total_seconds % 60
    milliseconds = delta.microseconds // 1000
    return f"{hours:02}:{minutes:02}:{seconds:02}.{milliseconds:03}"


def process_vtt_files(vtt_files):
    offset = timedelta()
    combined_vtt = webvtt.WebVTT()
    for path in vtt_files:
        vtt = webvtt.read(path)
        for caption in vtt:
            combined_start = format_vtt_timecode(parse_vtt_timecode(caption.start) + offset)
            caption.start = combined_start
            combined_delta_end = parse_vtt_timecode(caption.end) + offset
            combined_end = format_vtt_timecode(combined_delta_end)
            caption.end = combined_end
            combined_vtt.captions.append(caption)
        offset = combined_delta_end
        # print("last_caption: ", caption, caption.end, offset)
    return combined_vtt


def combine_vtt_files(mp3_url):
    podcast_dir = mp3_url.podcast_dir
    vtt_files = sorted(podcast_dir.glob(f"{mp3_url.prefix}_out_*.vtt"))
    combined_vtt = process_vtt_files(vtt_files)
    combined_vtt_path = podcast_dir / f"{mp3_url.prefix}_combined.vtt"
    with combined_vtt_path.open("w") as f:
        combined_vtt.write(f)
    return podcast_dir, mp3_url.prefix, combined_vtt_path

In [75]:
vtt_paths = []
for mp3_url in prepared_urls:
    vtt_paths.append(combine_vtt_files(mp3_url))

## Combine Text Files

In [76]:
def combine_txt_files(mp3_url):
    podcast_dir = mp3_url.podcast_dir
    txt_files = sorted(podcast_dir.glob(f"{mp3_url.prefix}_out_*.txt"))
    contents = []
    for path in txt_files:
        with path.open("r") as f:
            content = f.read()
        contents.append(content)
    combined_txt_path = podcast_dir / f"{mp3_url.prefix}_combined.txt"
    with combined_txt_path.open("w") as f:
        f.write(" ".join(contents))
    return podcast_dir, mp3_url.prefix, combined_txt_path

In [77]:
vtt_paths = []
for mp3_url in prepared_urls:
    vtt_paths.append(combine_vtt_files(mp3_url))