In [None]:
import cv2
import numpy as np
import random
import os
import glob

# Supported video extensions
video_extensions = ['.mp4', '.mov', '.mkv', '.avi', '.flv', '.webm', '.m4v', '.3gp', '.mpeg', '.ts']

input_video_folder = "./Videos/test.avi" 
output_visible_video_path = "./visible_watermarked/output_visible_watermarked.mp4"
output_invisible_video_path = "./invisible_watermarked/output_invisible_watermarked.mp4"
watermark_text = "CrowdGenAI"

def add_visible_watermarks(frame, text, alpha=0.3):
    h, w, _ = frame.shape
    font = cv2.FONT_HERSHEY_SIMPLEX
    font_scale = 1
    thickness = 2
    color = (255, 255, 255)

    watermark_bg = np.zeros_like(frame)
    positions = []
    for i in range(4):  
        y_pos = int(h * (0.1 + 0.2 * i))
        for j in range(3):  
            x_pos = int(w * (0.1 + 0.3 * j)) 
            positions.append((x_pos, y_pos))

    for pos in positions:
        cv2.putText(watermark_bg, text, pos, font, font_scale, color, thickness, cv2.LINE_AA)
    cv2.addWeighted(watermark_bg, alpha, frame, 1 - alpha, 0, frame)

    return frame

def add_invisible_watermark(frame, text):
    h, w, _ = frame.shape
    blank_image = np.zeros((h, w), dtype=np.uint8)
    font = cv2.FONT_HERSHEY_SIMPLEX
    font_scale = 3
    thickness = 5

    positions = []
    for i in range(4): 
        y_pos = int(h * (0.1 + 0.2 * i)) 
        for j in range(3):
            x_pos = int(w * (0.1 + 0.3 * j)) 
            positions.append((x_pos, y_pos))

    for pos in positions:
        cv2.putText(blank_image, text, pos, font, font_scale, 255, thickness, cv2.LINE_AA)

    frame[:, :, 0] = np.bitwise_and(frame[:, :, 0], 0xFE) | (blank_image >> 7)

    return frame

def process_video(video_path, visible_out, invisible_out):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise ValueError(f"Cannot open video file: {video_path}")

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    visible_writer = cv2.VideoWriter(visible_out, fourcc, fps, (width, height))
    invisible_writer = cv2.VideoWriter(invisible_out, fourcc, fps, (width, height))

    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    watermark_frames = set([0, 1, frame_count - 2, frame_count - 1])
    if frame_count > 4:
        random_frames = random.sample(range(2, frame_count - 2), min(3, frame_count - 4))
        watermark_frames.update(random_frames)

    current_frame = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        if current_frame in watermark_frames:
            visible_frame = add_visible_watermarks(frame.copy(), watermark_text)
            invisible_frame = add_invisible_watermark(frame.copy(), watermark_text)
        else:
            visible_frame = frame.copy()
            invisible_frame = frame.copy()

        visible_writer.write(visible_frame)
        invisible_writer.write(invisible_frame)

        current_frame += 1

    cap.release()
    visible_writer.release()
    invisible_writer.release()

def find_video_file(path, extensions):
    if os.path.isfile(path): 
        return path
    elif os.path.isdir(path): 
        video_files = []
        for ext in extensions:
            video_files.extend(glob.glob(os.path.join(path, f"*{ext}")))

        if not video_files:
            raise FileNotFoundError(f"No valid video files found in {path} with extensions: {extensions}")

        return video_files[0]
    else:
        raise FileNotFoundError(f"The path {path} is neither a file nor a directory.")

if __name__ == "__main__":
    try:
        video_file = find_video_file(input_video_folder, video_extensions)

        process_video(
            video_file,
            output_visible_video_path,
            output_invisible_video_path
        )

        print(f"Watermarking completed. Videos saved.\nProcessed video: {video_file}")
    except Exception as e:
        print(f"Error: {e}")


Watermarking completed. Videos saved.
Processed video: ./Videos/test.avi


In [44]:
import cv2
import numpy as np
import os
import glob
from skimage.metrics import structural_similarity as ssim

# Supported video extensions
video_extensions = ['.mp4', '.mov', '.mkv', '.avi', '.flv', '.webm', '.m4v', '.3gp', '.mpeg', '.ts']

def generate_watermark_mask(frame, text):
    h, w, _ = frame.shape
    blank_image = np.zeros_like(frame, dtype=np.uint8)
    font = cv2.FONT_HERSHEY_SIMPLEX
    font_scale = 1
    thickness = 2
    color = (255, 255, 255)
    positions = []
    for i in range(4):
        y_pos = int(h * (0.1 + 0.2 * i))
        for j in range(3):
            x_pos = int(w * (0.1 + 0.3 * j))
            positions.append((x_pos, y_pos))

    for pos in positions:
        cv2.putText(blank_image, text, pos, font, font_scale, color, thickness, cv2.LINE_AA)
    return blank_image

def detect_visible_watermark(frame, text, ssim_threshold=0.90):
    watermark_mask = generate_watermark_mask(frame, text)
    
    # Convert both to grayscale
    frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    mask_gray = cv2.cvtColor(watermark_mask, cv2.COLOR_BGR2GRAY)

    # Compute SSIM
    score, _ = ssim(frame_gray, mask_gray, full=True)
    return score > ssim_threshold


def check_visible_watermark_in_video(video_path, text, ssim_threshold=0.90, frame_sample_rate=10):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise ValueError(f"Cannot open video file: {video_path}")

    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    if frame_count == 0:
        return False
    
    sample_frames = range(0, frame_count, max(1, frame_count // frame_sample_rate))
    for frame_number in sample_frames:
      cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
      ret, frame = cap.read()
      if not ret:
          continue

      if detect_visible_watermark(frame, text, ssim_threshold=ssim_threshold):
          cap.release()
          return True
    cap.release()
    return False

def find_video_file(path, extensions):
    if os.path.isfile(path):
        return path
    elif os.path.isdir(path):
        video_files = []
        for ext in extensions:
            video_files.extend(glob.glob(os.path.join(path, f"*{ext}")))

        if not video_files:
            raise FileNotFoundError(f"No valid video files found in {path} with extensions: {extensions}")

        return video_files[0]
    else:
        raise FileNotFoundError(f"The path {path} is neither a file nor a directory.")


if __name__ == "__main__":
    input_video_path = "invisible_watermarked\output_invisible_watermarked.mp4"
    watermark_text = "CrowdGenAI"

    try:
        video_file = find_video_file(input_video_path, video_extensions)
        watermark_detected = check_visible_watermark_in_video(video_file, watermark_text)
        if watermark_detected:
            print(f"Watermark with text '{watermark_text}' detected in the video: {video_file}")
        else:
            print(f"Watermark with text '{watermark_text}' not detected in the video: {video_file}")
    except Exception as e:
        print(f"Error: {e}")

Watermark with text 'CrowdGenAI' detected in the video: invisible_watermarked\output_invisible_watermarked.mp4
