### 初始化 Initialization

In [None]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from torchvision.utils import save_image
from torch.utils.data import DataLoader, Dataset
import fnmatch
import os
from PIL import Image


def to_img(x):

    x = 0.5*(x+1)
    x = x.clamp(0, 1)
    x = x.view(x.size(0), 3, x.size(-2),  x.size(-1))

    return x


# 可以使用高斯噪声，也可以使用均匀分布噪声，测试下来效果差不多
# gaussian distribution and uniform distribution noises are both ok
dist = {
    'u':torch.rand,
    'n':torch.randn
}

In [None]:

class PersonDataset(Dataset):

    def __init__(self, img_dir, transform, inp_dim=416) -> None:
        super(PersonDataset, self).__init__()

        self.img_dir = img_dir
        self.img_names = fnmatch.filter(os.listdir(self.img_dir), '*.png') + fnmatch.filter(os.listdir(self.img_dir), '*.jpg')
        self.transform = transform

        self.inp_dim = inp_dim

    def __len__(self) -> int:
        return len(self.img_names)

    def __getitem__(self, index: int):

        img_path = os.path.join(self.img_dir, self.img_names[index])
        image = Image.open(img_path).convert('RGB')
        image = self.pad_and_scale(image)
        image = self.transform(image)
        
        return image

    def pad_and_scale(self, img):

        w, h = img.size
        if w == h:
            padded_img = img
        else:
            dim_to_pad = 1 if w<h else 2
            if dim_to_pad == 1:
                padding = (h - w) / 2
                padded_img = Image.new('RGB', (h, h), color=(127,127,127))
                padded_img.paste(img, (int(padding), 0))
            else:
                padding = (w - h) / 2
                padded_img = Image.new('RGB', (w, w), color=(127,127,127))
                padded_img.paste(img, (0, int(padding)))
        resize = transforms.Resize((self.inp_dim, self.inp_dim))
        padded_img = resize(padded_img)     #choose here
        
        return padded_img


# 定义不同尺度的自编码器
# the autoencoder for different block size
class AutoEncoder8(nn.Module):

    def __init__(self):
        super(AutoEncoder8, self).__init__()

        self.encoder = nn.Sequential(
            nn.Conv2d(3, 8, kernel_size=8, stride=2, padding=1),    # batch, 8, 24, 24
            nn.ReLU(),
            nn.Conv2d(8, 16, kernel_size=4, stride=2, padding=1),    # batch, 16, 12, 12
            nn.ReLU(),
            nn.Conv2d(16, 32, kernel_size=4, stride=2, padding=1),    # batch, 32, 6, 6
            nn.ReLU(),
        )

        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(32, 16, kernel_size=4, stride=2, padding=1),    # batch, 16, 12, 12
            nn.ReLU(),
            nn.ConvTranspose2d(16, 8, kernel_size=4, stride=2, padding=1),    # batch, 16, 24, 24
            nn.ReLU(),
            nn.ConvTranspose2d(8, 3, kernel_size=8, stride=2, padding=1),    # batch, 3, 52, 52
            nn.Tanh(),
        )

    def forward(self, x):

        x = self.encoder(x)
        x = self.decoder(x)
        
        return x


class AutoEncoder16(nn.Module):

    def __init__(self):
        super(AutoEncoder16, self).__init__()

        self.encoder = nn.Sequential(
            nn.Conv2d(3, 8, kernel_size=2, stride=2, padding=1),    # batch, 8, 14, 14
            nn.ReLU(),
            nn.Conv2d(8, 16, kernel_size=2, stride=2, padding=1),    # batch, 16, 8, 8
            nn.ReLU(),
            nn.Conv2d(16, 32, kernel_size=2, stride=2, padding=1),    # batch, 32, 5, 5
            nn.ReLU(),
        )

        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(32, 16, kernel_size=2, stride=2, padding=1),    # batch, 16, 8, 8
            nn.ReLU(),
            nn.ConvTranspose2d(16, 8, kernel_size=2, stride=2, padding=1),    # batch, 8, 14, 14
            nn.ReLU(),
            nn.ConvTranspose2d(8, 3, kernel_size=2, stride=2, padding=1),    # batch, 3, 26, 26
            nn.Tanh(),
        )

    def forward(self, x):

        x = self.encoder(x)
        x = self.decoder(x)
        
        return x


class AutoEncoder32(nn.Module):

    def __init__(self):
        super(AutoEncoder32, self).__init__()

        self.encoder = nn.Sequential(
            nn.Conv2d(3, 8, kernel_size=2, stride=1, padding=1),    # batch, 8, 14, 14
            nn.ReLU(),
            nn.Conv2d(8, 16, kernel_size=2, stride=2, padding=1),    # batch, 16, 8, 8
            nn.ReLU(),
            nn.Conv2d(16, 32, kernel_size=2, stride=2, padding=1),    # batch, 32, 5, 5
            nn.ReLU(),
        )

        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(32, 16, kernel_size=2, stride=2, padding=1),    # batch, 16, 8, 8
            nn.ReLU(),
            nn.ConvTranspose2d(16, 8, kernel_size=2, stride=2, padding=1),    # batch, 8, 14, 14
            nn.ReLU(),
            nn.ConvTranspose2d(8, 3, kernel_size=2, stride=1, padding=1),    # batch, 3, 13, 13
            nn.Tanh(),
        )

    def forward(self, x):

        x = self.encoder(x)
        x = self.decoder(x)
        
        return x


