In [4]:
!pip install torch
!pip install torchvision
!pip install cv2-extras
!pip install xlrd
!pip install openpyxl
!pip install pytorch-ignite



In [5]:
import pandas as pd
import numpy as np
import yaml

In [6]:
class DataInfoLoader:
    def __init__(self,dataset_name,config):
        self.config=config
        self.dataset_name=dataset_name
        self.img_num=len(pd.read_excel(config[dataset_name]['gt_file_path'])['img_name'])
        self.IQA_results_path=config[dataset_name]['IQA_results_path']
    
    def get_qs_std(self):
        return pd.read_excel(self.config[self.dataset_name]['gt_file_path'])['acc_avg']

    def get_img_name(self):
        return pd.read_excel(self.config[self.dataset_name]['gt_file_path'])['img_name']
    
    def get_img_set(self):
        return pd.read_excel(self.config[self.dataset_name]['gt_file_path'])['img_set']

    def get_img_path(self):
        "Get the image path for all images"
        return [self.config[self.dataset_name]['root']+'/'+name for name in self.get_img_name()]

if __name__=='__main__':
    with open('./config.yaml') as f:
        config=yaml.load(f)
    dataset_name='SOC'
    dil=DataInfoLoader(dataset_name,config)
    img_name=dil.get_img_name()
    print(img_name[2])

  config=yaml.load(f)


img3.bmp


In [7]:
import os
import torch
from torch.utils.data import Dataset
from torchvision.transforms.functional import to_tensor
from PIL import Image
from scipy.signal import convolve2d
import numpy as np
import yaml
import math
from pathlib import Path
import cv2

In [8]:
def default_loader(path):
    return Image.open(path).convert('L')

def localNormalization(patch, P=3, Q=3, C=1):
    kernel = np.ones((P,Q)) / (P * Q)
    patch_mean = convolve2d(patch, kernel, boundary='symm', mode='same')
    patch_sm = convolve2d(np.square(patch), kernel, boundary='symm', mode='same')
    patch_std = np.sqrt(np.maximum(patch_sm - np.square(patch_mean), 0)) + C
    patch_ln = (patch - patch_mean)/patch_std
    return patch_ln

def patchSifting(im, patch_size=48, stride=48):
    img=np.array(im).copy()
    im1=localNormalization(img)
    im1=Image.fromarray(im1)
    ret1,im2= cv2.threshold(img,0,255,cv2.THRESH_BINARY+cv2.THRESH_OTSU)
    w, h = im1.size
    patches=()
    for i in range(0, h - stride, stride):
        for j in range(0, w - stride, stride):
            patch = im2[i:i+patch_size,j:j+patch_size]
            if judgeAllOnesOrAllZreos(patch)==False:
                patch=to_tensor(im1.crop((j,i,j+patch_size,i+patch_size)))
                patch=patch.float().unsqueeze(0)
                patches = patches + (patch,)
    return patches
def judgeAllOnesOrAllZreos(patch):
    flag1=True
    flag2=True
    for i in range(patch.shape[0]):
        for j in range(patch.shape[1]):
            if patch[i,j]==255:
                continue
            else:
                flag1=False

    for i in range(patch.shape[0]):
        for j in range(patch.shape[1]):
            if patch[i,j]==0:
                continue
            else:
                flag2=False
    return flag1 or flag2
class DIQADataset(Dataset):
    def __init__(self,dataset_name,config,data_index,status='train',loader=default_loader):
        self.loader = loader
        self.patch_size = 48
        self.stride = 48
        test_ratio = config['test_ratio']  
        train_ratio = config['train_ratio']
        dil=DataInfoLoader(dataset_name,config)
        img_name=dil.get_img_name()
        img_path=dil.get_img_path()
        qs_std=dil.get_qs_std()

        # print(f"{img_name} {img_path} {qs_std}")

        if status == 'train':
            self.index = data_index
            print("# Train Images: {}".format(len(self.index)))
        if status == 'test':
            self.index = data_index
            print("# Test Images: {}".format(len(self.index)))
        if status == 'val':
            self.index = data_index
            print("# Val Images: {}".format(len(self.index)))
        print('Index:')
        print(self.index)

        self.patches = ()
        self.label = []
        ps=1
        for idx in self.index:
            im=self.loader(img_path[idx])
            patches = patchSifting(im)
            print(f"{ps}")
            ps+=1
            if status == 'train':
                self.patches = self.patches + patches
                for i in range(len(patches)):
                    self.label.append(qs_std[idx])
            else:
                self.patches = self.patches + (torch.stack(patches),)  
                self.label.append(qs_std[idx])

    def __len__(self):
        return len(self.patches)

    def __getitem__(self, idx):
        return (self.patches[idx], torch.Tensor([self.label[idx]]))

