In [None]:
import os, re, json, shutil
from pathlib import Path
import subprocess
from PIL import Image, ImageDraw, ImageFont
import soundfile as sf
import numpy as np
import textwrap

os.environ['PYTORCH_ENABLE_MPS_FALLBACK'] = '1'
from kokoro import KPipeline
import torch

# import nltk
# nltk.download('punkt')
# nltk.download('punkt_tab')
from nltk.tokenize import sent_tokenize

In [2]:
SCREEN_SIZE = (720, 1280)
FONT_SIZE = 45
MIN_FONT_SIZE = 24
FRAME_DIR = Path("video-resource/frames")
AUDIO_DIR = Path("video-resource/audio")
OUTPUT_VIDEO_DIR = Path("video-resource/output")

FRAME_DIR.mkdir(parents=True, exist_ok=True)
AUDIO_DIR.mkdir(parents=True, exist_ok=True)

FONT_DIR = Path("video-resource/fonts")
FONTS = {
    "bold": str(FONT_DIR / "BearSansUI-Bold.otf"),
    "italic": str(FONT_DIR / "BearSansUI-Italic.otf"),
    "regular": str(FONT_DIR / "BearSansUI-Regular.otf")
}

def load_fonts(font_paths, size_range):
    font_cache = {"bold": {}, "italic": {}, "regular": {}}
    for style, path in font_paths.items():
        for size in range(size_range[0], size_range[1] + 1):
            try:
                font_cache[style][size] = ImageFont.truetype(path, size)
            except IOError:
                font_cache[style][size] = ImageFont.load_default()
    return font_cache
FONT_CACHE = load_fonts(FONTS, (MIN_FONT_SIZE, FONT_SIZE + 10))

pipeline = KPipeline(lang_code="a", repo_id="hexgrad/Kokoro-82M")
frame_index = 0

  WeightNorm.apply(module, name, dim)


In [3]:
def split_text(text):
    if not text: return []
    numbered_list_pattern = re.compile(r"^\d+\.\s+")
    chunks = []
    for line in text.strip().split('\n'):
        line = line.strip()
        if not line: continue
        if numbered_list_pattern.match(line):
            chunks.append(line)
        else:
            chunks.extend(sent_tokenize(line))
    return [s.strip() for s in chunks if s.strip()]

def chunk_long_text(text, threshold=220, max_length=200):
    if len(text) <= threshold:
        return [text]

    chunks = textwrap.wrap(text, width=max_length, break_long_words=False, break_on_hyphens=False)
    
    if len(chunks) <= 1:
        return chunks

    modified_chunks = []
    num_chunks = len(chunks)
    for i, chunk in enumerate(chunks):
        if i > 0:
            chunk = "... " + chunk
        if i < num_chunks - 1:
            chunk = chunk + "..."
        modified_chunks.append(chunk)

    return modified_chunks

def wrap_text_by_pixels(draw, text, font, max_width):
    lines = []
    words = text.split()
    if not words: return []
    current_line = words[0]
    for word in words[1:]:
        if draw.textlength(current_line + " " + word, font=font) <= max_width:
            current_line += " " + word
        else:
            lines.append(current_line)
            current_line = word
    lines.append(current_line)
    return lines

def pre_combine_audio(audio_dir, output_path):
    audio_files = sorted(audio_dir.glob("part_*.wav"))
    if not audio_files:
        return False
        
    combined_data = []
    for file_path in audio_files:
        data, sample_rate = sf.read(file_path)
        if sample_rate != 24000:
            continue
        combined_data.append(data)
    
    master_audio_data = np.concatenate(combined_data)
    sf.write(output_path, master_audio_data, 24000)
    return True

