In [None]:
import pdb 

# Create Dataset

In [None]:
from __future__ import print_function, division
import os
import torch
import torch.nn.functional as F
import pandas as pd
from skimage import io, transform
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
from PIL import Image
from sklearn import preprocessing

# Ignore warnings
import warnings
warnings.filterwarnings("ignore")

plt.ion()   # interactive mode

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


### Unzip file into directory

In [None]:
import zipfile
with zipfile.ZipFile("/content/gdrive/MyDrive/Aesthetic/style_image_test.zip","r") as zip_ref: 
  zip_ref.extractall("/content/data/") 

In [None]:
import zipfile
with zipfile.ZipFile("/content/gdrive/MyDrive/Aesthetic/style_image_train.zip","r") as zip_ref:
  zip_ref.extractall("/content/data/") 

### Define our dataset class and transforms. 
Note these transforms are just the sample taken from the pytorch VGG documentation

Dataset class code adapted from https://pytorch.org/tutorials/recipes/recipes/custom_dataset_transforms_loader.html

dataset returns an object with image and attribute keys. Attribute is a numpy array of attributes as described in the readme:

Columns 0 - 9: Counts of aesthetics ratings on a scale of 1-10. Column 0 
has counts of ratings of 1 and column 9 has counts of ratings of 10.

Columns 10 - 11: Semantic tag IDs. There are 66 IDs ranging from 1 to 66.
The file tags.txt contains the textual tag corresponding to the numerical
id. Each image has between 0 and 2 tags. Images with less than 2 tags have
a "0" in place of the missing tag(s).

Column 12: Challenge ID. The file challenges.txt contains the name of 
the challenge corresponding to each ID.

Column 13-26: Style ID one hot encoded (optional)

