## Required Library

In [1]:
import os
import csv
from torchvision.models import resnet50, densenet121, vit_b_16, ViT_B_16_Weights, swin_b, Swin_B_Weights
import torch.nn as nn
import torch as t
import torch.nn.functional as F
import warnings
from torchvision.transformsansforms import transforms as T
import numpy as np
from torch.utils import data
from PIL import Image
from torchnet import meter
from torch.utils.data import DataLoader
from tqdm import tqdm
import random
from torch.optim import Adam
import numpy as np
import matplotlib.pyplot as plt
from sklearn import metrics
from collections import defaultdict
import shutil
import visdom
import time

  from .autonotebook import tqdm as notebook_tqdm


## Basic Information Setting

In [30]:
class DefaultConfig(object):
    env = 'ResNet50-Flow-spatial'
    vis_port = 8097
    model = 'ResNet50'
    train_flow_data = 'train_flow_SCVD.csv'
    valid_flow_data = 'valid_flow_SCVD.csv'
    test_flow_data = 'test_flow_SCVD.csv'
    
    train_spatial_data = 'train_spatial_SCVD.csv'
    valid_spatial_data = 'valid_spatial_SCVD.csv'
    test_spatial_data = 'test_spatial_SCVD.csv'

    save_model = './checkpoint/'
    load_model_path = save_model + 'xx.pth'
    batch_size_0 = 128
    batch_size_1 = 128
    use_gpu = True
    num_workers = 4
    print_freq = 20
    result_file = save_model + 'xx.csv'
    max_epoch = 20
    lr = 0.0001
    lr_decay = 0.5
    weight_decay = 1e-4
    scheduler = None
    device = t.device('cuda:2') if use_gpu else t.device('cpu')
    loss_weight = t.Tensor([1, 1]).to(device)

    def _parse(self, kwargs):
        for k, v in kwargs.items():
            if not hasattr(self, k):
                warnings.warn("Warning: opt has not attribut %s" % k)
            setattr(self, k, v)

        print('user config:')
        for k, v in self.__class__.__dict__.items():
            if not k.startswith('_'):
                print(k, getattr(self, k))
                
opt = DefaultConfig()
criterion = nn.CrossEntropyLoss(weight = opt.loss_weight)

## CNN Model

In [3]:
class ResNet50_Spatial(nn.Module):
    def __init__(self):
        super(ResNet50_Spatial, self).__init__()
        self.model = resnet50(pretrained = True)
        self.model.fc = nn.Sequential()
        for params in self.parameters():
            params.requires_grad = False
        self.fc1 = nn.Linear(2048, 256)
            
    def forward(self, x):
        x = self.model(x)
        feature = self.fc1(x)
        return feature
    
    def get_optimizer(self, lr, weight_decay):
        
        return Adam(self.parameters(), lr = lr, weight_decay = weight_decay)
    
class ResNet50_Flow(nn.Module):
    def __init__(self):
        super(ResNet50_Flow, self).__init__()
        self.model = resnet50(pretrained = True)
        self.model.fc = nn.Sequential()
        for params in self.parameters():
            params.requires_grad = False
        self.fc1 = nn.Linear(2048, 256)
            
    def forward(self, x):
        x = self.model(x)
        feature = self.fc1(x)
        return feature
    
    def get_optimizer(self, lr, weight_decay):
        
        return Adam(self.parameters(), lr = lr, weight_decay = weight_decay)
                    
class ResNet50_Flow_Spatial(nn.Module):
    def __init__(self):
        super(ResNet50_Flow_Spatial, self).__init__()
        self.flow = ResNet50_Flow()
        self.spatial = ResNet50_Spatial()
        self.relu = nn.ReLU(inplace = True)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 2)
        self.fc4 = nn.Linear(256, 2)
        self.fc5 = nn.Linear(256, 2)
            
    def forward(self, x, y):
        x = self.flow(x)
        y = self.spatial(y)
        xy_cat = t.cat((x, y), 1)
        output = self.fc3(self.relu(self.fc2(xy_cat)))
        output1 = self.fc4(self.relu(x))
        output2 = self.fc5(self.relu(y))
        return output, output1, output2
    
    def get_optimizer(self, lr, weight_decay):
        
        return Adam(self.parameters(), lr = lr, weight_decay = weight_decay)

