In [None]:
from matplotlib import pyplot as plt
import numpy as np
import cv2
import time, sys
from IPython.display import clear_output, display
from time import sleep
import random

# Classes

In [None]:
class ImageUtil:
    
    @staticmethod
    def show1(image, title='', qs=4):
        fig = plt.figure(figsize=(qs, qs))
        plt.title(title)
        plt.axis('off')
        (h, w, *c) = image.shape
        image_abs = np.abs(image)
        if(len(c) > 0):
            image_abs = cv2.cvtColor(image_abs, cv2.COLOR_BGR2RGB)
            plt.imshow(image_abs)
        else:
            plt.imshow(image_abs, cmap = 'gray')
            
    @staticmethod
    def show(images_arr, titles_arr=None, qs=4):
        
        images = np.array(images_arr)
        (n_y, n_x, *rest) = images.shape
        titles = np.full((n_y, n_x), '')  if titles_arr is None else np.array(titles_arr) 
        
        figsize = qs * n_x, qs * n_y
        fig = plt.figure(figsize=figsize)
        plt.axis('off')
        for yy in range(n_y):
            for xx in range(n_x):
                fig.add_subplot(n_y, n_x, yy * n_x + xx + 1)
                plt.title(titles[yy, xx])
                plt.axis('off')
                (h, w, *c) = images[yy, xx].shape
                img_abs = np.abs(images[yy, xx])
                if(len(c) > 0):
                    img_abs = cv2.cvtColor(img_abs, cv2.COLOR_BGR2RGB)
                    plt.imshow(img_abs)
                else:
                    plt.imshow(img_abs, cmap = 'gray')
                    
    @staticmethod
    def psnr(image_1, image_2):
        mse = np.mean((np.abs(image_1) - np.abs(image_2)) ** 2)
        if(mse == 0): return np.inf
        psnr = 10 * np.log10(255 * 255 / mse)
        return psnr
    
    @staticmethod
    def ber(logo_1, logo_2):
        logo_size, *_ = logo_1.shape
        return np.sum(np.abs(np.abs(logo_1) - np.abs(logo_2))) / (logo_size * logo_size)

In [None]:
class ProgressBar:
    
    bar_length = 20
    
    def __init__(self, title='Progress'):
        self.title = title
        progress = 0
        block = int(round(self.bar_length * progress))
        text = "{0}: [{1}] {2:.1f}%".format(self.title, "#" * block + "-" * (self.bar_length - block), progress * 100)
        self.disp = display(text, display_id=True)
        
    def update(self, progress):
        bar_length = 20
        if isinstance(progress, int):
            progress = float(progress)
        if not isinstance(progress, float):
            progress = 0
        if progress < 0:
            progress = 0
        if progress >= 1:
            progress = 1
        block = int(round(bar_length * progress))
        text = "{0}: [{1}] {2:.1f}%".format(self.title, "#" * block + "-" * (bar_length - block), progress * 100)
        self.disp.update(text)

In [None]:
class Image:
    
    def __init__(self, bgr_image, image_size):
        self.image_size = image_size
        self.image = cv2.resize(bgr_image, (image_size, image_size))
        
    def get_y(self):
        image_ycrcb = cv2.cvtColor(self.image, cv2.COLOR_BGR2YCrCb)
        return image_ycrcb[:, :, 0]
    
    def set_y(self, image_y):
        image_ycrcb = cv2.cvtColor(self.image, cv2.COLOR_BGR2YCrCb)
        image_y = np.abs(image_y).astype(np.uint8)
        image_ycrcb[:, :, 0] = image_y
        self.image = cv2.cvtColor(image_ycrcb, cv2.COLOR_YCrCb2BGR)
        
    @staticmethod
    def copy(instance):
        return Image(np.copy(instance.image), instance.image_size)
    
    @staticmethod
    def get_y_from(image):
        image_ycrcb = cv2.cvtColor(image, cv2.COLOR_BGR2YCrCb)
        return image_ycrcb[:, :, 0]

In [None]:
class Logo:
    
    def __init__(self, logo, logo_size, treshold=128):
        logo = cv2.resize(logo, (logo_size, logo_size))
        self.logo = (logo >= treshold) * 1
        self.logo_size = logo_size