In [None]:
def create_text_image(header=None, subtitle=None, body=None, is_quote=False, is_summary=False):
    img = Image.new("RGB", SCREEN_SIZE, "white")
    draw = ImageDraw.Draw(img)
    padding = 80
    max_width = SCREEN_SIZE[0] - 2 * padding
    y_pos = padding
    line_spacing = 10

    if header:
        font = FONT_CACHE["bold"].get(int(FONT_SIZE * 1.2), ImageFont.load_default())
        for line in wrap_text_by_pixels(draw, header, font, max_width):
            draw.text((padding, y_pos), line, font=font, fill="black")
            y_pos += font.getbbox(line)[3] + line_spacing
        y_pos += line_spacing

    if subtitle:
        font = FONT_CACHE["italic"].get(FONT_SIZE, ImageFont.load_default())
        for line in wrap_text_by_pixels(draw, subtitle, font, max_width):
            draw.text((padding, y_pos), line, font=font, fill="gray")
            y_pos += font.getbbox(line)[3] + line_spacing
        y_pos += line_spacing

    if body:
        body_top_y = y_pos
        available_height = SCREEN_SIZE[1] - body_top_y - padding
        current_font_size = FONT_SIZE

        font_style = "regular"
        fill_color = "black"
        if is_quote:
            font_style = "italic"
        elif is_summary:
            font_style = "italic"
            fill_color = "gray" 
        
        while current_font_size >= MIN_FONT_SIZE:
            body_font = FONT_CACHE[font_style].get(current_font_size, ImageFont.load_default())
            body_lines = wrap_text_by_pixels(draw, body, body_font, max_width)
            total_text_height = sum(body_font.getbbox(line)[3] + line_spacing for line in body_lines) - line_spacing
            if total_text_height <= available_height:
                break
            else:
                current_font_size -= 2
        
        body_start_y = body_top_y + (available_height - total_text_height) / 2
        
        if is_quote or is_summary:
            bar_width = 4
            bar_padding = 20
            bar_x0 = padding - bar_padding - bar_width
            bar_x1 = padding - bar_padding
            draw.rectangle([(bar_x0, body_start_y), (bar_x1, body_start_y + total_text_height)], fill="lightgray")

        for line in body_lines:
            draw.text((padding, body_start_y), line, font=body_font, fill=fill_color)
            body_start_y += body_font.getbbox(line)[3] + line_spacing

    return img

In [5]:
def generate_audio_and_frame(text_to_speak, header=None, subtitle=None, is_quote=False, is_summary=False):
    global frame_index
    audio_path = AUDIO_DIR / f"part_{frame_index:04d}.wav"
    full_audio, text_parts = [], []
    
    for gs, _, audio in pipeline(text_to_speak, voice="af_heart", speed=1.0):
        if audio is not None:
            full_audio.append(audio)
            text_parts.append(gs) 
    
    if not full_audio:
        print(f"Warning: No audio generated for text: '{text_to_speak}'")
        return
        
    text_for_image = "".join(text_parts)
    display_text = text_for_image
    if is_quote:
        display_text = display_text.replace("Start quote.", "").replace("End quote.", "").strip()

    if not is_quote and not is_summary and (display_text == header or display_text == subtitle):
        display_text = None
    
    image = create_text_image(header=header, subtitle=subtitle, body=display_text, is_quote=is_quote, is_summary=is_summary)
    image_path = FRAME_DIR / f"frame_{frame_index:04d}.png"
    image.save(image_path)

    combined_audio = torch.cat(full_audio).unsqueeze(0)
    sf.write(str(audio_path), combined_audio.squeeze().cpu().numpy(), 24000)
    frame_index += 1

In [6]:
def process_article(json_path):
    with open(json_path, "r", encoding="utf-8") as f:
        article = json.load(f)
    main_title = article.get("title", "")
    subtitle = article.get("subtitle", "")
    sections = article.get("sections", [])
    
    for sentence in split_text(main_title):
        for chunk in chunk_long_text(sentence):
            generate_audio_and_frame(text_to_speak=chunk, header=chunk)

    for sentence in split_text(subtitle):
        for chunk in chunk_long_text(sentence):
            generate_audio_and_frame(text_to_speak=chunk, header=main_title, subtitle=chunk)

    for section in sections:
        section_title = section.get("title", "")
        summary_text = section.get("summary")
        header = section_title if section_title and section_title.lower() != main_title.lower() else main_title
        
        if section_title and section_title.lower() != main_title.lower():
            for sentence in split_text(section_title):
                for chunk in chunk_long_text(sentence):
                    generate_audio_and_frame(text_to_speak=chunk, header=chunk)

        if summary_text:
            for chunk in chunk_long_text(f"Here is the summary of {section_title}:"):
                 generate_audio_and_frame(text_to_speak=chunk, header=header)
            
            for sentence in split_text(summary_text):
                for chunk in chunk_long_text(sentence):
                    generate_audio_and_frame(text_to_speak=chunk, header=header, is_summary=True)
            
            for chunk in chunk_long_text("End of Summary. Now reading the main article:"):
                generate_audio_and_frame(text_to_speak=chunk, header=header)
        
        for para in section.get("content", []):
            if para.startswith("<start quote>"):
                quote_content = para.replace("<start quote>", "").replace("<end quote>", "").strip()
                quote_sentences = split_text(quote_content)
                if not quote_sentences: continue

                all_quote_display_chunks = []
                for sentence in quote_sentences:
                    all_quote_display_chunks.extend(chunk_long_text(sentence))

                if not all_quote_display_chunks: continue
                
                if len(all_quote_display_chunks) == 1:
                    text_to_speak = f"Start quote. {all_quote_display_chunks[0]} End quote."
                    generate_audio_and_frame(text_to_speak=text_to_speak, header=header, is_quote=True)
                else:
                    generate_audio_and_frame(text_to_speak=f"Start quote. {all_quote_display_chunks[0]}", header=header, is_quote=True)
                    for chunk in all_quote_display_chunks[1:-1]:
                        generate_audio_and_frame(text_to_speak=chunk, header=header, is_quote=True)
                    generate_audio_and_frame(text_to_speak=f"{all_quote_display_chunks[-1]} End quote.", header=header, is_quote=True)
            else:
                for sentence in split_text(para):
                    for chunk in chunk_long_text(sentence):
                        generate_audio_and_frame(text_to_speak=chunk, header=header)