In [23]:
class DenseNet121_Spatial(nn.Module):
    def __init__(self):
        super(DenseNet121_Spatial, self).__init__()
        self.model = densenet121(pretrained = True)
        self.model.classifier = nn.Sequential()
        for params in self.parameters():
            params.requires_grad = False
        self.fc1 = nn.Linear(1024, 256)
            
    def forward(self, x):
        x = self.model(x)
        feature = self.fc1(x)
        return feature
    
    def get_optimizer(self, lr, weight_decay):
        
        return Adam(self.parameters(), lr = lr, weight_decay = weight_decay)
    
class DenseNet121_Flow(nn.Module):
    def __init__(self):
        super(DenseNet121_Flow, self).__init__()
        self.model = densenet121(pretrained = True)
        self.model.classifier = nn.Sequential()
        for params in self.parameters():
            params.requires_grad = False
        self.fc1 = nn.Linear(1024, 256)
            
    def forward(self, x):
        x = self.model(x)
        feature = self.fc1(x)
        return feature
    
    def get_optimizer(self, lr, weight_decay):
        
        return Adam(self.parameters(), lr = lr, weight_decay = weight_decay)
                    
class DenseNet121_Flow_Spatial(nn.Module):
    def __init__(self):
        super(DenseNet121_Flow_Spatial, self).__init__()
        self.flow = DenseNet121_Flow()
        self.spatial = DenseNet121_Spatial()
        self.relu = nn.ReLU(inplace = True)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 2)
        self.fc4 = nn.Linear(256, 2)
        self.fc5 = nn.Linear(256, 2)
            
    def forward(self, x, y):
        x = self.flow(x)
        y = self.spatial(y)
        xy_cat = t.cat((x, y), 1)
        output = self.fc3(self.relu(self.fc2(xy_cat)))
        output1 = self.fc4(self.relu(x))
        output2 = self.fc5(self.relu(y))
        return output, output1, output2
    
    def get_optimizer(self, lr, weight_decay):
        
        return Adam(self.parameters(), lr = lr, weight_decay = weight_decay)

In [None]:
class VIT_Spatial(nn.Module):
    def __init__(self):
        super(VIT_Spatial, self).__init__()
        self.model = vit_b_16(weights=ViT_B_16_Weights)
        self.model.heads = nn.Sequential()
        for params in self.parameters():
            params.requires_grad = False
        self.fc1 = nn.Linear(768, 256)
            
    def forward(self, x):
        x = self.model(x)
        feature = self.fc1(x)
        return feature
    
    def get_optimizer(self, lr, weight_decay):
        
        return Adam(self.parameters(), lr = lr, weight_decay = weight_decay)
    
class VIT_Flow(nn.Module):
    def __init__(self):
        super(VIT_Flow, self).__init__()
        self.model = vit_b_16(weights=ViT_B_16_Weights)
        self.model.heads = nn.Sequential()
        for params in self.parameters():
            params.requires_grad = False
        self.fc1 = nn.Linear(768, 256)
            
    def forward(self, x):
        x = self.model(x)
        feature = self.fc1(x)
        return feature
    
    def get_optimizer(self, lr, weight_decay):
        
        return Adam(self.parameters(), lr = lr, weight_decay = weight_decay)
                    
class ViTB16_Flow_Spatial(nn.Module):
    def __init__(self):
        super(ViTB16_Flow_Spatial, self).__init__()
        self.flow = VIT_Flow()
        self.spatial = VIT_Spatial()
        self.relu = nn.ReLU(inplace = True)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 2)
        self.fc4 = nn.Linear(256, 2)
        self.fc5 = nn.Linear(256, 2)
            
    def forward(self, x, y):
        x = self.flow(x)
        y = self.spatial(y)
        xy_cat = t.cat((x, y), 1)
        output = self.fc3(self.relu(self.fc2(xy_cat)))
        output1 = self.fc4(self.relu(x))
        output2 = self.fc5(self.relu(y))
        return output, output1, output2
    
    def get_optimizer(self, lr, weight_decay):
        
        return Adam(self.parameters(), lr = lr, weight_decay = weight_decay)

