<a href="https://colab.research.google.com/github/Tirth-chokshi/Admin-Dashboard/blob/master/notebooks/colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Copyright (c) Meta Platforms, Inc. and affiliates. All rights reserved. This source code is licensed under the license found in the LICENSE file in the root directory of this source tree.

# Video Seal Inference

[[`arXiv`](https://arxiv.org/abs/2412.09492)]
[[`Colab`](https://colab.research.google.com/github/facebookresearch/videoseal/blob/main/notebooks/colab.ipynb)]
[[`Demo`](https://aidemos.meta.com/videoseal)]

## Installation

Clone repository and install dependencies

In [1]:
!git clone https://github.com/facebookresearch/videoseal.git
%cd videoseal

Cloning into 'videoseal'...
remote: Enumerating objects: 554, done.[K
remote: Counting objects: 100% (150/150), done.[K
remote: Compressing objects: 100% (69/69), done.[K
remote: Total 554 (delta 106), reused 87 (delta 81), pack-reused 404 (from 2)[K
Receiving objects: 100% (554/554), 26.00 MiB | 18.81 MiB/s, done.
Resolving deltas: 100% (288/288), done.
/content/videoseal


Install dependencies

In [None]:
!pip install -r requirements.txt

Collecting omegaconf (from -r requirements.txt (line 3))
  Downloading omegaconf-2.3.0-py3-none-any.whl.metadata (3.9 kB)
Collecting lpips (from -r requirements.txt (line 5))
  Downloading lpips-0.1.4-py3-none-any.whl.metadata (10 kB)
Collecting timm==0.9.16 (from -r requirements.txt (line 6))
  Downloading timm-0.9.16-py3-none-any.whl.metadata (38 kB)
Collecting pre-commit (from -r requirements.txt (line 7))
  Downloading pre_commit-4.0.1-py2.py3-none-any.whl.metadata (1.3 kB)
Collecting PyWavelets (from -r requirements.txt (line 11))
  Downloading pywavelets-1.8.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.0 kB)
Collecting av (from -r requirements.txt (line 12))
  Downloading av-14.0.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.5 kB)
Collecting pyav (from -r requirements.txt (line 13))
  Downloading pyav-14.0.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (2.1 kB)
Collecting decord (from -r requirements.tx

## Imports and loading

In [None]:
%cd /content/videoseal

/content/videoseal


In [None]:
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import logging
logging.getLogger("matplotlib.image").setLevel(logging.ERROR)
from IPython.display import HTML, display

import pandas as pd
from tqdm import tqdm
import numpy as np
import ffmpeg
import os
import cv2
import subprocess

import torch

from videoseal.evals.metrics import bit_accuracy
from videoseal.models import Videoseal
from videoseal.utils.cfg import setup_model_from_model_card


def get_video_info(input_path):
    # Open the video file
    video = cv2.VideoCapture(input_path)

    # Get video properties
    width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = video.get(cv2.CAP_PROP_FPS)
    codec = int(video.get(cv2.CAP_PROP_FOURCC))
    num_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))

    # Decode codec to human-readable form
    codec_str = "".join([chr((codec >> 8 * i) & 0xFF) for i in range(4)])

    video.release()  # Close the video file

    return {
        "width": width,
        "height": height,
        "fps": fps,
        "codec": codec_str,
        "num_frames": num_frames
    }

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## Load the model

The videoseal library provides pretrained models for embedding and extracting watermarks.

In [None]:
# Load the VideoSeal model
model = setup_model_from_model_card("videoseal")

# Set the model to evaluation mode and move it to the selected device
model = model.eval()
model = model.to(device)
model.compile()

# Setup the step size. Bigger step size makes embedding faster but loses a bit of robustness.
model.step_size = 8

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Model loaded successfully from /root/.cache/huggingface/hub/models--facebook--video_seal/snapshots/8037ef59ba2b2ec8fb8b55298ff37b8ccddd078d/checkpoint.pth with message: <All keys matched successfully>


## Embedding

The embedding process is the process of hiding the watermark in the video.

In [None]:
def embed_video_clip(
    model: Videoseal,
    clip: np.ndarray,
    msgs: torch.Tensor
) -> np.ndarray:
    clip_tensor = torch.tensor(clip, dtype=torch.float32).permute(0, 3, 1, 2) / 255.0
    outputs = model.embed(clip_tensor, msgs=msgs, is_video=True, lowres_attenuation=True)
    processed_clip = outputs["imgs_w"]
    processed_clip = (processed_clip * 255.0).byte().permute(0, 2, 3, 1).numpy()
    return processed_clip

def embed_video(
    model: Videoseal,
    input_path: str,
    output_path: str,
    chunk_size: int,
    crf: int = 23
) -> None:
    # Read video dimensions
    video_info = get_video_info(input_path)
    width = int(video_info['width'])
    height = int(video_info['height'])
    fps = float(video_info['fps'])
    codec = video_info['codec']
    num_frames = int(video_info['num_frames'])

    # Open the input video
    process1 = (
        ffmpeg
        .input(input_path)
        .output('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height), r=fps)
        .run_async(pipe_stdout=True, pipe_stderr=subprocess.PIPE)
    )
    # Open the output video
    process2 = (
        ffmpeg
        .input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height), r=fps)
        .output(output_path, vcodec='libx264', pix_fmt='yuv420p', r=fps, crf=crf)
        .overwrite_output()
        .run_async(pipe_stdin=True, pipe_stderr=subprocess.PIPE)
    )

    # Create a random message
    msgs = model.get_random_msg()
    with open(output_path.replace(".mp4", ".txt"), "w") as f:
        f.write("".join([str(msg.item()) for msg in msgs[0]]))

    # Process the video
    frame_size = width * height * 3
    chunk = np.zeros((chunk_size, height, width, 3), dtype=np.uint8)
    frame_count = 0
    pbar = tqdm(total=num_frames, desc="Watermark embedding")
    while True:
        in_bytes = process1.stdout.read(frame_size)
        if not in_bytes:
            break
        frame = np.frombuffer(in_bytes, np.uint8).reshape([height, width, 3])
        chunk[frame_count % chunk_size] = frame
        frame_count += 1
        pbar.update(1)
        if frame_count % chunk_size == 0:
            processed_frame = embed_video_clip(model, chunk, msgs)
            process2.stdin.write(processed_frame.tobytes())
    process1.stdout.close()
    process2.stdin.close()
    process1.wait()
    process2.wait()

    return msgs

You are free to upload any video and change the `video_path`.

You can look at the watermark video output in the folder `outputs`.

In [None]:
# Path to the input video
video_path = "./assets/videos/1.mp4"

# Create the output directory and path
output_dir = "./outputs"
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, os.path.basename(video_path))

# Embed the watermark inside the video with a random msg
msgs_ori = embed_video(model, video_path, output_path, 16)
print(f"\nSaved watermarked video to {output_path}")

Watermark embedding: 100%|██████████| 256/256 [03:18<00:00,  1.29it/s]

Saved watermarked video to ./outputs/1.mp4





## Extraction

Load the video output from the embedding process and extract the watermark.

In [None]:
def detect_video_clip(
    model: Videoseal,
    clip: np.ndarray
) -> torch.Tensor:
    clip_tensor = torch.tensor(clip, dtype=torch.float32).permute(0, 3, 1, 2) / 255.0
    outputs = model.detect(clip_tensor, is_video=True)
    output_bits = outputs["preds"][:, 1:]  # exclude the first which may be used for detection
    return output_bits

def detect_video(
    model: Videoseal,
    input_path: str,
    num_frames_for_extraction: int,
    chunk_size: int
) -> None:
    # Read video dimensions
    video_info = get_video_info(input_path)
    width = int(video_info['width'])
    height = int(video_info['height'])
    num_frames = int(video_info['num_frames'])

    # Open the input video
    process1 = (
        ffmpeg
        .input(input_path)
        .output('pipe:', format='rawvideo', pix_fmt='rgb24')
        .run_async(pipe_stdout=True, pipe_stderr=subprocess.PIPE)
    )

    # Process the video
    frame_size = width * height * 3
    chunk = np.zeros((chunk_size, height, width, 3), dtype=np.uint8)
    frame_count = 0
    soft_msgs = []
    pbar = tqdm(total=num_frames, desc="Watermark extraction")
    while True:
        if frame_count >= num_frames_for_extraction:
            break
        in_bytes = process1.stdout.read(frame_size)
        if not in_bytes:
            break
        frame = np.frombuffer(in_bytes, np.uint8).reshape([height, width, 3])
        chunk[frame_count % chunk_size] = frame
        frame_count += 1
        pbar.update(1)
        if frame_count % chunk_size == 0:
            soft_msgs.append(detect_video_clip(model, chunk))
    process1.stdout.close()
    process1.wait()

    soft_msgs = torch.cat(soft_msgs, dim=0)
    soft_msgs = soft_msgs.mean(dim=0)  # Average the predictions across all frames
    return soft_msgs

Watermark extraction:  12%|█▎        | 32/256 [00:12<01:30,  2.48it/s]


Binary message extracted with 100.0% bit accuracy





In [None]:
# Detect the watermark
num_frames_for_extraction = 32
soft_msgs = detect_video(model, output_path, num_frames_for_extraction, 16)
bit_acc = bit_accuracy(soft_msgs, msgs_ori).item() * 100
print(f"\nBinary message extracted with {bit_acc:.1f}% bit accuracy")

## Run other baselines

To download other checkpoints, you can run the following command:

In [None]:
!pip install huggingface_hub
!huggingface-cli download tangtianzhong/img-wm-torchscript --cache-dir .cache
!mkdir ckpts
!cp .cache/models--tangtianzhong--img-wm-torchscript/snapshots/845dc751783db2a03a4b14ea600b0a4a9aba89aa/*.pt ckpts/
!rm -rf .cache

Fetching 11 files:   0%|                                 | 0/11 [00:00<?, ?it/s]Downloading '.gitattributes' to '.cache/models--tangtianzhong--img-wm-torchscript/blobs/a6344aac8c09253b3b630fb776ae94478aa0275b.incomplete'
Downloading 'cin_nsm_decoder.pt' to '.cache/models--tangtianzhong--img-wm-torchscript/blobs/60cbc933a974f0258d3cfa947404c0e92027a29c3769edba34d90718301f9e8d.incomplete'
Downloading 'trustmark_decoder_q.pt' to '.cache/models--tangtianzhong--img-wm-torchscript/blobs/c1e22e4a12c095e6a8f59c0b4fe35f2c09a6a071a62548b1fd6aae37f6fd85ef.incomplete'
Downloading 'hidden_encoder_48b.pt' to '.cache/models--tangtianzhong--img-wm-torchscript/blobs/6a7c78241837a455db3d160134fa25e60ec225b548b5d6aab69a30cd3f7b19c3.incomplete'
Downloading 'mbrs_256_m256_decoder.pt' to '.cache/models--tangtianzhong--img-wm-torchscript/blobs/9835526c99330b84f4910ab79f64d6da398ac2eaed9731f14b8e8707d775a70d.incomplete'
Downloading 'cin_nsm_encoder.pt' to '.cache/models--tangtianzhong--img-wm-torchscript/blob

In [None]:
from videoseal.utils.cfg import setup_model_from_checkpoint

model = setup_model_from_checkpoint("baseline/trustmark")
model = model.eval()
model = model.to(device)
model.compile()

model.chunk_size = 32  # embed 32 frames/imgs at a time
model.step_size = 4  # propagate the wm to 4 next frame/img
# model.blender.scaling_w *= 1.5  # imperceptibility/robustness trade-off

### Embedding

In [None]:
# Path to the input video
video_path = "./assets/videos/1.mp4"

# Create the output directory and path
output_dir = "./outputs"
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, os.path.basename(video_path))

# Embed the watermark inside the video with a random msg
msgs_ori = embed_video(model, video_path, output_path, 16)
print(f"\nSaved watermarked video to {output_path}")

### Extraction

In [None]:
# Detect the watermark
num_frames_for_extraction = 32
soft_msgs = detect_video(model, output_path, num_frames_for_extraction, 16)
bit_acc = bit_accuracy(soft_msgs, msgs_ori).item() * 100
print(f"\nBinary message extracted with {bit_acc:.1f}% bit accuracy")