In [2]:
from PIL import Image, ImageChops, ImageEnhance
from torchvision import transforms
import os
import random
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
import torch.utils.data as data
import torch.nn as nn
import torch.optim as optim
from tqdm.notebook import tqdm, trange
import torch
import time
import torchvision.models as models

In [3]:
def convert_to_ela_image(path, quality):
    temp_filename = 'temp_file.jpg'
    ela_filename = 'temp_ela_file.png'
    
    image = Image.open(path).convert('RGB')
    image.save(temp_filename, 'JPEG', quality = quality)
    temp_image = Image.open(temp_filename)
    
    ela_image = ImageChops.difference(image, temp_image)
    
    extrema = ela_image.getextrema()
    max_diff = max([ex[1] for ex in extrema])
    if max_diff == 0:
        max_diff = 1
    scale = 255.0 / max_diff

    ela_image = ImageEnhance.Brightness(ela_image).enhance(scale)
    
    return ela_image

In [4]:
def prepare_image(image_path, image_size):
    convert_tensor = transforms.ToTensor()
    return convert_tensor(convert_to_ela_image(image_path, 85).resize(image_size))

In [6]:
X = [] # ELA converted images
Y = [] # label: 0 for fake, 1 for real

image_size = (224, 224) 
path = 'CV-Test/Au'
for dirname, _, filenames in os.walk(path):
    for filename in filenames:
        full_path = os.path.join(dirname, filename)
        X.append(prepare_image(full_path, image_size))
        Y.append(1)  

print(len(X), len(Y))

100 100


In [7]:
image_size = (224, 224) 
path = 'CV-Test/Tp'
for dirname, _, filenames in os.walk(path):
    for filename in filenames:
        full_path = os.path.join(dirname, filename)
        X.append(prepare_image(full_path, image_size))
        Y.append(0)

print(len(X), len(Y))

200 200


In [8]:
for i in range(10):
    X, Y = shuffle(X, Y, random_state=i)

In [41]:
test_iterator = data.DataLoader([[X[i], Y[i]] for i in range(len(Y))],
                                 batch_size=256)

In [55]:
def calculate_accuracy(y_pred, y):
    top_pred = y_pred.argmax(1, keepdim=True)
    correct = top_pred.eq(y.view_as(top_pred)).sum()
    acc = correct.float() / y.shape[0]
    predict_real = 0
    predict_fake = 0
    tp = 0
    fp = 0
    tn = 0
    fn = 0
    for index in range(len(y)):
        label = y[index]
        predict = top_pred[index]
        if int(predict) == 1:
            predict_real += 1
            if int(label) == 1:
                tn += 1
            if int(label) == 0:
                fn += 1
            
        if int(predict) == 0:
            predict_fake += 1
            if int(label) == 0:
                tp += 1
            if int(label) == 1:
                fp += 1
        
    return predict_real, predict_fake, tp, fp, tn, fn,acc

In [56]:
def evaluate(model, iterator, criterion, device):
    epoch_loss = 0
    epoch_acc = 0
    model.eval()
    
    with torch.no_grad():
        for (x, y) in tqdm(iterator, desc="Evaluating", leave=False):

            x = x.to(device)
            y = y.to(device)

            y_pred, _ = model(x)

            loss = criterion(y_pred, y)

            predict_real, predict_fake, tp, fp, tn, fn,acc = calculate_accuracy(y_pred, y)

            epoch_loss += loss.item()
            epoch_acc += acc.item()

    return predict_real, predict_fake, tp, fp, tn, fn, epoch_loss / len(iterator), epoch_acc / len(iterator)

In [57]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

criterion = nn.CrossEntropyLoss()

criterion = criterion.to(device)

# VGG10

In [58]:
class VGG(nn.Module):
    def __init__(self, features, output_dim):
        super().__init__()

        self.features = features
        self.avgpool = nn.AdaptiveAvgPool2d(7)

        self.classifier = nn.Sequential(
            nn.Linear(256 * 7 * 7, 4096),
            nn.LeakyReLU(inplace=True),
            nn.Dropout(0.5), 
            nn.Linear(4096, 4096),
            nn.LeakyReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(4096, output_dim),
        )

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        h = x.view(x.shape[0], -1)
        x = self.classifier(h)
        return x, h