In [None]:
class SwinB_Flow(nn.Module):
    def __init__(self):
        super(SwinB_Flow, self).__init__()
        self.model = swin_b(weights = Swin_B_Weights.IMAGENET1K_V1)
        self.model.head = nn.Sequential()
        for params in self.parameters():
            params.requires_grad = False
        self.fc1 = nn.Linear(1024, 256)
        
    def forward(self,x):
        x = self.model(x)
        feature = self.fc1(x)
        return feature

    def get_optimizer(self, lr, weight_decay):
        
        return Adam(self.parameters(), lr = lr, weight_decay = weight_decay)    

class SwinB_Spatial(nn.Module):
    def __init__(self):
        super(SwinB_Spatial, self).__init__()
        self.model = swin_b(weights = Swin_B_Weights.IMAGENET1K_V1)
        self.model.head = nn.Sequential()
        for params in self.parameters():
            params.requires_grad = False
        self.fc1 = nn.Linear(1024, 256)
        
    def forward(self,x):
        x = self.model(x)
        feature = self.fc1(x)
        return feature

    def get_optimizer(self, lr, weight_decay):
        
        return Adam(self.parameters(), lr = lr, weight_decay = weight_decay) 
    
class SwinB_Flow_Spatial(nn.Module):
    def __init__(self):
        super(SwinB_Flow_Spatial, self).__init__()
        self.flow = SwinB_Flow()
        self.spatial = SwinB_Spatial()
        self.relu = nn.ReLU(inplace = True)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 2)
        self.fc4 = nn.Linear(256, 2)
        self.fc5 = nn.Linear(256, 2)
            
    def forward(self, x, y):
        x = self.flow(x)
        y = self.spatial(y)
        xy_cat = t.cat((x, y), 1)
        output = self.fc3(self.relu(self.fc2(xy_cat)))
        output1 = self.fc4(self.relu(x))
        output2 = self.fc5(self.relu(y))
        return output, output1, output2
    
    def get_optimizer(self, lr, weight_decay):
        
        return Adam(self.parameters(), lr = lr, weight_decay = weight_decay)


## Get Dataset

In [4]:
def get_dataset(csv_path, root):
    csv_path = os.path.join(root, csv_path)
    print (csv_path)
    train_img = []
    valid_img = []
    test_img = []
    if csv_path.split('.')[0].split('/')[-1].split('_')[0] == 'train':
        x = train_img
    elif csv_path.split('.')[0].split('/')[-1].split('_')[0]  == 'valid':
        x = valid_img
    else:
        x = test_img
    with open(csv_path,'r') as file_object:
        reader = csv.reader(file_object)
        for i, data in enumerate(reader):
            x.append(data[0])
    return x

## SCVD Dataset

In [12]:
class SCVD_Dataset(data.Dataset):
    def __init__(self, img1, img2, train, test):
        super(SCVD_Dataset, self).__init__()
        
        self.train = train
        self.test = test
        self.img1 = img1
        self.img2 = img2
    
        normalize = T.Normalize(mean = [0.485, 0.456, 0.406],
                                std = [0.229, 0.224, 0.225])

        if self.train: 
            self.transforms = T.Compose([
                T.RandomHorizontalFlip(p = 0.5),  
                T.RandomVerticalFlip(p = 0.5), 
                T.ToTensor(),
                normalize
            ])
        else:
            self.transforms = T.Compose([
                T.ToTensor(),
                normalize
            ])

    def __getitem__(self, index):
        
        img_path1 = self.img1[index]
        root = img_path1
        if img_path1.split('/')[3] == 'transparency_print':
            label =  0
        else:
            label =  1 if img_path1.split('.')[-2].split('/')[-1].split('_')[1] == 'NONE' else 0    
        
        pic1 = Image.open(img_path1)
        data1 = self.transforms(pic1)
        
        img_path2 = self.img2[index]    
        pic2 = Image.open(img_path2)
        data2 = self.transforms(pic2)
        
        return data1, data2, label, root

    def __len__(self):
        return len(self.img1)

