# Video Killed The Radio Star

Notebook by David Marx ([@DigThatData](https://twitter.com/digthatdata))

Shared under MIT license

## What is this?

Point this notebook at a youtube url and it'll make a music video for you.

## How this animation technique works

For each text prompt you provide, the notebook will...

1. Generate an image based on that text prompt
2. Use the generated image as the `init_image` to recombine with the text prompt to generate variations similar to the first image. This produces a sequence of extremely similar images based on the original text prompt
3. This image sequence is then repeated several times to produce a longer sequence

The technique demonstrated in this notebook was inspired by a [video](https://www.youtube.com/watch?v=WJaxFbdjm8c) created by Ben Gillin.


In [None]:
#pip install vktrs
!pip install -e .

In [None]:
# @markdown # Check GPU Status

from vktrs.utils import gpu_info

gpu_info()

In [None]:
# @title # 1. 🔑 Provide your API Key
# @markdown Running this cell will prompt you to enter your API Key below. 

# @markdown To get your API key, visit https://beta.dreamstudio.ai/membership

# @markdown ---

# @markdown A note on security best practices: **don't publish your API key.**

# @markdown We're using a form field designed for sensitive data like passwords.
# @markdown This notebook does not save your API key in the notebook itself,
# @markdown but instead loads your API Key into the colab environment. This way,
# @markdown you can make changes to this notebook and share it without concern
# @markdown that you might accidentally share your API Key. 
# @markdown 

use_stability_api = True # @param {type:'boolean'}

if use_stability_api:
    import os, getpass
    os.environ['STABILITY_KEY'] = getpass.getpass('Enter your API Key')


In [None]:
# @title # 4. Animation parameters

from omegaconf import OmegaConf
storyboard = OmegaConf.create()

storyboard.params = dict(

     video_url = 'https://www.youtube.com/watch?v=WJaxFbdjm8c' # @param {type:'string'}
    , theme_prompt = "extremely detailed, painted by ralph steadman and radiohead, beautiful, wow" # @param {type:'string'}

    , n_variations=1 # @param {type:'integer'}
    , image_consistency=0.85 # @param {type:"slider", min:0, max:1, step:0.01}
    , fps = 12 # @param {type:"slider", min:4, max:60, step:1}

    , output_filename = 'output.mp4' # @param {type:'string'}
    , add_caption = True # @param {type:'boolean'}
    , display_frames_as_we_get_them = True # @param {type:'boolean'}

    , optimal_ordering = True # @param {type:'boolean'}
    , whisper_seg = True # @param {type:'boolean'}
    , max_video_duration_in_seconds = 300 # @param {type:'integer'}

    , max_variations_per_opt_pass=15
    , use_stability_api = use_stability_api

)

storyboard.params.max_frames = storyboard.params.fps * storyboard.params.max_video_duration_in_seconds


print(f"Max total frames: {storyboard.params.max_frames}")
#print(f"Max API requests: {int(max_frames/repeat)}")

if storyboard.params.optimal_ordering:

    opt_batch_size = storyboard.params.n_variations
    while opt_batch_size > storyboard.params.max_variations_per_opt_pass:
        opt_batch_size /= 2
    print(f"Frames per re-ordering batch: {opt_batch_size}")
    storyboard.params.opt_batch_size = opt_batch_size


with open('storyboard.yaml') as fp:
    OmegaConf.save(config=storyboard, f=fp.name)

In [None]:
%%capture

# to do: these installations will be handled by installing vktrs from pypi
### installations and definitions

#!pip install yt-dlp
#!pip install python-tsp
#!pip install webvtt-py # only need this if srv2 isn't available

#https://github.com/explosion/tokenizations
#!pip install pytokenizations


import datetime as dt

import json
import os
from pathlib import Path
import re
import string
from subprocess import Popen, PIPE
import textwrap
import time
import warnings

import tokenizations
import webvtt

from IPython.display import display
import numpy as np


from itertools import chain, cycle

from tqdm.autonotebook import tqdm

from vktrs.youtube import (
    YoutubeHelper,
    parse_timestamp,
    vtt_to_token_timestamps,
    srv2_to_token_timestamps,
)

In [None]:
%%capture

# Download subtitles and audio from youtube

video_url = storyboard.params.video_url
helper = YoutubeHelper(video_url)

# to do: check if user provided an audio filepath before attempting to download from youtube

input_audio = helper.info['requested_downloads'][-1]['filepath']
!ffmpeg -y -i "{input_audio}" -acodec libmp3lame audio.mp3

# to do: write audio and subtitle paths/meta to storyboard

subtitle_format = helper.info['requested_subtitles']['en']['ext']
subtitle_fpath = helper.info['requested_subtitles']['en']['filepath']


if subtitle_format == 'srv2':
    with open(subtitle_fpath, 'r') as f:
        srv2_xml = f.read() 
    token_start_times = srv2_to_token_timestamps(srv2_xml)

elif subtitle_format == 'vtt':
    captions = webvtt.read(subtitle_fpath)
    token_start_times = vtt_to_token_timestamps(captions)

# If unable to download supported subtitles, force use whisper
else:
    storyboard.params.whisper_seg = True

whisper_seg = storyboard.params.whisper_seg

In [None]:
### transcribe and segment speech using whisper

if whisper_seg:
    !pip install git+https://github.com/openai/whisper
    from vktrs.asr import whisper_lyrics

    prompt_starts = whisper_lyrics(audio_fpath="audio.mp3")

    #storyboard.prompt_starts = prompt_starts
    # to do: deal with these td objects
    #with open('storyboard.yaml') as fp:
    #    OmegaConf.save(config=storyboard, f=fp.name)

In [None]:
### This cell computes how many frames are needed for each segment
### based on the start times for each prompt

import datetime as dt
fps = storyboard.params.fps

ifps = dt.timedelta(seconds=1/fps)

# estimate video end
video_duration = dt.timedelta(seconds=helper.info['duration'])

# dummy prompt for last scene duration
prompt_starts.append({'td':video_duration})

# make sure we respect the duration of the previous phrase
frame_start=dt.timedelta(seconds=0)
prompt_starts[0]['anim_start']=frame_start
for i, rec in enumerate(prompt_starts[1:], start=1):
  rec_prev = prompt_starts[i-1]
  k=0
  while rec_prev['anim_start'] + k*ifps < rec['td']:
    k+=1
  k-=1
  rec_prev['frames'] = k
  rec_prev['anim_duration'] = k*ifps
  frame_start+=k*ifps
  rec['anim_start']=frame_start

# make sure we respect the duration of the previous phrase
# to do: push end time into a timedelta and consider it... somewhere near here
for i, rec1 in enumerate(prompt_starts):
    rec0 = prompt_starts[i-1]
    rec0['duration'] = rec1['td'] - rec0['td']

# drop the dummy frame
prompt_starts = prompt_starts[:-1]

# to do: given a 0 duration prompt, assume its duration is captured in the next prompt 
#        and guesstimate a corrected prompt start time and duration 



In [None]:
### checkpoint the processing work we've done to this point

import copy

prompt_starts_copy = copy.deepcopy(prompt_starts)

for rec in prompt_starts_copy:
    for k,v in list(rec.items()):
        if isinstance(v, dt.timedelta):
            rec[k] = v.total_seconds()

        # flush image objects if they're there, they anger omegaconf
        if k in ('frame0','variations','images', 'images_raw'):
            rec.pop(k)

storyboard.prompt_starts = prompt_starts_copy

# to do: deal with these td objects
with open('storyboard.yaml') as fp:
    OmegaConf.save(config=storyboard, f=fp.name)

In [None]:
# generate animation

from omegaconf import OmegaConf

storyboard = OmegaConf.load('storyboard.yaml')
prompt_starts = storyboard.prompt_starts

# force use api for now
storyboard.params.use_stability_api = True

import copy
import datetime as dt
import string
import random

import PIL
from pathlib import Path

from vktrs.api import (
    get_image_for_prompt
)
from vktrs.tsp import (
    tsp_permute_frames,
    batched_tsp_permute_frames,
)

from vktrs.utils import (
    add_caption2image,
    save_frame,
)

from vktrs.utils import remove_punctuation

if storyboard.params.use_stability_api:
    from vktrs.api import get_image_for_prompt
else:
    raise NotImplementedError(
        'Image generation with this notebook currently depends on the stability api. '
        'Support for inference using the huggingface diffusers library (i.e. no api required) '
        'will be added soon.'
    )


def get_variations_w_init(prompt, init_image, **kargs):
    return list(get_image_for_prompt(prompt=prompt, init_image=init_image, **kargs))

def get_close_variations_from_prompt(prompt, n_variations=2, image_consistency=.7):
    """
    prompt: a text prompt
    n_variations: total number of images to return
    image_consistency: float in [0,1], controls similarity between images generated by the prompt.
                        you can think of this as controlling how much "visual vibration" there will be.
                        - 0=regenerate each iandely identical
    """
    images = list(get_image_for_prompt(prompt))
    for _ in range(n_variations - 1):
        img = get_variations_w_init(prompt, images[0], start_schedule=(1-image_consistency))[0]
        images.append(img)
    return images


frames = []
theme_prompt = storyboard.params.theme_prompt
optimal_ordering = storyboard.params.optimal_ordering
add_caption = False # storyboard.params.add_caption
display_frames_as_we_get_them = storyboard.params.display_frames_as_we_get_them
max_frames = storyboard.params.max_frames
image_consistency = storyboard.params.image_consistency
n_variations = storyboard.params.n_variations
max_variations_per_opt_pass = storyboard.params.max_variations_per_opt_pass


# to do: move this up to run params
proj_name = 'test'

print("Ensuring each prompt has an associated image")
for idx, rec in enumerate(prompt_starts):
    print(
        f"[{rec['anim_start']} | {rec['ts']}] [{rec['duration']} | {rec['anim_duration']}] - {rec['frames']} - {rec['prompt']}"
    )
    lyric = rec['prompt']
    prompt = f"{lyric}, {theme_prompt}"
    if rec.get('frame0_fpath') is None:
        init_image = list(get_image_for_prompt(prompt))[0]
        rec['frame0_fpath'] = save_frame(
            init_image,
            idx,
            root_path=Path('./frames') / proj_name,
            name=proj_name, ## to do.... uh... i dunno
            )

        if display_frames_as_we_get_them:
            print(lyric)
            display(init_image)




In [None]:

# update config

prompt_starts_copy = copy.deepcopy(prompt_starts)

for rec in prompt_starts_copy:
    for k,v in list(rec.items()):
        if isinstance(v, dt.timedelta):
            rec[k] = v.total_seconds()
        # flush images for now
        if k in ('frame0','variations','images', 'images_raw'):
            rec.pop(k)

storyboard.prompt_starts = prompt_starts_copy

# to do: deal with these td objects
with open('storyboard.yaml') as fp:
    OmegaConf.save(config=storyboard, f=fp.name)

In [None]:
from omegaconf import OmegaConf
from PIL import Image

from itertools import cycle

# reload config
storyboard = OmegaConf.load('storyboard.yaml')
prompt_starts = OmegaConf.to_container(storyboard.prompt_starts, resolve=True)

# load init_images and generate variations as needed
# to do: use SDK args to request multiple images in single request...
print("Fetching variations")
for idx, rec in enumerate(prompt_starts):
    images = []

    init_image = Image.open(rec['frame0_fpath'])
    n_variations = rec.get('variations', storyboard.params.n_variations)
    for _ in range(n_variations - 1):
      img = get_variations_w_init(prompt, init_image, start_schedule=(1-image_consistency))[0]
      images.append(img)
    rec['variations'] = images
    images = [init_image] + images

    rec['variations_fpaths'] = [
        save_frame(
            img,
            idx,
            root_path=Path('./frames') / proj_name,
            name=proj_name, ## to do.... uh... i dunno
        ) for j, img in enumerate(rec['variations'])
    ]

    # to do: persist the ordering in the storyboard
    if optimal_ordering:
        images = batched_tsp_permute_frames(
            images,
            max_variations_per_opt_pass
        )
    rec['images'] = rec['images_raw'] = images

    if add_caption:
        rec['images'] = [add_caption2image(im, lyric) for im in rec['images']]
    
    rec['images_fpaths'] = [
        save_frame(
            img,
            idx,
            root_path=Path('./frames') / proj_name,
            name=proj_name, ## to do.... uh... i dunno
        ) for j, img in enumerate(rec['images'])
    ]

    if display_frames_as_we_get_them:
        print(lyric)
        for im in images:
            display(im)

    #images *= repeat
    sequence = []
    frame_factory = cycle(images)
    while len(sequence) < rec['frames']:
        sequence.append(next(frame_factory))
    frames.extend(sequence)
    if len(frames) >= max_frames:
        break

In [None]:
# @title # 6. 🎥 Compile your video!

from subprocess import Popen, PIPE

from omegaconf import OmegaConf
from tqdm.autonotebook import tqdm

# reload config
storyboard = OmegaConf.load('storyboard.yaml')
fps = storyboard.params.fps

#input_audio = '/content/Ai Generated Music Video - Deltron 3030 - Virus [WJaxFbdjm8c].webm'

# to do: add this to dmarx/fine ...which really needs a better name. as usual.

output_filename = storyboard.params.output_filename
input_audio = 'audio.mp3'

# to do: read frames and variations back into memory. This should be the last cell that gets run, so we need to 
# update state wrt any user interventions in the storyboard object

#from tqdm import tqdm # to do: auto

cmd_in = ['ffmpeg', '-y', '-f', 'image2pipe', '-vcodec', 'png', '-r', str(fps), '-i', '-']
cmd_out = ['-vcodec', 'libx264', '-r', str(fps), '-pix_fmt', 'yuv420p', '-crf', '1', '-preset', 'veryslow', '-shortest', output_filename]

if input_audio:
  cmd_in += ['-i', str(input_audio), '-acodec', 'libmp3lame']

cmd = cmd_in + cmd_out

p = Popen(cmd, stdin=PIPE)
#for im in tqdm(chain(frames)):
for im in tqdm(frames):
  im.save(p.stdin, 'PNG')
p.stdin.close()

print("Encoding video...")
p.wait()
print("Video complete.")
print(f"Video saved to: {output_filename}")

In [None]:
!tar -czvf output.tar.gz output.mp4

# to do

## on-disk storyboard

* [ ] animation specified by a config file. assumed to start empty but doesn't need to. whether or not to perform a step is determined by whether or not the config file is populated
* [ ] after url is downloaded, subtitles and audio files get logged in config
* [x] after prompts are parsed, prompts and start times go into config
* [x] frame counts and animation start times
* [x] variations to compute per scene
* [x] init_images, generated as needed. locations logged in config

* **recommended intervention point**

* [ ] variations generated, saved to disk, locations saved to stoyrboard

* **final intervention point** presenting the storyboard to the user somehow for approval would be nice

* [ ] video compiled, saved to disk, and outfilename saved to storyboard


## workflow changes
* generate single prompts first and then circle back to variations after. Give user an opportunity to re-generate prompts before committing to the variation generation
  - add some sort of 'storyboard' experience
* separately save image outputs and images with text overlaid
* facilitate "resume" operation for generating (more) variations from a particular prompt 
* if possible try to come up with a way to sync visual vibration rate with audio beat
* wrap subtitle2prompts utility
* add mechanism for auto-extracting lyrics based on scene duration and target tokens per prompt
* think about how to port this into deforum
* optionally encode segments separately
* Permit user to specify non-english base language
  - still download and use en captions for translated prompts?
  - use base language captions for prompt alignment
  - use user-specified lyrics in base language to paste over images