In [None]:
preprocess = transforms.Compose([
    transforms.Resize(256),
    
    transforms.CenterCrop(227),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

def pil_loader(path):
  with open(path, 'rb') as f:
    with Image.open(f) as img:
      return img.convert('RGB')

In [None]:
class AVADataset1(Dataset):
    """AVA dataset."""

    def __init__(self, image_root_dir, ava_filepath, image_id_filepath, image_style_filepath=None, transform=None):
        """
        Args:
            ava_file (string): Path to the csv file with annotations.
            aesthetic_file (string): Path to the csv file with image ids
            root_dir (string): Directory with all the images.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        self.root_dir = image_root_dir
        self.transform = transform

        # Read in data into pandas data frames
        self.ava_frame = pd.read_csv(
            ava_filepath, 
            sep=' ', 
            header=None, 
            names=["idx_old", "image_id", "1_count", "2_count", "3_count", "4_count", "5_count", "6_count", "7_count", "8_count", "9_count", "10_count", "semantic_1_id", "semantic_2_id", "challenge_id"]
            )

        imageid_frame = pd.read_csv(image_id_filepath, sep=' ', header=None, names=['id'])

        # remove examples without an attribute tag (where tag == 0)
        self.ava_frame.drop(self.ava_frame[self.ava_frame.semantic_1_id == 0].index, inplace=True)

        # one hot encode the first semantic tag and drop tag columns
        lb = preprocessing.LabelBinarizer()
        lb.fit(list(range(1,67)))
        self.ava_frame = pd.concat([self.ava_frame, pd.DataFrame(lb.transform(self.ava_frame.semantic_1_id), columns=["semantic_id_" + str(x) for x in range(1,67)])], axis=1)
        self.ava_frame.drop(columns=['semantic_1_id','semantic_2_id'], inplace=True)

        # if a style image file is given, add that data as well
        if image_style_filepath != None:
          image_style_frame = pd.read_csv(image_style_filepath, sep=' ', header=None)
          # if the style only has one value we need to one hot encode it
          if len(image_style_frame.columns) == 1:
            image_style_frame = pd.get_dummies(image_style_frame[0], prefix='style_')
          imageid_frame = image_style_frame.join(imageid_frame, rsuffix='_id', lsuffix='_style')

        # merge frames
        self.ava_frame = self.ava_frame.merge(imageid_frame, left_on="image_id", right_on='id', how='inner')

        # remove last column which is duplicate id column
        self.ava_frame.pop(self.ava_frame.columns[-1])

    def __len__(self):
        return len(self.ava_frame)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        img_id = int(self.ava_frame.iloc[idx, 1])
        img_name = os.path.join(self.root_dir,
                                f"{img_id}.jpg")

        # image = Image.fromarray(io.imread(img_name))
        image = pil_loader(img_name)
        attributes = np.array(self.ava_frame.iloc[idx, 2:]).astype('float')

        if self.transform:
          image = self.transform(image)
        label = np.array(self.ava_frame.iloc[idx,2:12].astype('float'))
        label = label / label.sum()
        

        sample = {"id": img_id, 'image': image, 'attributes': attributes,'label':label}

        return sample

In [None]:
dataset_train = AVADataset1(ava_filepath="/content/gdrive/MyDrive/Aesthetic/style_image_train/AVA.txt", 
                     image_id_filepath="/content/gdrive/MyDrive/Aesthetic/style_image_train/train.jpgl",
                     image_root_dir="/content/gdrive/MyDrive/Aesthetic/style_image_train/images", 
                     #target_path="/content/gdrive/MyDrive/Aesthetic/target_train.csv",
                     transform=preprocess)

In [None]:
dataset_test = AVADataset1(ava_filepath="/content/gdrive/MyDrive/Aesthetic/style_image_train/AVA.txt", 
                     image_id_filepath="/content/gdrive/MyDrive/Aesthetic/style_image_test/test.jpgl", 
                     image_root_dir="/content/gdrive/MyDrive/Aesthetic/style_image_test",
                     #target_path="/content/gdrive/MyDrive/Aesthetic/target_test.csv", 
                     transform=preprocess)

In [None]:
r=dataset_train[0]['image']
r.shape

torch.Size([3, 224, 224])

### Samples

## MobileNet implementation 

In [None]:
!pip install common

Collecting common
  Downloading https://files.pythonhosted.org/packages/eb/b2/c900168d36abd28b1b08a81387835eff8b574bc6c2e9fefb5c4a38135d94/common-0.1.2.tar.gz
Building wheels for collected packages: common
  Building wheel for common (setup.py) ... [?25l[?25hdone
  Created wheel for common: filename=common-0.1.2-cp37-none-any.whl size=3734 sha256=19c46678e2e5d6d7b863c329935a57a498ae6a1c8b7f3af4bc38dc9ae48aecdb
  Stored in directory: /root/.cache/pip/wheels/2e/8f/ec/9ac55fd8f7923ddf23619c89b42dbbcfc71db6ee41ad5e7b5e
Successfully built common
Installing collected packages: common
Successfully installed common-0.1.2


In [None]:
import math
import os

import torch
import torch.nn as nn





def conv_bn(inp, oup, stride):
    return nn.Sequential(
        nn.Conv2d(inp, oup, 3, stride, 1, bias=False),
        nn.BatchNorm2d(oup),
        nn.ReLU(inplace=True)
    )


def conv_1x1_bn(inp, oup):
    return nn.Sequential(
        nn.Conv2d(inp, oup, 1, 1, 0, bias=False),
        nn.BatchNorm2d(oup),
        nn.ReLU(inplace=True)
    )


class InvertedResidual(nn.Module):
    def __init__(self, inp, oup, stride, expand_ratio):
        super(InvertedResidual, self).__init__()
        self.stride = stride
        assert stride in [1, 2]

        self.use_res_connect = self.stride == 1 and inp == oup

        self.conv = nn.Sequential(
            # pw
            nn.Conv2d(inp, inp * expand_ratio, 1, 1, 0, bias=False),
            nn.BatchNorm2d(inp * expand_ratio),
            nn.ReLU6(inplace=True),
            # dw
            nn.Conv2d(inp * expand_ratio, inp * expand_ratio, 3, stride, 1, groups=inp * expand_ratio, bias=False),
            nn.BatchNorm2d(inp * expand_ratio),
            nn.ReLU6(inplace=True),
            # pw-linear
            nn.Conv2d(inp * expand_ratio, oup, 1, 1, 0, bias=False),
            nn.BatchNorm2d(oup),
        )

    def forward(self, x):
        if self.use_res_connect:
            return x + self.conv(x)
        else:
            return self.conv(x)


class MobileNetV2(nn.Module):
    def __init__(self, n_class=1000, input_size=224, width_mult=1.):
        super(MobileNetV2, self).__init__()
        # setting of inverted residual blocks
        self.interverted_residual_setting = [
            # t, c, n, s
            [1, 16, 1, 1],
            [6, 24, 2, 2],
            [6, 32, 3, 2],
            [6, 64, 4, 2],
            [6, 96, 3, 1],
            [6, 160, 3, 2],
            [6, 320, 1, 1],
        ]

        # building first layer
        assert input_size % 32 == 0
        input_channel = int(32 * width_mult)
        self.last_channel = int(1280 * width_mult) if width_mult > 1.0 else 1280
        self.features = [conv_bn(3, input_channel, 2)]
        # building inverted residual blocks
        for t, c, n, s in self.interverted_residual_setting:
            output_channel = int(c * width_mult)
            for i in range(n):
                if i == 0:
                    self.features.append(InvertedResidual(input_channel, output_channel, s, t))
                else:
                    self.features.append(InvertedResidual(input_channel, output_channel, 1, t))
                input_channel = output_channel
        # building last several layers
        self.features.append(conv_1x1_bn(input_channel, self.last_channel))
        self.features.append(nn.AvgPool2d(input_size // 32))
        # make it nn.Sequential
        self.features = nn.Sequential(*self.features)

        # building classifier
        self.classifier = nn.Sequential(
            nn.Dropout(),
            nn.Linear(self.last_channel, n_class),
        )

        self._initialize_weights()

    def forward(self, x):
        x = self.features(x)
        x = x.view(-1, self.last_channel)
        x = self.classifier(x)
        return x

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data.normal_(0, math.sqrt(2. / n))
                if m.bias is not None:
                    m.bias.data.zero_()
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()
            elif isinstance(m, nn.Linear):
                n = m.weight.size(1)
                m.weight.data.normal_(0, 0.01)
                m.bias.data.zero_()


def mobile_net_v2(pretrained=True):
    model = MobileNetV2()
    if pretrained:
        path_to_model = '/content/gdrive/MyDrive/Aesthetic/mobilenetv2.pth.tar'

        state_dict = torch.load(path_to_model, map_location=lambda storage, loc: storage)
        #for key in ["features.18.0.weight", "features.18.1.weight", "features.18.1.bias", "features.18.1.running_mean", "features.18.1.running_var"]:
          #del state_dict[key]
        model.load_state_dict(state_dict)
    return model

NIMA model from based on https://arxiv.org/abs/1709.05424

In [None]:
class NIMA(nn.Module):
    def __init__(self, pretrained_base_model=True):
        super(NIMA, self).__init__()
        base_model = mobile_net_v2(pretrained=pretrained_base_model)
        base_model = nn.Sequential(*list(base_model.children())[:-1])

        #base_model = nn.Sequential(*list(base_model.modules())[:-1])
        #self.avgpool = nn.AdaptiveAvgPool2d((1, 1))

        self.base_model = base_model

        self.Hl = nn.Sequential(
            #nn.Conv2d(320, 1280, kernel_size=(1, 1), stride=(1, 1), bias=False), 
            #nn.BatchNorm2d(1280, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
            #nn.ReLU(inplace=True),
            #nn.AvgPool2d(kernel_size=7, stride=7, padding=0),
            #nn.Flatten(),
            nn.ReLU(False),
            nn.Dropout(p=0.75),
            nn.Linear(1280, 10),
            #nn.AdaptiveAvgPool2d((1, 1)),
            nn.Softmax(dim=1),
        )
        self.att = nn.Sequential(
            #nn.Conv2d(320, 1280, kernel_size=(1, 1), stride=(1, 1), bias=False), 
            #nn.BatchNorm2d(1280, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
            #nn.ReLU(inplace=True),
            #nn.AvgPool2d(kernel_size=7, stride=7, padding=0),
            #nn.Flatten(),
            nn.ReLU(False),
            nn.Dropout(p=0.75),
            nn.Linear(1280, 66),
            #nn.AdaptiveAvgPool2d((1, 1)),
            nn.ReLU(False),

        )

    def forward(self, x):
        x = self.base_model(x)
        #print(x.size())
        #x=self.avgpool(x)
        #print(x.size())
        x = x.view(x.size(0), -1)
        #print(x.size())
        yhl= self.Hl(x)
        yatt=self.att(x)
        #yatt=F.sigmoid(yatt)
        return yhl,yatt


## Run

In [None]:
def train(train_loader, model, device, criterion_hl, criterion_attr, optimizer, epoch):
  model.train()
  scores_arr = np.arange(1,11) #array for calculating average scores
  tscores=[]
  for i_batch, batch in enumerate(train_loader):
    # calculate average score for each image
    #print ("batch index {}, 0/1: {}/{}".format(i_batch,len(np.where(batch["target"].numpy() == 0)[0]),len(np.where(batch["target"].numpy() == 1)[0])))
    

    # from average image score calculate binary label high or low
    score_target = batch["label"]

    # get semantic tag target
    attr_target = np.argmax(batch["attributes"][:,13:], axis=1)

    # # get style target
    # attr_target = batch["attributes"][:,13:]
    
    output = model(batch["image"].to(device))
    #print(output[0])
    score_target=torch.tensor(score_target,device=device)
    #pred_prob_hl=output[0].cpu().detach().numpy()
    
    #pred_scores_hl=np.argmax(pred_prob_hl,axis=1)
    #pred_scores_hl=np.where(pred_prob_hl > 5, 1, 0)
  
    
      
    #pred = torch.round(outputr)


    # calculate loss (non-weighted)
    loss_hl = criterion_hl(output[0],score_target)
    
    # calculate hl loss weighted
    # loss_hl = criterion_hl(output[0], torch.tensor(score_target, device=device), reduction='none')
    # loss_weights = np.where((avg_scores > 6) | (avg_scores < 4), 6, 1)
    # loss_hl = loss_hl * loss_weights
    # loss_hl = torcn.mean(loss_hl)
    #pred_prob_att=output[1].cpu().detach().numpy()
    
    #pred_scores_att=np.argmax(pred_prob_att,axis=1)
    loss_attr = criterion_attr(output[1], torch.tensor(attr_target, device=device))
    loss = loss_hl + loss_attr
    #loss = loss_hl
    if i_batch % 50 == 0:
      print(f"loss: {loss},losshl:{loss_hl}")

    # backprop
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # if i_batch+1 % 100 == 0:
    #   break

def test(test_loader, model, device):
  model.eval()
  scores_arr = np.arange(1,11) #array for calculating average scores
  total_loss_style = 0.
  correct = 0.
  tscores=[]
  ret=[]
  with torch.no_grad():
    for i_batch, batch in enumerate(test_loader):
      avg_scores = (batch["attributes"][:,:10] * scores_arr).sum(axis=1) / batch["attributes"][:,:10].sum(axis=1)

      # from average image score calculate binary label high or low
      score_target = torch.tensor(np.where(avg_scores > 5, 1, 0), device=device)
      # calculate average score for each image
     
      #print(score_target)
      # from average image score calculate binary label high or low
    
  
      output = model(batch["image"].to(device))
     
          
      
      prob=output[0].cpu().detach().numpy()
      #print("prob=",prob)
      scores=prob*scores_arr
      #print("scores=",scores)
      avg_s=np.sum(scores,axis=1)
      #print("avg=",avg_s)
      score_pred = torch.tensor(np.where(avg_s > 5, 1, 0), device=device)
      tscores.append(torch.argmax(output[1],dim=1))

      #print(outputr,t)
      
      
      
      
      
      correct = correct + score_pred.eq(score_target.view_as(score_pred)).sum().item()


  print(f"avg acc: {correct}/{len(test_loader.dataset)}")
  return tscores,ret
  

def adjust_learning_rate(optimizer, epoch):
    """Sets the learning rate to the initial LR decayed by 10 every 30 epochs
      0.01 is the initial learning rate
    """
    lr = 0.01 * (0.1 ** (epoch // 30))
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr

In [None]:
score_target=pd.read_csv("/content/gdrive/MyDrive/Aesthetic/target_train.csv")

In [None]:
class_sample_count=[]
class_sample_count.append(score_target['0'].value_counts()[0])
class_sample_count.append(score_target['0'].value_counts()[1])
weight = [1. / csc for csc in class_sample_count]
samples_weight = np.array([weight[t] for t in score_target['0']])

samples_weight = torch.from_numpy(samples_weight)
samples_weigth = samples_weight.double()
sampler = torch.utils.data.WeightedRandomSampler(samples_weight, len(samples_weight))

In [None]:
score_target=pd.read_csv("/content/gdrive/MyDrive/Aesthetic/target_scores.csv")

In [None]:
weight = [1,7]
samples_weight =[]

for score in score_target['avg_score']:
  if 4.5<score<6.5:
    samples_weight.append(1/7.0)
  else:
    samples_weight.append(1.0/1.0)  

samples_weight = torch.FloatTensor(samples_weight)
samples_weigth = samples_weight.double()
sampler = torch.utils.data.WeightedRandomSampler(samples_weight, len(samples_weight))

In [None]:
class EMDLoss(nn.Module):
    """Earch Mover's Distance(EMD) Loss in *Neural Image Assessment*.
    """

    def __init__(self, r=2, reduction='mean'):
        super(EMDLoss, self).__init__()
        self.r = r
        self.reduction = reduction

    def forward(self, pred, target):
        cdf_pred = torch.cumsum(pred, -1)
        cdf_target = torch.cumsum(target, -1)

        samplewise_emd = (
            torch.mean(torch.abs(cdf_pred - cdf_target) ** self.r, dim=-1) ** (1 / self.r)
        )
        if self.reduction is None:
            return samplewise_emd
        elif self.reduction == 'mean':
            return torch.mean(samplewise_emd)

In [None]:
model.eval()

In [None]:
no_cuda = False

use_cuda = not no_cuda and torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
train_loader = DataLoader(dataset_train,batch_size=16,sampler=sampler,
                         num_workers=4)
test_loader = DataLoader(dataset_test, batch_size=128,
                        shuffle=False, num_workers=4)
model = NIMA().to(device)

criterion_attr = nn.CrossEntropyLoss()
#criterion_hl = nn.CrossEntropyLoss()
criterion_hl = EMDLoss()
learning_rate = 0.01
parameters = [
   {"params": model.base_model.parameters()},
   {"params": model.Hl.parameters(), "lr": 3e-5},
   {"params": model.att.parameters(), "lr": 3e-5},
]
optimizer = torch.optim.Adam(parameters, lr=3e-5)
#optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
#optimizer = torch.optim.SGD(parameters, 0.0001, momentum=0.1, weight_decay=0.0001)
epochs = 10

for epoch in range(50):
  print(epoch)
  train(train_loader, model, device, criterion_hl, criterion_attr, optimizer, epoch)
  if (epoch+1)%5==0:
    model_name="/content/gdrive/MyDrive/Aesthetic/modelsampleattnew"+str(epoch)+".pt"
    torch.save(model.state_dict(), model_name)

  tscores=test(test_loader, model, device)
  #test(test_loader, model, device)

  # adjust_learning_rate(optimizer, epoch)

In [None]:
with open('/content/gdrive/MyDrive/Aesthetic/attSGD.txt', 'w') as f:
    for item in tscores:
        f.write("%s\n" % item)


In [None]:
import pickle
 
with open('/content/gdrive/MyDrive/Aesthetic/attSGD.pkl', 'wb') as f:
   pickle.dump(tscores, f)
 

In [None]:
torch.save(model.state_dict(), "/content/gdrive/MyDrive/Aesthetic/model.pt")

In [None]:
no_cuda = False
test_loader = DataLoader(dataset_test, batch_size=128,
                        shuffle=True, num_workers=4)
use_cuda = not no_cuda and torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

model = NIMA().to(device)
model.load_state_dict(torch.load('/content/gdrive/MyDrive/Aesthetic/modelsampleattnew29.pt'))
tscores,ret=test(test_loader, model, device)
#output = model((dataset_test[420]['image']).to(device))

In [None]:
use_cuda = not no_cuda and torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

model = NIMA().to(device)
model.load_state_dict(torch.load('/content/gdrive/MyDrive/Aesthetic/modelsampleattnew39.pt'))
#tscores,ret=test(test_loader, model, device)
img = load_image("/content/gdrive/MyDrive/Aesthetic/style_image_test/820923.jpg")
input = preprocess_image(img)

output = model(input)

Bounding box calculation

In [None]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-


import cv2
try:
    import google.colab
    from google.colab.patches import cv2_imshow
except:
    from cv2 import imshow as cv2_imshow
import matplotlib.pyplot as plt
import os
import numpy as np

class BBoxerwGradCAM():
    
    def __init__(self,learner,heatmap,image_path,resize_scale_list,bbox_scale_list,dir_path,i):
        self.learner = learner
        self.heatmap = heatmap
        self.image_path = image_path
        self.resize_list = resize_scale_list
        self.scale_list = bbox_scale_list
        self.dir_path=dir_path
        self.i=i
        
        self.og_img, self.smooth_heatmap = self.heatmap_smoothing()
        
        #self.bbox_coords, self.poly_coords, self.grey_img, self.contours = self.form_bboxes()
        
        self.bboxes=self.form_bboxes()
        
    def heatmap_smoothing(self):
        og_img = cv2.imread(self.image_path)
        heatmap = cv2.resize(self.heatmap, (self.resize_list[0],self.resize_list[1])) # Resizing
        og_img = cv2.resize(og_img, (self.resize_list[0],self.resize_list[1])) # Resizing
        '''
        The minimum pixel value will be mapped to the minimum output value (alpha - 0)
        The maximum pixel value will be mapped to the maximum output value (beta - 155)
        Linear scaling is applied to everything in between.
        These values were chosen with trial and error using COLORMAP_JET to deliver the best pixel saturation for forming contours.
        '''
        index = self.image_path.find('style_image_test')
        index2 = self.image_path.find('.')
    #print("img path=", img_path[index+16:index2])
        self.path=self.image_path[index+16:index2]
        heatmapshow = cv2.normalize(heatmap, None, alpha=0, beta=155, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_8U)
        heatmapshow = cv2.applyColorMap(heatmapshow, cv2.COLORMAP_JET)
        #mask = (heatmap - np.min(heatmap)) / np.max(heatmap)

        #heatmapshow = cv2.applyColorMap(np.uint8(255 * mask), cv2.COLORMAP_JET)

        #heatmapshow = np.float32(heatmap) / 255
        
        return og_img, heatmapshow
    
    def show_smoothheatmap(self):
        heatmap_path=self.dir_path+'/shmap'
        if not (os.path.isdir(heatmap_path)):
          os.makedirs(heatmap_path)

        gradcam_path = heatmap_path+'/'+str(self.i)+".png"

        cv2.imwrite(gradcam_path, self.smooth_heatmap)
        #cv2_imshow(self.smooth_heatmap)
        #cv2.waitKey(0)
        #cv2.destroyAllWindows()
        
    def show_bboxrectangle(self):
        for bbox_coords,_,_,_  in self.bboxes:
          cv2.rectangle(self.og_img,
                        (bbox_coords[0],bbox_coords[1]),
                        (bbox_coords[0]+bbox_coords[2],bbox_coords[1]+bbox_coords[3]),
                        (0,0,0),3)
          
        #cv2_imshow(self.og_img)

        heatmap_path=self.dir_path+'/bbox'
        if not (os.path.isdir(heatmap_path)):
          os.makedirs(heatmap_path)

        gradcam_path = heatmap_path+'/'+str(self.i)+".png"

        cv2.imwrite(gradcam_path, self.og_img)
        #cv2.waitKey(0)
        #cv2.destroyAllWindows()
    
    def show_contouredheatmap(self):
        img_col = cv2.merge([self.grey_img,self.grey_img,self.grey_img]) # merge channels to create color image (3 channels)
        cv2.fillPoly(img_col, self.contours, [36,255,12]) # fill contours on 3 channel image
        cv2_imshow(img_col)
        cv2.waitKey(0)
        cv2.destroyAllWindows()
        
    def show_bboxpolygon(self):
        for _,poly_coords,_,_  in self.bboxes:
          cv2.polylines(self.og_img,poly_coords,True,(0,0,0),2)
        heatmap_path=self.dir_path+'/poly'
        if not (os.path.isdir(heatmap_path)):
          os.makedirs(heatmap_path)

        gradcam_path = heatmap_path+'/'+str(self.i)+".png"

        cv2.imwrite(gradcam_path, self.og_img)
        #cv2_imshow(self.og_img)
        #cv2.waitKey(0)
        #cv2.destroyAllWindows()
    
    def form_bboxes(self):
      for thr in [127,80]:
          grey_img = cv2.cvtColor(self.smooth_heatmap, cv2.COLOR_BGR2GRAY)
          ret,thresh = cv2.threshold(grey_img,thr,255,cv2.THRESH_BINARY)
          contours,hierarchy = cv2.findContours(thresh, 1, 2)
          b=[]

          for item in range(len(contours)):
              cnt = contours[item]
              #print(len(contours))
              if len(cnt)>20:
                  #print(len(cnt))
                  x,y,w,h = cv2.boundingRect(cnt) # x, y is the top left corner, and w, h are the width and height respectively
                  poly_coords = [cnt] # polygon coordinates are based on contours
                  
                  x = int(x*self.scale_list[0]) # rescaling the boundary box based on user input
                  y = int(y*self.scale_list[1])
                  w = int(w*self.scale_list[2])
                  h = int(h*self.scale_list[3])

                  #print("entered")
                  b.append([[x,y,w,h], poly_coords, grey_img, contours])
              
              else: 
                continue
                #print("contour error (too small)")
          if b:
              break

      return b
                
    def get_bboxes(self):
        return self.bbox_coords, self.poly_coords


In [None]:
image_resizing_scale = [227,227]
bbox_scaling = [1,1,1,1] 

In [None]:


bbox = BBoxerwGradCAM(model,
                      gcam,
                      "/content/gdrive/MyDrive/Aesthetic/style_image_test/952233.jpg",
                      image_resizing_scale,
                      bbox_scaling)

## GradCam

implementation taken from https://github.com/da2so/GradCAM_PyTorch

In [None]:
import zipfile
with zipfile.ZipFile("/content/gdrive/MyDrive/Aesthetic/GradCAM_PyTorch.zip","r") as zip_ref:
  zip_ref.extractall("/") 

In [None]:

class NoIndexError(Exception):
    def __init__(self, message):
        self.message = message

class NoSuchNameError(Exception):
    def __init__(self, message):
        self.message = message

In [None]:
import os
import cv2
import sys
import numpy as np
from matplotlib import pyplot as plt

from torch.nn import functional as F
import torch.nn as nn
import torch
from torch.autograd import Variable
from torchvision import models
import torchvision


#from exceptions import NoSuchNameError , NoIndexError

def load_model(model_name):

    #for saved model (.pt)
    if '.pt' in model_name:
        if torch.typename(torch.load(model_name)) == 'OrderedDict':

            """
            if you want to use customized model that has a type 'OrderedDict',
            you shoud load model object as follows:
            
            from Net import Net()
            model=Net()
            """
            model=NIMA()
            model.load_state_dict(torch.load(model_name))

        else:
            model = torch.load(model_name)

    #for pretrained model (ImageNet)
    elif hasattr(models , model_name):
        model = getattr(models,model_name)(pretrained=True)
    else:
        print('Choose an available pre-trained model')
        sys.exit()

    model.eval()
    if cuda_available():
        model.cuda()

    return model

def cuda_available():
    use_cuda = torch.cuda.is_available()
    return use_cuda

def load_image(path):
    img = cv2.imread(path, 1)
    img = cv2.resize(img, (227, 227))
    img = np.float32(img) / 255

    return img

def preprocess_image(img):
    means = [0.485, 0.456, 0.406]
    stds = [0.229, 0.224, 0.225]

    preprocessed_img = img.copy()[:, :, ::-1]
    for i in range(3):
        preprocessed_img[:, :, i] = preprocessed_img[:, :, i] - means[i]
        preprocessed_img[:, :, i] = preprocessed_img[:, :, i] / stds[i]
    preprocessed_img = \
        np.ascontiguousarray(np.transpose(preprocessed_img, (2, 0, 1)))

    if cuda_available():
        preprocessed_img_tensor = torch.from_numpy(preprocessed_img).cuda()
    else:
        preprocessed_img_tensor = torch.from_numpy(preprocessed_img)

    preprocessed_img_tensor.unsqueeze_(0)
    return Variable(preprocessed_img_tensor, requires_grad=False)

def save(mask, img, img_path,i):

    mask = (mask - np.min(mask)) / np.max(mask)

    heatmap = cv2.applyColorMap(np.uint8(255 * mask), cv2.COLORMAP_JET)

    heatmap = np.float32(heatmap) / 255
    gradcam = 1.0 * heatmap + img
    gradcam = gradcam / np.max(gradcam)

    #/content/gdrive/MyDrive/Aesthetic/style_image_test/149846.jpg
    index = img_path.find('style_image_test')
    index2 = img_path.find('.')
    #print("img path=", img_path[index+16:index2])
    path=img_path[index+16:index2]
    #print(path)
    
    dir_path="/content/gdrive/MyDrive/Aesthetic/content/GradCAM_PyTorch/resultnew/Attmap"+path
    #path = 'result/' + img_path[index + 1:index2] +'/'+model_path
    if not (os.path.isdir(dir_path)):
       os.makedirs(dir_path)

    gradcam_path = dir_path+ '/'+str(i)+".png"

    cv2.imwrite(gradcam_path, np.uint8(255 * gradcam))
    return dir_path
    
def isInt_str(v):
    v = str(v).strip()
    return v == '0' or (v if v.find('..') > -1 else v.lstrip('-+').rstrip('0').rstrip('.')).isdigit()

def choose_tlayer(model_obj):
    name_to_num = {}
    sel_module = False
    name_module = None
    module_list = ['Sequential','Bottleneck','container','Block','densenet']
    while True:
        for num, module in enumerate(model_obj.named_children()):
            if any(x in torch.typename(module[1]) for x in module_list): 
                print(f'[ Number: {num},  Name: {module[0]} ] -> Module: {module[1]}\n')
                name_to_num[module[0]] = num
            else:
                print(f'[ Number: {num},  Name: {module[0]} ] -> Layer: {module[1]}\n')
                name_to_num[module[0]] = num

        print('<<      You sholud not select [classifier module], [fc layer] !!      >>')
        if sel_module == False:
            a = input('Choose "Number" or "Name" of a module containing a target layer or a target layer: ')
        else:
            a = input(f'Choose "Number" or "Name" of a module containing a target layer or a target layer in {name_module} module: ')

        print('\n'*3)
        m_val = list(model_obj._modules.values())
        m_key = list(model_obj._modules.keys())
        if isInt_str(a) == False:
            a = name_to_num[a]
        try:
            if any(x in torch.typename(m_val[int(a)]) for x in module_list): 
                model_obj = m_val[int(a)]
                name_module = m_key[int(a)]
                sel_module = True
            else:
                t_layer = m_val[int(a)]
                return t_layer

        except IndexError:
            raise NoIndexError('Selected index (number) is not allowed.')
        except KeyError:
            raise NoSuchNameError('Selected name is not allowed.')




In [None]:
import numpy as np
import sys 
import cv2

import torch 
from torch.nn import functional as F
from torch.autograd import Variable



class GradCAM():
    def __init__(self, path, model_path, select_t_layer, class_index = None):

        self.img_path = path
        #self.model_path = model_path
        self.class_index = class_index
        self.select_t_layer = select_t_layer
        
        # Save outputs of forward and backward hooking
        self.gradients = dict()
        self.activations = dict()
        
        self.model = model_path
        
        def backward_hook(module, grad_input, grad_output):
            self.gradients['value'] = grad_output[0]
            return None
        def forward_hook(module, input, output):
            self.activations['value'] = output
            return None
        
        #find finalconv layer name

        if self.select_t_layer == False:
            model_obj=self.model
            name_to_num = {}
            sel_module = -1
            next=-1
            name_module = None
            module_list = ['Sequential','Bottleneck','container','Block','densenet']
            run=True
            while run:
                for num, module in enumerate(model_obj.named_children()):
                    if any(x in torch.typename(module[1]) for x in module_list): 
                        #print(f'[ Number: {num},  Name: {module[0]} ] -> Module: {module[1]}\n')
                        name_to_num[module[0]] = num
                    else:
                        #print(f'[ Number: {num},  Name: {module[0]} ] -> Layer: {module[1]}\n')
                        name_to_num[module[0]] = num

                #print('<<      You sholud not select [classifier module], [fc layer] !!      >>')
                if sel_module == -1 and next==-1:
                    a = 0
                elif sel_module==-1 and next==0:
                    a =0
                elif next>0:
                    a=sel_module    
                    
                  

                #print('\n'*3)
                m_val = list(model_obj._modules.values())
                m_key = list(model_obj._modules.keys())
                if isInt_str(a) == False:
                    a = name_to_num[a]
                try:
                    if any(x in torch.typename(m_val[int(a)]) for x in module_list): 
                        model_obj = m_val[int(a)]
                        name_module = m_key[int(a)]
                        if int(a)==0 and next>=0: 
                          sel_module=18
                          next+=1
                        elif int(a)==0 and next<0:
                          next+=1
                        elif int(a)==18:
                          sel_module=0  
                       
                    else:
                        self.t_layer = m_val[int(a)]
                        run=False
                      

                except IndexError:
                    raise NoIndexError('Selected index (number) is not allowed.')
                except KeyError:
                    raise NoSuchNameError('Selected name is not allowed.')

        else:
            model_obj=self.model
            # get a target layer from user's input 
            self.t_layer = choose_tlayer(model_obj)

        # hooking for getting feature map
        self.t_layer.register_forward_hook(forward_hook)
        # hooking for getting gradients
        self.t_layer.register_backward_hook(backward_hook)
        
    def __call__(self):

        #print('\nGradCAM start ... ')

        self.img = load_image(self.img_path)

        #numpy to tensor and normalize
        self.input = preprocess_image(self.img)

        output = self.model(self.input)
        #for i in range(10):
        if self.class_index == None:
              # get class index of highest prob among result probabilities
            self.class_index = np.argmax(output[0].cpu().data.numpy())
            #self.class_index = i

        one_hot = np.zeros((1, output[1].size()[-1]), dtype = np.float32)
        one_hot[0][self.class_index] = 1
        one_hot = Variable(torch.from_numpy(one_hot), requires_grad = True)

        if cuda_available():
            one_hot = torch.sum(one_hot.cuda() * output[1])
        else:
            one_hot = torch.sum(one_hot * output[1])

        self.model.zero_grad()
        one_hot.backward(retain_graph = True)
          
        gradients = self.gradients['value']
        activations = self.activations['value']
          
          #reshaping
        weights = torch.mean(torch.mean(gradients, dim = 2), dim=2)
        weights = weights.reshape(weights.shape[1], 1, 1)
        activationMap = torch.squeeze(activations[0])
            
          #Get gradcam
        gradcam = F.relu((weights*activationMap).sum(0))
        gradcam = cv2.resize(gradcam.data.cpu().numpy(), (227,227))
        dir_path=save(gradcam, self.img, self.img_path,1)
        self.class_index=None
        image_resizing_scale = [227,227]
        box_scaling = [1,1,1,1] 

        bbox = BBoxerwGradCAM(model,
                      gradcam,
                      self.img_path,
                      image_resizing_scale,
                      box_scaling,
                      dir_path,
                      1)
        bbox.show_smoothheatmap()
        bbox.show_bboxrectangle()
        bbox.show_bboxpolygon()

          #return gradcam
        
        #print('GradCAM end !!!\n')



In [None]:
no_cuda = False

use_cuda = not no_cuda and torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
model = NIMA().to(device)
model.load_state_dict(torch.load('/content/gdrive/MyDrive/Aesthetic/modelsampleattnew29.pt'))
grad=GradCAM(path="/content/gdrive/MyDrive/Aesthetic/style_image_test/952233.jpg",model_path=model,select_t_layer=False)

In [None]:
from progressbar import ProgressBar
pbar=ProgressBar()
no_cuda = False

use_cuda = not no_cuda and torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
model = NIMA().to(device)
model.load_state_dict(torch.load('/content/gdrive/MyDrive/Aesthetic/modelsampleattnew29.pt'))
for i in pbar(dataset_test):
  img_name='/content/gdrive/MyDrive/Aesthetic/style_image_test/'+str(i['id'])+'.jpg'
  grad=GradCAM(path=img_name,model_path=model,select_t_layer=False)
  grad()
  


  