In [1]:
import time
import torch.optim
import torch.nn.functional as F
import torch.nn as nn
from torchvision import models
from datasets.dataset import CUBDataset
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import time
from torchvision import transforms
from PIL import Image, ImageDraw
import numpy as np
from matplotlib import pyplot
from numpy import unravel_index
from collections import Counter

# declaration

In [2]:
N_parts = 5

In [3]:
trainset = CUBDataset()
trainloader = DataLoader(dataset=trainset, batch_size=10, shuffle=True)
testset = CUBDataset(is_test = True)
testloader = DataLoader(dataset=testset, batch_size=10, shuffle=True)

In [4]:
vgg19 = torch.hub.load('pytorch/vision:v0.9.0', 'vgg19', pretrained=True)

Using cache found in C:\Users\Vincent/.cache\torch\hub\pytorch_vision_v0.9.0


In [162]:
def attention_map(feature_channels, d):
    """
    i = n-th Part
    j = 1..c
    Mi(X) = sigmoid(∑dji[W∗X]j)
    W∗X = j-th feature channel
    dji = j-th weight vector
    """
    M = 0
    for i in range(feature_channels.size(0)):
        M += d[i] * feature_channels[i]
    torch.sigmoid(M)
    return M

In [184]:
def Loss_CNG(M, t):
    """
    Input are the coordinates of the position vector "t" and all attention maps of one image "M".
    Lcng(Mi) =Dis(Mi) +λDiv(Mi)
    The λ in Eqn. (7) and mrg in Eqn. (9) are empirically set to 2 and 0.02.
    """
    weight = 2
    margin = .02
    loss = list()
    
    def distance(mi, ix, iy, i, t):
        """Dis(Mi) =∑(x,y)∈Mi(mi(x, y)[||x−tx||2+||y−ty||2])"""
        return mi[iy,ix] * ((ix - t[i][0])** 2 + (iy - t[i][1])**2)

    def diversity(mi, ix, iy, mrg):
        """Div(Mi) =∑(x,y)∈Mi(mi(x, y)[max(k/=im)_k(x, y)−mrg])"""
        return mi[iy,ix] * (max([mk[iy,ix] for mk in M]) - mrg)
    
    for i, mi in enumerate(M):
        dis = 0
        div = 0
        for iy in range(mi.size(0)):
            for ix in range(mi.size(1)):
                dis += distance(mi, ix, iy, i, t)
                div += diversity(mi, ix, iy, margin)
        loss.append(dis + weight * div)

    return loss

In [185]:
class Net(nn.Module):
    """
    FC Layers which produce a weight vector d_i(X) from [d_1 .. d_c], where c is the length of feature channels.
    Takes as input convolutional features which gets represented as positional vectors t.
    """
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(1024, 512)
        self.fc2 = nn.Linear(512, 512)
    
    def forward(self, t):
        """
        t = position vector
        d = weight vector
        """
        d = self.fc1(t)
        d = self.fc2(d)
        return d

# channel grouping

### Normalization

In [186]:
# TODO add to preprocessing
img , label = trainset[120]
img = transforms.ToTensor()(img)
img = img.permute(1,2,0)
img = transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])(img/255) 
img = img.unsqueeze(0)
print(img.shape)

torch.Size([1, 3, 448, 448])


### Calculate position vector "t"

In [187]:
channels = vgg19.features(img)

coordinates = list()
for channel in channels.detach().numpy()[0]:
    coordinates = coordinates + [unravel_index(channel.argmax(), channel.shape)]

t_flat = [point for coordinate in coordinates for point in coordinate]
t_flat = torch.FloatTensor(t_flat)

In [205]:
# Initiate N-times neural networks (Part FC's)
NNs = {N: Net() for N in range(N_parts)}

In [206]:
M = list()
for N in range(N_parts):
    
    # Calculate weight vector "d"
    d = NNs[N](t_flat)

    # Calculate attention map "M"
    mi = attention_map(channels[0], d)
    M.append(mi)

### Calculate Loss CNG of all Mi's "L_cng(M_i)"

In [207]:
M_loss_cng = Loss_CNG(M, coordinates)
M_loss_cng

