In [1]:
import numpy as np
import cv2
from PIL import Image
import imagehash
from sklearn.decomposition import PCA
from scipy.fftpack import dct
import warnings

In [2]:
# Utility functions for video processing
def open_video(video_path):
    return cv2.VideoCapture(video_path)

def save_video(output_path, fps, frame_size, is_color=True):
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    return cv2.VideoWriter(output_path, fourcc, fps, frame_size, is_color)

def process_and_save_video(video_path, output_path, process_func, is_color=True):
    cap = open_video(video_path)
    out = save_video(output_path, cap.get(cv2.CAP_PROP_FPS),
                     (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))), is_color)

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        processed_frame = process_func(frame)
        out.write(processed_frame)

    cap.release()
    out.release()

In [3]:
# Transformation and Filter functions
def apply_gaussian_blur(frame, kernel_size=(5, 5)):
    return cv2.GaussianBlur(frame, kernel_size, 0)

def apply_sharpening(frame):
    kernel = np.array([[0, -1, 0],
                       [-1, 5, -1],
                       [0, -1, 0]])
    return cv2.filter2D(frame, -1, kernel)

def apply_smoothing(frame, diameter=15, sigma_color=75, sigma_space=75):
    return cv2.bilateralFilter(frame, diameter, sigma_color, sigma_space)

def apply_edge_detection(frame):
    gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    edges = cv2.Canny(gray_frame, 100, 200)
    return cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR)

def apply_color_filtering(frame, lower_bound=(35, 50, 50), upper_bound=(85, 255, 255)):
    hsv_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    mask = cv2.inRange(hsv_frame, lower_bound, upper_bound)
    return cv2.bitwise_and(frame, frame, mask=mask)

def apply_compression(frame, quality=30):
    encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), quality]
    result, encimg = cv2.imencode('.jpg', frame, encode_param)
    return cv2.imdecode(encimg, 1)

def apply_rotation(video_path, output_path, angle=90):
    cap = cv2.VideoCapture(video_path)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')

    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Adjust frame dimensions for 90° or 270° rotation
    if angle in [90, 270]:
        new_width, new_height = height, width
    else:
        new_width, new_height = width, height

    out = cv2.VideoWriter(output_path, fourcc, cap.get(cv2.CAP_PROP_FPS), (new_width, new_height))

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        if angle == 90:
            rotated_frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
        elif angle == 180:
            rotated_frame = cv2.rotate(frame, cv2.ROTATE_180)
        elif angle == 270:
            rotated_frame = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE)
        else:
            rotated_frame = frame

        out.write(rotated_frame)

    cap.release()
    out.release()

def apply_scaling(video_path, output_path, new_size=(320, 240)):
    cap = cv2.VideoCapture(video_path)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, cap.get(cv2.CAP_PROP_FPS), new_size)

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        resized_frame = cv2.resize(frame, new_size)
        out.write(resized_frame)

    cap.release()
    out.release()

def apply_cropping(video_path, output_path, crop_rect=(50, 50, 200, 200)):
    cap = cv2.VideoCapture(video_path)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')

    crop_width = crop_rect[2]
    crop_height = crop_rect[3]

    out = cv2.VideoWriter(output_path, fourcc, cap.get(cv2.CAP_PROP_FPS), (crop_width, crop_height))

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        cropped_frame = frame[crop_rect[1]:crop_rect[1] + crop_rect[3], crop_rect[0]:crop_rect[0] + crop_rect[2]]
        out.write(cropped_frame)

    cap.release()
    out.release()

def loop_video(input_path, output_path, loop_count=3):
    cap = cv2.VideoCapture(input_path)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    frames = []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(frame)

    cap.release()

    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    for _ in range(loop_count):
        for frame in frames:
            out.write(frame)

    out.release()

def apply_translation(video_path, output_path, tx=30, ty=30):
    cap = cv2.VideoCapture(video_path)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    out = cv2.VideoWriter(output_path, fourcc, cap.get(cv2.CAP_PROP_FPS), (width, height))

    # Define translation matrix
    M = np.float32([[1, 0, tx], [0, 1, ty]])

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Apply the translation
        translated_frame = cv2.warpAffine(frame, M, (width, height))
        out.write(translated_frame)

    cap.release()
    out.release()