## Loss

In [6]:
class CMFL(nn.Module):
	"""
	Cross Modal Focal Loss
	"""

	def __init__(self, alpha = 1, gamma = 2, binary = False, multiplier = 2, sg = False):
		super(CMFL, self).__init__()
		self.alpha = alpha
		self.gamma = gamma
		self.binary = binary
		self.multiplier =multiplier
		self.sg = sg

	def forward(self, inputs_a, inputs_b, targets):
        
		bce_loss_a = criterion(inputs_a, targets)
		bce_loss_b = criterion(inputs_b, targets)

		pt_a = t.exp(-bce_loss_a)
		pt_b = t.exp(-bce_loss_b)

		eps = 0.000000001

		if self.sg:
			d_pt_a = pt_a.detach()
			d_pt_b = pt_b.detach()
			wt_a = ((d_pt_b + eps)*(self.multiplier * pt_a * d_pt_b))/(pt_a + d_pt_b + eps)
			wt_b = ((d_pt_a + eps)*(self.multiplier * d_pt_a * pt_b))/(d_pt_a + pt_b + eps)
		else:
			wt_a = ((pt_b + eps)*(self.multiplier * pt_a * pt_b))/(pt_a + pt_b + eps)
			wt_b = ((pt_a + eps)*(self.multiplier * pt_a * pt_b))/(pt_a + pt_b + eps)

		if self.binary:
			wt_a = wt_a * (1 - targets)
			wt_b = wt_b * (1 - targets)

		f_loss_a = self.alpha * (1 - wt_a)**self.gamma * bce_loss_a
		f_loss_b = self.alpha * (1 - wt_b)**self.gamma * bce_loss_b

		loss= 0.5 * t.mean(f_loss_a) + 0.5 * t.mean(f_loss_b) 
		
		return loss

## Create CSV For AUC EER