if __name__=='__main__':
    pass  

In [9]:
import torch
from torch import nn
import torch.nn.functional as F
from torchvision.transforms import ToTensor,ToPILImage
from torch.autograd import Variable as V 
from PIL import Image
import cv2

In [10]:
class CNNDIQAnet(nn.Module):
    def __init__(self):
        super(CNNDIQAnet, self).__init__()
        self.conv1 = nn.Conv2d(1, 40, 5)
        self.pool1 = nn.MaxPool2d(4)
        self.conv2 = nn.Conv2d(40, 80, 5)
        self.fc1   = nn.Linear(160, 1024)
        self.fc2   = nn.Linear(1024, 1024)
        self.fc3 = nn.Linear(1024, 1)
    
    def forward(self, x):
        x=x.view(-1,x.size(-3),x.size(-2),x.size(-1))
        x  = self.conv1(x)
        x  = self.pool1(x)
        x  = self.conv2(x)
        x1 = F.max_pool2d(x, (x.size(-2), x.size(-1)))
        x2 = -F.max_pool2d(-x, (x.size(-2), x.size(-1)))
        x  = torch.cat((x1, x2), 1)  
        x  = x.squeeze(3).squeeze(2)
        x  = F.relu(self.fc1(x))
        x  = F.dropout(x)
        x  = F.relu(self.fc2(x))
        x  = self.fc3(x)
        return x

if __name__=='__main__':
    pass

In [11]:
from argparse import ArgumentParser
import torch
from torch import nn
import torch.nn.functional as F
from torch.optim import Adam

import numpy as np
from scipy import stats
import os, yaml

from ignite.engine import Events, create_supervised_trainer, create_supervised_evaluator
from ignite.metrics.metric import Metric

In [12]:
class DIQAPerformance(Metric):
    def reset(self):
        self.label_pred = []
        self.label      = []

    def update(self, output):
        y_pred, y = output
        self.label.append(y)
        self.label_pred.append(torch.mean(y_pred))

    def compute(self):
        sq_std = np.reshape(np.asarray(self.label), (-1,))
        sq_pred = np.reshape(np.asarray(self.label_pred), (-1,))
        srocc = stats.spearmanr(sq_std, sq_pred)[0]
        plcc = stats.pearsonr(sq_std, sq_pred)[0]
        #plcc = stats.pearsonr(sq, q)[0]
        return srocc, plcc

In [13]:
from argparse import ArgumentParser
import torch
from torch.utils.data import DataLoader
from torch import nn
import torch.nn.functional as F
from torch.optim import Adam
import numpy as np
import yaml
from pathlib import Path
from ignite.engine import Events, create_supervised_trainer, create_supervised_evaluator
from ignite.metrics.metric import Metric
import math

In [14]:
def ensure_dir(path):
    p=Path(path)
    if not p.exists():
        p.mkdir()

def loss_fn(y_pred, y):
    return F.l1_loss(y_pred, y) 

def get_data_loaders(dataset_name,config,train_batch_size):
    datainfo=DataInfoLoader(dataset_name,config) 
    img_num=datainfo.img_num
    index=np.arange(img_num)
    np.random.shuffle(index)
    
    train_index=index[0:math.floor(img_num*0.6)]
    val_index=index[math.floor(img_num*0.6):math.floor(img_num*0.8)]
    test_index=index[math.floor(img_num*0.8):]
    
    
    train_dataset = DIQADataset(dataset_name,config,train_index,status='train')
    train_loader = torch.utils.data.DataLoader(train_dataset,
                                              batch_size=train_batch_size,
                                              shuffle=True,
                                              num_workers=4)

    val_dataset = DIQADataset(dataset_name,config,val_index,status='val')
    val_loader = torch.utils.data.DataLoader(val_dataset)

    if config['test_ratio']:
        test_dataset = DIQADataset(dataset_name,config,test_index,status='test')
        test_loader = torch.utils.data.DataLoader(test_dataset)
        return train_loader, val_loader, test_loader
    return train_loader, val_loader