In [59]:
def get_vgg_layers(config, batch_norm):
    layers = []
    in_channels = 3
    for c in config:
        assert c == 'M' or isinstance(c, int)
        if c == 'M':
            layers += [nn.MaxPool2d(kernel_size=2)]
        else:
            conv2d = nn.Conv2d(in_channels, c, kernel_size=3, padding=1)
            if batch_norm:
                layers += [conv2d, nn.BatchNorm2d(c), nn.LeakyReLU(inplace=True)]
            else:
                layers += [conv2d, nn.LeakyReLU(inplace=True)]
            in_channels = c

    return nn.Sequential(*layers)

In [60]:
vgg16_config = [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M']
vgg16_layers = get_vgg_layers(vgg16_config, batch_norm=True)

OUTPUT_DIM = 2
model = VGG(vgg16_layers, OUTPUT_DIM)

In [61]:
pretrained_model = models.vgg16_bn(pretrained=True)

In [62]:
IN_FEATURES = pretrained_model.classifier[-1].in_features

final_fc = nn.Linear(IN_FEATURES, OUTPUT_DIM)

pretrained_model.classifier[-1] = final_fc

pretrained_model.features = pretrained_model.features[:24]
pretrained_model.classifier = None

In [65]:
model.load_state_dict(torch.load('VGG10/VGG10-copymove-best.pt'))
model = model.to(device)
predict_real, predict_fake, tp, fp, tn, fn, test_loss, test_acc = evaluate(model, test_iterator, criterion, device)
precision = tp / (tp + fp)
recall = tp / (tp + fn)
f1 = 2 * precision * recall / (precision + recall)

print('The number of predicted real images: {}'.format(predict_real))
print('The number of predicted fake images: {}'.format(predict_fake))
print('The number of correctly predicted real images: {}'.format(tn))
print('The number of correctly predicted fake images: {}'.format(tp))
print('The number of predicted fake images that should be real: {}'.format(fp))
print('The number of predicted real images that should be fake: {}'.format(fn))
print('precision: {}'.format(precision))
print('recall: {}'.format(recall))
print('f1 score: {}'.format(f1))
print('test loss: {}'.format(test_loss))
print('test accuracy: {}'.format(test_acc))

Evaluating:   0%|          | 0/1 [00:00<?, ?it/s]

The number of predicted real images: 89
The number of predicted fake images: 111
The number of correctly predicted real images: 85
The number of correctly predicted fake images: 96
The number of predicted fake images that should be real: 15
The number of predicted real images that should be fake: 4
precision: 0.8648648648648649
recall: 0.96
f1 score: 0.9099526066350712
test loss: 0.2938739061355591
test accuracy: 0.9049999713897705


# VGG16

In [66]:
class VGG(nn.Module):
    def __init__(self, features, output_dim):
        super().__init__()

        self.features = features

        self.avgpool = nn.AdaptiveAvgPool2d(7)

        self.classifier = nn.Sequential(
            nn.Linear(512 * 7 * 7, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(4096, output_dim),
        )

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        h = x.view(x.shape[0], -1)
        x = self.classifier(h)
        return x, h

In [67]:
def get_vgg_layers(config, batch_norm):

    layers = []
    in_channels = 3

    for c in config:
        assert c == 'M' or isinstance(c, int)
        if c == 'M':
            layers += [nn.MaxPool2d(kernel_size=2)]
        else:
            conv2d = nn.Conv2d(in_channels, c, kernel_size=3, padding=1)
            if batch_norm:
                layers += [conv2d, nn.BatchNorm2d(c), nn.ReLU(inplace=True)]
            else:
                layers += [conv2d, nn.ReLU(inplace=True)]
            in_channels = c

    return nn.Sequential(*layers)

In [68]:
vgg16_config = [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512,
                'M', 512, 512, 512, 'M']
vgg16_layers = get_vgg_layers(vgg16_config, batch_norm=True)

In [70]:
OUTPUT_DIM = 2

model = VGG(vgg16_layers, OUTPUT_DIM)
pretrained_model = models.vgg16_bn(pretrained=True)

In [71]:
IN_FEATURES = pretrained_model.classifier[-1].in_features

final_fc = nn.Linear(IN_FEATURES, OUTPUT_DIM)

pretrained_model.classifier[-1] = final_fc

In [72]:
model.load_state_dict(torch.load('VGG16/VGG16-copymove-best.pt'))
model = model.to(device)
predict_real, predict_fake, tp, fp, tn, fn, test_loss, test_acc = evaluate(model, test_iterator, criterion, device)
precision = tp / (tp + fp)
recall = tp / (tp + fn)
f1 = 2 * precision * recall / (precision + recall)

print('The number of predicted real images: {}'.format(predict_real))
print('The number of predicted fake images: {}'.format(predict_fake))
print('The number of correctly predicted real images: {}'.format(tn))
print('The number of correctly predicted fake images: {}'.format(tp))
print('The number of predicted fake images that should be real: {}'.format(fp))
print('The number of predicted real images that should be fake: {}'.format(fn))
print('precision: {}'.format(precision))
print('recall: {}'.format(recall))
print('f1 score: {}'.format(f1))
print('test loss: {}'.format(test_loss))
print('test accuracy: {}'.format(test_acc))

Evaluating:   0%|          | 0/1 [00:00<?, ?it/s]

The number of predicted real images: 91
The number of predicted fake images: 109
The number of correctly predicted real images: 84
The number of correctly predicted fake images: 93
The number of predicted fake images that should be real: 16
The number of predicted real images that should be fake: 7
precision: 0.8532110091743119
recall: 0.93
f1 score: 0.8899521531100477
test loss: 0.2539728581905365
test accuracy: 0.8849999904632568


# ResNet

In [73]:
# Defining the ResNet architecture
class ResNet(nn.Module):
    def __init__(self, config, output_dim):
        super().__init__()
                
        block, n_blocks, channels = config
        self.in_channels = channels[0]
            
        assert len(n_blocks) == len(channels) == 4
        
        self.conv1 = nn.Conv2d(3, self.in_channels, kernel_size = 7, stride = 2, padding = 3, bias = False)
        self.bn1 = nn.BatchNorm2d(self.in_channels)
        self.relu = nn.ReLU(inplace = True)
        self.maxpool = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
        
        self.layer1 = self.get_resnet_layer(block, n_blocks[0], channels[0])
        self.layer2 = self.get_resnet_layer(block, n_blocks[1], channels[1], stride = 2)
        self.layer3 = self.get_resnet_layer(block, n_blocks[2], channels[2], stride = 2)
        self.layer4 = self.get_resnet_layer(block, n_blocks[3], channels[3], stride = 2)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(self.in_channels, output_dim)
        
    def get_resnet_layer(self, block, n_blocks, channels, stride = 1):
    
        layers = []
        
        if self.in_channels != block.expansion * channels:
            downsample = True
        else:
            downsample = False
        
        layers.append(block(self.in_channels, channels, stride, downsample))
        
        for i in range(1, n_blocks):
            layers.append(block(block.expansion * channels, channels))

        self.in_channels = block.expansion * channels
            
        return nn.Sequential(*layers)
        
    def forward(self, x):
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        
        x = self.avgpool(x)
        h = x.view(x.shape[0], -1)
        x = self.fc(h)
        
        return x, h

In [74]:
class BasicBlock(nn.Module):
    
    expansion = 1
    
    def __init__(self, in_channels, out_channels, stride = 1, downsample = False):
        super().__init__()
                
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size = 3, 
                               stride = stride, padding = 1, bias = False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size = 3, 
                               stride = 1, padding = 1, bias = False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        self.relu = nn.ReLU(inplace = True)
        
        if downsample:
            conv = nn.Conv2d(in_channels, out_channels, kernel_size = 1, 
                             stride = stride, bias = False)
            bn = nn.BatchNorm2d(out_channels)
            downsample = nn.Sequential(conv, bn)
        else:
            downsample = None
        
        self.downsample = downsample
        
    def forward(self, x):
        
        i = x
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        
        x = self.conv2(x)
        x = self.bn2(x)
        
        if self.downsample is not None:
            i = self.downsample(i)
                        
        x += i
        x = self.relu(x)
        
        return x

In [75]:
class Bottleneck(nn.Module):
    
    expansion = 4
    
    def __init__(self, in_channels, out_channels, stride = 1, downsample = False):
        super().__init__()
    
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size = 1, 
                               stride = 1, bias = False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size = 3, 
                               stride = stride, padding = 1, bias = False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        self.conv3 = nn.Conv2d(out_channels, self.expansion * out_channels, kernel_size = 1,
                               stride = 1, bias = False)
        self.bn3 = nn.BatchNorm2d(self.expansion * out_channels)
        
        self.relu = nn.ReLU(inplace = True)
        
        if downsample:
            conv = nn.Conv2d(in_channels, self.expansion * out_channels, kernel_size = 1, 
                             stride = stride, bias = False)
            bn = nn.BatchNorm2d(self.expansion * out_channels)
            downsample = nn.Sequential(conv, bn)
        else:
            downsample = None
            
        self.downsample = downsample
        
    def forward(self, x):
        
        i = x
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        
        x = self.conv3(x)
        x = self.bn3(x)
                
        if self.downsample is not None:
            i = self.downsample(i)
            
        x += i
        x = self.relu(x)
    
        return x

In [77]:
from collections import namedtuple
ResNetConfig = namedtuple('ResNetConfig', ['block', 'n_blocks', 'channels'])

In [78]:
resnet18_config = ResNetConfig(block = BasicBlock,
                               n_blocks = [2,2,2,2],
                               channels = [64, 128, 256, 512])

resnet34_config = ResNetConfig(block = BasicBlock,
                               n_blocks = [3,4,6,3],
                               channels = [64, 128, 256, 512])
resnet50_config = ResNetConfig(block = Bottleneck,
                               n_blocks = [3, 4, 6, 3],
                               channels = [64, 128, 256, 512])

resnet101_config = ResNetConfig(block = Bottleneck,
                                n_blocks = [3, 4, 23, 3],
                                channels = [64, 128, 256, 512])

resnet152_config = ResNetConfig(block = Bottleneck,
                                n_blocks = [3, 8, 36, 3],
                                channels = [64, 128, 256, 512])

In [79]:
OUTPUT_DIM = 2

model = ResNet(resnet18_config, OUTPUT_DIM)

In [80]:
def initialize_parameters(m):
    if isinstance(m, nn.Conv2d):
      nn.init.kaiming_normal_(m.weight.data, nonlinearity='relu')
      if m.bias is not None:
        nn.init.constant_(m.bias.data, 0)
    elif isinstance(m, nn.BatchNorm2d):
      nn.init.constant_(m.weight.data, 1)
      nn.init.constant_(m.bias.data, 0)
    elif isinstance(m, nn.Linear):
        nn.init.xavier_normal_(m.weight.data, gain=nn.init.calculate_gain('relu'))
        nn.init.constant_(m.bias.data, 0)

In [81]:
import torchvision.models as models
pretrained_model = models.resnet18(pretrained = True)

IN_FEATURES = pretrained_model.fc.in_features 

fc = nn.Linear(IN_FEATURES, OUTPUT_DIM)

pretrained_model.fc = fc

Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /home/featurize/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth


  0%|          | 0.00/44.7M [00:00<?, ?B/s]

In [82]:
model.load_state_dict(torch.load('ResNet/resnet-copymove-best.pt'))
model = model.to(device)
predict_real, predict_fake, tp, fp, tn, fn, test_loss, test_acc = evaluate(model, test_iterator, criterion, device)
precision = tp / (tp + fp)
recall = tp / (tp + fn)
f1 = 2 * precision * recall / (precision + recall)

print('The number of predicted real images: {}'.format(predict_real))
print('The number of predicted fake images: {}'.format(predict_fake))
print('The number of correctly predicted real images: {}'.format(tn))
print('The number of correctly predicted fake images: {}'.format(tp))
print('The number of predicted fake images that should be real: {}'.format(fp))
print('The number of predicted real images that should be fake: {}'.format(fn))
print('precision: {}'.format(precision))
print('recall: {}'.format(recall))
print('f1 score: {}'.format(f1))
print('test loss: {}'.format(test_loss))
print('test accuracy: {}'.format(test_acc))

Evaluating:   0%|          | 0/1 [00:00<?, ?it/s]

The number of predicted real images: 91
The number of predicted fake images: 109
The number of correctly predicted real images: 83
The number of correctly predicted fake images: 92
The number of predicted fake images that should be real: 17
The number of predicted real images that should be fake: 8
precision: 0.8440366972477065
recall: 0.92
f1 score: 0.8803827751196172
test loss: 0.2973495125770569
test accuracy: 0.875


# AlexNet

In [84]:
X = [] # ELA converted images
Y = [] # label: 0 for fake, 1 for real

image_size = (32, 32) 
path = 'CV-Test/Au'
for dirname, _, filenames in os.walk(path):
    for filename in filenames:
        full_path = os.path.join(dirname, filename)
        X.append(prepare_image(full_path, image_size))
        Y.append(1)  

print(len(X), len(Y))

100 100


In [85]:
path = 'CV-Test/Tp'
for dirname, _, filenames in os.walk(path):
    for filename in filenames:
        full_path = os.path.join(dirname, filename)
        X.append(prepare_image(full_path, image_size))
        Y.append(0)

print(len(X), len(Y))

200 200


In [86]:
for i in range(10):
    X, Y = shuffle(X, Y, random_state=i)

In [87]:
test_iterator = data.DataLoader([[X[i], Y[i]] for i in range(len(Y))],
                                 batch_size=256)

In [88]:
class AlexNet(nn.Module):
    def __init__(self, output_dim):
        super().__init__()
        # Convolution layer(feature extracction)
        self.features = nn.Sequential(
            # layer 1 
            nn.Conv2d(3, 64, 3, 2, 1), 
            nn.MaxPool2d(2),
            nn.BatchNorm2d(64), # batch normalization 
            nn.LeakyReLU(inplace=True), # leaky relu as our activation function
            # layer 2
            nn.Conv2d(64, 192, 3, padding=1),
            nn.MaxPool2d(2),
            nn.BatchNorm2d(192),
            nn.LeakyReLU(inplace=True),
            # layer 3
            nn.Conv2d(192, 384, 3, padding=1),
            nn.BatchNorm2d(384),
            nn.LeakyReLU(inplace=True),
            # layer 4
            nn.Conv2d(384, 256, 3, padding=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(inplace=True),
            # layer 5
            nn.Conv2d(256, 256, 3, padding=1),
            nn.MaxPool2d(2),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(inplace=True),
        )
        # Fully connected layer(classification)
        self.classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(256 * 2 * 2, 4096),
            nn.LeakyReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.LeakyReLU(inplace=True), 
            nn.Linear(4096, output_dim),
        )

    def forward(self, x):
        x = self.features(x)
        h = x.view(x.shape[0], -1)
        x = self.classifier(h)
        return x, h

In [89]:
OUTPUT_DIM = 2
model = AlexNet(OUTPUT_DIM)

In [90]:
model.load_state_dict(torch.load('AlexNet/alex-copymove-model-best.pt'))
model = model.to(device)
predict_real, predict_fake, tp, fp, tn, fn, test_loss, test_acc = evaluate(model, test_iterator, criterion, device)
precision = tp / (tp + fp)
recall = tp / (tp + fn)
f1 = 2 * precision * recall / (precision + recall)

print('The number of predicted real images: {}'.format(predict_real))
print('The number of predicted fake images: {}'.format(predict_fake))
print('The number of correctly predicted real images: {}'.format(tn))
print('The number of correctly predicted fake images: {}'.format(tp))
print('The number of predicted fake images that should be real: {}'.format(fp))
print('The number of predicted real images that should be fake: {}'.format(fn))
print('precision: {}'.format(precision))
print('recall: {}'.format(recall))
print('f1 score: {}'.format(f1))
print('test loss: {}'.format(test_loss))
print('test accuracy: {}'.format(test_acc))

Evaluating:   0%|          | 0/1 [00:00<?, ?it/s]

The number of predicted real images: 104
The number of predicted fake images: 96
The number of correctly predicted real images: 86
The number of correctly predicted fake images: 82
The number of predicted fake images that should be real: 14
The number of predicted real images that should be fake: 18
precision: 0.8541666666666666
recall: 0.82
f1 score: 0.836734693877551
test loss: 0.34927788376808167
test accuracy: 0.8399999737739563