In [None]:
class PHT:
    
    def __init__(self, image_size, max_momen, step = -1):
        self.image_size = image_size
        self.max_momen = max_momen
        if(step == -1):
            self.step = image_size
        else:
            self.step = step
        
    def pcet(self, image_bgr):

        pb = ProgressBar('Transform PCET')
        t_len = 2 * self.max_momen + 1

        im_vec = np.reshape(np.array(image_bgr), (self.image_size * self.image_size, 1))
        transformed = np.array([])

        # GENERATE MATRIX TRANSFORM

        vec_n = np.ones((t_len, t_len), dtype='float32')
        vec_m = np.ones((t_len, t_len), dtype='float32')
        for i in range(-self.max_momen, self.max_momen + 1):
            vec_n[i + self.max_momen, :] = i
            vec_m[:, i + self.max_momen] = i
        vec_n = vec_n.reshape((t_len * t_len, 1))
        vec_m = vec_m.reshape((t_len * t_len, 1))

        vec_i = np.ones((self.image_size, self.image_size), dtype='float32')
        vec_k = np.ones((self.image_size, self.image_size), dtype='float32')
        for i in range(self.image_size):
            vec_i[i, :] = i
            vec_k[:, i] = i
        vec_i = vec_i.reshape((1, self.image_size * self.image_size))
        vec_k = vec_k.reshape((1, self.image_size * self.image_size))

        vec_y = self.translate_index(vec_i)
        vec_x = self.translate_index(vec_k)
        vec_r = np.sqrt(vec_x * vec_x + vec_y * vec_y)
        vec_t = np.arctan2(vec_y, vec_x)

        # FREE UP SOME MEMORY - 1
        del vec_i
        del vec_k
        del vec_y
        del vec_x

        pb.update(0)
        full_size = t_len * t_len
        step_len = int(np.ceil(full_size / self.step))
        
        for row in range(self.step):
            start = row * step_len
            end = np.min([row * step_len + step_len, full_size])
            mat_r = np.repeat(vec_r, end - start, axis=0)
            mat_t = np.repeat(vec_t, end - start, axis=0)
            mat_n = np.repeat(vec_n[start:end, :], self.image_size * self.image_size, axis=1)
            mat_m = np.repeat(vec_m[start:end, :], self.image_size * self.image_size, axis=1)
            mat_c = (mat_r <= 1)
            mat_f = (np.abs(mat_n) + np.abs(mat_m) <= self.max_momen)

            multiplier = 4 / (np.pi * self.image_size * self.image_size)
            mat_w = mat_f * mat_c * multiplier * np.exp(-2 * np.pi * mat_n * mat_r * mat_r * 1j - mat_m * mat_t * 1j)
            trf_row = np.matmul(mat_w, im_vec)
            transformed = np.append(transformed, trf_row)

            # FREE UP SOME MEMORY - 2
            del mat_w
            del mat_n
            del mat_m
            del mat_r
            del mat_t
            del mat_c
            del mat_f

            pb.update((row + 1) / self.step)

        pb.update(1)

        transformed2 = np.reshape(transformed, (t_len, t_len))
        del transformed
        return transformed2
    
    def inverse_pcet(self, momen):

        pb = ProgressBar('Inverse PCET')
        t_len = 2 * self.max_momen + 1

        momen_vec = np.reshape(np.array(momen), (t_len * t_len, 1))
        inversed = np.array([])

        # GENERATE MATRIX INVERS TRANSFORM

        vec_n = np.ones((t_len, t_len), dtype='float32')
        vec_m = np.ones((t_len, t_len), dtype='float32')
        for i in range(-self.max_momen, self.max_momen + 1):
            vec_n[i + self.max_momen, :] = i
            vec_m[:, i + self.max_momen] = i
        vec_n = vec_n.reshape((1, t_len * t_len))
        vec_m = vec_m.reshape((1, t_len * t_len))

        vec_i = np.ones((self.image_size, self.image_size), dtype='float32')
        vec_k = np.ones((self.image_size, self.image_size), dtype='float32')
        for i in range(self.image_size):
            vec_i[i,:] = i
            vec_k[:,i] = i
        vec_i = vec_i.reshape((self.image_size * self.image_size, 1))
        vec_k = vec_k.reshape((self.image_size * self.image_size, 1))

        vec_y = self.translate_index(vec_i)
        vec_x = self.translate_index(vec_k)
        vec_r = np.sqrt(vec_x * vec_x + vec_y * vec_y)
        vec_t = np.arctan2(vec_y, vec_x)

        # FREE UP SOME MEMORY - 1
        del vec_i
        del vec_k
        del vec_y
        del vec_x

        pb.update(0)
        full_size = self.image_size * self.image_size
        step_len = int(np.ceil(full_size / self.step))
        
        for row in range(self.step):
            start = row * step_len
            end = np.min([row * step_len + step_len, full_size])
            mat_r = np.repeat(vec_r[start:end, :], t_len * t_len, axis=1)
            mat_t = np.repeat(vec_t[start:end, :], t_len * t_len, axis=1)
            mat_n = np.repeat(vec_n, end - start, axis=0)
            mat_m = np.repeat(vec_m, end - start, axis=0)
            mat_c = (mat_r <= 1)
            mat_f = (np.abs(mat_n) + np.abs(mat_m) <= self.max_momen)

            mat_v = mat_f * mat_c * np.exp(2 * np.pi * mat_n * mat_r * mat_r * 1j + mat_m * mat_t * 1j)
            inv_row = np.matmul(mat_v, momen_vec)
            inversed = np.append(inversed, inv_row)

            # FREE UP SOME MEMORY - 2
            del mat_v
            del mat_n
            del mat_m
            del mat_r
            del mat_t
            del mat_c
            del mat_f

            pb.update((row + 1) / self.step)

        pb.update(1)

        inversed2 = np.reshape(inversed, (self.image_size, self.image_size))
        del inversed
        return inversed2
    
    def translate_index(self, index):
        return (2 * index - self.image_size + 1) / self.image_size