In [15]:
class Solver:
    def __init__(self):
        self.model=CNNDIQAnet()

    def run(self,dataset_name,train_batch_size,epochs,lr,weight_decay,model_name,config,trained_model_file,save_result_file,disable_gpu=False):
        if config['test_ratio']:
            train_loader, val_loader, test_loader = get_data_loaders(dataset_name,config,train_batch_size)
        else:
            train_loader, val_loader = get_data_loaders(dataset_name,config,train_batch_size)

        device = torch.device("cuda" if not disable_gpu and torch.cuda.is_available() else "cpu")
        self.model = self.model.to(device)
        
        optimizer = Adam(self.model.parameters(), lr=lr, weight_decay=weight_decay)
        global best_criterion
        best_criterion = -1 
        trainer = create_supervised_trainer(self.model, optimizer, loss_fn, device=device)
        evaluator = create_supervised_evaluator(self.model,metrics={'DIQA_performance': DIQAPerformance()},device=device)
        print('Hello World')

        @trainer.on(Events.EPOCH_COMPLETED)
        def log_validation_results(engine):
            evaluator.run(val_loader)
            metrics = evaluator.state.metrics
            SROCC, PLCC= metrics['DIQA_performance']
            print("Validation Results - Epoch: {} SROCC: {:.4f} PLCC: {:.4f} ".format(engine.state.epoch, SROCC, PLCC))
            global best_criterion
            global best_epoch
            if SROCC > best_criterion:
                best_criterion = SROCC
                best_epoch = engine.state.epoch
                torch.save(self.model.state_dict(), trained_model_file)

        @trainer.on(Events.EPOCH_COMPLETED)
        def log_testing_results(engine):
            if config["test_ratio"] > 0 and config['test_during_training']:
                evaluator.run(test_loader)
                metrics = evaluator.state.metrics
                SROCC,PLCC= metrics['DIQA_performance']
                SROCC,PLCC= metrics['DIQA_performance']
                print("Testing Results    - Epoch: {} SROCC: {:.4f} PLCC: {:.4f} ".format(engine.state.epoch, SROCC, PLCC))

        @trainer.on(Events.COMPLETED)
        def final_testing_results(engine):
            if config["test_ratio"] > 0:
                self.model.load_state_dict(torch.load(trained_model_file,map_location ='cpu'))
                evaluator.run(test_loader)
                metrics = evaluator.state.metrics
                SROCC,PLCC= metrics['DIQA_performance']
                global best_epoch
                print("Final Test Results - Epoch: {} SROCC: {:.4f} PLCC: {:.4f} ".format(best_epoch, SROCC, PLCC))
                np.save(save_result_file, (SROCC, PLCC))
        # kick everything off
        trainer.run(train_loader, max_epochs=epochs)

In [None]:
if __name__ == "__main__":
    
    dataset_name='SOC'
    exp_id='1'
    batch_size=128
    epochs=2
    lr=0.001
    weight_decay=0.0
    model='CNNDIQA'

    with open('./config.yaml') as f:
        config=yaml.load(f)
    print('exp id: ' + exp_id)
    print('model: ' + model)
    print(config)
    
    ensure_dir('checkpoints')
    trained_model_file = 'checkpoints/{}-{}-EXP{}-lr={}.pth'.format(model, dataset_name, exp_id, lr)
    ensure_dir('results')
    save_result_file = 'results/{}-{}-EXP{}-lr={}.npy'.format(model, dataset_name, exp_id, lr)

  
    
    solver=Solver()
    solver.run(dataset_name,batch_size, epochs, lr, weight_decay,model, 
               config,trained_model_file, save_result_file, disable_gpu='store_true')

  config=yaml.load(f)


exp id: 1
model: CNNDIQA
{'test_during_training': True, 'train_ratio': 0.6, 'val_ratio': 0.2, 'test_ratio': 0.2, 'SOC': {'root': 'C:\\Users\\anujp\\Downloads\\databaserelease2\\databaserelease2\\jp2k', 'gt_file_path': './data/gt_files/LIVE_jp2k.xlsx', 'IQA_results_path': './results/'}}
# Train Images: 136
Index:
[126 197 139 205 138 118 145   7  73 131   8 119  11 211 194 113  74  80
 226 216 155 172 225 125 201 150 163  46 112 224 182 210 220 191  83 140
  55 167  28 133 199 169 159  65  77 129 124  84 196 136 110 144 164 141
 189  52  76 109 183  29 209  38  13 179 212 215  71  68  72 203  79  32
  53  19  33  85  41  47 160  40  98  99  16 162 132  24  21 134 171  37
 170  90  36 130  30  49 166  51 219 180  14 218  48 122 174  88  93  82
  27 195   0  20 102  57  66 100 200 181 105 198 142  70  95 115 152 184
   5 187  94  34   1 177  50 157  92 117]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
