<a href="https://colab.research.google.com/github/Kuyas/ImageSplicing/blob/master/ImageSplicing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Image Splicing

## Import The required libraries

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
import torchvision.models as models
from torchvision.models import resnet152
import torch.optim as optim
from torch.autograd import Variable
from sklearn import svm
from math import sqrt
import matplotlib.pyplot as plt
import numpy as np
import time
import cv2
from google.colab.patches import cv2_imshow
from torchvision import datasets
import statistics

## Configuration Variables

In [None]:
EPOCHS = 50
BATCH_SIZE = 1
LEARNING_RATE = 0.0003
ILLUMINANT_TRAIN_DATAPATH = '/content/drive/My Drive/CTScan/run_2/train/'
ILLUMINANT_VAL_DATAPATH = '/content/drive/My Drive/CTScan/run_2/test/'
ILLUMINANT_TEST_DATAPATH = '/content/drive/My Drive/CTScan/run_2/test/'
clf = svm.SVC()

## The Resnet and Feature Extraction is defined here

In [None]:
class Base(nn.Module):
    def __init__(self):
      super(Base, self).__init__()
      self.model = models.resnet152(True)


      # The feature extraction to create the DSF 
      self.feature_extract = nn.Sequential(self.model.conv1,
                                           self.model.bn1,
                                           nn.ReLU(),
                                           nn.MaxPool2d(3,2,1,1,ceil_mode=False),
                                           self.model.layer1,
                                           self.model.layer2,
                                           self.model.layer3,
                                           self.model.layer4)
      
      self.avgpool = self.model.avgpool
      self.classifier = self.model.fc
      self.dropout = nn.Dropout(0.5)
      self.relu = nn.ReLU()
      self.fc1 = nn.Linear(1000,500)
      self.fc2 = nn.Linear(500,250)
      self.fc3 = nn.Linear(250,2)
      self.softmax = nn.LogSoftmax(dim=1)

      self.gradients = None
    
    # The hook to get the gradient at the particular layer
    def activations_hook(self,grad):
      self.gradients = grad
    
    def get_gradient(self):
        return self.gradients

    def get_activations_hook(self):
      return self.gradients

    def get_activations(self,x):
      return self.feature_extract(x)
      
    def forward(self,x):
      x = self.feature_extract(x)
      h = x.register_hook(self.activations_hook)
      x = self.avgpool(x)
      x = x.view((1,-1))
      y = x
      x = self.classifier(x)
      # x = self.dropout(F.relu(self.fc1(x)))
      # x = self.dropout(F.relu(self.fc2(x)))
      # x = self.dropout(F.relu(self.fc3(x)))
      # x = self.softmax(x)
      return x,y


In [None]:
class ImageFolderWithPaths(datasets.ImageFolder):
    """Custom dataset that includes image file paths. Extends
    torchvision.datasets.ImageFolder
    """

    # override the __getitem__ method. this is the method that dataloader calls
    def __getitem__(self, index):
        # this is what ImageFolder normally returns 
        original_tuple = super(ImageFolderWithPaths, self).__getitem__(index)
        # the image file path
        path = self.imgs[index][0]
        # make a new tuple that includes original and the path
        tuple_with_path = (original_tuple + (path,))
        return tuple_with_path


## Transformations for the Images
- Resize images to 224x224 (The input to the ResNet is 224x224)
- convert to a tensor

In [None]:
TRAIN_TRANSFORM_IMG = transforms.Compose([
    transforms.Resize(size=224),
    transforms.RandomResizedCrop(size=224, scale=(0.8, 1.0)),
    # transforms.RandomRotation(degrees = 15),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean = [0.485, 0.456, 0.406], std = [0.229, 0.224, 0.225])
    ])