### 训练相关函数 Some functions for training

In [None]:
# 用于将噪声进行随机变换并加入到图像中
# applying transformations to noise, and paste it to the image
def noise_applier(img, noise):

    noise = torch.clamp(noise, 0.000001, 0.999999)

    pad_h = (img.size(-2)-noise.size(-2)) / 2
    pad_w = (img.size(-1)-noise.size(-1)) / 2

    mypad = nn.ConstantPad2d((int(pad_w+0.5), int(pad_w), int(pad_h+0.5), int(pad_h)), 0)
    noise = mypad(noise)

    scale_w = torch.FloatTensor(img.size(0)).uniform_(1.5,4).cuda()
    scale_h = torch.FloatTensor(img.size(0)).uniform_(1.5,4).cuda()
    rand_map = torch.rand((2, img.size(0))).cuda()
    tx = ((rand_map[0,:]>0.66).float()-(rand_map[0,:]<0.33).float())*0.5
    ty = ((rand_map[1,:]>0.66).float()-(rand_map[1,:]<0.33).float())*0.5
    theta = torch.FloatTensor(img.size(0), 2, 3).fill_(0).cuda()

    theta[:, 0, 0] = 1/scale_w
    theta[:, 0, 1] = 0
    theta[:, 0, 2] = tx/scale_w
    theta[:, 1, 0] = 0
    theta[:, 1, 1] = 1/scale_h
    theta[:, 1, 2] = ty/scale_h

    grid = nn.functional.affine_grid(theta, noise.shape)
    noise_t = nn.functional.grid_sample(noise, grid)

    img = torch.where((noise_t == 0), img, noise_t)

    return img

In [None]:

def ae_trainer(model, train_loader, epochs, learning_rate, box_length, patch_distribution, pretrain=None, CUDA=True):

    if pretrain != None:
        model.load_state_dict(torch.load(pretrain))
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-5)
    noise_switch = 0    # 控制是否加入噪声   >=1: add noise 0: no

    for epoch in range(epochs):

        total_loss = 0

        for img in train_loader:
            if CUDA:
                img = img.cuda()

            if noise_switch < 3:
                output = model(img)
                loss = criterion(output, img)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                total_loss += loss.data

                noise_switch += 1
            
            else:

                noise = dist[patch_distribution]((img.size(0), 3, box_length//4, box_length//4)).cuda()
                #target_var = torch.var(noise, dim=(1,2,3))
                img = noise_applier(img, noise)
                output = model(img)
                var_output = torch.var(output, dim=(1,2,3))
                target_var = torch.ones_like(var_output)
                loss = criterion(var_output, target_var)

                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                total_loss += loss.data

                noise_switch = 0

                #exit(1)

        print('epoch [{}/{}], loss:{:.4f}'
            .format(epoch, epochs-1, total_loss))

        
        if epoch % 5 == 0:
            pic = to_img(img.cpu().data)
            save_image(pic, "./ae_train_process/{}_ori.png".format(epoch))
            pic = to_img(output.cpu().data)
            save_image(pic, "./ae_train_process/{}_rec.png".format(epoch))
        

    if pretrain != None:
        torch.save(model.state_dict(), pretrain)
    else:
        torch.save(model.state_dict(), "./ae_weights/%s_%d.pth" % (patch_distribution, box_length))


### 训练 Training

In [None]:
# uniform distribution
learning_rate = 0.002
epochs = 2 # 2 is for testing the code, using 150 works better
box_length = 13

transform = transforms.Compose([
    #transforms.Resize(size=400),
    transforms.RandomCrop(box_length),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.RandomVerticalFlip(),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

img_dir = "Dataset/back_dhd_coco/"
batch_size = 200
dataset_train = PersonDataset(img_dir, transform)
train_loader = DataLoader(dataset_train, batch_size=batch_size, shuffle=True)

model = AutoEncoder32().cuda()

ae_trainer(model, train_loader, epochs, learning_rate, box_length, patch_distribution='u', pretrain=None, CUDA=True)

epoch [0/1], loss:0.3016
epoch [1/1], loss:0.3588


In [None]:
# gausion distribution
learning_rate = 0.005
epochs = 2 # 2 is for testing the code, using 150 works better
box_length = 13

transform = transforms.Compose([
    transforms.RandomCrop(box_length),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.RandomVerticalFlip(),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

img_dir = "Dataset/back_dhd_coco/"
batch_size = 200
dataset_train = PersonDataset(img_dir, transform)
train_loader = DataLoader(dataset_train, batch_size=batch_size, shuffle=True)

model = AutoEncoder32().cuda()

ae_trainer(model, train_loader, epochs, learning_rate, box_length, patch_distribution='n', CUDA=True)

epoch [0/1], loss:0.2460
epoch [1/1], loss:0.2822
