## Test

In [1]:
import cv2
import os
import pywt
import numpy as np
from util.watermarking import Watermarking
from util.test import eval_metrics
from util.metrics import METRICS
import matplotlib.pyplot as plt

def test(T, K, W, data_path):
    results = {key:[] for key in METRICS.keys()}
    for i, image in enumerate(os.listdir(data_path)):
        if i == 128:
            break
        try:
            I = cv2.imread(data_path+"/"+image)
            result = eval_metrics(T, I, W, K)
            for key in METRICS.keys():
                results[key].append(result[key])
        except Exception as e:
            print(image)
            raise e
    return results

## ADD

In [2]:
"""Additive algorithm for Watermarking"""

class ADD(Watermarking):
    """Class for watermarking for ADD algorithm."""
    def __init__(self):
        self.type = "ADD"

    def enc(self, _i, _w, _k):
        """Encrypts an image _i with a watermark _w and a key _k."""
        _w = cv2.resize(_w, (_i.shape[1], _i.shape[0]))
        alpha = _k
        i_res = _i + alpha*_w
        i_res = np.clip(i_res,0,255)
        i_res = i_res.astype(np.uint8)
        return i_res

    def dec(self, _d, _i, _k):
        """Decrypts a watermark from an image _i with a key _k."""

        alpha = _k
        i_res = _i - _d
        i_res = i_res/alpha
        i_res = np.clip(i_res, 0, 255)
        return i_res


In [3]:
T = ADD()
K = 0.05
W = cv2.imread("watermark.jpg")

results_ADD = test(T, K, W, "100_Image_Dataset")
results_ADD_MEAN = {key:np.mean(results_ADD[key]) for key in METRICS.keys()}
results_ADD_MEAN

{'MSE': 2.6353496666666665,
 'PSNR': 43.925691310152395,
 'SSIM': 0.9953798325994483,
 'BER': 1.1873208888888889}

In [4]:
results_ADD = test(T, K, W, "COCO_Dataset")
results_ADD_MEAN = {key:np.mean(results_ADD[key]) for key in METRICS.keys()}
results_ADD_MEAN

{'MSE': 2.6048239872685186,
 'PSNR': 43.98414050474563,
 'SSIM': 0.9954223710717927,
 'BER': 1.1688138888888888}

## LSB

In [5]:
"""LSB algorithm for watermarking."""

import numpy as np
from util.watermarking import Watermarking
from util.test import encrypt_img
from util.test import eval_metrics

class LSB(Watermarking):
    """Abstract class for watermarking for LSB algorithm."""
    def __init__(self):
        self.type = "LSB"

    def enc(self, _i, _w, _k):
        """Encrypts an image _i with a watermark _w and a key _k."""
        i_flat = _i.flatten()
        w_bits = _w + bytes("$$$",encoding='utf8')
        cnt = 0
        for bit in w_bits:
            for i in range(8):
                bit_val = (bit >> i) & 1
                if bit_val == 1:
                    i_flat[cnt] |= 1
                else:
                    i_flat[cnt] &= 0
                cnt += 1
        i_res = np.resize(i_flat, _i.shape)
        return i_res

    def dec(self, _d, _i, _k):
        """Decrypts an image _d with a key _k."""
        d_flat = _d.flatten()
        cnt = 0
        recovered = bytearray()
        for _ in range(len(d_flat)):
            current_byte = 0
            for j in range(8):
                current_byte |= ((d_flat[cnt] & 1)<<j)
                cnt+=1
                if cnt >= len(d_flat):
                    break
            if cnt >= len(d_flat):
                break
            recovered += int(current_byte).to_bytes(1, 'big')
            if recovered[-3:] == b'$$$':
                break
        return bytes(recovered[:-3])

T = LSB()
K = 0.00
W = b'1101010100101010101001010'
results_LSB = test(T, K, W, "100_Image_Dataset")
results_LSB_MEAN = {key:np.mean(results_ADD[key]) for key in METRICS.keys()}
results_LSB_MEAN

{'MSE': 2.6048239872685186,
 'PSNR': 43.98414050474563,
 'SSIM': 0.9954223710717927,
 'BER': 1.1688138888888888}

In [6]:
results_LSB = test(T, K, W, "COCO_Dataset")
results_LSB_MEAN = {key:np.mean(results_ADD[key]) for key in METRICS.keys()}
results_LSB_MEAN

{'MSE': 2.6048239872685186,
 'PSNR': 43.98414050474563,
 'SSIM': 0.9954223710717927,
 'BER': 1.1688138888888888}

## DCT

In [8]:
"""DFT watermarking algorithm."""

quant = np.array([[16,11,10,16,24,40,51,61],      
                    [12,12,14,19,26,58,60,55],    
                    [14,13,16,24,40,57,69,56],
                    [14,17,22,29,51,87,80,62],
                    [18,22,37,56,68,109,103,77],
                    [24,35,55,64,81,104,113,92],
                    [49,64,78,87,103,121,120,101],
                    [72,92,95,98,112,100,103,99]])