VAL_TRANSFORM_IMG = transforms.Compose([
    transforms.Resize(size=224),
    transforms.RandomResizedCrop(size=224, scale=(0.8, 1.0)),
    # transforms.CenterCrop(size=224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

train_data = ImageFolderWithPaths(root=ILLUMINANT_TRAIN_DATAPATH, transform=TRAIN_TRANSFORM_IMG)
train_data_loader = torch.utils.data.DataLoader(train_data,batch_size=BATCH_SIZE,shuffle=True,num_workers=0)


data = ImageFolderWithPaths(root=ILLUMINANT_TRAIN_DATAPATH, transform=TRAIN_TRANSFORM_IMG)
data_loader = torch.utils.data.DataLoader(train_data,batch_size=1,shuffle=True,num_workers=0)


val_data = ImageFolderWithPaths(root=ILLUMINANT_VAL_DATAPATH, transform=VAL_TRANSFORM_IMG)
val_data_loader = torch.utils.data.DataLoader(val_data,batch_size=BATCH_SIZE,shuffle=True,num_workers=0)

test_data = ImageFolderWithPaths(root=ILLUMINANT_TEST_DATAPATH, transform=VAL_TRANSFORM_IMG)
test_data_loader = torch.utils.data.DataLoader(test_data,batch_size=BATCH_SIZE,shuffle=True,num_workers=0)

## Information about the datase

In [None]:
dataiter = iter(train_data_loader)
images, labels, paths = dataiter.next()
print("Number of Training Examples: ", len(train_data))
print("Number of Test Examples: ", len(test_data))
print("Number of Validation Examples: ", len(val_data))
print("Detected Classes are: ", train_data.class_to_idx)
train_iter = iter(train_data_loader)
images, labels, _ = train_iter.next()
print("Image Shape on Batch size = {} ".format(images.size()))

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

Number of Training Examples:  128
Number of Test Examples:  35
Number of Validation Examples:  35
Detected Classes are:  {'tampered': 0, 'true': 1}
Image Shape on Batch size = torch.Size([1, 3, 224, 224]) 
cuda:0


## Initialize the model
- create model
- transfer it to the GPU
- define the loss function (CrossEntropyLoss)
- Adam Optimizer

## Function to generate the Heatmap

In [None]:
def HeatmapGeneration(img,paths):
  resnet.eval()
  # img, _ = next(iter(data_loader))
  pred,_ = resnet(img)
  # print(pred.shape)
  pred[:, 2].backward()

  """
  get the gradient and neuron values from the penultimate layer
  We take the ones with the greatest change and build the heatmap from it
  """
  gradients = resnet.get_gradient()
  pooled_gradients = torch.mean(gradients, dim=[0, 2, 3])
  activations = resnet.get_activations(img).detach()

  for i in range(512):
      activations[:, i, :, :] *= pooled_gradients[i]

      
  heatmap = torch.mean(activations, dim=1).squeeze()
  heatmap = np.maximum(heatmap.cpu(), 0)
  heatmap /= torch.max(heatmap)
  # plt.matshow(heatmap.squeeze())
  heatmap = heatmap.numpy()
  paths = ''.join(paths)
  p = paths.split("/")
  img = cv2.imread(paths)

  """
  Resizing the heatmap to fit the image
  """
  heatmap = cv2.resize(heatmap, (img.shape[1], img.shape[0]))
  heatmap = np.uint8(255 * heatmap)
  heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_HSV)
  superimposed_img = heatmap * 0.4 + img
  scale_percent = 20 # percent of original size
  width = int(superimposed_img.shape[1] * scale_percent / 100)
  height = int(superimposed_img.shape[0] * scale_percent / 100)
  dim = (width, height)
  resized = cv2.resize(superimposed_img, dim, interpolation = cv2.INTER_AREA)
  # cv2_imshow(resized)


  """
  Convert the heatmap to HSV space and display only the H
  """
  h, s, v = resized[:, :, 0], resized[:, :, 1], resized[:, :, 2]
  # cv2_imshow(h)
  resized = np.float32(h)
  hist = cv2.calcHist([resized], [0], None, [256], [0, 256])
  # cv2_imshow(hist)
  # plt.show()
  # if(p[-2] == 'tampered'):
  print(p[-2],min(hist),max(hist),np.mean(hist),np.median(hist))
  return max(hist)
  # print(hist.shape)


In [None]:
resnet = Base()
resnet.to(device)
resnet.eval()
dsf = []
y_dsf = []
k = 0
t_iter = iter(data_loader)
while k in range(128):
  k=k+1
  # print(k)
  img,label,paths = t_iter.next()
  img = img.cuda()
  pred, features = resnet(img)
  pred.argmax(dim=1)
  pred[:,2].backward()
  gradients = resnet.get_gradient()
  pooled_gradients = torch.mean(gradients, dim=[0, 2, 3])
  activations = resnet.get_activations(img).detach()
  dsf.append(features.cpu().data.numpy())
  # p = paths.raw_input().split("/")
  paths = ''.join(paths)
  p = paths.split("/")
  if p[-2] == 'tampered':
      y_dsf.append(0)
  else:
      y_dsf.append(1)

In [None]:
dsf = np.asarray(dsf)
dsf = dsf.reshape(128,2048)

In [None]:
from sklearn.svm import SVC
# clf = SVC(gamma='auto',class_weight={0:0.25,1:0.75})
clf = SVC(gamma='auto',class_weight="balanced")
clf.fit(dsf, y_dsf)

SVC(C=1.0, break_ties=False, cache_size=200, class_weight='balanced', coef0=0.0,
    decision_function_shape='ovr', degree=3, gamma='auto', kernel='rbf',
    max_iter=-1, probability=False, random_state=None, shrinking=True,
    tol=0.001, verbose=False)

In [None]:
correct = 0
total = 0
classes =['tampered','normal']
# confusion_matrix = torch.zeros(2, 2)
correct_0 = 0
correct_1 = 0
count = 0
class_correct = list(0. for i in range(2))
class_total = list(0. for i in range(2))
mean = 0
threshold = 828
resnet.eval()
# with torch.no_grad():
for data in test_data_loader:
    total=total+1
    images,labels,paths = data[0].to(device), data[1].to(device),data[2]
    # images = to_pil(gray_world(from_pil(images))
    outputs, features = resnet(images)

    predicted = clf.predict(features.cpu().data.numpy())
    paths = ''.join(paths)
    p = paths.split("/")
    actual = 0
    if p[-2] == 'tampered':
        actual = 0
        class_total[0] += 1
    else:
        actual = 1
        class_total[1]+=1

    if(predicted == 0):
      print("1",end=" ")
      count = count+1
      mean_single = HeatmapGeneration(images,paths)
      mean+=mean_single
      if mean_single < 900:
        predicted = 1
    if(predicted == 0):
      print("0",end=" ")
      count = count+1
      mean_single = HeatmapGeneration(images,paths)
      mean+=mean_single
      if mean_single > 1100:
        predicted = 1
    if(actual == predicted and actual == 0):
      correct = correct+1
      class_correct[0] +=1
    elif(actual == predicted and actual == 1):
      correct = correct+1
      class_correct[1] +=1



# print(mean,count,mean/count)

acc = 100*correct/total
print(correct)
print(total)
print('Testing accuracy: ' + str(acc))
# print(confusion_matrix)

for i in range(2):
    print('Accuracy of %5s : %2d %%' % (
        classes[i], 100 * class_correct[i] / class_total[i]))
acc0 = 100 * class_correct[0] / class_total[0]
acc1 = 100 * class_correct[1] / class_total[1]
print_row_end = []
row_end = [acc,acc0,acc1]

1 true [0.] [860.] 35.753906 21.0
1 true [0.] [615.] 38.597656 33.5
1 tampered [0.] [1562.] 33.371094 23.5
0 tampered [0.] [1562.] 33.371094 23.5
1 tampered [1.] [1029.] 40.035156 22.0
0 tampered [1.] [1029.] 40.035156 22.0
1 tampered [0.] [1302.] 32.617188 23.5
0 tampered [0.] [1302.] 32.617188 23.5
1 true [0.] [1028.] 37.851562 28.0
0 true [0.] [1028.] 37.851562 28.0
1 true [0.] [925.] 37.33203 21.0
0 true [0.] [925.] 37.33203 21.0
1 true [0.] [1545.] 33.871094 18.0
0 true [0.] [1545.] 33.871094 18.0
1 tampered [0.] [805.] 39.578125 19.0
1 tampered [0.] [1205.] 39.808594 27.0
0 tampered [0.] [1205.] 39.808594 27.0
1 tampered [0.] [989.] 34.820312 26.0
0 tampered [0.] [989.] 34.820312 26.0
1 true [0.] [1260.] 33.125 26.5
0 true [0.] [1260.] 33.125 26.5
1 true [0.] [1408.] 34.628906 22.5
0 true [0.] [1408.] 34.628906 22.5
1 true [0.] [577.] 33.257812 27.5
1 true [1.] [805.] 32.796875 25.0
1 true [0.] [1323.] 36.070312 23.0
0 true [0.] [1323.] 36.070312 23.0
1 true [0.] [923.] 36.39453 

In [None]:
Testing accuracy: 77.77777777777777
Accuracy of tampered : 89 %
Accuracy of normal : 50 %
threshold - < 700

In [None]:
Testing accuracy: 68.57142857142857
Accuracy of tampered : 61 %
Accuracy of normal : 76 %
threshold - > 900

Testing accuracy: 68.57142857142857
Accuracy of tampered : 83 %
Accuracy of normal : 52 %
threshold - > 1100

In [None]:
""" GRAY WORLD"""
Testing accuracy: 62.857142857142854
Accuracy of tampered : 44 %
Accuracy of normal : 82 %
threshold = 750

Testing accuracy: 65.71428571428571
Accuracy of tampered : 50 %
Accuracy of normal : 82 %
threshold = 800

## The running of the network
- Forward Propogation

In [None]:
model = Base()
model.to(device)
criterion = nn.NLLLoss()
_params = filter(lambda p: p.requires_grad, model.parameters())
# optimizer = optim.SGD(_params, lr=LEARNING_RATE, momentum=0.9)
optimizer = optim.Adam(_params, lr=LEARNING_RATE, weight_decay=1e-5)

In [None]:
train_loss = []
test_loss = []
train_accuracy = []
test_accuracy = []

for epoch in range(EPOCHS):
    print(epoch)
    start = time.time()
    correct = 0 
    iterations = 0
    iter_loss = 0.0

    model.train()

    for i,data in enumerate(train_data_loader,0):
        inputs, labels,paths = data[0].to(device), data[1].to(device),data[2]
        optimizer.zero_grad()
        outputs,_ = model(inputs)        
        loss = criterion(outputs,labels)
        iter_loss+=loss.item()
        loss.backward()
        optimizer.step()

        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum()
        iterations += 1

    train_loss.append(iter_loss/iterations)
    train_accuracy.append((100 * correct / len(train_data)))

    # loss = 0.0
    # correct = 0
    # iterations = 0
    # model.eval()

    # for i, data in enumerate(val_data_loader):
    #     inputs, labels = data[0].to(device), data[1].to(device)
    #     outputs = model(inputs)     
    #     loss = criterion(outputs, labels)
    #     loss += loss.item()
    #     _, predicted = torch.max(outputs, 1)
    #     correct += (predicted == labels).sum()

    #     iterations+=1

    # test_loss.append(loss/iterations)
    # # Record the Testing accuracy
    # test_accuracy.append((100 * correct / len(val_data)))
    stop = time.time()
    # print ('Epoch {}/{}, Training Loss: {:.3f}, Training Accuracy: {:.3f}, Validation Loss: {:.3f}, Validation Acc: {:.3f}, Time: {:.3f}s'
    #            .format(epoch+1, EPOCHS, train_loss[-1], train_accuracy[-1], test_loss[-1], test_accuracy[-1], stop-start))
    print ('Epoch {}/{}, Training Loss: {:.3f}, Training Accuracy: {:.3f}, Time: {:.3f}s'
               .format(epoch+1, EPOCHS, train_loss[-1], train_accuracy[-1], stop-start))


## Testing of the Network

In [None]:
correct = 0
total = 0
classes =['NORMAL','SPLICING']
# confusion_matrix = torch.zeros(2, 2)
correct_0 = 0
correct_1 = 0
class_correct = list(0. for i in range(2))
class_total = list(0. for i in range(2))
model.eval()
with torch.no_grad():
  for data in test_data_loader:
      images,labels,paths = data[0].to(device), data[1].to(device),data[2]
      outputs, op_1 = model(images)
      _, predicted = torch.max(outputs,1)
      total += labels.size(0)
      c = (predicted == labels).squeeze()
      for i in range(BATCH_SIZE):
        if(predicted == 1):
          HeatmapGeneration(images,paths)
      for i in range(4):
          # print(labels)
          label = labels
          # print(c)
          class_correct[label] += c.item()
          class_total[label] += 1
      correct+=(predicted == labels).sum().item()
      for t, p in zip(labels.view(-1), predicted.view(-1)):
              confusion_matrix[t.long(), p.long()] += 1
acc = 100*correct/total
print(correct)
print(total)
print('Testing accuracy: ' + str(acc))
# print(confusion_matrix)

for i in range(2):
    print('Accuracy of %5s : %2d %%' % (
        classes[i], 100 * class_correct[i] / class_total[i]))
acc0 = 100 * class_correct[0] / class_total[0]
acc1 = 100 * class_correct[1] / class_total[1]
print_row_end = []
row_end = [acc,acc0,acc1]