In [None]:
!apt-get update
!apt-get install -y ffmpeg imagemagick
!pip install torch torchvision torchaudio transformers diffusers accelerate google-generativeai moviepy soundfile requests yt_dlp pytube datasets[audio] gtts

!sed -i '/<policy domain="coder" rights="none" pattern="PDF" \/>/c\  <policy domain="coder" rights="read|write" pattern="PDF" \/>' /etc/ImageMagick-6/policy.xml
!sed -i '/<policy domain="coder" rights="none" pattern="LABEL" \/>/c\  <policy domain="coder" rights="read|write" pattern="LABEL" \/>' /etc/ImageMagick-6/policy.xml
!sed -i '/<policy domain="coder" rights="none" pattern="TEXT" \/>/c\  <policy domain="coder" rights="read|write" pattern="TEXT" \/>' /etc/ImageMagick-6/policy.xml
!sed -i '/<policy domain="coder" rights="none" pattern="CAPTION" \/>/c\  <policy domain="coder" rights="read|write" pattern="CAPTION" \/>' /etc/ImageMagick-6/policy.xml

In [None]:
import os
import google.generativeai as genai
import torch
from moviepy.video.fx import all as vfx
from transformers import pipeline
import soundfile as sf
import requests
from moviepy.editor import VideoFileClip, AudioFileClip, CompositeVideoClip, TextClip, concatenate_videoclips
import tempfile
from IPython.display import Video, display
from PIL import Image, ImageDraw, ImageFont, ImageOps
import numpy as np
from datasets import load_dataset
from moviepy.video.fx.all import fadein, fadeout
from moviepy.video.VideoClip import ImageClip
import random
from datetime import datetime
import time
import urllib.parse
import json
import yt_dlp
from pytube import YouTube

In [None]:
def generate_story(topic, content_type, target_audience, word_count=100):
    model = genai.GenerativeModel('gemini-2.0-flash')
    response = model.generate_content(
        f"Create a concise {word_count}-word {content_type} about {topic} that a {target_audience} could understand. "
        f"Explain the concept in simple, everyday language without scientific jargon. "
        f"Focus on what it is, how it works, and why it matters in practical terms. Make it exactly {word_count} words "
        f"suitable for text-to-speech conversion. Make it engaging and easy to follow."
    )
    script_text = response.text

    print("Generated Story:\n")
    print(script_text)
    print(f"\nWord count: {len(script_text.split())}")

    return script_text

In [None]:


def generate_speech(script_text, output_path="/content/story.wav"):
    try:
        print("Starting speech synthesis...")

        max_length = 500
        if len(script_text) > max_length:
            script_text = script_text[:max_length]
            print(f"Trimmed text to {max_length} characters for speech synthesis")

        synthesiser = pipeline("text-to-speech", model="microsoft/speecht5_tts")
        embeddings_dataset = load_dataset("Matthijs/cmu-arctic-xvectors", split="validation")
        speaker_embedding = torch.tensor(embeddings_dataset[1223]["xvector"]).unsqueeze(0)

        print("Generating speech audio...")
        speech = synthesiser(script_text, forward_params={"speaker_embeddings": speaker_embedding})

        sf.write(output_path, speech["audio"], samplerate=speech["sampling_rate"], format='WAV')

        if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
            print("Audio generated successfully!")
            return output_path
        else:
            raise Exception("Audio file was not created properly")

    except Exception as e:
        print(f"Error generating speech: {e}")

        try:
            print("Trying fallback speech synthesis with gTTS...")
            from gtts import gTTS
            tts = gTTS(text=script_text, lang='en', slow=False)
            tts.save(output_path)
            return output_path
        except Exception as fallback_error:
            print(f"Fallback speech generation also failed: {fallback_error}")
            raise

In [None]:
def download_youtube_video(video_url, output_path, max_duration=60):
    try:
        ydl_opts = {
            'format': 'bestvideo[height<=1080][ext=mp4]+bestaudio[ext=m4a]/best[height<=1080][ext=mp4]',
            'outtmpl': output_path,
            'quiet': True,
            'no_warnings': True
        }

        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(video_url, download=True)
            duration = info.get('duration', 0)

            if duration > max_duration * 3:
                print(f"Video too long ({duration}s), skipping")
                return None

            if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
                return output_path
            else:
                print("Download appeared to succeed but file not found or empty")
                return None
    except Exception as e:
        print(f"Error downloading YouTube video: {e}")
        return None

In [None]:
def search_youtube_videos(query, count=5):
    videos = []

    try:
        search_term = f"{query} animation explanation"

        ydl_opts = {
            'quiet': True,
            'no_warnings': True,
            'extract_flat': True,
            'force_generic_extractor': True,
            'ignoreerrors': True,
            'format': 'best',
            'max_downloads': count*2
        }

        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            search_url = f"ytsearch{count*2}:{search_term}"
            search_results = ydl.extract_info(search_url, download=False)

            if 'entries' in search_results:
                for entry in search_results['entries']:
                    if entry and 'id' in entry and 'title' in entry:
                        video_url = f"https://www.youtube.com/watch?v={entry['id']}"
                        videos.append({
                            'url': video_url,
                            'title': entry.get('title', ''),
                            'source': 'youtube_ytdlp'
                        })
                        if len(videos) >= count:
                            break
    except Exception as e:
        print(f"YouTube search (yt-dlp) error: {e}")

    return videos[:count]