[tensor(-277369.0312, grad_fn=<AddBackward0>),
 tensor(1068108.5000, grad_fn=<AddBackward0>),
 tensor(63077.7266, grad_fn=<AddBackward0>),
 tensor(881210.5000, grad_fn=<AddBackward0>),
 tensor(-249701.1875, grad_fn=<AddBackward0>)]

In [None]:
learning_rate = 1e-3
input_channels = 1
output_features = 6
epoch = 1
save_model_name = 'models/pretrained.pth'

In [None]:
trainset = CUBDataset()
trainloader = DataLoader(dataset=trainset, batch_size=10, shuffle=True)
testset = CUBDataset(is_test = True)
testloader = DataLoader(dataset=testset, batch_size=10, shuffle=True)

In [None]:
vgg19 = torch.hub.load('pytorch/vision:v0.9.0', 'vgg19', pretrained=True)

# Standard Vgg19

In [None]:
img , label = trainset[120]
print(img.shape)
print(img.transpose(1,2,0).shape)
baseimage = Image.fromarray(img.transpose(1,2,0).astype(np.uint8))
baseimage


In [None]:
img = transforms.ToTensor()(img)
img = img.permute(1,2,0)
mean, std = img.mean([1,2]), img.std([1,2])
#print("Mean:", mean, "\nStd:", std)
#print("image shape:", img.shape)
#img = transforms.Normalize(mean, std)(img)
#/255 -> Vgg19 Normalization
img = img/255
img = transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])(img) 
img = img.unsqueeze(0)
print(img.shape)

In [None]:
vgg19(img)

In [None]:
output = nn.Softmax(dim=1)(vgg19(img))

In [None]:
with open("classes_vgg19.txt","r") as f:
    classes = f.read().split("\n")

In [None]:
max_o = 0
highscore = None
for i, o in enumerate(output[0]):
    if max_o < o:
        highscore = i
        max_o = o

print(highscore,classes[highscore])

In [None]:
channels = vgg19.features(img)
channels = channels.detach().numpy()

In [None]:
square = 5
ix = 1
for _ in range(square):
    for _ in range(square):
        # specify subplot and turn of axis
        ax = pyplot.subplot(square, square, ix)
        ax.set_xticks([])
        ax.set_yticks([])
        # plot filter channel in grayscale
        pyplot.imshow(channels[0, ix-1, :, :], cmap='gray')
        ix += 1
# show the figure
pyplot.show()


# Formula 1: determine t

In [None]:
t = list()
for channel in channels[0]:
    #print(np.max(channel))
    t.append(unravel_index(channel.argmax(), channel.shape))

In [None]:
x,y = zip(*t)

In [None]:
x_list = list()
y_list = list()
for x,y in t:
    x_list.append(x)
    y_list.append(y)
  

In [None]:
plt.scatter(x_list,y_list)

In [None]:
counter = Counter(t)

In [None]:
part_xy = counter.most_common(20)[1:]

In [None]:
part_xy

In [None]:
for xy,_ in part_xy:
    x,y = xy
    i = ImageDraw.Draw(baseimage).rectangle([(x-1)*32,(y-1)*32,x*32,y*32])
baseimage

In [None]:

i = ImageDraw.Draw(baseimage)
i.rectangle([1,1,32,32])

In [None]:
i = 0

In [None]:
baseimage

# Clustering

In [None]:
class Clustering(nn.Module):
    
    def __init__(self):
        super(Clustering, self).__init__()
        self.fc1 = nn.Linear(512, 512)
        self.fc2 = nn.Linear(512, 512)
        self.fc3 = nn.Linear(512, 512)
        self.fc4 = nn.Linear(512, 512)
        

In [None]:
t_flatten = [i for xy in t for i in xy]

In [None]:
t_flatten= torch.Tensor(t_flatten)

In [None]:
t_flatten.shape

In [None]:
vgg19.fc1 = nn.Linear(in_features=1024, out_features=512, bias=True)
vgg19.fc2 = nn.Linear(in_features=512, out_features=512, bias=True)
#vgg19.fc(torch.Tensor(t).permute(1,0))
vgg19.fc2(vgg19.fc1(t_flatten))

In [None]:
vgg19.eval