In [None]:
class RIW:
    
    def __init__(self, image_size, logo_size, max_momen, q_step, batch = 10, seed = 1234):
        self.image_size = image_size
        self.logo_size = logo_size
        self.max_momen = max_momen
        self.q_step = q_step
        self.batch = batch
        self.seed = seed
        
    def generate_compensation(self, image, logo):
        
        # Error Handle
        image_size, *_ = image.shape
        if(image_size != self.image_size):
            raise Exception('Image size {} doesn\'t match to {}.'.format(image_size, self.image_size))
        logo_size, *_ = logo.shape
        if(logo_size != self.logo_size):
            raise Exception('Logo size {} doesn\'t match {}.'.format(logo_size, self.logo_size))
        
        # Transform Image
        transformer = PHT(self.image_size, self.max_momen, self.batch)
        momen = transformer.pcet(image)
        
        # Select random position to embed
        momen_logo = np.zeros((2 * self.max_momen + 1, 2 * self.max_momen + 1))
        momen_logo_filter = np.zeros((2 * self.max_momen + 1, 2 * self.max_momen + 1))
        logo_flatten = logo.flatten()
        logo_position = self.generate_random_position(len(logo_flatten), 0, 2 * self.max_momen + 1)
        for i in range(len(logo_flatten)):
            (n, m) = logo_position[i]
            momen_logo[n + self.max_momen, m + self.max_momen] = logo_flatten[i]
            momen_logo_filter[n + self.max_momen, m + self.max_momen] = 1
        
        momen_logo = momen_logo + np.rot90(np.rot90(momen_logo))
        momen_logo_filter = momen_logo_filter + np.rot90(np.rot90(momen_logo_filter))
        
        # Calculate momen modification
        momen_expected = 2 * self.q_step * np.round(np.abs(momen) / 2 / self.q_step)
        momen_expected = momen_expected + self.q_step / 2 * (momen_logo == 1)
        momen_expected = momen_expected - self.q_step / 2 * (momen_logo == 0)
        momen_expected = momen_expected + 2 * self.q_step * (momen_expected < 0)

        momen_diff = momen_expected - np.abs(momen)
        momen_diff = momen_diff / (np.abs(momen) + 0.000000001) * momen

        # Generating compensation image
        momen_compensation = momen_diff * momen_logo_filter
        img_compensation = transformer.inverse_pcet(momen_compensation)
        return img_compensation
        
    def embed(self, image, logo):
        self.img_compensation = self.generate_compensation(image, logo)

        # Modifying image
        img_y_watermarked = image + self.img_compensation
        return img_y_watermarked
        
    def extract(self, image):
        
        # Error Handle
        image_size, *_ = image.shape
        if(image_size != self.image_size):
            raise Exception('Image size {} doesn\'t match to {}.'.format(image_size, self.image_size))
        
        # Transform Image
        transformer = PHT(self.image_size, self.max_momen, self.batch)
        momen = transformer.pcet(image)
        
        # Calculate extracted bit
        momen_logo = ((np.abs(momen) - 2 * self.q_step * np.round(np.abs(momen) / 2 / self.q_step)) > 0) * 1
        momen_logo_filter = np.zeros((2 * self.max_momen + 1, 2 * self.max_momen + 1))
        
        # Extracting watermark
        logo_flatten = np.array([])
        logo_position = self.generate_random_position(self.logo_size * self.logo_size, 0, 2 * self.max_momen + 1)
        for i in range(self.logo_size * self.logo_size):
            (n, m) = logo_position[i]
            momen_logo_filter[n + self.max_momen, m + self.max_momen] = 1
            binary_value = momen_logo[n + self.max_momen, m + self.max_momen]
            logo_flatten = np.append(logo_flatten, binary_value)
        
        logo_extracted = logo_flatten.reshape((self.logo_size, self.logo_size))
        return logo_extracted
        
    def is_accurate(self, order, repetition):
        return (
            repetition % 4 != 0 and
            np.abs(order) + np.abs(repetition) <= self.max_momen and
            # repetition > 0 and
            order > 0
        )
    
    def generate_random_position(self, array_size, min_val, max_val):
        random.seed(10)
        arr = []
        for i in range(array_size):
            order, repetition  = random.randint(min_val, max_val), random.randint(min_val, max_val)
            trial = 0
            while(((order, repetition) in arr) or (not self.is_accurate(order, repetition))):
                trial = trial + 1
                if(trial > 5000):
                    print('Please use more momen')
                    return np.array([])
                    break
                order, repetition  = random.randint(min_val, max_val), random.randint(min_val, max_val)
            arr.append((order, repetition))
        return np.array(arr)

