In [None]:
import torch
from torch import nn, optim
from torchsummary import summary
from torchvision.utils import make_grid

import os
from shutil import rmtree
import imageio
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt

In [None]:
def pil_to_np_array(pil_image):
    array = np.array(pil_image).transpose(2,0,1)
    return array.astype(np.float32) / 255.

def np_to_torch_array(np_array):
    return torch.from_numpy(np_array)[None, :]

def torch_to_np_array(torch_array):
    return torch_array.detach().cpu().numpy()[0]

def read_image(path, image_size = -1):
    pil_image = Image.open(path)
    return pil_image

def save_image(np_array, step):
    pil_image = Image.fromarray((np_array * 255.0).transpose(1, 2, 0).astype('uint8'), 'RGB')
    pil_image.save(f'progress/{str(step).zfill(len(str(TRAINING_STEPS)))}.png')

def get_image_grid(images, nrow = 3):
    torch_images = [torch.from_numpy(x) for x in images]
    grid = make_grid(torch_images, nrow)
    return grid.numpy()
    
def visualize_sample(*images_np, nrow = 3, size_factor = 10):
    c = max(x.shape[0] for x in images_np)
    images_np = [x if (x.shape[0] == c) else np.concatenate([x, x, x], axis=0) for x in images_np]
    grid = get_image_grid(images_np, nrow)
    plt.figure(figsize = (len(images_np) + size_factor, 12 + size_factor))
    plt.axis('off')
    plt.imshow(grid.transpose(1, 2, 0))
    plt.show()

In [None]:
DTYPE = torch.cuda.FloatTensor
INPUT_DEPTH = 32
LR = 0.01 
TRAINING_STEPS = 10000
SHOW_STEP = 50
REG_NOISE = 0.03
MAX_DIM = 128

In [None]:
original_image_path  = os.path.join('data/watermark-available/image1.png')
watermark_path = os.path.join('data/watermark-available/watermark.png')

original_image_pil = read_image(original_image_path)
original_image_pil = original_image_pil.convert('RGB')
original_image_pil = original_image_pil.resize((128, 128))

w, h = original_image_pil.size
aspect_ratio = w / h
if w > MAX_DIM and w > h:
    h = int((h / w) * MAX_DIM)
    w = MAX_DIM
elif h > MAX_DIM and h > w:
    w = int((w / h) * MAX_DIM)
    h = MAX_DIM

original_image_pil = original_image_pil.resize((w, h))

watermark_pil = read_image(watermark_path)
watermark_pil = watermark_pil.convert('RGB')
watermark_pil = watermark_pil.resize((original_image_pil.size[0], original_image_pil.size[1]))

original_image_np = pil_to_np_array(original_image_pil)
watermark_np = pil_to_np_array(watermark_pil)
# watermark_np[watermark_np == 0.0] = 1.0

watermarked_np = original_image_np * watermark_np
image_mask_var = np_to_torch_array(watermark_np).type(DTYPE)

visualize_sample(original_image_np, watermark_np, watermarked_np, nrow = 3, size_factor = 10)

In [None]:
class Conv2dBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride = 1, bias = False):
        super(Conv2dBlock, self).__init__()

        self.model = nn.Sequential(
            nn.ReflectionPad2d(int((kernel_size - 1) / 2)),
            nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding = 0, bias = bias),
            nn.BatchNorm2d(out_channels),
            nn.LeakyReLU(0.2)
        )

    def forward(self, x):
        return self.model(x)

class Concat(nn.Module):
    def __init__(self, dim, *args):
        super(Concat, self).__init__()
        self.dim = dim

        for idx, module in enumerate(args):
            self.add_module(str(idx), module)

    def forward(self, input):
        inputs = []
        for module in self._modules.values():
            inputs.append(module(input))

        inputs_shapes2 = [x.shape[2] for x in inputs]
        inputs_shapes3 = [x.shape[3] for x in inputs]        

        if np.all(np.array(inputs_shapes2) == min(inputs_shapes2)) and np.all(np.array(inputs_shapes3) == min(inputs_shapes3)):
            inputs_ = inputs
        else:
            target_shape2 = min(inputs_shapes2)
            target_shape3 = min(inputs_shapes3)

            inputs_ = []
            for inp in inputs: 
                diff2 = (inp.size(2) - target_shape2) // 2 
                diff3 = (inp.size(3) - target_shape3) // 2 
                inputs_.append(inp[:, :, diff2: diff2 + target_shape2, diff3:diff3 + target_shape3])

        return torch.cat(inputs_, dim=self.dim)

    def __len__(self):
        return len(self._modules)