In [None]:
def get_pexels_videos(query, count=5):
    pexels_url = "https://api.pexels.com/videos/search"
    pexels_headers = {"Authorization": "n91nJ2kq1eBohPcE20DCs6SVqpdcUXvrEDZJpmpd4OojUDhZFabQqLcN"}

    search_query = query
    pexels_params = {"query": search_query, "per_page": count*2, "orientation": "landscape"}

    try:
        response = requests.get(pexels_url, headers=pexels_headers, params=pexels_params)
        response.raise_for_status()
        videos = response.json().get('videos', [])

        video_links = []
        for video in videos:
            if 'video_files' in video:
                best_quality = max(
                    (f for f in video['video_files'] if f.get('width', 0) >= 1280),
                    key=lambda x: x.get('width', 0),
                    default=None
                )
                if best_quality:
                    video_links.append({
                        'url': best_quality['link'],
                        'title': video.get('user', {}).get('name', 'Pexels Video'),
                        'source': 'pexels'
                    })
                    if len(video_links) >= count:
                        break

        return video_links[:count]
    except Exception as e:
        print(f"Pexels API error: {e}")
        return []

In [None]:
def get_videos(query, count=5):
    print(f"Searching for videos related to '{query}'...")

    pexels_videos = get_pexels_videos(query, count)
    if len(pexels_videos) >= count:
        print(f"Found {len(pexels_videos)} Pexels videos")
        return pexels_videos

    remaining = count - len(pexels_videos)
    youtube_videos = search_youtube_videos(query, remaining)
    combined_videos = pexels_videos + youtube_videos

    if len(combined_videos) >= count:
        print(f"Found videos from multiple sources: {len(pexels_videos)} from Pexels and {len(youtube_videos)} from YouTube")
        return combined_videos[:count]

    print(f"Could only find {len(combined_videos)} videos")
    return combined_videos