4

In [1]:
from PIL import Image
import torch
import numpy as np
from torch import nn
import torch.nn.functional as F
from torchvision.transforms.functional import to_tensor
from torch.autograd import Variable as V 
from scipy.signal import convolve2d
import cv2

In [4]:
class CNNDIQAnet(nn.Module):
    def __init__(self):
        super(CNNDIQAnet, self).__init__()
        self.conv1 = nn.Conv2d(1, 40, 5)
        self.pool1 = nn.MaxPool2d(4)
        self.conv2 = nn.Conv2d(40, 80, 5)
        self.fc1   = nn.Linear(160, 1024)
        self.fc2   = nn.Linear(1024, 1024)
        self.fc3 = nn.Linear(1024, 1)
    
    def forward(self, x):
        x=x.view(-1,x.size(-3),x.size(-2),x.size(-1))
        x  = self.conv1(x)
        x  = self.pool1(x)
        x  = self.conv2(x)
        x1 = F.max_pool2d(x, (x.size(-2), x.size(-1)))
        x2 = -F.max_pool2d(-x, (x.size(-2), x.size(-1)))
        x  = torch.cat((x1, x2), 1)  
        x  = x.squeeze(3).squeeze(2)
        x  = F.relu(self.fc1(x))
        x  = F.dropout(x)
        x  = F.relu(self.fc2(x))
        x  = self.fc3(x)
        return x

def localNormalization(patch, P=3, Q=3, C=1):
    kernel = np.ones((P,Q)) / (P * Q)
    patch_mean = convolve2d(patch, kernel, boundary='symm', mode='same')
    patch_sm = convolve2d(np.square(patch), kernel, boundary='symm', mode='same')
    patch_std = np.sqrt(np.maximum(patch_sm - np.square(patch_mean), 0)) + C
    patch_ln = (patch - patch_mean)/patch_std
    return patch_ln

def judgeAllOnesOrAllZreos(patch):
    flag1=True
    flag2=True
    for i in range(patch.shape[0]):
        for j in range(patch.shape[1]):
            if patch[i,j]==255:
                continue
            else:
                flag1=False

    for i in range(patch.shape[0]):
        for j in range(patch.shape[1]):
            if patch[i,j]==0:
                continue
            else:
                flag2=False
    return flag1 or flag2

def patchSifting(im, patch_size=48, stride=48):
    img=np.array(im).copy()
    im1=localNormalization(img)
    im1=Image.fromarray(im1)
    ret1,im2= cv2.threshold(img,0,255,cv2.THRESH_BINARY+cv2.THRESH_OTSU)
    w, h = im1.size
    patches=()
    for i in range(0, h - stride, stride):
        for j in range(0, w - stride, stride):
            patch = im2[i:i+patch_size,j:j+patch_size]
            if judgeAllOnesOrAllZreos(patch)==False:
                patch=to_tensor(im1.crop((j,i,j+patch_size,i+patch_size)))
                patch=patch.float().unsqueeze(0)
                patches = patches + (patch,)
    return patches

class Solver:
    def __init__(self):
        #pre-trained model path
        self.model_path='./checkpoints/CNNDIQA-SOC-EXP916-lr=0.0001.pth'
        #Initialize the model
        self.model=CNNDIQAnet()

    def quality_assessment(self,img_path):
        #load the pre-trained model
        self.model.load_state_dict(torch.load(self.model_path,map_location='cpu'))
        im=Image.open(img_path).convert('L')
        im=torch.stack(patchSifting(im))
        #im=to_tensor(im).unsqueeze(0)
        # if torch.cuda.is_available():
        #     model=model.cuda()
        #     im=im.cuda()
        qs=self.model(im)
        qs=qs.data.squeeze(0).cpu().numpy()[:,0].mean()
        return qs
if __name__=='__main__':
    #Example
    img_path='C:\\Users\\anujp\\Downloads\\databaserelease2\\databaserelease2\\jp2k\\img1.bmp'
    solver=Solver()
    qs=solver.quality_assessment(img_path)
    print('quality score is:{}'.format(qs))

quality score is:0.8130279183387756


In [6]:
import scipy 
from scipy import stats