In [11]:
from manual_influencer import constants, script_generator

#mjson = script_generator.generate_script(text=constants.TEXT_TO_SCENES_PROMPT)

In [12]:
import json
#data = json.loads(mjson)

In [13]:
from google import genai
from google.genai import types
import base64

def generate_scene_image_description(prompt):
    client = genai.Client(
      vertexai=True,
      project=constants.GCP_PROJECT_ID,
      location="global",
    )

    si_text1 = """make an english description of an image matching this scene description. black and white, rough sketch, cartoonish style. describe the objects rather than using technical terms. only answer with the description. no other text."""

    model = "gemini-2.0-flash-001"
    contents = [
        types.Content(
          role="user",
          parts=[
            types.Part.from_text(text=prompt)
          ]
        ),
    ]
    generate_content_config = types.GenerateContentConfig(
        temperature = 1,
        top_p = 0.95,
        max_output_tokens = 8192,
        response_modalities = ["TEXT"],
        safety_settings = [types.SafetySetting(
          category="HARM_CATEGORY_HATE_SPEECH",
          threshold="OFF"
        ),types.SafetySetting(
          category="HARM_CATEGORY_DANGEROUS_CONTENT",
          threshold="OFF"
        ),types.SafetySetting(
          category="HARM_CATEGORY_SEXUALLY_EXPLICIT",
          threshold="OFF"
        ),types.SafetySetting(
          category="HARM_CATEGORY_HARASSMENT",
          threshold="OFF"
        )],
        system_instruction=[types.Part.from_text(text=si_text1)],
    )

    mstr = ''

    for chunk in client.models.generate_content_stream(
        model = model,
        contents = contents,
        config = generate_content_config,
        ):
        print(chunk.text, end="")
        mstr += chunk.text

    return mstr
    

#generate()

In [14]:
from vertexai.preview.vision_models import ImageGenerationModel
import vertexai

#pmt = "A close-up, roughly sketched, black and white cartoon drawing of a powerline adapter, suggesting a simple, perhaps technical illustration."
def generate_image(prompt, filename):
    print("Prompt: " + prompt)
    vertexai.init(project=constants.GCP_PROJECT_ID, location="us-central1")
    generation_model = ImageGenerationModel.from_pretrained("imagen-3.0-generate-002")

    images = generation_model.generate_images(
        prompt=prompt,
        number_of_images=1,
        aspect_ratio="9:16",
        negative_prompt="",
        person_generation="",
        safety_filter_level="",
        add_watermark=True,
    )

    images[0].save(filename)
    
# generate_image("A close-up, roughly sketched, black and white cartoon drawing of a powerline adapter, suggesting a simple, perhaps technical illustration", "data/visual/example.png")

In [15]:
import subprocess
import json
import wave
import contextlib
from mutagen.mp3 import MP3

def get_audio_duration(filepath):
    audio = MP3(filepath)
    print(audio.info.length)
    return audio.info.length

In [16]:
import cv2
import os
import subprocess
import imageio_ffmpeg
import ffmpeg
import math
import gc
import time

def generate_video(image_folder = 'data/images', audio_file = './data/synthesis.wav', output_file = './data/out.mp4'):
    try:
        os.remove('./finished_video.mp4')
        os.remove('./video.avi')
        os.remove('./video.mp4')
    except:
        pass

    images = [img for img in os.listdir(image_folder) if img.endswith(".png")]
    frame = cv2.imread(os.path.join(image_folder, images[0]))
    height, width, layers = frame.shape

    video = cv2.VideoWriter('./video.avi', 0, 30, (width,height))
    
    audio_duration = get_audio_duration(audio_file)
    vid_duration_frames = audio_duration * 30
    frame_count_per_image = int(math.ceil(vid_duration_frames / len(images)))

    for image in images:
        for _ in range(frame_count_per_image):
            video.write(cv2.imread(os.path.join(image_folder, image)))

    #cv2.destroyAllWindows()
    video.release()
    
    ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe()
    subprocess.run([
        ffmpeg_exe,
        '-y',               # overwrite output if exists
        '-i', 'video.avi',  # input video
        '-c:v', 'libx264',  # video codec
        '-c:a', 'aac',      # audio codec
        '-shortest',        # finish when the shorter stream ends
        'video.mp4'         # output file
    ])
    
    subprocess.run([
        ffmpeg_exe,
        '-y',
        '-i', 'video.mp4',
        '-i', audio_file,
        '-c:v', 'copy',
        '-c:a', 'aac',
        '-shortest',
        output_file
    ])

    #input_video = ffmpeg.input('./video.mp4')

    #input_audio = ffmpeg.input(audio_file)

    gc.collect()
    time.sleep(0.1)
    
    #ffmpeg.concat(input_video, input_audio, v=1, a=1).output(output_file).run(cmd=ffmpeg_exe)
    os.remove('./video.avi')
    os.remove('./video.mp4')