class DCT(Watermarking):
    """Class for watermarking for DCT algorithm."""

    def __init__(self):
        self.type = "DCT"
        self.message = None
        self.bit_mess = None
        self.ori_col = 0
        self.ori_row = 0
        self.num_bits = 0

    def enc(self,img,secret_msg,_k):
        """Encrypts an image _i with a watermark _w and a key _k."""

        secret=secret_msg
        self.message = str(len(secret))+'*'+secret_msg
        self.bit_mess = self.to_bits()

        img = self.add_padd(img)
        row,col = img.shape[:2]
        self.ori_row, self.ori_col = row, col

        if(col/8)*(row/8)<len(secret):
            print("Error: Message too large to encode in image")
            return False

        row,col = img.shape[:2]
        b_img,g_img,r_img = cv2.split(img)
        new_channels = [b_img, g_img, r_img]
        b_img = np.float32(b_img)
        img_blocks = [np.round(b_img[j:j+8, i:i+8]-128)
                        for j in range(0, row, 8) for i in range(0, col, 8)]
        dct_blocks = [np.round(cv2.dct(img_Block)) for img_Block in img_blocks]
        quantized_dct = [np.round(dct_Block/quant) for dct_Block in dct_blocks]
        mess_index = 0
        letter_index = 0
        for quantized_block in quantized_dct:
            _dc = quantized_block[0][0]
            _dc = np.uint8(_dc)
            _dc = np.unpackbits(_dc)
            _dc[7] = self.bit_mess[mess_index][letter_index]
            _dc = np.packbits(_dc)
            _dc = np.float32(_dc)
            _dc= _dc-255
            quantized_block[0][0] = _dc
            letter_index = letter_index+1
            if letter_index == 8:
                letter_index = 0
                mess_index = mess_index + 1
                if mess_index == len(self.message):
                    break
        s_img_blocks = [quantized_block *quant+128 for quantized_block in quantized_dct]
        s_img=[]
        for chunk_row_blocks in self.chunks(s_img_blocks, col/8):
            for row_block_num in range(8):
                for block in chunk_row_blocks:
                    s_img.extend(block[row_block_num])
        s_img = np.array(s_img).reshape(row, col)
        s_img = np.uint8(s_img)
        f_img = cv2.merge((s_img,g_img,r_img))
        return f_img

    def dec(self,img, _i, _k):
        row,col = img.shape[:2]
        mess_size = None
        message_bits = []
        buff = 0
        b_img,g_img,r_img = cv2.split(img)
        b_img = np.float32(b_img)
        img_blocks = [b_img[j:j+8, i:i+8]-128 for (j,i) in itertools.product(range(0,row,8),
                                                                       range(0,col,8))]    
        quantized_dct = [img_Block/quant for img_Block in img_blocks]
        i=0
        for quantized_block in quantized_dct:
            _dc = quantized_block[0][0]
            _dc = np.uint8(_dc)
            _dc = np.unpackbits(_dc)
            if _dc[7] == 0:
                buff+= 1 << (7-i)
            i=1+i
            if i == 8:
                message_bits.append(chr(buff))
                buff = 0
                i =0
                if message_bits[-1] == '*' and mess_size is None:
                    try:
                        mess_size = int(''.join(message_bits[:-1]))

                    except: pass

            if len(message_bits) - len(str(mess_size)) - 1 == mess_size:
                return ''.join(message_bits)[len(str(mess_size))+1:]
        s_img_blocks = [quantized_block *quant+128 for quantized_block in quantized_dct]
        s_img=[]
        for chunk_row_blocks in self.chunks(s_img_blocks, col/8):
            for row_block_num in range(8):
                for block in chunk_row_blocks:
                    s_img.extend(block[row_block_num])
        s_img = np.array(s_img).reshape(row, col)
        s_img = np.uint8(s_img)
        s_img = cv2.merge((s_img,g_img,r_img))
        return ''

    def chunks(self, _l, _n):
        """Yield successive _n-sized chunks from l."""
        m = int(_n)
        for i in range(0, len(_l), m):
            yield _l[i:i + m]

    def to_bits(self):
        """Converts a string to a list of bits"""
        bits = []
        for char in self.message:
            binval = bin(ord(char))[2:].rjust(8,'0')
            bits.append(binval)
        self.num_bits = bin(len(bits))[2:].rjust(8,'0')
        return bits

    def add_padd(self, img):
        """Adds padding to an image to make it divisible by 8."""
        col = img.shape[1]
        row = img.shape[0]
        img = cv2.resize(img,(col+(8-col%8),row+(8-row%8)))
        return img


T = DCT()
K = 0.00
W = "Hello World"

results_DCT = test(T, K, W, "100_Image_Dataset")
results_DCT_MEAN = {key:np.mean(results_ADD[key]) for key in METRICS.keys()}
results_DCT_MEAN