In [None]:
class AntiTranslation:

    @staticmethod
    def kp2pt(kp):
        (px, py) = kp.pt
        return (int(px), int(py))

    @classmethod
    def get_pairs(cls, keypointmathces, keypoints1, keypoints2):
        pairs = []
        for mt in keypointmathces:
            p1 = keypoints1[mt.queryIdx]
            p2 = keypoints2[mt.trainIdx]
            pairs.append((cls.kp2pt(p1), cls.kp2pt(p2)))
        return pairs

    @staticmethod
    def calc_dist(pa, pb):
        (pax, pay) = pa
        (pbx, pby) = pb
        dx = pax - pbx
        dy = pay - pby
        return np.sqrt(dx * dx + dy * dy)

    @staticmethod
    def add_tupple(t1, t2):
        (t1x, t1y) = t1
        (t2x, t2y) = t2
        return (t1x + t2x, t1y + t2y)

    @classmethod
    def calc_error(cls, keypointmathces, keypoints1, keypoints2, center1, center2, s):
        pairs = cls.get_pairs(keypointmathces, keypoints1, keypoints2)
        sum = 0
        for (p1, p2) in pairs:
            d1 = cls.calc_dist(p1, center1)
            d2 = s * cls.calc_dist(p2, center2)
            sum += abs(d1 - d2)
        return sum / len(pairs)
    
    @classmethod
    def optimize_center(cls, keypointmathces, keypoints1, keypoints2, c1, c2, n_step=2000):
        dx = 0
        dy = 0
        s = 1
        d_step = 0.1
        s_step = 0.01
        N_STEP = n_step

        err = 10000

        pb = ProgressBar('Calculate Center')
        for i in range(N_STEP):
            
            
            update_variable = []
            for xx in [-d_step, 0, d_step]:
                for yy in [-d_step, 0, d_step]:
                    for ss in [-s_step, 0, s_step]:
                        update_variable.append((0, dx + xx, dy + yy, s + ss))

            for j in range(len(update_variable)):
                (_, updated_x, updated_y, updated_s) = update_variable[j]
                tmp = cls.calc_error(keypointmathces[:], keypoints1, keypoints2, (c1, c1), (c2 + updated_x, c2 + updated_y), updated_s)
                update_variable[j] = (tmp, update_variable[j][1], update_variable[j][2], update_variable[j][3])

            update_variable = sorted(update_variable, key=lambda tup: tup[0])

            err = update_variable[0][0]
            dx = update_variable[0][1]
            dy = update_variable[0][2]
            s = update_variable[0][3]

            pb.update(i / N_STEP)

        pb.update(1)
        return (int(np.round(-dx)), int(np.round(-dy)), s)
    
    @classmethod
    def calculate_center(cls, original, attacked, n_step=2000):
    
        # Initiate ORB detector
        orb = cv2.ORB_create()

        # find the keypoints and descriptors with ORB
        kp_1, des_1 = orb.detectAndCompute(original,None)
        kp_2, des_2 = orb.detectAndCompute(attacked,None)

        # create BFMatcher object
        bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)

        # Match descriptors.
        matches = bf.match(des_1, des_2)

        # Sort them in the order of their distance.
        matches = sorted(matches, key = lambda x:x.distance)

        (n1, *_) = original.shape
        (n2, *_) = attacked.shape

        return cls.optimize_center(matches[:50], kp_1, kp_2, (n1 - 1) / 2, (n2 - 1) / 2, n_step)

