In [1]:
!apt-get update
!apt-get install -y ffmpeg

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
0% [Connecting to archive.ubuntu.com] [Waiting for headers] [Connected to r2u.stat.illinois.edu (192                                                                                                    Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:4 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done
W: Skipping acquire of config

In [2]:
!pip install gradio torch diffusers transformers numpy imageio imageio-ffmpeg gtts pydub moviepy



In [3]:
import torch
from diffusers import DiffusionPipeline, DPMSolverMultistepScheduler
import numpy as np
import imageio
import os
from gtts import gTTS
from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips
import tempfile
import gradio as gr
from PIL import Image
import cv2

# Set up your Hugging Face token
os.environ["HUGGING_FACE_HUB_TOKEN"] = "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"

class TextToVideoGenerator:
    def __init__(self):
        # Load T2V pipeline with a stable model
        print("Loading Text-to-Video model...")
        self.pipe = DiffusionPipeline.from_pretrained(
            "damo-vilab/text-to-video-ms-1.7b",
            torch_dtype=torch.float16,
            variant="fp16"
        )

        # Move to GPU if available
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        if self.device == "cuda":
            self.pipe.to(self.device)
            # Optimize for speed with less memory usage
            self.pipe.scheduler = DPMSolverMultistepScheduler.from_config(self.pipe.scheduler.config)
            self.pipe.enable_vae_slicing()
        print(f"Model loaded and running on {self.device}")

        # Initialize TTS pipeline
        print("Loading Text-to-Speech engine...")
        self.tts = gTTS

        # Store the last frame of previous segment for continuity
        self.last_frame = None
        self.style_seed = None

        # Define voice configurations
        self.voice_configs = {
            "Adult Male": {"lang": "en", "tld": "com"},
            "Adult Female": {"lang": "en", "tld": "co.uk"},
            "Child": {"lang": "en", "tld": "com.au"},
            "Elderly": {"lang": "en", "tld": "ca"},
            "Professional": {"lang": "en", "tld": "ie"}
        }

    def generate_segment(self, prompt, num_frames=16, fps=8, guidance_scale=9.0,
                         seed=None, continuation_strength=0.3,
                         progress=gr.Progress(), progress_start=0, progress_end=0.33):
        """Generate a single video segment from a text prompt with continuity from previous segment"""
        progress(progress_start, desc=f"Generating video segment for: {prompt[:30]}...")

        # Set seed for consistency
        generator = None
        if seed is not None:
            generator = torch.Generator(device=self.device).manual_seed(seed)

        # Generate video frames
        result = self.pipe(
            prompt=prompt,
            num_inference_steps=25,
            num_frames=num_frames,
            guidance_scale=guidance_scale,
            generator=generator
        )

        progress((progress_start + progress_end) / 2, desc="Processing video segment...")

        # Extract frames correctly based on the model output format
        if hasattr(result, 'frames'):
            frames = result.frames
            if isinstance(frames, list):
                frames = np.array(frames)
            elif isinstance(frames, np.ndarray) and frames.ndim == 5:
                frames = frames[0]
        elif hasattr(result, 'videos'):
            videos = result.videos
            if isinstance(videos, torch.Tensor):
                videos = videos.cpu().numpy()
            if videos.ndim == 5:
                videos = videos[0]
                if videos.shape[0] == 3:
                    videos = np.transpose(videos, (1, 2, 3, 0))
                frames = videos
            else:
                frames = videos
        else:
            raise ValueError("Unexpected model output format")

        # Make sure frames are in the right format
        if frames.ndim == 5:
            frames = frames[0]

        # Convert float32 to uint8 for video saving if needed
        if frames.dtype == np.float32 or frames.dtype == np.float64:
            frames = (frames * 255).astype(np.uint8)

        # Apply frame blending for continuity if this isn't the first segment
        if self.last_frame is not None:
            # Create a smooth transition by blending the first few frames with the last frame of previous segment
            blend_frames = min(4, len(frames))
            for i in range(blend_frames):
                # Calculate blend ratio (gradually reduce influence of previous frame)
                blend_ratio = continuation_strength * (1.0 - i / blend_frames)
                # Blend current frame with last frame
                frames[i] = cv2.addWeighted(
                    self.last_frame, blend_ratio,
                    frames[i], 1.0 - blend_ratio,
                    0
                )

        # Save the last frame for next segment's continuity
        self.last_frame = frames[-1].copy()

        # Clear CUDA cache
        if torch.cuda.is_available():
            torch.cuda.empty_cache()

        progress(progress_end, desc=f"Segment completed")
        return frames

    def generate_video(self, prompt, num_segments=3, num_frames=16, fps=8, guidance_scale=9.0, progress=gr.Progress()):
        """Generate multiple continuous video segments and combine them"""
        progress(0, desc="Starting multi-segment video generation...")

        # Create temporary directory for files
        temp_dir = tempfile.mkdtemp()

        # Reset continuity variables
        self.last_frame = None

        # Set a style seed for consistency in style across segments
        self.style_seed = torch.randint(1, 100000, (1,)).item()

        # Create prompts for each segment that maintain scene continuity
        segment_prompts = []

        # First segment uses the original prompt
        segment_prompts.append(prompt)

        # Create better prompts for continuity
        transition_words = [
            "continuing, the same scene with",
            "following directly, the same view of",
            "next moment in the same scene,"
        ]

        # Create modified prompts for continuation segments
        for i in range(1, num_segments):
            # Add different continuity modifiers
            transition = transition_words[min(i-1, len(transition_words)-1)]
            continuation_prompt = f"{transition} {prompt}"
            segment_prompts.append(continuation_prompt)

        # Generate each segment with seeds that create style consistency
        all_frames = []
        for i in range(num_segments):
            # Calculate progress values for this segment
            progress_start = i * (0.7 / num_segments)
            progress_end = (i + 1) * (0.7 / num_segments)

            # Generate segment using a seed derived from the style seed
            # This maintains style consistency while allowing progression
            segment_seed = self.style_seed + (i * 5)  # Small increments for controlled variation

            frames = self.generate_segment(
                segment_prompts[i],
                num_frames=num_frames,
                fps=fps,
                guidance_scale=guidance_scale,
                seed=segment_seed,
                continuation_strength=0.3 if i > 0 else 0.0,  # Only apply continuity after first segment
                progress=progress,
                progress_start=progress_start,
                progress_end=progress_end
            )
            all_frames.append(frames)

        # Save individual segments temporarily
        segment_paths = []
        for i, frames in enumerate(all_frames):
            segment_path = os.path.join(temp_dir, f"segment_{i}.mp4")
            imageio.mimsave(segment_path, frames, fps=fps)
            segment_paths.append(segment_path)

        # Combine segments using moviepy for a smoother transition
        progress(0.75, desc="Combining video segments...")
        clips = [VideoFileClip(path) for path in segment_paths]

        # Use cross-fade transitions between clips
        combined_clip = concatenate_videoclips(clips, method="compose")

        # Save combined video
        video_path = os.path.join(temp_dir, "generated_video.mp4")
        combined_clip.write_videofile(video_path, codec="libx264", audio=False, fps=fps)

        # Close clips to free resources
        for clip in clips:
            clip.close()

        progress(0.8, desc="Combined video created")
        return video_path, temp_dir

    def generate_audio(self, text, voice_type, temp_dir):
        """Generate audio from text using gTTS with specified voice type"""
        audio_path = os.path.join(temp_dir, "voiceover.mp3")

        # Get voice configuration
        voice_config = self.voice_configs.get(voice_type, self.voice_configs["Adult Male"])

        # Generate audio with the selected voice configuration
        self.tts(
            text=text,
            lang=voice_config["lang"],
            tld=voice_config["tld"],
            slow=voice_type == "Child"  # Slow down for child voice
        ).save(audio_path)

        return audio_path

    def combine_video_audio(self, video_path, audio_path, progress=gr.Progress()):
        """Combine video and audio files"""
        progress(0.9, desc="Combining video and audio...")

        output_path = video_path.replace(".mp4", "_with_audio.mp4")

        # Load video and audio clips
        video_clip = VideoFileClip(video_path)
        audio_clip = AudioFileClip(audio_path)

        # If audio is longer than video, extend video by looping
        if audio_clip.duration > video_clip.duration:
            # Calculate how many times to repeat the video
            repeat_times = int(np.ceil(audio_clip.duration / video_clip.duration))
            video_clips = [video_clip] * repeat_times
            extended_video = concatenate_videoclips(video_clips)
            # Trim the extended video to match audio duration
            final_video = extended_video.subclip(0, audio_clip.duration)
        else:
            final_video = video_clip
            # Trim audio if needed
            audio_clip = audio_clip.subclip(0, min(video_clip.duration, audio_clip.duration))

        # Set audio to video
        final_video = final_video.set_audio(audio_clip)

        # Write output file
        final_video.write_videofile(output_path, codec="libx264", audio_codec="aac", fps=24)

        # Close clips to free resources
        video_clip.close()
        audio_clip.close()

        progress(1.0, desc="Video with voiceover created!")
        return output_path

    def process(self, prompt, script, voice_type, progress=gr.Progress()):
        """Process the entire pipeline from text to video with audio"""
        try:
            progress(0.1, desc="Starting process...")
            # Generate segments for a longer video
            video_path, temp_dir = self.generate_video(prompt, num_segments=3, progress=progress)

            # Generate audio with selected voice type
            progress(0.8, desc=f"Generating {voice_type} voiceover...")
            audio_path = self.generate_audio(script, voice_type, temp_dir)

            # Combine video and audio
            final_path = self.combine_video_audio(video_path, audio_path, progress=progress)

            return final_path
        except Exception as e:
            import traceback
            error_details = traceback.format_exc()
            print(f"Error: {str(e)}\n{error_details}")
            return f"Error: {str(e)}"

# Create the Gradio interface
def create_ui():
    generator = TextToVideoGenerator()
    with gr.Blocks(title="Script to Motion Video Generator") as app:
        with gr.Row():
            # Left column for title and instructions
            with gr.Column(scale=1):
                gr.Markdown(
                    """
                    # Automatic Script to Motion Video Generator
                    Transform your written scripts into dynamic motion videos with AI-generated visuals and voiceovers.
                    """
                )

                gr.Markdown("## How to use:")
                gr.Markdown(
                    """
                    1. Enter a visual prompt describing what you want to see in the video
                    2. Enter your script text for the voiceover
                    3. Select the voice type for your voiceover
                    4. Click "Generate" and wait for the magic to happen!
                    """
                )

        with gr.Row(equal_height=True):
            # Left column for inputs
            with gr.Column(scale=1):
                prompt_input = gr.Textbox(
                    label="Visual Prompt",
                    placeholder="A beautiful sunset over mountains with clouds...",
                    lines=4,
                    elem_id="prompt_box"
                )

                with gr.Row():
                    with gr.Column(scale=3):
                        script_input = gr.Textbox(
                            label="Voiceover Script",
                            placeholder="Enter the script text here that will be converted to speech...",
                            lines=8,
                            elem_id="script_box"
                        )

                    with gr.Column(scale=1):
                        voice_dropdown = gr.Dropdown(
                            choices=list(generator.voice_configs.keys()),
                            value="Adult Male",
                            label="Voice Type",
                            elem_id="voice_dropdown"
                        )

                generate_btn = gr.Button("Generate Video", variant="primary", size="lg")

            # Right column for video output
            with gr.Column(scale=1):
                # Video output with custom height to match input boxes
                video_output = gr.Video(
                    label="Generated Video",
                    height=470,  # Approximate height to match prompt + script boxes combined
                    elem_id="video_display"
                )

        with gr.Row():
            # Tips section at the bottom
            with gr.Column():
                gr.Markdown(
                    """
                    ### Tips for better results:
                    - Provide detailed visual prompts for more precise video generation
                    - Keep scripts concise and clear for better voiceover quality
                    - Using descriptive prompts with specific themes will help maintain visual consistency
                    - For best results, describe a continuous scene rather than multiple different scenes
                    - Different voice types can enhance the mood of your video
                    """
                )


        generate_btn.click(
            fn=generator.process,
            inputs=[prompt_input, script_input, voice_dropdown],
            outputs=video_output
        )

    return app

# Run either the test or the UI
if __name__ == "__main__":
    import sys
    if len(sys.argv) > 1 and sys.argv[1] == "--test":
        # Test class
        class MockProgress:
            def __call__(self, value, desc=""):
                print(f"Progress: {value*100:.0f}% - {desc}")

        generator = TextToVideoGenerator()
        prompt = "A beautiful sunset over mountains"
        script = "This is a test of our text to video generation system with voiceover capabilities."
        voice_type = "Adult Male"
        result = generator.process(prompt, script, voice_type, MockProgress())
        print(f"Test completed successfully. Output video: {result}")
    else:
        app = create_ui()
        app.launch(share=True)

Loading Text-to-Video model...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.



Loading pipeline components...:   0%|          | 0/5 [00:00<?, ?it/s]

Model loaded and running on cuda
Loading Text-to-Speech engine...
Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://99df299b6d13fbbfea.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