In [None]:
def create_caption(text, duration, video_width, video_height, position='bottom'):
    max_width = int(video_width * 0.8)
    padding = int(video_height * 0.02)
    line_spacing = int(video_height * 0.015)

    try:
        font_paths = [
            "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
            "/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf",
            "Arial.ttf",
            "Roboto-Regular.ttf"
        ]

        font_size = int(video_height * 0.035)
        font = None

        for path in font_paths:
            try:
                font = ImageFont.truetype(path, font_size)
                break
            except:
                continue

        if font is None:
            font = ImageFont.load_default()
            font_size = int(video_height * 0.04)
    except:
        font = ImageFont.load_default()
        font_size = int(video_height * 0.04)

    words = text.split()
    lines = []
    current_line = []

    for word in words:
        test_line = ' '.join(current_line + [word])
        test_width = font.getlength(test_line) if hasattr(font, 'getlength') else font.getsize(test_line)[0]

        if test_width <= max_width:
            current_line.append(word)
        else:
            if current_line:
                lines.append(' '.join(current_line))
            current_line = [word]

    if current_line:
        lines.append(' '.join(current_line))

    if hasattr(font, 'getbbox'):
        line_heights = [font.getbbox(line)[3] - font.getbbox(line)[1] for line in lines]
    else:
        line_heights = [font.getsize(line)[1] for line in lines]

    text_height = sum(line_heights) + (len(lines) - 1) * line_spacing

    bg_width = max_width + 2 * padding
    bg_height = text_height + 2 * padding

    bg = Image.new('RGBA', (bg_width, bg_height), (0, 0, 0, 0))
    draw = ImageDraw.Draw(bg)

    try:
        corner_radius = min(20, bg_height // 4)
        draw.rounded_rectangle(
            [(0, 0), (bg_width, bg_height)],
            radius=corner_radius,
            fill=(0, 0, 0, 160))
    except AttributeError:
        draw.rectangle(
            [(0, 0), (bg_width, bg_height)],
            fill=(0, 0, 0, 160))

    y_offset = padding
    for i, line in enumerate(lines):
        if hasattr(font, 'getlength'):
            line_width = font.getlength(line)
        else:
            line_width = font.getsize(line)[0]

        x_offset = (bg_width - line_width) / 2

        draw.text(
            (x_offset + 1, y_offset + 1),
            line,
            font=font,
            fill=(0, 0, 0, 200)
        )

        draw.text(
            (x_offset, y_offset),
            line,
            font=font,
            fill=(255, 255, 255, 240)
        )

        y_offset += line_heights[i] + line_spacing

    bg_np = np.array(bg)

    caption_clip = ImageClip(bg_np, duration=duration)

    if position == 'bottom':
        y_pos = video_height - bg_height - int(video_height * 0.05)
    elif position == 'top':
        y_pos = int(video_height * 0.05)
    else:
        y_pos = (video_height - bg_height) // 2

    caption_clip = caption_clip.set_position(('center', y_pos))
    caption_clip = caption_clip.fadein(0.5).fadeout(0.5)

    return caption_clip

In [None]:
def make_video(audio_path, video_data, script_text, topic, output_path="/content/final_video.mp4"):
    try:
        audio = AudioFileClip(audio_path)
        total_duration = audio.duration

        num_videos = len(video_data)
        clip_duration = total_duration / num_videos

        words = script_text.split()
        words_per_clip = len(words) // num_videos
        text_parts = [' '.join(words[i:i+words_per_clip]) for i in range(0, len(words), words_per_clip)]
        if len(text_parts) > num_videos:
            text_parts = text_parts[:num_videos]
        elif len(text_parts) < num_videos:
            text_parts.extend([""] * (num_videos - len(text_parts)))

        print(f"Creating video with {num_videos} clips and audio duration of {total_duration:.1f} seconds")

        content_clips = []

        for i, video_info in enumerate(video_data):
            try:
                url = video_info['url']
                is_youtube = 'youtube.com' in url or 'youtu.be' in url

                video_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name
                print(f"Processing video {i+1}/{num_videos} from {video_info['source']}")

                if is_youtube:
                    success = download_youtube_video(url, video_file)
                    if not success:
                        print(f"Failed to download YouTube video {i+1}, skipping")
                        continue
                else:
                    try:
                        response = requests.get(url, stream=True)
                        response.raise_for_status()
                        with open(video_file, 'wb') as f:
                            for chunk in response.iter_content(chunk_size=8192):
                                f.write(chunk)
                    except Exception as download_error:
                        print(f"Failed to download video {i+1}: {download_error}")
                        continue

                    if not os.path.exists(video_file) or os.path.getsize(video_file) == 0:
                        print(f"Failed to download video {i+1}, skipping")
                        continue

                try:
                    clip = VideoFileClip(video_file)
                except Exception as clip_error:
                    print(f"Error loading video {i+1}: {clip_error}")
                    continue

                if not hasattr(clip, 'duration') or clip.duration < 1:
                    print(f"Invalid video {i+1}, skipping")
                    continue

                if clip.size[1] != 1080:
                    clip = clip.resize(height=1080)

                if clip.duration <= clip_duration:
                    segment = clip
                else:
                    max_start = max(0, clip.duration - clip_duration - 1)
                    start_time = min(max_start, random.uniform(0, max_start))
                    end_time = min(clip.duration, start_time + clip_duration)
                    segment = clip.subclip(start_time, end_time)

                if segment.duration > clip_duration:
                    segment = segment.subclip(0, clip_duration)
                elif segment.duration < clip_duration:
                    speed_factor = segment.duration / clip_duration
                    segment = segment.fx(vfx.speedx, speed_factor)

                segment = segment.crossfadein(0.5).crossfadeout(0.5)

                if i < len(text_parts) and text_parts[i].strip():
                    caption = create_caption(
                        text_parts[i],
                        duration=segment.duration,
                        video_width=segment.w,
                        video_height=segment.h,
                        position='bottom' if i % 2 == 0 else 'top'
                    )

                    final_segment = CompositeVideoClip([segment, caption])
                else:
                    final_segment = segment

                content_clips.append(final_segment)

            except Exception as e:
                print(f"Error processing video {i+1}: {e}")
                continue

        if not content_clips:
            raise ValueError("No valid video clips were created")

        final_video = concatenate_videoclips(content_clips, method="compose")

        final_video = final_video.set_audio(audio)

        final_video.write_videofile(
            output_path,
            codec='libx264',
            audio_codec='aac',
            fps=24,
            bitrate="8000k",
            threads=2,
            preset='medium',
            ffmpeg_params=['-crf', '18', '-pix_fmt', 'yuv420p']
        )

        print(f"Video saved to {output_path}")
        return output_path

    except Exception as e:
        print(f"Error creating video: {e}")
        import traceback
        traceback.print_exc()
        raise

In [None]:
def create_educational_video(topic, content_type, target_audience, word_count=100):
    try:
        script_text = generate_story(topic, content_type , target_audience, word_count)

        audio_path = generate_speech(script_text)

        video_data = get_videos(topic, count=5)
        print("\nVideo data found:")
        for i, video in enumerate(video_data, 1):
            print(f"{i}: {video['source']} - {video['title'][:40]}... ({video['url'][:60]}...)")

        output_path = make_video(audio_path, video_data, script_text, topic)

        if os.path.exists(output_path):
            display(Video(output_path, embed=True, html_attributes="controls autoplay"))
            print("\nVideo created successfully!")
        else:
            print("\nFailed to create video.")

        return output_path

    except Exception as e:
        print(f"Error in video creation process: {e}")
        import traceback
        traceback.print_exc()

if __name__ == "__main__":
    create_educational_video("digestion", "educational content", "12 year old", 100)