In [None]:
# !pip install opencv-python
# !pip install moviepy
# !pip install pytesseract
#!pip install transformers==4.30.0
#!pip install transformers youtube_transcript_api pytesseract moviepy opencv-python-headless yt-dlp requests

Need to install ffmpeg and add to path:https://www.gyan.dev/ffmpeg/builds/
get the ffmpeg-git-full.7z, extract it, add the bin to your System Environment path

Need to install tesseract and do the same: https://github.com/UB-Mannheim/tesseract/wiki

In [4]:
import os

# Manually set FFmpeg path if it's not found in PATH
os.environ["PATH"] += os.pathsep + r"C:\ffmpeg\bin"

# Now try running FFmpeg
os.system("ffmpeg -version")
import pytesseract

# Set path manually
pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe"



In [12]:
import os
import cv2
import json
import torch
import numpy as np
import yt_dlp
from PIL import Image
from transformers import ViTForImageClassification, ViTFeatureExtractor, BlipProcessor, BlipForConditionalGeneration
from pytesseract import image_to_string
from youtube_transcript_api import YouTubeTranscriptApi
from moviepy.video.io.VideoFileClip import VideoFileClip

# Load Vision Transformer (ViT) and BLIP models
vit_model_name = "google/vit-base-patch16-224"
blip_model_name = "Salesforce/blip-image-captioning-base"

feature_extractor = ViTFeatureExtractor.from_pretrained(vit_model_name)
vit_model = ViTForImageClassification.from_pretrained(vit_model_name)

blip_processor = BlipProcessor.from_pretrained(blip_model_name)
blip_model = BlipForConditionalGeneration.from_pretrained(blip_model_name)

# Ensure FFmpeg is installed
FFMPEG_INSTALLED = os.system("ffmpeg -version") == 0

# Create output directories
os.makedirs("videos", exist_ok=True)
os.makedirs("processed_frames", exist_ok=True)
os.makedirs("output_data", exist_ok=True)

# Define label mapping
def get_label_name(label_index):
    labels = vit_model.config.id2label  # Get label mapping from model config
    return labels.get(label_index, "unknown")

def extract_video_id(url):
    """Extracts the YouTube video ID from a URL."""
    import re
    match = re.search(r"(?:v=|\/)([0-9A-Za-z_-]{11}).*", url)
    return match.group(1) if match else None

def download_youtube_video(url):
    """Downloads a YouTube video and returns the saved file path."""
    video_id = extract_video_id(url)
    if not video_id:
        print(f"Could not extract video ID from: {url}")
        return None

    output_path = f"videos/{video_id}.mp4"
    
    ydl_opts = {
        'format': 'bestvideo+bestaudio/best',
        'outtmpl': output_path,
        'merge_output_format': 'mp4',
        'postprocessors': [{'key': 'FFmpegVideoConvertor', 'preferedformat': 'mp4'}],
    }

    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        try:
            ydl.download([url])
            return output_path
        except Exception as e:
            print(f"Video download failed: {e}")
            return None

def get_youtube_transcript(video_url):
    """Fetches and saves the transcript from a YouTube video."""
    video_id = extract_video_id(video_url)
    if not video_id:
        return None

    transcript_path = f"output_data/{video_id}_transcript.txt"
    try:
        transcript = YouTubeTranscriptApi.get_transcript(video_id)
        transcript_text = "\n".join([f"{entry['start']:.2f}s: {entry['text']}" for entry in transcript])

        with open(transcript_path, "w", encoding="utf-8") as f:
            f.write(transcript_text)
        
        print(f"Transcript saved: {transcript_path}")
        return transcript_text
    except Exception as e:
        print(f"Error fetching transcript: {e}")
        return None