# generate_video(image_folder="./data/visual", audio_file='./data/audio/output.mp3', output_file = '0.mp4')

In [17]:
import ffmpeg
import imageio_ffmpeg
import os

def concat_videos(video_paths, output_path='concatenated.mp4'):
    # Create the concat input text file
    with open('concat_list.txt', 'w') as f:
        for path in video_paths:
            f.write(f"file '{os.path.abspath(path)}'\n")

    ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe()

    # Run ffmpeg concat
    subprocess.run([
        ffmpeg_exe,
        '-f', 'concat',
        '-safe', '0',
        '-i', 'concat_list.txt',
        '-c', 'copy',
        output_path
    ])

    os.remove('concat_list.txt')


In [18]:
from pydub import AudioSegment
from pydub.effects import speedup

def speedup_speech(input_file='./data/audio/output_raw.mp3', output_file='./data/audio/output.mp3', speed=1.4):
    audio = AudioSegment.from_file(input_file)
    faster_audio = speedup(audio, playback_speed=speed)
    faster_audio.export(output_file, format="mp3")

In [19]:
from manual_influencer.tts import synthesize_speech
import shutil
from pathlib import Path
import os
import time

seconds_per_image = 4

for mp4_file in Path('./vids/').glob('*.mp4'):
    try:
        os.remove(mp4_file)
    except:
        pass

for k in range(1, 2):
    try:
        os.makedirs("vids", exist_ok=True)
        os.makedirs("data", exist_ok=True)
        os.makedirs("data/visual", exist_ok=True)
        os.makedirs("data/audio", exist_ok=True)
        
        mjson = script_generator.generate_script(text=constants.TEXT_TO_SCENES_PROMPT)
        data = json.loads(mjson)
        
        visual_folder = './data/visual'
        for f in os.listdir(visual_folder):
            path = os.path.join(visual_folder, f)
            if os.path.isfile(path):
                os.remove(path)

        try:
            os.remove('./concatenated.mp4')
        except:
            pass

        for mp4_file in Path('.').glob('*.mp4'):
            try:
                os.remove(mp4_file)
            except:
                pass

        images = []
        idx = 0

        for scene in data['scenes']:
            print(scene)
            visual = "A roughly sketched, cartoonish black and white image with no text; "
            visual += generate_scene_image_description(scene['visual'])
            synthesize_speech(text=scene['audio'], output_file='./data/audio/output_raw.mp3')
            speedup_speech(input_file='./data/audio/output_raw.mp3', output_file='./data/audio/output.mp3', speed=1.2)
            time_in_s = get_audio_duration('./data/audio/output.mp3')
            for i in range(1, int(time_in_s / 4 + 2)):
                generate_image(visual, visual_folder + '/images_00' + str(i) + '.png')
                time.sleep(16)
            generate_video(image_folder=visual_folder, audio_file='./data/audio/output.mp3', output_file = str(idx) + '.mp4')
            for f in os.listdir(visual_folder):
                path = os.path.join(visual_folder, f)
                if os.path.isfile(path):
                    os.remove(path)
            idx += 1

        ml = []

        for mp4_file in Path('.').glob('*.mp4'):
            ml.append(mp4_file)

        print('Creating vid: ' + str(k))
        concat_videos(ml, output_path='./vids/vid_' + str(k) + '.mp4')
    except Exception as e:
        print("error:", e)
        pass

{
  "scenes": [
    {
      "visual": "Ein Mann versucht mit einem überdimensionalen Gummihammer ein rohes Ei zu zerbrechen.",
      "audio": "Ich hab's gleich...Moment...Nur noch ein kleiner Schubs."
    },
    {
      "visual": "Das Ei ist unversehrt. Der Mann holt einen Winkelschleifer aus dem Werkzeugkoffer.",
      "audio": "Das Ei ist wohl doch härter als gedacht. Dann muss wohl die Schleifkraft ran."
    },
    {
      "visual": "Nahaufnahme des Winkelschleifers, der mit voller Wucht auf das Ei trifft. Eigelb spritzt.",
      "audio": "Oha! Das ging schneller als gedacht. Vielleicht doch etwas übertrieben."
    },
    {
      "visual": "Der Mann steht bedröppelt da, Eigelb bedeckt sein Gesicht und den Arbeitsbereich.",
      "audio": "Naja, wenigstens ist das Ei jetzt auf. Nächstes Mal probier ich es vielleicht doch lieber mit einem Messer."
    }
  ]
}{'visual': 'Ein Mann versucht mit einem überdimensionalen Gummihammer ein rohes Ei zu zerbrechen.', 'audio': "Ich hab's gleich..

In [20]:
from pathlib import Path

ml = []

#for mp4_file in Path('.').glob('*.mp4'):
#    ml.append(mp4_file)
    
#concat_videos(ml)