{'MSE': 2.6048239872685186,
 'PSNR': 43.98414050474563,
 'SSIM': 0.9954223710717927,
 'BER': 1.1688138888888888}

In [9]:
results_DCT = test(T, K, W, "COCO_Dataset")
results_DCT_MEAN = {key:np.mean(results_DCT[key]) for key in METRICS.keys()}
results_DCT_MEAN

{'MSE': 34.402668848152196,
 'PSNR': 32.7950684662244,
 'SSIM': 0.7237992823996434,
 'BER': 26.936990077475762}

## DWT

In [None]:
"""DWT watermarking algorithm."""

class DWT(Watermarking):
    """Class for watermarking using DWT algorithm."""

    def __init__(self):
        self.type = "DWT"
        self.message = None
        self.bit_mess = None
        self.ori_col = 0
        self.ori_row = 0
        self.num_bits = 0

    def enc(self, img, secret_msg, _k):
        """Encrypts an image with a watermark."""

        img = self.add_padd(img)
        b_img, g_img, r_img = cv2.split(img)

        image = b_img

        ori_row, ori_col = image.shape
        self.message = str(len(secret_msg))+'*'+secret_msg
        self.bit_mess = self.to_bits()
        binary_watermark = self.bit_mess

        coeffs = pywt.dwt2(image, "haar")
        _ll, (_ch, _cv, _cd) = coeffs
        _ll = np.array(_ll, dtype=np.int32)

        binary_watermark_index = 0
        for i in range(_ll.shape[0]):
            for j in range(_ll.shape[1]):
                if binary_watermark_index >= len(binary_watermark):
                    break
                binary_pixel = np.binary_repr(_ll[i, j], width=8)
                modified_pixel = int(binary_pixel[:-1] + binary_watermark[binary_watermark_index], 2)
                _ll[i, j] = modified_pixel
                binary_watermark_index += 1
            if binary_watermark_index >= len(binary_watermark):
                break

        modified_coeffs = (_ll, (_ch, _cv, _cd))
        watermarked_image = pywt.idwt2(modified_coeffs, "haar")

        watermarked_image = cv2.resize(watermarked_image, (ori_col, ori_row))
        r_img = cv2.resize(r_img, (ori_col, ori_row))
        g_img = cv2.resize(g_img, (ori_col, ori_row))
        ret_image = np.zeros((img.shape[0], img.shape[1], 3), dtype=np.uint8)
        ret_image[:, :, 0] = watermarked_image
        ret_image[:, :, 1] = g_img
        ret_image[:, :, 2] = r_img
        return ret_image

    def dec(self,img, _i, _k):
        """Extracts the watermark from an image."""

        coeffs = pywt.dwt2(img, "haar")
        _ll, (_, _, _) = coeffs

        binary_watermark = ""
        binary_watermark_index = 0
        for i in range(_ll.shape[0]):
            for j in range(_ll.shape[1]):
                if binary_watermark_index >= len(binary_watermark):
                    break

                binary_pixel = np.binary_repr(_ll[i, j], width=8)
                binary_watermark += binary_pixel[-1]
                binary_watermark_index += 1
            if binary_watermark_index >= len(binary_watermark):
                break

        watermark = "".join(
            chr(int(binary_watermark[i:i+8], 2)) for i in range(0, len(binary_watermark), 8))

        return watermark

    def chunks(self, _l, _n):
        """Yield successive _n-sized chunks from l."""
        m = int(_n)
        for i in range(0, len(_l), m):
            yield _l[i:i + m]

    def to_bits(self):
        """Converts a string to a list of bits"""
        bits = []
        for char in self.message:
            binval = bin(ord(char))[2:].rjust(8,'0')
            bits.append(binval)
        self.num_bits = bin(len(bits))[2:].rjust(8,'0')
        return bits

    def add_padd(self, img):
        """Adds padding to an image to make it divisible by 8."""
        col = img.shape[1]
        row = img.shape[0]
        img = cv2.resize(img,(col+(8-col%8),row+(8-row%8)))
        return img

T = DWT()
K = 0.00
W = "Hello World"

results_DWT = test(T, K, W, "100_Image_Dataset")
results_DWT_MEAN = {key:np.mean(results_ADD[key]) for key in METRICS.keys()}
results_DWT_MEAN

  binary_pixel = np.binary_repr(_ll[i, j], width=8)


{'MSE': 45.750982404517146,
 'PSNR': 31.912928826068537,
 'SSIM': 0.9934247791222252,
 'BER': 29.80561128971036}

In [None]:
results_DWT = test(T, K, W, "COCO_Dataset")
results_DWT_MEAN = {key:np.mean(results_DCT[key]) for key in METRICS.keys()}
results_DWT_MEAN

  binary_pixel = np.binary_repr(_ll[i, j], width=8)


{'MSE': 34.21752877419776,
 'PSNR': 32.82326044341667,
 'SSIM': 0.7318678398968141,
 'BER': 12.727806454680646}