In [None]:
class Attacker:

    # Rotation
    @staticmethod
    def rotate(image, angle):
        image_center = tuple(np.array(image.shape[1::-1]) / 2)
        rot_mat = cv2.getRotationMatrix2D(image_center, angle, 1.0)
        result = cv2.warpAffine(image, rot_mat, image.shape[1::-1], flags=cv2.INTER_LINEAR)
        return result

    # Resize
    @staticmethod
    def resize(image, size):
        result = cv2.resize(image, (size, size))
        return result

    # Translation
    @staticmethod
    def translate(image, px, py):
        (h, w, c) = image.shape
        dx = int(px * w)
        dy = int(py * h)
        result = np.zeros((h, w, c)).astype(np.uint8)
        if(dx > 0):
            result[:,dx:w,:] = image[:,0:w-dx,:]
        else:
            result[:,0:w+dx,:] = image[:,-dx:w,:]
        result2 = np.zeros((h, w, c)).astype(np.uint8)
        if(dy > 0):
            result2[dy:h,:,:] = result[0:h-dy,:,:]
        else:
            result2[0:h+dy,:,:] = result[-dy:h,:,:]
        return result2

    # Translation
    @staticmethod
    def translate_by_pixel(image, dx, dy):
        (h, w, c) = image.shape
        result = np.zeros((h, w, c)).astype(np.uint8)
        if(dx > 0):
            if(dx > w): dx = w
            result[:,dx:w,:] = image[:,0:w-dx,:]
        else:
            if(dx < -w): dx = -w
            result[:,0:w+dx,:] = image[:,-dx:w,:]
        result2 = np.zeros((h, w, c)).astype(np.uint8)
        if(dy > 0):
            if(dy > h): dy = h
            result2[dy:h,:,:] = result[0:h-dy,:,:]
        else:
            if(dy < -h): dy = -h
            result2[0:h+dy,:,:] = result[-dy:h,:,:]
        return result2

    # Crop
    @staticmethod
    def crop(image, tp, rp, bp, lp):
        (h, w, c) = image.shape
        dt = int(h*tp)
        db = int(h*bp)
        dl = int(w*lp)
        dr = int(w*rp)
        result = np.copy(image)
        result[0:dt,:,:] = np.zeros((dt, w, c)).astype(np.uint8)
        result[h-db:h,:,:] = np.zeros((db, w, c)).astype(np.uint8)
        result[:,0:dl,:] = np.zeros((h, dl, c)).astype(np.uint8)
        result[:,w-dr:w,:] = np.zeros((h, dr, c)).astype(np.uint8)
        return result

    # Blur
    @staticmethod
    def filter_average(image, kernel_size):
        result = cv2.blur(image, (kernel_size, kernel_size))
        return result

    # Median Filter
    @staticmethod
    def filter_median(image, kernel_size):
        result = cv2.medianBlur(image, kernel_size)
        return result

    # Gaussian Filter
    @staticmethod
    def filter_gauss(image, kernel_size):
        result = cv2.GaussianBlur(image, (kernel_size, kernel_size), 0)
        return result

    # Noise
    @staticmethod
    def noise_salt_pepper(image, n_p):
        result = np.copy(image)
        for i, row in enumerate(image):
            for j, val in enumerate(row):
                if(np.random.randint(low=0, high=100) < n_p * 100 / 2):
                    result[i,j] = 0
        for i, row in enumerate(image):
            for j, val in enumerate(row):
                if(np.random.randint(low=0, high=100) < n_p * 100 / 2):
                    result[i,j] = 255
        return result

    # JPEG Compression
    @staticmethod
    def jpeg_compress(image, Q=95):
        encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), Q]
        result, encimg = cv2.imencode('.jpg', image, encode_param)
        decimg = cv2.imdecode(encimg, 1)
        return decimg

In [None]:
np.full((3, 2), '')