In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms, models
from torch import optim
from PIL import Image
import cv2
import os
import numpy as np
import os
import pickle
import matplotlib.pyplot as plt
import random 
import glob

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
%%bash
rm -rf images

In [None]:
%%bash
mkdir images
wget -O images/Dataset.zip https://github.com/jbrownlee/Datasets/releases/download/Flickr8k/Flickr8k_Dataset.zip 
mkdir images/dataset
unzip -d images/Dataset/ images/Dataset.zip
ls -v images/Dataset/Flicker8k_Dataset/ | cat -n | while read n f; do mv -n "images/Dataset/Flicker8k_Dataset/$f" "images/dataset/$n.jpg"; done
wget -O images/wm.png https://drive.google.com/uc?id=1FqvgBV3ZMsRgsxbyTbJtzhwwD2f6wW2A&export=download
mkdir images/watermarked
mkdir images/original
mkdir images/test
mkdir images/output
rm -rf images/Dataset
rm -rf images/Dataset.zip
mkdir saved_model

In [None]:
def gen_watermark(dataset_path, watermark_img_path, output_path, samples, og_img_path=None, test=False):
  # Read the original image + dimension
  og_img = glob.glob(dataset_path)

  print('Adding watermarks')

  images = og_img[:samples]

  if test:
    images = og_img[-samples:]

  for i in og_img[:samples]:
      img = Image.open(i)
      img_w, img_h = img.size

      if og_img_path:
        filename = os.path.basename(i)
        # Save resized original image
        img.save(og_img_path + filename)

      # Read the watermark image + dimension
      watermark = Image.open(watermark_img_path)

      WM_SIZE = random.choice([1.1, 1.2, 1.3])
      # Resize the watermark in proportion to original image
      basewidth = int(img_w // WM_SIZE)
      wpercent = (basewidth / float(watermark.size[0]))
      hsize = int((float(watermark.size[1]) * float(wpercent)))
      watermark = watermark.resize((basewidth, hsize), Image.ANTIALIAS)

      # If watermark image is not a PNG
      if watermark.mode!='RGBA':
          alpha = Image.new('L', watermark.size, 255)
          watermark.putalpha(alpha)

      watermark_w, watermark_h = watermark.size

      # Randomly select transparency
      TRANSPARENCY = random.randint(40, 100)

      # Randomly assign watermark coordinates
      water_w, water_h = (random.randint(0, img_w - watermark_w), random.randint(0, img_h - watermark_h))
      paste_mask = watermark.split()[3].point(lambda i: i * TRANSPARENCY / 100.)
      img.paste(watermark, (water_w, water_h), mask=paste_mask)

      filename = os.path.basename(i)
      # Save watermarked image
      img.save(output_path + filename)

  print('Watermark added to all the images')

In [None]:
%%bash
rm -rf images/watermarked/*
rm -rf images/original/*
rm -rf images/test/*

In [None]:
dataset_path = 'images/dataset/*.*'
watermark_img_path = 'images/wm.png'
watermarked_img_path = 'images/watermarked/'
test_path = 'images/test/'
og_img_path = 'images/original/'
gen_path = 'images/output/'
train_samples = 100
test_samples = 20

# Generate watermarked training set
print("=== Train set ===")
gen_watermark(dataset_path, watermark_img_path, watermarked_img_path, train_samples, og_img_path, test=False)

# Generate watermarked test set
print("\n\n=== Test set ===")
gen_watermark(dataset_path, watermark_img_path, test_path, test_samples, test=True)

=== Train set ===
Adding watermarks
Watermark added to all the images


=== Test set ===
Adding watermarks
Watermark added to all the images


In [None]:
def conv(in_ch, out_ch, kernel_size=4, stride=2, padding=1, instance_norm=True):
    layers = []
    conv_layer = nn.Conv2d(in_ch, out_ch, kernel_size, stride, padding, bias=False)
    layers.append(conv_layer)

    if instance_norm:
      layers.append(nn.InstanceNorm2d(out_ch))

    return nn.Sequential(*layers)

# upsampling -> convolution -> InstanceNorm (optional)
def upsampling(in_ch, out_ch, kernel_size=3, stride=1, padding=1, instance_norm=True):
    layers=[]
    upsample=nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
    layers.append(upsample)
    conv_layer = nn.Conv2d(in_ch, out_ch, kernel_size, stride, padding, bias=False)
    layers.append(conv_layer)

    if instance_norm:
      layers.append(nn.InstanceNorm2d(out_ch))

    return nn.Sequential(*layers)    

In [None]:
# Architecture based on https://arxiv.org/pdf/1905.12845.pdf

class Discriminator(nn.Module):
    def __init__(self, conv_dim=64):
        super(Discriminator, self).__init__()
        self.conv_dim = conv_dim
        self.conv1 = conv(6, conv_dim)                                # 6->64    w/2 h/2
        self.conv2 = conv(conv_dim, conv_dim*2, instance_norm=False)  # 64->128  w/4 h/4
        self.conv3 = conv(conv_dim*2, conv_dim*4, 3, 1, 1, False)     # 128->256 w/4 h/4
        self.norm1 = nn.InstanceNorm2d(conv_dim*2)
        self.norm2 = nn.InstanceNorm2d(conv_dim*4)
        # Final fully-connected layer
        self.conv4 = conv(conv_dim*4, 1, 3, 1, 1, False)
        self.norm3 = nn.InstanceNorm2d(1)

    def forward(self, xx):
        # Hidden layers + leaky relu activation
        out = self.conv1(xx)
        out = F.leaky_relu(self.conv2(out), 0.2)
        out = self.norm1(out)
        out = F.leaky_relu(self.conv3(out), 0.2)
        out = self.norm2(out)
        out = self.conv4(out)
        out = self.norm3(out)
        out = torch.sigmoid(out)
        
        return out

In [None]:
class Generator(nn.Module):
    def __init__(self, conv_dim=64):
        super(Generator, self).__init__()
        self.conv_dim = conv_dim
        self.norm1=nn.InstanceNorm2d(3)
        self.conv1=conv(3, conv_dim , instance_norm=False)              # 3->64       w/2 h/2
        self.conv2 = conv(conv_dim, conv_dim*2, instance_norm=True)     # 64->128     w/4 h/4
        self.conv3 = conv(conv_dim*2, conv_dim*4, instance_norm=True)   # 128->256    w/8 h/8
        self.conv4 = conv(conv_dim*4, conv_dim*4, instance_norm=False)  # 256->256    w/16 h/16
        
        self.upsample1=upsampling(conv_dim*4, conv_dim*4) # upsampling -> conv2d-> Instance norm    256->256  w/8 h/8
        # First skip connection  256->512  w/8 h/8
        self.upsample2=upsampling(conv_dim*8, conv_dim*2) # upsampling -> conv2d-> Instance norm    512->128  w/4 h/4
        # Second skip connection 128->256  w/4 h/4
        self.upsample3=upsampling(conv_dim*4, conv_dim)   # upsampling -> conv2d-> Instance norm    256->64   w/2 h/2
        # Third skip connection  64->128   w/2 h/2
        self.upsample4=upsampling(conv_dim*2, 3, instance_norm=False) # upsampling -> conv2d        128->3    w   h 

    def forward(self, xx):
        out=self.norm1(xx)
        out=self.conv1(out)
        out=F.leaky_relu(out, 0.2)
        res1=out  # Store result for skip connection
        out1=self.conv2(out)

        res2=out1 
        out2=F.leaky_relu(out1, 0.2)
        out2=self.conv3(out2)

        res3=out2
        out3=F.leaky_relu(out2, 0.2)
        out3=self.conv4(out3)

        out3=F.relu(out3)

        out3=self.upsample1(out3)
        out3=torch.cat((res3, out3), 1)
        out3=F.relu(out3)

        out3=self.upsample2(out3)
        out3=torch.cat((res2, out3), 1)
        out3=F.relu(out3)

        out3=self.upsample3(out3)
        out3=torch.cat((res1, out3), 1)
        out3=F.relu(out3)

        out3=self.upsample4(out3)

        return out3

### Loss functions

In [None]:
alpha = 10
beta = 1e-4

# Use pretrained VGG-16 feature extractor for generator
vgg = models.vgg16(pretrained=True).features
if torch.cuda.is_available():
  vgg = vgg.cuda()

def L1_loss(og_img, pred_img):
    L1_loss = torch.sum(torch.abs((pred_img - og_img)))/(pred_img.size()[1]*pred_img.size()[2]*pred_img.size()[3])
    return L1_loss

def perceptual_loss(og_img, pred_img, model):
    for name, layer in model._modules.items():
        og_img = layer(og_img)
        pred_img = layer(pred_img)
        if(name == "8"):  # Layer 8 is relu2_2
            break
    p_loss = torch.sum(torch.pow((pred_img - og_img), 2))/(og_img.size()[1]*og_img.size()[2]*og_img.size()[3])
    return p_loss

# Define generator loss
def G_loss(og_img, pred_img, watermark_img, D, G, model):
    p1 = D(torch.cat((watermark_img,pred_img),1))
    G_loss = -torch.log(torch.mean(p1))+alpha*L1_loss(og_img, pred_img)+beta*perceptual_loss(og_img, pred_img, model)
    return G_loss

# Define discriminator loss
def D_loss(og_img, pred_img, watermark_img, D, G):
    p0 = D(torch.cat((watermark_img, og_img),1))
    p1 = D(torch.cat((watermark_img, pred_img),1))
    D_loss = - (torch.log(torch.mean(p0)) + torch.log(1.-torch.mean(p1)))
    return D_loss

In [None]:
lr = 1e-4
beta1 = 0.5
beta2 = 0.999

# Create new models for training
G=Generator()
D=Discriminator()

# If using saved model, uncomment lines below
# G=torch.load("saved_model/g749_model.pkl")
# D=torch.load("saved_model/d749_model.pkl")

if torch.cuda.is_available():
    G = G.cuda()
    D = D.cuda()

d_optimizer = optim.Adam(D.parameters(), lr, [beta1, beta2])
g_optimizer = optim.Adam(G.parameters(), lr, [beta1, beta2])

In [None]:
def show_img(original_img,watermarked_img,pred_img):
    print("Original image")
    plt.imshow(np.transpose(original_img.cpu().numpy().reshape(3,(int)(w-w%16),(int)(h-h%16)),[2,1,0]))
    plt.show()

    print("Watermarked image")
    plt.imshow(np.transpose(watermarked_img.cpu().numpy().reshape(3,(int)(w-w%16),(int)(h-h%16)),[2,1,0]))
    plt.show()

    print("Generated image")
    pred_img=(pred_img-pred_img.min())/(pred_img.max()-pred_img.min())
    plt.savefig(gen_path)
    plt.imshow(np.transpose(pred_img.cpu().detach().numpy().reshape(3,(int)(w-w%16),(int)(h-h%16)),[2,1,0]))
    plt.show()

In [None]:
def get_tensor(img, h, w):
  img = cv2.resize(img, ((int)(w-w%16), (int)(h-h%16)), interpolation = cv2.INTER_CUBIC)
  img = np.transpose(img, (2, 1, 0))
  img = img.reshape(1, 3, (int)(w-w%16), (int)(h-h%16))
  img = img/255
  img_tensor = torch.tensor(img, dtype=torch.float32)

  return img_tensor

### Training

In [None]:
epoch = 500
count = 0

for i in range(epoch):
    for name in os.listdir(watermarked_img_path):

        no_watermark_img=np.array(Image.open(og_img_path+"/"+name))
        h = no_watermark_img.shape[0]
        w = no_watermark_img.shape[1]
        no_watermark_img = get_tensor(no_watermark_img, h, w)

        watermark_img=np.array(Image.open(watermarked_img_path+"/"+name))
        h = watermark_img.shape[0]
        w = watermark_img.shape[1]
        watermark_img = get_tensor(watermark_img, h, w)
    
        if torch.cuda.is_available():
            no_watermark_img = no_watermark_img.cuda()
            watermark_img = watermark_img.cuda()

        # Train generator 
        g_optimizer.zero_grad()
        pred_img = G(watermark_img)
        pred_img = (pred_img-pred_img.min())/(pred_img.max()-pred_img.min())
        g_loss = G_loss(no_watermark_img, pred_img, watermark_img, D, G, vgg)
        g_loss.backward()
        g_optimizer.step()

        # Train discriminator
        d_optimizer.zero_grad()
        pred_img = G(watermark_img)
        pred_img = (pred_img-pred_img.min())/(pred_img.max()-pred_img.min())
        d_loss = D_loss(no_watermark_img, pred_img, watermark_img, D, G)
        d_loss.backward()
        d_optimizer.step()

        if(count%500 == 0):
          print('Train Epoch: {:3} \t D Loss: {:F} \t G Loss: {:F}'.format(
            i, d_loss.item(), g_loss.item()))
        
        if(count%501 == 0):
            show_img(no_watermark_img,watermark_img,pred_img)
        
        count += 1
  
    # Save model periodically 
    if((i+1)%250 == 0):
        torch.save(D, 'saved_model/d{}_model.pkl'.format(i+250))  
        torch.save(G, 'saved_model/g{}_model.pkl'.format(i+250))

### Test

In [None]:
def get_img(img, w, h):
    img = np.transpose(img, (2, 1, 0))
    img = img.reshape(1, 3, (w-w%16), (h-h%16))
    img = img/255
    img = torch.tensor(img,dtype=torch.float32)
    
    if torch.cuda.is_available():
        img=img.cuda()

    img = G(img)
    img = (img-img.min())/(img.max()-img.min())
    
    return img

def test(G, path):
    print("Test image")
    img = np.array(Image.open(path))
    h = img.shape[0]
    w =i mg.shape[1]
    img = cv2.resize(img, ((int)(w-w%16), (int)(h-h%16)), interpolation = cv2.INTER_CUBIC)

    plt.imshow(img)
    plt.show()

    print("Generated image")
    img = get_img(img, w, h)
    plt.imshow(np.transpose(img.cpu().detach().numpy().reshape(3,(w-w%16), (h-h%16)), [2,1,0]))
    plt.show()

for i in range(test_samples):
    test(G, test_path + "/" + os.listdir(test_path)[i])