def apply_zoom(video_path, output_path, zoom_factor=1.5):
    cap = cv2.VideoCapture(video_path)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    out = cv2.VideoWriter(output_path, fourcc, cap.get(cv2.CAP_PROP_FPS), (width, height))

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Calculate center crop dimensions
        center_x, center_y = width // 2, height // 2
        crop_width, crop_height = int(width / zoom_factor), int(height / zoom_factor)
        crop_x1 = max(0, center_x - crop_width // 2)
        crop_x2 = min(width, center_x + crop_width // 2)
        crop_y1 = max(0, center_y - crop_height // 2)
        crop_y2 = min(height, center_y + crop_height // 2)

        # Crop and resize to original size
        cropped_frame = frame[crop_y1:crop_y2, crop_x1:crop_x2]
        zoomed_frame = cv2.resize(cropped_frame, (width, height))

        out.write(zoomed_frame)

    cap.release()
    out.release()

def apply_affine_transformation(video_path, output_path):
    cap = cv2.VideoCapture(video_path)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    out = cv2.VideoWriter(output_path, fourcc, cap.get(cv2.CAP_PROP_FPS), (width, height))

    # Define affine transformation matrix
    pts1 = np.float32([[50, 50], [200, 50], [50, 200]])
    pts2 = np.float32([[10, 100], [200, 50], [100, 250]])
    M = cv2.getAffineTransform(pts1, pts2)

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Apply the affine transformation
        affine_transformed_frame = cv2.warpAffine(frame, M, (width, height))
        out.write(affine_transformed_frame)

    cap.release()
    out.release()

def apply_perspective_transformation(video_path, output_path):
    cap = open_video(video_path)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = save_video(output_path, cap.get(cv2.CAP_PROP_FPS), (width, height))

    # Define points for perspective transformation
    pts1 = np.float32([[0, 0], [width - 1, 0], [0, height - 1], [width - 1, height - 1]])
    pts2 = np.float32([[0, 0], [width - 100, 50], [50, height - 50], [width - 50, height - 100]])
    M = cv2.getPerspectiveTransform(pts1, pts2)

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Apply perspective transformation
        perspective_frame = cv2.warpPerspective(frame, M, (width, height))
        out.write(perspective_frame)

    cap.release()
    out.release()

def apply_frame_dropping(input_path, output_path, drop_rate):
    cap = cv2.VideoCapture(input_path)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    fps = cap.get(cv2.CAP_PROP_FPS) / drop_rate
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        if frame_count % drop_rate == 0:
            out.write(frame)
        frame_count += 1

    cap.release()
    out.release()

def apply_frame_rate_conversion(input_path, output_path, new_fps):
    cap = cv2.VideoCapture(input_path)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    out = cv2.VideoWriter(output_path, fourcc, new_fps, (width, height))

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        out.write(frame)

    cap.release()
    out.release()

def apply_speed_manipulation(input_path, output_path, speed_factor):
    cap = cv2.VideoCapture(input_path)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    fps = cap.get(cv2.CAP_PROP_FPS) * speed_factor
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        out.write(frame)

    cap.release()
    out.release()

In [4]:
# Hashing functions
def phash(frame):
    pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    return str(imagehash.phash(pil_image))

def whash(frame):
    pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    return str(imagehash.whash(pil_image))

def ahash(frame):
    pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    return str(imagehash.average_hash(pil_image))

def dhash(frame):
    pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    return str(imagehash.dhash(pil_image))

def kl_hash(frame, hash_size=8):
    gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    resized_frame = cv2.resize(gray_frame, (64, 64), interpolation=cv2.INTER_LANCZOS4)
    pixels = resized_frame.astype(float)

    pca = PCA(n_components=hash_size)
    with warnings.catch_warnings():
        warnings.simplefilter("ignore", category=RuntimeWarning)
        pca_transformed = pca.fit_transform(pixels)

    if np.isnan(pca_transformed).any():
        pca_transformed = np.nan_to_num(pca_transformed)

    dct_transformed = dct(dct(pca_transformed, axis=0), axis=1)
    dct_low_freq = dct_transformed[:hash_size, :hash_size]
    dct_median_val = np.median(dct_low_freq)
    dct_hash_array = (dct_low_freq > dct_median_val).astype(int)
    dct_hash_string = ''.join(str(x) for x in dct_hash_array.flatten())
    dct_hash_hex = hex(int(dct_hash_string, 2))[2:].zfill(hash_size * hash_size // 4)

    return dct_hash_hex

# Function to process and compute hashes for video frames
def process_video_and_compute_hashes(video_path, hash_func):
    cap = open_video(video_path)
    video_hashes = []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frame_hash = hash_func(frame)
        video_hashes.append(frame_hash)

    cap.release()
    return video_hashes

# Similarity calculation functions
def hex_to_bin(hex_str):
    return bin(int(hex_str, 16))[2:].zfill(256)

def hamming_distance(bin_str1, bin_str2):
    return sum(c1 != c2 for c1, c2 in zip(bin_str1, bin_str2))

def similarity_score(original_hashes, edited_hashes):
    scores = []
    for original, edited in zip(original_hashes, edited_hashes):
        bin_original = hex_to_bin(original)
        bin_edited = hex_to_bin(edited)
        distance = hamming_distance(bin_original, bin_edited)
        score = 1 - (distance / len(bin_original))
        scores.append(score)
    return scores

def compute_similarity_scores(original_hashes, modified_hashes):
    if not original_hashes or not modified_hashes:
        print("Error: One or both sets of hashes are empty.")
        return [], 0.0

    if len(original_hashes) != len(modified_hashes):
        print("Warning: The number of hashes in the original and modified videos do not match.")
        return [], 0.0

    scores = similarity_score(original_hashes, modified_hashes)
    average_score = sum(scores) / len(scores) if scores else 0.0
    return scores, average_score

In [5]:
# Example usage for computing different hashes and transformations
video_path = r"C:\Users\prias\Downloads\THPH\CGI Animated Short Film_ _Watermelon A Cautionary Tale_ by Kefei Li & Connie Qin He _ CGMeetup.mp4"
transformations = {
    'Blurred': (apply_gaussian_blur, 'blurred_video.mp4'),
    'Sharpened': (apply_sharpening, 'sharpened_video.mp4'),
    'Smoothed': (apply_smoothing, 'smoothed_video.mp4'),
    'Edges': (apply_edge_detection, 'edges_video.mp4'),
    'Color Filtered': (apply_color_filtering, 'color_filtered_video.mp4'),
    'Compressed': (lambda frame: apply_compression(frame, quality=30), 'compressed_video.mp4'),
    'Rotated': (lambda video, out: apply_rotation(video, out, angle=90), 'rotated_video_90.mp4'),
    'Resized': (lambda video, out: apply_scaling(video, out, new_size=(320, 240)), 'resized_video.mp4'),
    'Cropped': (lambda video, out: apply_cropping(video, out, crop_rect=(50, 50, 200, 200)), 'cropped_video.mp4'),
    'Flipped Horizontal': (lambda frame: cv2.flip(frame, 1), 'flipped_video_horizontal.mp4'),
    'Flipped Vertical': (lambda frame: cv2.flip(frame, 0), 'flipped_video_vertical.mp4'),
    'Translated': (lambda video, out: apply_translation(video, out, tx=30, ty=30), 'translated_video.mp4'),
    'Zoomed': (lambda video, out: apply_zoom(video, out, zoom_factor=1.5), 'zoomed_video.mp4'),
    'Affine Transformed': (apply_affine_transformation, 'affine_transformed_video.mp4'),
    'Perspective Transformed': (apply_perspective_transformation, 'perspective_transformed_video.mp4'),
    'Frame Dropped': (lambda video, out: apply_frame_dropping(video, out, drop_rate=2), 'frame_dropped_video.mp4'),
    'Frame Rate Converted': (lambda video, out: apply_frame_rate_conversion(video, out, new_fps=15), 'frame_rate_converted_video.mp4'),
    'Slowed Down': (lambda video, out: apply_speed_manipulation(video, out, speed_factor=0.5), 'slowed_down_video.mp4'),
    'Looped': (lambda video, out: loop_video(video, out, loop_count=3), 'looped_video.mp4')
}

In [6]:
# Compute the hashes for the original video
original_hashes = {
    'pHash': process_video_and_compute_hashes(video_path, phash),
    'wHash': process_video_and_compute_hashes(video_path, whash),
    'aHash': process_video_and_compute_hashes(video_path, ahash),
    'dHash': process_video_and_compute_hashes(video_path, dhash),
    'KL Hash': process_video_and_compute_hashes(video_path, kl_hash)
}

In [7]:
# Apply transformations and compute the hashes for each transformation
for transformation, (func, output_path) in transformations.items():
    print(f"Applying {transformation} transformation...")
    if transformation in ['Rotated', 'Resized', 'Cropped', 'Translated', 'Zoomed', 'Affine Transformed', 'Perspective Transformed', 'Frame Dropped', 'Frame Rate Converted', 'Slowed Down', 'Looped']:
        # These need specific handling as they are video level transformations
        func(video_path, output_path)
    else:
        # Frame-level transformations
        process_and_save_video(video_path, output_path, func)

    # Compute hashes for the transformed video
    modified_hashes = {
        'pHash': process_video_and_compute_hashes(output_path, phash),
        'wHash': process_video_and_compute_hashes(output_path, whash),
        'aHash': process_video_and_compute_hashes(output_path, ahash),
        'dHash': process_video_and_compute_hashes(output_path, dhash),
        'KL Hash': process_video_and_compute_hashes(output_path, kl_hash)
    }

    # Compare and compute similarity scores
    for hash_type in original_hashes.keys():
        print(f"Comparing original and {transformation} {hash_type} hashes:")
        scores, avg_score = compute_similarity_scores(original_hashes[hash_type], modified_hashes[hash_type])
        if scores:
            #print(f"{transformation} {hash_type} Similarity Scores:", scores)
            print(f"Average {transformation} {hash_type} Similarity:", avg_score)
        else:
            print(f"Could not compute similarity scores for {transformation} using {hash_type}.")

Applying Blurred transformation...
Comparing original and Blurred pHash hashes:
Average Blurred pHash Similarity: 0.998849852071006
Comparing original and Blurred wHash hashes:
Average Blurred wHash Similarity: 0.9972670118343195
Comparing original and Blurred aHash hashes:
Average Blurred aHash Similarity: 0.999060650887574
Comparing original and Blurred dHash hashes:
Average Blurred dHash Similarity: 0.9984291789940828
Comparing original and Blurred KL Hash hashes:
Average Blurred KL Hash Similarity: 0.9712629437869823
Applying Sharpened transformation...
Comparing original and Sharpened pHash hashes:
Average Sharpened pHash Similarity: 0.9976793639053254
Comparing original and Sharpened wHash hashes:
Average Sharpened wHash Similarity: 0.9961621671597634
Comparing original and Sharpened aHash hashes:
Average Sharpened aHash Similarity: 0.998542899408284
Comparing original and Sharpened dHash hashes:
Average Sharpened dHash Similarity: 0.9974519230769231
Comparing original and Sharpe

Average Perspective Transformed dHash Similarity: 0.9067464866863906
Comparing original and Perspective Transformed KL Hash hashes:
Average Perspective Transformed KL Hash Similarity: 0.8805306952662721
Applying Frame Dropped transformation...
Comparing original and Frame Dropped pHash hashes:
Could not compute similarity scores for Frame Dropped using pHash.
Comparing original and Frame Dropped wHash hashes:
Could not compute similarity scores for Frame Dropped using wHash.
Comparing original and Frame Dropped aHash hashes:
Could not compute similarity scores for Frame Dropped using aHash.
Comparing original and Frame Dropped dHash hashes:
Could not compute similarity scores for Frame Dropped using dHash.
Comparing original and Frame Dropped KL Hash hashes:
Could not compute similarity scores for Frame Dropped using KL Hash.
Applying Frame Rate Converted transformation...
Comparing original and Frame Rate Converted pHash hashes:
Average Frame Rate Converted pHash Similarity: 0.9987925