In [None]:
class SkipEncoderDecoder(nn.Module):
    def __init__(self, input_depth, num_channels_down = [128] * 5, num_channels_up = [128] * 5, num_channels_skip = [128] * 5):
        super(SkipEncoderDecoder, self).__init__()

        self.model = nn.Sequential()
        model_tmp = self.model

        for i in range(len(num_channels_down)):

            deeper = nn.Sequential()
            skip = nn.Sequential()

            if num_channels_skip[i] != 0:
                model_tmp.add_module(str(len(model_tmp) + 1), Concat(1, skip, deeper))
            else:
                model_tmp.add_module(str(len(model_tmp) + 1), deeper)
            
            model_tmp.add_module(str(len(model_tmp) + 1), nn.BatchNorm2d(num_channels_skip[i] + (num_channels_up[i + 1] if i < (len(num_channels_down) - 1) else num_channels_down[i])))

            if num_channels_skip[i] != 0:
                skip.add_module(str(len(skip) + 1), Conv2dBlock(input_depth, num_channels_skip[i], 1, bias = False))
                
            deeper.add_module(str(len(deeper) + 1), Conv2dBlock(input_depth, num_channels_down[i], 3, 2, bias = False))
            deeper.add_module(str(len(deeper) + 1), Conv2dBlock(num_channels_down[i], num_channels_down[i], 3, bias = False))

            deeper_main = nn.Sequential()

            if i == len(num_channels_down) - 1:
                k = num_channels_down[i]
            else:
                deeper.add_module(str(len(deeper) + 1), deeper_main)
                k = num_channels_up[i + 1]

            deeper.add_module(str(len(deeper) + 1), nn.Upsample(scale_factor = 2, mode = 'nearest'))

            model_tmp.add_module(str(len(model_tmp) + 1), Conv2dBlock(num_channels_skip[i] + k, num_channels_up[i], 3, 1, bias = False))
            model_tmp.add_module(str(len(model_tmp) + 1), Conv2dBlock(num_channels_up[i], num_channels_up[i], 1, bias = False))

            input_depth = num_channels_down[i]
            model_tmp = deeper_main

        self.model.add_module(str(len(self.model) + 1), nn.Conv2d(num_channels_up[0], 3, 1, bias = True))
        self.model.add_module(str(len(self.model) + 1), nn.Sigmoid())
    
    def forward(self, x):
        return self.model(x)

def input_noise(input_depth, spatial_size, scale = 1.0/10):
    shape = [1, input_depth, spatial_size[0], spatial_size[1]]
    return torch.rand(*shape) * scale

In [None]:
generator = SkipEncoderDecoder(
    INPUT_DEPTH,
    num_channels_down = [128] * 5,
    num_channels_up = [128] * 5,
    num_channels_skip = [128] * 5
).type(DTYPE)
generator_input = input_noise(INPUT_DEPTH, watermarked_np.shape[1:]).type(DTYPE)
summary(generator, generator_input.shape[1:])

objective = torch.nn.MSELoss().type(DTYPE)
optimizer = optim.Adam(generator.parameters(), LR)

watermarked_var = np_to_torch_array(watermarked_np).type(DTYPE)
watermark_var = np_to_torch_array(watermark_np).type(DTYPE)

generator_input_saved = generator_input.detach().clone()
noise = generator_input.detach().clone()

In [None]:
if os.path.isdir('progress'):
    rmtree('progress')
os.mkdir('progress')

for step in range(TRAINING_STEPS):
    optimizer.zero_grad()
    generator_input = generator_input_saved

    if REG_NOISE > 0:
        generator_input = generator_input_saved + (noise.normal_() * REG_NOISE)
        
    output = generator(generator_input)
   
    loss = objective(output * watermark_var, watermarked_var)
    loss.backward()
        
    if step % SHOW_STEP == 0:
        output_image = torch_to_np_array(output)
        save_image(output_image, step)
        visualize_sample(output_image, nrow = 1, size_factor = 5)
        print(f'Step: {step} | Loss: {loss.item()}')
        
    optimizer.step()