In [1]:
#!pip install pytube opencv-python transformers
#!pip install moviepy
#!pip install lightning
# !pip install pytorch-lightning

In [1]:
from pytube import YouTube
import os
def download_video(url, path='./videos'):
    yt = YouTube(url)
    # 가장 높은 해상도의 스트림 선택
    ys = yt.streams.get_highest_resolution()
    # 영상 다운로드
    ys.download(path)
    print(f"다운로드 완료: {ys.default_filename}")
    return os.path.join(path, ys.default_filename)

In [2]:
youtube_url='https://www.youtube.com/watch?v=AkKgXCO6mBA&ab_channel=Popcorn%26CokeReview'
video_path=download_video(youtube_url)

다운로드 완료: 와 진짜 말도 안되는 미쳐버린 상상력으로 만들어낸 띵작 영화 [결말포함].mp4


In [3]:
video_path

'./videos/와 진짜 말도 안되는 미쳐버린 상상력으로 만들어낸 띵작 영화 [결말포함].mp4'

In [4]:
import cv2
from transformers import ViTFeatureExtractor

def extract_video_features(extractor, video_file, sample_every=1):
    vc = cv2.VideoCapture(video_file)
    frames = []
    while vc.isOpened():
        success, frame = vc.read()
        if not success:
            break
        frames.append(frame)
    vc.release()

    features = extractor(images=frames, return_tensors="pt")
    return features["pixel_values"]

In [5]:
from torch.utils.data import DataLoader, Dataset
class VideoFramesDataset(Dataset):
    def __init__(self, video_path, transform=None):
        self.video_path = video_path
        self.transform = transform
        self.frames = self._load_frames()

    def _load_frames(self):
        cap = cv2.VideoCapture(self.video_path)
        frames = []
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frames.append(frame)
        cap.release()
        return frames

    def __len__(self):
        return len(self.frames)

    def __getitem__(self, idx):
        frame = self.frames[idx]
        if self.transform:
            frame = self.transform(frame)
        return frame


In [6]:
import torch
from torch import nn
from transformers import ViTModel
from pytorch_lightning import LightningModule

from collections import defaultdict

import numpy as np
import torch
import torch.nn as nn
from torch import optim
# from torchmetrics import F1
from transformers import ViTModel


class SummaryModel(LightningModule):
    def __init__(self, hidden_dim=768, individual_logs=None):
        super().__init__()
        self.vit = ViTModel.from_pretrained("google/vit-base-patch16-224-in21k")
        self.scorer = nn.Linear(hidden_dim, 1)
        # self.sigmoid = nn.Sigmoid()
        self.loss = nn.BCEWithLogitsLoss()
        # self.train_f1 = F1()
        # self.val_f1 = F1()
        # self.test_f1 = F1()
        self.individual_logs = individual_logs
        self.tta_logs = defaultdict(list)

    def forward(self, x):
        x = self.vit(x).pooler_output
        x = self.scorer(x)
        # x = self.sigmoid(x)
        return x

    def run_batch(self, batch, batch_idx, metric, training=False):
        video_name, image_features, labels = batch
        video_name = video_name[0]
        image_features = image_features.squeeze(0)
        labels = labels.squeeze(0)

        # Score - aggregated labels.
        score = torch.sum(labels, dim=0)
        score = torch.min(
            score,
            torch.ones(
                score.shape[0],
            ).to(score.device),
        )
        out = self(image_features).squeeze(1)
        try:
            loss = self.loss(out.double(), score)
            preds = (torch.sigmoid(out) > 0.7).int()
            metric.update(preds, score.int())
            f1 = metric.compute()
            tp, fp, tn, fn = metric._get_final_stats()
            self.tta_logs[video_name].append((tp.item(), fp.item(), fn.item()))
        except Exception as e:
            print(e)
            loss = 0
        return loss

    def training_step(self, batch, batch_idx):
        loss = self.run_batch(batch, batch_idx, self.train_f1, training=True)
        self.log("train_loss", loss)
        return loss

    def training_epoch_end(self, training_step_outputs):
        self.log("train_f1", self.train_f1.compute())
        self.train_f1.reset()

    def validation_step(self, batch, batch_idx):
        loss = self.run_batch(batch, batch_idx, self.val_f1)
        self.log("val_loss", loss)
        return loss

    def validation_epoch_end(self, validation_step_outputs):
        self.log("val_f1", self.val_f1.compute())
        self.val_f1.reset()

    def test_step(self, batch, batch_idx):
        loss = self.run_batch(batch, batch_idx, self.test_f1)
        self.log("test_loss", loss)
        return loss

    def test_epoch_end(self, outputs):
        f1 = self.test_f1.compute()
        self.log("test_f1", f1)
        tp, fp, tn, fn = self.test_f1._get_final_stats()
        print(f"\nTest f1: {f1}, TP: {tp}, FP: {fp}, TN: {tn}, fn: {fn}")
        self.test_f1.reset()

    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(self.parameters(), lr=1e-4)
        return optimizer



In [7]:
def load_model(ckpt_path, device='cpu'):
    model = SummaryModel.load_from_checkpoint(ckpt_path).to(device)
    model.eval()
    return model

In [8]:
import os
os.environ["IMAGEIO_FFMPEG_EXE"] = "/opt/homebrew/bin/ffmpeg"
from moviepy.editor import VideoFileClip, concatenate_videoclips
from transformers import ViTFeatureExtractor
from torchvision.transforms import Compose, Resize, ToTensor
import cv2
def summarize_video(video_path, model, extractor, threshold=0.7):
    transform = Compose([
        Resize((224, 224)),  # ViT 입력 크기에 맞춰 조정
        ToTensor(),
    ])
    
    dataset = VideoFramesDataset(video_path, transform=transform)
    dataloader = DataLoader(dataset, batch_size=16, shuffle=False)

    preds = []
    for batch in dataloader:
        features = extractor(images=batch, return_tensors="pt")["pixel_values"].to('cuda' or 'cpu')
        with torch.no_grad():
            out = model(features).squeeze(1)
            pred = (torch.sigmoid(out) > threshold).nonzero(as_tuple=True)[0]
            preds.extend(pred + len(preds) * dataloader.batch_size)

    # 요약 비디오 생성
    clip = VideoFileClip(video_path)
    clips = [clip.subclip(max(frame.item() / clip.fps - 1, 0), min(frame.item() / clip.fps + 1, clip.duration)) for frame in preds]
    final_clip = concatenate_videoclips(clips)
    summary_path = "summary_video.mp4"
    final_clip.write_videofile(summary_path, codec="libx264", audio_codec="aac")
    print(f'Summary video saved to {summary_path}')

In [None]:
model_path = 'model/summary.ckpt'

# 영상 다운로드
# downloaded_video_path = download_video(video_path)

# 모델 및 특징 추출기 로딩
device = 'cpu' # 또는 'cuda' if GPU 사용
model = load_model(model_path, device)
extractor = ViTFeatureExtractor.from_pretrained("google/vit-base-patch16-224-in21k")

# 영상 요약 및 저장
summarize_video(video_path, model, extractor)

Lightning automatically upgraded your loaded checkpoint from v1.4.9 to v2.2.1. To apply the upgrade to your files permanently, run `python -m pytorch_lightning.utilities.upgrade_checkpoint model/summary.ckpt`