In [None]:
class Part(nn.Module):

    def __init__(self):
        super(Part, self).__init__()
        self.fc1 = nn.Linear(512, 512)
        self.fc2 = nn.Linear(512, 256)

    def forward(self, x):
        conv_matrix = torch.clone(x)
        conv_matrix = conv_matrix.reshape(conv_matrix.size(0), 512, 1, 784) #512 = patterns; 784 = 28x28 pattern w x h
        conv_matrix = conv_matrix.transpose(1, 3)
        x = F.avg_pool2d(x, kernel_size=28, stride=28)
        x = x.view(x.size(0), -1)
        x = torch.tanh(self.fc1(x))
        x = self.fc2(x)
        x = torch.sigmoid(x).unsqueeze(1).unsqueeze(1)
        x = F.interpolate(x, (1, 784), mode='bilinear', align_corners=True)
        x = x.squeeze(1).squeeze(1).unsqueeze(2).unsqueeze(3)
        x = x * conv_matrix
        x = F.avg_pool2d(x, kernel_size=(1, 512), stride=512)
        x = x * 0.1
        x = F.softmax(x, dim=1)
        x = torch.exp(x)
        x = x + 1
        x = torch.log(x)
        x = x * 4
        x = x.squeeze(2).squeeze(2)
        return x.reshape(x.size(0), 28, 28)

In [None]:
class Loss(nn.Module):

    def __init__(self):
        super(Loss, self).__init__()

    def forward(self, tensor):
        loss_sum = torch.zeros(1).cuda()
        indexes = Loss.get_max_index(tensor)
        for i in range(len(indexes)):
            max_x, max_y = indexes[i]
            for j in range(tensor.size(1)):
                for k in range(tensor.size(2)):
                    loss_sum += ((max_x - j) * (max_x - j) + (max_y - k) * (max_y - k)) * tensor[i, j, k]
        return loss_sum

    @staticmethod
    def get_max_index(tensor):
        shape = tensor.shape
        indexes = []
        for i in range(shape[0]):
            mx = tensor[i, 0, 0]
            x, y = 0, 0
            for j in range(shape[1]):
                for k in range(shape[2]):
                    if tensor[i, j, k] > mx:
                        mx = tensor[i, j, k]
                        x, y = j, k
            indexes.append([x, y])
        return indexes

In [None]:
part = Part()

In [None]:
loss_fn = Loss()

In [None]:
optimizer = torch.optim.Adam(part.parameters(), lr = learning_rate)

In [None]:
img , label = trainset[55]
Image.fromarray(img.transpose(1,2,0).astype(np.uint8))

In [None]:
img = transforms.ToTensor()(img)
img = img.permute(1,2,0)
print(img.shape)
mean, std = img.mean([1,2]), img.std([1,2])
print("Mean:", mean, "\nStd:", std)
img = img/255
img = transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])(img)
img = img.unsqueeze(0)
print("image shape:", img.shape)

In [None]:
channels = vgg19.features[0:36](img)

In [None]:
print("channel shape:", channels.shape)
output = part(channels)
print("output shape:", output.shape)
optimizer.zero_grad()
loss = loss_fn(output)
loss.backward()
optimizer.step()
print("hi")

In [None]:
part_model = torch.load(save_model_name)

In [None]:
output = part_model(channels)
x = output.permute(1,2,0).detach().numpy()
print(x.shape)
print(type(x))
print(np.stack((x,x,x),axis=2).squeeze(-1).shape)
Image.fromarray(np.stack((x,x,x),axis=2).squeeze(-1).astype(np.uint8))


In [None]:
epoch = 10
for epoch_number in range(epoch):
    running_loss, count, acc = 0., 0, 0.
    for batch, label in trainloader:
        for img in batch:
            t = time.time()
            #print(img.shape)
            img = img/255
            img = transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])(img)
            img = img.unsqueeze(0)
            #print("image shape:", img.shape)
            channels = vgg19.features[0:36](img)
            #print("channel shape:", channels.shape)
            output = part(channels)
            #print("output shape:", output.shape)
            optimizer.zero_grad()
            loss = loss_fn(output)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            count += img.size(0)
            #print(time.time() - t)
        print(epoch_number, count, running_loss, Loss.get_max_index(output))

           


In [None]:
torch.save(part, save_model_name)