In [7]:
def render_video_with_ffmpeg(title, master_audio_path):    
    filelist_path = FRAME_DIR / "filelist.txt"
    with open(filelist_path, "w") as f:
        audio_files = sorted(AUDIO_DIR.glob("part_*.wav"))
        for audio_file in audio_files:
            idx = audio_file.stem.split("_")[1]
            image_file = FRAME_DIR / f"frame_{idx}.png"
            if image_file.exists():
                try:
                    duration = sf.info(str(audio_file)).duration
                    if duration < 0.01: continue
                    f.write(f"file '{image_file.resolve()}'\n")
                    f.write(f"duration {duration}\n")
                except Exception:
                    continue

    output_path = OUTPUT_VIDEO_DIR / f"{title}.mp4"
    
    command = [
        "ffmpeg",
        "-f", "concat",
        "-safe", "0",
        "-i", str(filelist_path),
        "-i", str(master_audio_path),
        "-c:v", "libx264",
        "-pix_fmt", "yuv420p",
        "-c:a", "copy",
        # "-c:a", "aac",
        # "-b:a", "320k",
        "-shortest",
        "-y",
        str(output_path)
    ]

    try:
        result = subprocess.run(command, check=True, capture_output=True, text=True)
        print(f"\n✅ FFMPEG rendering successful. Video saved to {output_path}")
    except subprocess.CalledProcessError as e:
        print("❌ FFMPEG rendering failed.")
        print("FFMPEG stderr:", e.stderr)
    except FileNotFoundError:
        print("❌ FFMPEG not found. Please ensure FFMPEG is installed and in your system's PATH.")

In [8]:
FRAME_DIR.mkdir(exist_ok=True)
AUDIO_DIR.mkdir(exist_ok=True)

process_article("processed-json-with-summary/Money Stuff - A Drug-Trial Stock Sale.json")
master_audio_file = AUDIO_DIR / "master_audio.wav"


✅ FFMPEG rendering successful. Video saved to video-resource/output/Money Stuff - A Drug-Trial Stock Sale.mp4


In [9]:
if pre_combine_audio(AUDIO_DIR, master_audio_file):
    render_video_with_ffmpeg("Money Stuff - A Drug-Trial Stock Sale", master_audio_file)


✅ FFMPEG rendering successful. Video saved to video-resource/output/Money Stuff - A Drug-Trial Stock Sale.mp4


In [None]:
directory_path = Path('processed-json-with-summary/')

if directory_path.is_dir():
    for entry in directory_path.iterdir():
        FRAME_DIR.mkdir(exist_ok=True)
        AUDIO_DIR.mkdir(exist_ok=True)
        process_article(entry)
        master_audio_file = AUDIO_DIR / "master_audio.wav"
        if pre_combine_audio(AUDIO_DIR, master_audio_file):
            render_video_with_ffmpeg(entry.stem, master_audio_file)
        shutil.rmtree("video-resource/audio")
        shutil.rmtree("video-resource/frames")

Updates:
- Consider adding images from main article
- Find a solution to the long pauses between broken down chunks of sentences.
- Incorporate "multiprocessing" to accelerate processing