def extract_frames(video_path, output_folder, interval=5):
    """Extracts frames from the video every 'interval' seconds."""
    os.makedirs(output_folder, exist_ok=True)
    clip = VideoFileClip(video_path)
    frame_times = np.arange(0, clip.duration, interval)

    frame_paths = []
    for time in frame_times:
        frame = clip.get_frame(time)
        frame_file = os.path.join(output_folder, f"frame_{int(time)}.jpg")
        cv2.imwrite(frame_file, cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
        frame_paths.append(frame_file)

    clip.close()
    print(f"Extracted {len(frame_paths)} frames from video.")
    return frame_paths

def classify_image(image_path):
    """Classifies an image using ViT to determine content type."""
    image = Image.open(image_path).convert("RGB")
    inputs = feature_extractor(images=image, return_tensors="pt")
    with torch.no_grad():
        outputs = vit_model(**inputs)
    predicted_label = outputs.logits.argmax(-1).item()
    return get_label_name(predicted_label)

def generate_caption(image_path):
    """Generates a caption for an image using BLIP."""
    image = Image.open(image_path).convert("RGB")
    inputs = blip_processor(images=image, return_tensors="pt")
    with torch.no_grad():
        caption_ids = blip_model.generate(**inputs)
    return blip_processor.decode(caption_ids[0], skip_special_tokens=True)

def extract_text(image_path):
    """Extracts text from images using OCR (Tesseract)."""
    image = cv2.imread(image_path)
    return image_to_string(image)

def process_video(video_url):
    """Main function to process the video: download, extract frames, analyze them."""
    # Ensure FFmpeg is available
    if not FFMPEG_INSTALLED:
        print("FFmpeg is not installed. Install it before running the script.")
        return

    video_path = download_youtube_video(video_url)
    if not video_path or not os.path.exists(video_path):
        print("Video download failed.")
        return

    print(f"Downloaded video: {video_path}")

    # Extract transcript
    transcript = get_youtube_transcript(video_url)

    # Extract frames
    video_id = extract_video_id(video_url)
    output_folder = f"processed_frames/{video_id}"
    frame_paths = extract_frames(video_path, output_folder)

    # Process each frame
    processed_data = []
    for frame_path in frame_paths:
        label = classify_image(frame_path)
        caption = generate_caption(frame_path)
        text = extract_text(frame_path)

        processed_data.append({
            "frame": os.path.basename(frame_path),
            "label": label,
            "caption": caption,
            "extracted_text": text
        })

    # Save processed data as JSON
    json_path = f"output_data/{video_id}_processed.json"
    with open(json_path, "w", encoding="utf-8") as f:
        json.dump(processed_data, f, indent=4, ensure_ascii=False)

    print(f"Processed data saved: {json_path}")
    return processed_data

if __name__ == "__main__":
    video_url = "https://www.youtube.com/watch?v=Ilg3gGewQ5U"
    process_video(video_url)


  return torch.load(checkpoint_file, map_location="cpu")


[youtube] Extracting URL: https://www.youtube.com/watch?v=Ilg3gGewQ5U
[youtube] Ilg3gGewQ5U: Downloading webpage
[youtube] Ilg3gGewQ5U: Downloading tv client config
[youtube] Ilg3gGewQ5U: Downloading player 5ae7d525
[youtube] Ilg3gGewQ5U: Downloading tv player API JSON
[youtube] Ilg3gGewQ5U: Downloading ios player API JSON
[youtube] Ilg3gGewQ5U: Downloading m3u8 information
[info] Ilg3gGewQ5U: Downloading 1 format(s): 399+251-2
[download] Destination: videos\Ilg3gGewQ5U.f399.mp4
[download] 100% of   78.86MiB in 00:00:06 at 11.98MiB/s    
[download] Destination: videos\Ilg3gGewQ5U.f251-2.webm
[download] 100% of   12.95MiB in 00:00:01 at 10.87MiB/s    
[Merger] Merging formats into "videos\Ilg3gGewQ5U.mp4"
Deleting original file videos\Ilg3gGewQ5U.f399.mp4 (pass -k to keep)
Deleting original file videos\Ilg3gGewQ5U.f251-2.webm (pass -k to keep)
[VideoConvertor] Not converting media file "videos\Ilg3gGewQ5U.mp4"; already is in target format mp4
Downloaded video: videos/Ilg3gGewQ5U.mp4
Tra



Processed data saved: output_data/Ilg3gGewQ5U_processed.json
