In [60]:
import numpy as np
import cv2
import dtcwt
import random


# Global Parameter

In [61]:
step = 5
alpha = 5
lowpass_str = 5
highpass_str = 5
embed_channel = 1 # 1: u; 2: v
code_length = 60
bit_to_pixel = 2 # each bit will generate bit_to_pixel*bit_to_pixel pixels
wm_level = 4
key = 66666
random_placement_key = 4827
coverimgpath = "frame_0000.png"
# coverimgpath = "/mnt/ssd1/H264_dirty_detect/video/image/life_0.png"


# Filename

In [62]:
# calculate the wm size that this cover image can afford 
image = cv2.imread(coverimgpath)
# this is equal to celling(h/8)
wm_w = (((((image.shape[0] + 1) // 2 + 1) // 2 + 1) // 2 + 1) // 2 + 1) // 2 
wm_h = (((((image.shape[1] + 1) // 2 + 1) // 2 + 1) // 2 + 1) // 2 + 1) // 2
if wm_w % 2 == 1:
    wm_w += 1
if wm_h % 2 == 1:
    wm_h += 1

print(f'{wm_w}  {wm_h}')

# wm_w = 11
# wm_h = 11

34  60


# Generate Key and corresponding wm

In [63]:
def generate_random_binary_string(length, random_seed=None):
    if random_seed is not None:
        random.seed(random_seed)
    
    binary_string = ''.join(random.choice('10') for _ in range(length))
    return binary_string

def generate_image(random_binary_string, width, height, n, output_filename):
    # Create a black image
    image = np.zeros((height, width, 3), dtype=np.uint8)

    # Initialize variables for tracking the position in the binary string
    index = 0
    row = 0
    col = 0

    # Loop until the image is completely filled
    while row + n <= height:
        # Check if we have scanned all bits; if so, reset the index
        if index >= len(random_binary_string):
            index = 0

        # Determine the color based on the current bit (0 or 1)
        color = (0, 0, 0) if random_binary_string[index] == '0' else (255, 255, 255)

        # Move to the next position
        if col + n <= width:
            # Place the nxn block
            for i in range(n):
                for j in range(n):
                    x = col + j
                    y = row + i
                    image[y, x] = color
            col += n
            index += 1
        else:
            # print(index)
            col = 0
            row += n


    # Save the generated image using OpenCV
    cv2.imwrite(output_filename, image)

def get_random_pos(big_matrix_shape, small_matrices, seed=None):
    # Set seed for reproducibility
    random.seed(seed)
    # Keep track of occupied positions
    occupied = np.zeros(big_matrix_shape, dtype=bool)

    random_position_list = []
    
    for small_matrix in small_matrices:
        small_height, small_width = small_matrix.shape
        
        # Get all possible positions for this small matrix
        possible_positions = [(i, j) for i in range(big_matrix_shape[0] - small_height + 1) 
                                      for j in range(big_matrix_shape[1] - small_width + 1)
                                      if not np.any(occupied[i:i+small_height, j:j+small_width])]
        

        # If no possible position, return an error
        if not possible_positions:
            raise ValueError("No space left for the matrix of shape {}".format(small_matrix.shape))
        
        # Randomly select one position
        chosen_position = random.choice(possible_positions)
        
        # Place the small matrix at the chosen position
        # big_matrix[chosen_position[0]:chosen_position[0]+small_height, 
        #            chosen_position[1]:chosen_position[1]+small_width] = small_matrix

        random_position_list.append((chosen_position[0], chosen_position[0]+small_height, chosen_position[1], chosen_position[1]+small_width))
        
        # Mark the position as occupied
        occupied[chosen_position[0]:chosen_position[0]+small_height, 
                 chosen_position[1]:chosen_position[1]+small_width] = True
        
    return random_position_list

def place_random_pos(big_matrix, small_matrices, random_pos):
    
    for idx, small_matrix in enumerate(small_matrices):
        small_height, small_width = small_matrix.shape
        
        # Place the small matrix at the chosen position
        big_matrix[random_pos[idx][0]:random_pos[idx][1], 
                   random_pos[idx][2]:random_pos[idx][3]] = small_matrix

        
    return big_matrix


In [64]:
random_binary_string = generate_random_binary_string(code_length, key)
generate_image(random_binary_string, wm_h, wm_w, bit_to_pixel, "image/wm.png")
# print(random_binary_string)


# utility function

In [74]:
def rebin(a, shape):
    if a.shape[0] % 2 == 1:
        a = np.vstack((a, np.zeros((1, a.shape[1]))))
    sh = shape[0], a.shape[0] // shape[0], shape[1], a.shape[1] // shape[1]
    # print(sh)
    return a.reshape(sh).mean(-1).mean(1)

def generate_staggered_array(rows, cols, output_filename):
    indices = np.arange(rows)[:, np.newaxis] + np.arange(cols)
    staggered_array = indices % 2

    staggered_array = staggered_array * 255
    cv2.imwrite(output_filename, staggered_array)

    # return staggered_array

def recover_string_from_image(n, original_length, gray_image):
    # Initialize variables for position tracking
    row, col = 0, 0

    # Use a list to store the detected strings in multiple passes
    detected_strings = []

    while row + n <= gray_image.shape[0]:  # Ensure we don't go beyond image boundaries
        # Initialize an empty string for the current pass
        current_string = ""

        while len(current_string) < original_length and row + n <= gray_image.shape[0]:
            # Extract the n x n block
            block = gray_image[row : row + n, col : col + n]

            avg_color = np.mean(block)

            # current_string += '0' if avg_color < 127.5 else '1'

            black_appear = 0
            look_all_black = True
            for i in range(n):
                for j in range(n):
                    if block[i, j] < 20:
                        black_appear += 1
                    if block[i, j] > 50:
                        look_all_black = False

            # if look_all_black:
            #     current_string += "0"
            # elif black_appear > 0 and avg_color < 127.5:
            #     current_string += "0"
            # else:
            #     current_string += "1"
            if black_appear > 0 and avg_color < 127.5:
                current_string += "0"
            else:
                current_string += "1"

            # Move to the next position
            col += n
            if col + n > gray_image.shape[1]:  # If we are at the end of a row
                col = 0
                row += n

        while len(current_string) < original_length:
            current_string += "5"

        detected_strings.append(current_string)

    # Calculate the final binary string using voting for each position
    final_string = ""
    for i in range(original_length):
        ones = sum([1 for s in detected_strings if s[i] == "1"])
        zeros = sum([1 for s in detected_strings if s[i] == "0"])

        # If there are more ones than zeros, append '1', else append '0'.
        # If there's a tie, append a random choice
        if ones > zeros:
            final_string += "1"
        elif zeros > ones:
            final_string += "0"
        else:
            final_string += np.random.choice(["0", "1"])

    return final_string

def embed_frame(frame, wm_coeffs):
    img = cv2.cvtColor(frame.astype(np.float32), cv2.COLOR_BGR2YUV)

    # # DTCWT for V channel
    v_transform = dtcwt.Transform2d()
    v_coeffs = v_transform.forward(img[:, :, 2], nlevels=3)

    # DTCWT for U channel
    img_transform = dtcwt.Transform2d()
    img_coeffs = img_transform.forward(img[:, :, 1], nlevels=3)

    # DTCWT for Y Channel
    y_transform = dtcwt.Transform2d()
    y_coeffs = y_transform.forward(img[:, :, 0], nlevels=3)

    # # Masks for the level 3 subbands
    masks3 = [0 for i in range(6)]
    shape3 = y_coeffs.highpasses[2][:, :, 0].shape

    # embed watermark highpass into U channel's highpass
    for i in range(6):
        masks3[i] = cv2.filter2D(
            np.abs(y_coeffs.highpasses[1][:, :, i]),
            -1,
            np.array([[1 / 4, 1 / 4], [1 / 4, 1 / 4]]),
        )
        masks3[i] = np.ceil(rebin(masks3[i], shape3) * (1 / step))
        # print(masks3[i])
        masks3[i] *= 1.0 / max(12.0, np.amax(masks3[i]))
        # print(masks3[i].shape) (135, 240)

    small_matrixs = []
    for lv in range(wm_level):
        # 4 times redudant in each level
        small_matrixs.append(wm_coeffs.highpasses[lv][:, :, 0])
        small_matrixs.append(wm_coeffs.highpasses[lv][:, :, 0])
        small_matrixs.append(wm_coeffs.highpasses[lv][:, :, 0])
        small_matrixs.append(wm_coeffs.highpasses[lv][:, :, 0])

    random_positions = get_random_pos(masks3[0].shape, small_matrixs, random_placement_key)
        
    for i in range(6):
        coeffs = np.zeros(masks3[i].shape, dtype="complex_")

        small_matrixs = []
        for lv in range(wm_level):
            small_matrixs.append(wm_coeffs.highpasses[lv][:, :, i])
            small_matrixs.append(wm_coeffs.highpasses[lv][:, :, i])
            small_matrixs.append(wm_coeffs.highpasses[lv][:, :, i])
            small_matrixs.append(wm_coeffs.highpasses[lv][:, :, i])

        coeffs = place_random_pos(coeffs, small_matrixs, random_positions)
        img_coeffs.highpasses[2][:, :, i] += highpass_str * (masks3[i] * coeffs)

    ### embed watermark lowpass into V channel's highpass[2] (highpass as mask)
    lowpass_masks = [0 for i in range(6)]

    for i in range(4):
        lowpass_masks[i] = cv2.filter2D(
            np.abs(
                np.abs(y_coeffs.highpasses[1][:, :, i]),
            ),
            -1,
            np.array([[1 / 4, 1 / 4], [1 / 4, 1 / 4]]),
        )
        lowpass_masks[i] = np.ceil(
            rebin(lowpass_masks[i], v_coeffs.highpasses[2].shape) * (1 / step)
        )
        lowpass_masks[i] *= 1.0 / max(12.0, np.amax(lowpass_masks[i]))

    lv1_h = wm_coeffs.highpasses[0][:, :, i].shape[0]
    lv1_w = wm_coeffs.highpasses[0][:, :, i].shape[1]
    for i in range(4):
        coeff = wm_coeffs.lowpass
        # print(coeff.shape) (68, 120)
        h, w = coeff.shape
        coeffs = np.zeros(lowpass_masks[i].shape)
        coeffs[2*lv1_h:2*lv1_h + h, 2*lv1_w:2*lv1_w + w] = coeff
        v_coeffs.highpasses[2][:, :, i] += lowpass_str * (lowpass_masks[i] * coeffs)

    img[:, :, 1] = img_transform.inverse(img_coeffs)
    img[:, :, 2] = img_transform.inverse(v_coeffs)
    ### embed watermark lowpass into V channel's highpass[2] end

    img = cv2.cvtColor(img, cv2.COLOR_YUV2BGR)
    img = np.clip(img, a_min=0, a_max=255)
    img = np.around(img).astype(np.uint8)

    return img

def decode_frame(wmed_img):
    wmed_img = cv2.cvtColor(wmed_img.astype(np.float32), cv2.COLOR_BGR2YUV)

    wmed_transform = dtcwt.Transform2d()
    wmed_coeffs = wmed_transform.forward(wmed_img[:, :, 1], nlevels=3)

    y_transform = dtcwt.Transform2d()
    y_coeffs = y_transform.forward(wmed_img[:, :, 0], nlevels=3)

    v_transform = dtcwt.Transform2d()
    v_coeffs = v_transform.forward(wmed_img[:, :, 2], nlevels=3)

    masks3 = [0 for i in range(6)]
    inv_masks3 = [0 for i in range(6)]
    shape3 = y_coeffs.highpasses[2][:, :, 0].shape
    for i in range(6):
        masks3[i] = cv2.filter2D(
            np.abs(y_coeffs.highpasses[1][:, :, i]),
            -1,
            np.array([[1 / 4, 1 / 4], [1 / 4, 1 / 4]]),
        )
        masks3[i] = np.ceil(rebin(masks3[i], shape3) * (1.0 / step))
        masks3[i][masks3[i] == 0] = 0.01
        masks3[i] *= 1.0 / max(12.0, np.amax(masks3[i]))
        inv_masks3[i] = 1.0 / masks3[i]

    shape = wmed_coeffs.highpasses[2][:, :, i].shape

    my_highpass = []
    h, w = (((shape[0] + 1) // 2 + 1) // 2 + 1) // 2, (((shape[1] + 1) // 2 + 1) // 2 + 1) // 2
    
    for i in range(wm_level):
        my_highpass.append(np.zeros((h, w, 6), dtype="complex_"))
        h = (h + 1) // 2 
        w = (w + 1) // 2
        # print(my_highpass[i].shape)


    small_matrixs = []
    for lv in range(wm_level):
        # 4 times redudant in each level
        small_matrixs.append(my_highpass[lv][:, :, 0])
        small_matrixs.append(my_highpass[lv][:, :, 0])
        small_matrixs.append(my_highpass[lv][:, :, 0])
        small_matrixs.append(my_highpass[lv][:, :, 0])

    random_positions = get_random_pos(inv_masks3[0].shape, small_matrixs, random_placement_key)

    for i in range(6):
        coeff = (wmed_coeffs.highpasses[2][:, :, i]) * inv_masks3[i] * 1 / highpass_str

        for lv in range(wm_level):
            my_highpass[lv][:, :, i] = (
                coeff[random_positions[lv * 4][0]: random_positions[lv * 4][1], random_positions[lv * 4][2]: random_positions[lv * 4][3]] + 
                coeff[random_positions[lv * 4 + 1][0]: random_positions[lv * 4 + 1][1], random_positions[lv * 4 + 1][2]: random_positions[lv * 4 + 1][3]] + 
                coeff[random_positions[lv * 4 + 2][0]: random_positions[lv * 4 + 2][1], random_positions[lv * 4 + 2][2]: random_positions[lv * 4 + 2][3]] + 
                coeff[random_positions[lv * 4 + 3][0]: random_positions[lv * 4 + 3][1], random_positions[lv * 4 + 3][2]: random_positions[lv * 4 + 3][3]]
            ) 

    highpasses = tuple(my_highpass)


    ### extract watermark lowpass into V channel's highpass[2] (highpass as mask)
    lowpass_masks = [0 for i in range(6)]
    inv_lowpass_masks = [0 for i in range(6)]

    for i in range(4):
        lowpass_masks[i] = cv2.filter2D(
            np.abs(
                np.abs(y_coeffs.highpasses[1][:, :, i]),
            ),
            -1,
            np.array([[1 / 4, 1 / 4], [1 / 4, 1 / 4]]),
        )
        lowpass_masks[i] = np.ceil(
            rebin(lowpass_masks[i], v_coeffs.highpasses[2].shape) * (1 / step)
        )
        lowpass_masks[i] *= 1.0 / max(12.0, np.amax(lowpass_masks[i]))
        lowpass_masks[i][lowpass_masks[i] == 0] = 0.01
        inv_lowpass_masks[i] = 1.0 / lowpass_masks[i]

    
    lowpass = np.zeros((my_highpass[-1].shape[0] * 2, my_highpass[-1].shape[1] * 2), dtype="complex_")

    for i in range(4):
        coeff = (v_coeffs.highpasses[2][:, :, i]) * inv_masks3[i] * 1 / highpass_str
        lowpass[:, :] += coeff[2*my_highpass[0].shape[0]: 2*my_highpass[0].shape[0] + 2 * my_highpass[-1].shape[0], 2*my_highpass[0].shape[1]: 2*my_highpass[0].shape[1] + 2 * my_highpass[-1].shape[1]]


    lowpass = lowpass.real.astype(np.float32)

    t = dtcwt.Transform2d()
    wm = t.inverse(dtcwt.Pyramid(lowpass, highpasses))

    recovered_string = recover_string_from_image(bit_to_pixel, code_length, wm)
    return recovered_string, wm


# Encode a watermark.png into a cover image

In [66]:
coverimgpath = "frame_0000.png"
img = cv2.imread(coverimgpath)

wm = cv2.imread("image/wm.png")
wm = cv2.cvtColor(wm, cv2.COLOR_BGR2GRAY)
wm_transform = dtcwt.Transform2d()
wm_coeffs = wm_transform.forward(wm, nlevels=wm_level)


wm_img = embed_frame(img, wm_coeffs)
cv2.imwrite(f"image/wm_img.png", wm_img)


True

# Decode the watermark for wm_img.png

In [75]:
wm_image_path = "image/wm_img.png"
wmed_img = cv2.imread(wm_image_path)
recovered_string, recover_wm = decode_frame(wmed_img)

# print(recover_wm.shape)
# print(recovered_string)
cv2.imwrite(f'image/wm_extract.png', recover_wm)

True

# Recover key from wm_extract.png

In [14]:
# def count_differences(original_string, recovered_string):
#     # Ensure both strings are of the same length
#     if len(original_string) != len(recovered_string):
#         raise ValueError("Both strings must have the same length.")

#     # Find the indices of differences
#     diff_indices = [i for i, (orig, rec) in enumerate(zip(original_string, recovered_string)) if orig != rec]
    
#     # Print the differing indices
#     if diff_indices:
#         print(f"Differences found at indices: {', '.join(map(str, diff_indices))}")
#         for idx in diff_indices:
#             print(f'{original_string[idx] }', end=' ')
#         print()
#     else:
#         print("No differences found.")
    
#     return len(diff_indices)

# generate_image(recovered_string, wm_h, wm_w, bit_to_pixel, "recovery_wm.png")
# print(f'differ length: {count_differences(random_binary_string, recovered_string)}')

In [15]:
# print(recovered_string)
# print(random_binary_string)
# print(recovered_string[19])
# print(random_binary_string[19])

In [16]:
# test_wm_path = "/mnt/ssd1/H264_dirty_detect/Experiment/wm/speed_bag_300_wm_k2486_cl60_wl4_crf1.png"

# test_wm = cv2.imread(test_wm_path)
# test_wm = cv2.cvtColor(test_wm, cv2.COLOR_BGR2GRAY)

# test_recovered_string = recover_string_from_image(bit_to_pixel, code_length, test_wm)
# generate_image(test_recovered_string, wm_h, wm_w, bit_to_pixel, "recovery_wm.png")
# print(f'differ length: {count_differences(random_binary_string, test_recovered_string)}')

# PSNR SSIM

In [17]:
# !ffmpeg -i frame_0000.png -i wm_img.png -filter_complex "psnr" -f null - 

In [18]:
# ! ffmpeg-quality-metrics video/life_300_wm.mp4 ../video/life_300.mp4 --metrics psnr ssim > result/life_300_wm_2d.json
# ! ffmpeg-quality-metrics video/park_joy_300_wm.mp4 ../video/park_joy_300.mp4 --metrics psnr ssim > result/park_joy_300_wm_2d.json
# ! ffmpeg-quality-metrics video/pedestrian_area_300_wm.mp4 ../video/pedestrian_area_300.mp4 --metrics psnr ssim > result/pedestrian_area_300_wm_2d.json
# ! ffmpeg-quality-metrics video/speed_bag_300_wm.mp4 ../video/speed_bag_300.mp4 --metrics psnr ssim > result/speed_bag_300_wm_2d.json


# ! ffmpeg-quality-metrics ../video/life_300_wm_k2486_cl60_wl4.mp4 ../video/life_300.mp4 --metrics psnr ssim > result/life_300_wm_k2486_cl60_wl4.json
# ! ffmpeg-quality-metrics ../video/park_joy_300_wm_k2486_cl60_wl4.mp4 ../video/park_joy_300.mp4 --metrics psnr ssim > result/park_joy_300_wm_k2486_cl60_wl4.json
# ! ffmpeg-quality-metrics ../video/pedestrian_area_300_wm_k2486_cl60_wl4.mp4 ../video/pedestrian_area_300.mp4 --metrics psnr ssim > result/pedestrian_area_300_wm_k2486_cl60_wl4.json
# ! ffmpeg-quality-metrics ../video/speed_bag_300_wm_k2486_cl60_wl4.mp4 ../video/speed_bag_300.mp4 --metrics psnr ssim > result/speed_bag_300_wm_k2486_cl60_wl4.json


# Some tests

In [19]:
# import multiprocessing

# def worker_function(x, y):
#     return x + y

# if __name__ == '__main__':
#     pool = multiprocessing.Pool(processes=4)
#     input_values = [(10, 10), (20, 20), (30, 30), (40, 40), (50, 50), (60, 60)]
#     results = pool.starmap(worker_function, input_values)
    
#     # Results are in the same order as input_values
#     print(results)
    
#     pool.close()
#     pool.join()

In [20]:
# import numpy as np

# # Define your binary strings
# str1 = '10101011111'
# str2 = '10101011011'

# # Convert binary strings to numpy arrays of integers
# arr1 = np.array([int(bit) for bit in str1])
# arr2 = np.array([int(bit) for bit in str2])

# # Calculate the means of the arrays
# mean1 = np.mean(arr1)
# mean2 = np.mean(arr2)

# # Calculate the numerator and denominator
# numerator = np.sum((arr1 - mean1) * (arr2 - mean2))
# denominator = np.sqrt(np.sum((arr1 - mean1) ** 2) * np.sum((arr2 - mean2) ** 2))

# # Calculate the normalized correlation coefficient
# corr_coefficient = numerator / denominator

# print(f"Normalized Correlation Coefficient: {corr_coefficient:.4f}")

In [21]:
# import numpy as np

# def normalized_cross_correlation_zero_lag(A_str, B_str):
#     # Convert strings to numpy arrays
#     A = np.array([int(i) for i in A_str])
#     B = np.array([int(j) for j in B_str])
    
#     # Compute mean values
#     mean_A = A.mean()
#     mean_B = B.mean()

#     numerator = np.sum((A - mean_A) * (B - mean_B))
#     denominator = np.sqrt(np.sum((A - mean_A)**2) * np.sum((B - mean_B)**2))
    
#     if denominator == 0:
#         ncc = 0
#     else:
#         ncc = numerator / denominator

#     return ncc

# # Example usage
# A = "101011"
# B = "101111"


# print(normalized_cross_correlation_zero_lag(A, B))

In [29]:
import numpy as np
import random

def place_matrices(big_matrix, small_matrices, seed=None):
    # Set seed for reproducibility
    random.seed(seed)
    
    # Keep track of occupied positions
    occupied = np.zeros_like(big_matrix, dtype=bool)
    
    for small_matrix in small_matrices:
        small_height, small_width = small_matrix.shape
        
        # Get all possible positions for this small matrix
        possible_positions = [(i, j) for i in range(big_matrix.shape[0] - small_height + 1) 
                                      for j in range(big_matrix.shape[1] - small_width + 1)
                                      if not np.any(occupied[i:i+small_height, j:j+small_width])]
        
        # If no possible position, return an error
        if not possible_positions:
            raise ValueError("No space left for the matrix of shape {}".format(small_matrix.shape))
        
        # Randomly select one position
        chosen_position = random.choice(possible_positions)
        
        # Place the small matrix at the chosen position
        big_matrix[chosen_position[0]:chosen_position[0]+small_height, 
                   chosen_position[1]:chosen_position[1]+small_width] = small_matrix
        
        # Mark the position as occupied
        occupied[chosen_position[0]:chosen_position[0]+small_height, 
                 chosen_position[1]:chosen_position[1]+small_width] = True
        
    return big_matrix

# Example
big_matrix = np.zeros((10, 10))
small_matrices = [np.ones((2, 2)), np.ones((3, 3)) * 2, np.ones((2, 3)) * 3]

result = place_matrices(big_matrix, small_matrices, seed=42)
print(result)


[[0. 2. 2. 2. 0. 0. 0. 0. 0. 0.]
 [0. 2. 2. 2. 0. 1. 1. 0. 0. 0.]
 [0. 2. 2. 2. 0. 1. 1. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 3. 3. 3. 0. 0. 0.]
 [0. 0. 0. 0. 3. 3. 3. 0. 0. 0.]]