In [7]:
def write_csv(results, file_name):

    with open(file_name, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(['id', 'score','root'])
        writer.writerows(results)

## Visualizer

In [8]:
class Visualizer(object):
    def __init__(self, env='default', **kwargs):
        self.vis = visdom.Visdom(env=env,use_incoming_socket=False, **kwargs)

        self.index = {}
        self.log_text = ''

    def reinit(self, env='default', **kwargs):

        self.vis = visdom.Visdom(env=env, **kwargs)
        return self

    def plot_many(self, d):

        for k, v in d.items():
            self.plot(k, v)

    def img_many(self, d):
        for k, v in d.items():
            self.img(k, v)

    def plot(self, name, y, **kwargs):

        x = self.index.get(name, 0)
        self.vis.line(Y=np.array([y]), X=np.array([x]),
                      win=name,
                      opts=dict(title=name),
                      update=None if x == 0 else 'append',
                      **kwargs
                      )
        self.index[name] = x + 1

    def img(self, name, img_, **kwargs):

        self.vis.images(img_.cpu().numpy(),
                        win=name,
                        opts=dict(title=name),
                        **kwargs
                        )

    def log(self, info, win='log_text'):
        self.log_text += ('[{time}] {info} <br>'.format(
            time=time.strftime('%m%d_%H%M%S'),
            info=info))
        self.vis.text(self.log_text, win)

    def __getattr__(self, name):
        return getattr(self.vis, name)

## Train And Valid

In [24]:
def train(**kwargs):
    
    
    img_flow_train = get_dataset(opt.train_flow_data, root = './')
    img_flow_valid = get_dataset(opt.valid_flow_data, root = './')
    
    img_spatial_train = get_dataset(opt.train_spatial_data, root = './')
    img_spatial_valid = get_dataset(opt.valid_spatial_data, root = './')
    
    c = list(zip(img_flow_train, img_spatial_train))
    random.shuffle(c)
    img_flow_train, img_spatial_train = zip(*c)
    
    opt._parse(kwargs)
    vis = Visualizer(opt.env, port = opt.vis_port)
    model = ResNet50_Flow_Spatial()
    
    model.to(opt.device)

    print('The number of training data: {}'.format(len(img_flow_train)))
    print('The number of Validing data: {}'.format(len(img_flow_valid)))
    
    train_data = SCVD_Dataset(img_flow_train, img_spatial_train, train = False, test = False)
    train_loader = DataLoader(train_data, batch_size = opt.batch_size_0, shuffle = False, num_workers = opt.num_workers)
    
    val_data = SCVD_Dataset(img_flow_valid, img_spatial_valid, train = False,  test = True)
    val_loader = DataLoader(val_data, batch_size = opt.batch_size_1, shuffle = False, num_workers = opt.num_workers)

    criterion1 = nn.CrossEntropyLoss(weight = opt.loss_weight)
    criterion2 = CMFL(alpha = 1, gamma = 3, binary= False, multiplier = 2)
    
    lr = opt.lr
   
    optimizer = model.get_optimizer(lr, opt.weight_decay)
   
    loss_meter = meter.AverageValueMeter()
    confusion_matrix = meter.ConfusionMeter(2)
    previous_loss = 1e10
    best_acc = 0

    for epoch in range(opt.max_epoch):

        loss_meter.reset()
        confusion_matrix.reset()
        for ii, (data1, data2, label, root) in enumerate(tqdm(train_loader)):
            
            flow = data1.to(opt.device)
            spatial = data2.to(opt.device)
            
            target = label.to(opt.device)
            score, score1, score2 = model(flow, spatial)
        
            loss1 = criterion1(score, target)
            loss2 = criterion2(score1, score2, target)
            loss = 0.5 * loss1 + 0.5 * loss2
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            loss_meter.add(loss.item())
            confusion_matrix.add(score.detach(), target.detach())  
                
        cm_accuracy = confusion_matrix.value()
        train_accuracy = (cm_accuracy[0][0] + cm_accuracy[1][1]) / (cm_accuracy.sum())
        print("epoch: {epoch},  training accuracy: {accuracy}".format(epoch = epoch, accuracy = train_accuracy))

        print("epoch: {epoch},  loss: {loss}".format(epoch = epoch,  loss = loss_meter.value()[0]))

        cm, acc = val(model, val_loader)
        print("-epoch: {epoch},  Valid accuracy: {acc}".format(epoch = epoch,acc = acc))

        model_name = opt.save_model + 'ResNet50-Flow-Spatial-{epoch}-{acc}.pth'.format(epoch = epoch, acc = acc)
        t.save(model.state_dict(), opt.save_model + 'ResNet50-Flow-Spatial-{epoch}-{acc}.pth'.format(epoch = epoch, acc = acc))
        if acc >= best_acc:
            best_acc = acc
            best_model_name = model_name
            
        # vis.plot('val_acc', acc)
        if loss_meter.value()[0] > previous_loss:          
            lr = lr * opt.lr_decay
            for param_group in optimizer.param_groups:
                param_group['lr'] = lr
        previous_loss = loss_meter.value()[0]

    print(best_model_name)      

@t.no_grad()
def val(model, dataloader):
    model.eval()
    confusion_matrix = meter.ConfusionMeter(2)
    for ii, (val_input, val_input1, label, root) in enumerate(tqdm(dataloader)):
        val_in = val_input.to(opt.device)
        val_in1 = val_input1.to(opt.device)
        score, score1, score2 = model(val_in, val_in1)
        confusion_matrix.add(score.detach(), label.type(t.LongTensor))
        
    model.train()
    cm_value = confusion_matrix.value()
    accuracy = (cm_value[0][0] + cm_value[1][1]) / (cm_value.sum())

    return confusion_matrix, accuracy 

## Test

In [27]:
@t.no_grad()
def test(**kwargs):
    opt._parse(kwargs)
    
    model = DenseNet121_Flow_Spatial()
    
    model.load_state_dict(t.load(opt.load_model_path, map_location='cpu'))
    
    
    img_flow_test = get_dataset(opt.test_flow_data, root = './')
    img_spatial_test = get_dataset(opt.test_spatial_data, root = './')
    
    
    print('The number of Testing data: {}'.format(len(img_flow_test)))
    test_data = SCVD_Dataset(img_flow_test, img_spatial_test, train = False, test = True)
    test_dataloader = DataLoader(test_data, batch_size = opt.batch_size_1, shuffle = False, num_workers = opt.num_workers)

    model.to(opt.device)
    results = []
    confusion_matrix = meter.ConfusionMeter(2)
    model.eval()
    
    
    for data1, data2, label, root in tqdm(test_dataloader):
        input1 = data1.to(opt.device)
        input2 = data2.to(opt.device)
        score, score1, score2 = model(input1, input2)

        output = F.softmax(score, dim=1)[:, 1].detach().tolist()
        
        confusion_matrix.add(score.detach(), label.type(t.LongTensor))
        


        batch_results = [(label_.item(), output_ ,root_ ) for label_, output_ , root_ in zip(label, output , root)]
        results += batch_results

       
    write_csv(results, opt.result_file)
    cm_value = confusion_matrix.value()
    accuracy = (cm_value[0][0] + cm_value[1][1]) / (cm_value.sum())
    print("Test accuracy: {}".format(accuracy))


## Model Train and Test

In [None]:
train()

In [None]:
test()

## Compute AUC EER

In [17]:
import numpy as np
import matplotlib.pyplot as plt  
import os
from collections import defaultdict
import csv
import shutil

def EER(FPR,TPR,threshold):
    """ Returns equal error rate (EER) and the corresponding threshold. """
    FNR = 1-TPR
    abs_diffs = np.abs(FPR - FNR)
    min_index = np.argmin(abs_diffs)
    eer = np.mean((FPR[min_index], FNR[min_index]))
    return eer, threshold[min_index]


def ComputeMetric(y_true, y_score, pos_label=1, isPlot=False, model_name='estimator', fig_path='.'):

    AUC = metrics.roc_auc_score(y_true, y_score)
    
    FPR, TPR, threshold = metrics.roc_curve(y_true, y_score, pos_label=pos_label)
    eer, best_threshold = EER(FPR, TPR, threshold)
    
    # if isPlot:
    #     display = metrics.RocCurveDisplay(fpr=FPR, tpr=TPR, roc_auc=AUC,
    #                                       estimator_name=model_name)
    #     display.plot()
    #     if not fig_path is None:
    #         plt.savefig(os.path.join(fig_path, model_name+'.png'))
    #     plt.show()

    return AUC, eer, best_threshold


def evaluation(csv_path, root='./', image = True):
    csv_path = os.path.join(root, csv_path)
    print (csv_path)
    img_dict = defaultdict(dict)
    model_name = csv_path.split('/')[-1].split('.')[0]
    with open(csv_path,'r') as file_object:
        reader = csv.reader(file_object)
        for i, data in enumerate(reader):
            if i == 0:
                continue
            label = int(data[0])
            score = float(data[1])

            if image == True:   
                img_name = '_'.join(data[2].split('_')[:-1]) 
            else:
                img_name = data[2]                                   

            if not img_name in img_dict.keys():
                img_dict[img_name] = {'label': label, 'num': 1, 'score':[score]}
            else:
                img_dict[img_name]['num'] += 1
                img_dict[img_name]['score'] += [score]

        print (len(img_dict))

        y_ture = np.array([])
        y_score = np.array([])

        false=[]
        for k, v in img_dict.items():

            score_averagy = sum(v['score']) / v['num']
            y_ture = np.append(y_ture, v['label']) 
            y_score = np.append(y_score, score_averagy)

        auc, eer, best_thresh = ComputeMetric(y_ture, y_score, isPlot=True,\
                                            model_name=model_name)
        print ('len : ', y_ture.shape)
        print ('model_name: ', model_name, ' auc: ', auc, ' eer: ', eer, \
            ' best_thresh: ', best_thresh)

In [None]:
csv_path = opt.result_file
evaluation(csv_path, root='./', image = True)