image_net

In [1]:
import numpy as np
import json
from keras.utils.data_utils import get_file
from keras import backend as K

In [3]:
import glob as gb
import os

In [4]:
CLASS_INDEX = None
CLASS_INDEX_PATH = 'https://s3.amazonaws.com/deep-learning-models/image-models/imagenet_class_index.json'

In [5]:
def preprocess_input(x, dim_ordering='default'):
    if dim_ordering == 'default':
        dim_ordering = K.image_data_format()
    x = x.astype(float)
    if dim_ordering == 'th':
        x[:, 0, :, :] -= 103.939
        x[:, 1, :, :] -= 116.779
        x[:, 2, :, :] -= 123.68
        # 'RGB'->'BGR'
        x = x[:, ::-1, :, :]
    else:
        x[:, :, :, 0] -= 103.939
        x[:, :, :, 1] -= 116.779
        x[:, :, :, 2] -= 123.68
        # 'RGB'->'BGR'
        x = x[:, :, :, ::-1]
    return x

In [6]:
def decode_predictions(preds, top=5):
    global CLASS_INDEX
    if len(preds.shape) != 2 or preds.shape[1] != 512:
        raise ValueError('`decode_predictions` expects '
                         'a batch of predictions '
                         '(i.e. a 2D array of shape (samples, 1000)). '
                         'Found array with shape: ' + str(preds.shape))
    if CLASS_INDEX is None:
        fpath = get_file('imagenet_class_index.json',
                         CLASS_INDEX_PATH,
                         cache_subdir='models')
        CLASS_INDEX = json.load(open(fpath))
    results = []
    for pred in preds:
        top_indices = pred.argsort()[-top:][::-1]
        result = [tuple(CLASS_INDEX[str(i)]) + (pred[i],) for i in top_indices]
        results.append(result)
    return results

VGG16

In [7]:
import tensorflow

In [8]:
import warnings
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Flatten, Dense, Input
from keras.layers import Conv2D, MaxPooling2D
from keras.preprocessing import image
from keras import backend as K

In [9]:

TF_WEIGHTS_PATH = 'https://github.com/fchollet/deep-learning-models/releases/download/v0.1/vgg16_weights_tf_dim_ordering_tf_kernels.h5'
TF_WEIGHTS_PATH_NO_TOP = 'https://github.com/fchollet/deep-learning-models/releases/download/v0.1/vgg16_weights_tf_dim_ordering_tf_kernels_notop.h5'


In [10]:
import tensorflow.compat.v2 as tf
from keras import backend
from keras.applications import imagenet_utils
from keras.engine import training
from keras.layers import VersionAwareLayers
from keras.utils import data_utils
from keras.utils import layer_utils
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, GlobalAveragePooling2D, GlobalMaxPooling2D

# isort: off
from tensorflow.python.util.tf_export import keras_export

In [11]:
def VGG16(
    include_top=True,
    weights="imagenet",
    input_tensor=None,
    input_shape=None,
    pooling=None,
    classes=1000,
    classifier_activation="softmax",
):
    if not (weights in {"imagenet", None} or tf.io.gfile.exists(weights)):
        raise ValueError(
            "The `weights` argument should be either "
            "`None` (random initialization), `imagenet` "
            "(pre-training on ImageNet), "
            "or the path to the weights file to be loaded.  Received: "
            f"weights={weights}"
        )

    if weights == "imagenet" and include_top and classes != 1000:
        raise ValueError(
            'If using `weights` as `"imagenet"` with `include_top` '
            "as true, `classes` should be 1000.  "
            f"Received `classes={classes}`"
        )
    # Determine proper input shape
    input_shape = imagenet_utils.obtain_input_shape(
        input_shape,
        default_size=224,
        min_size=32,
        data_format=backend.image_data_format(),
        require_flatten=include_top,
        weights=weights,
    )

    if input_tensor is None:
        img_input = Input(shape=input_shape)
    else:
        if not backend.is_keras_tensor(input_tensor):
            img_input = Input(tensor=input_tensor, shape=input_shape)
        else:
            img_input = input_tensor
    # Block 1
    x = Conv2D(
        64, (3, 3), activation="relu", padding="same", name="block1_conv1"
    )(img_input)
    x = Conv2D(
        64, (3, 3), activation="relu", padding="same", name="block1_conv2"
    )(x)
    x = MaxPooling2D((2, 2), strides=(2, 2), name="block1_pool")(x)

    # Block 2
    x = Conv2D(
        128, (3, 3), activation="relu", padding="same", name="block2_conv1"
    )(x)
    x = Conv2D(
        128, (3, 3), activation="relu", padding="same", name="block2_conv2"
    )(x)
    x = MaxPooling2D((2, 2), strides=(2, 2), name="block2_pool")(x)

    # Block 3
    x = Conv2D(
        256, (3, 3), activation="relu", padding="same", name="block3_conv1"
    )(x)
    x = Conv2D(
        256, (3, 3), activation="relu", padding="same", name="block3_conv2"
    )(x)
    x = Conv2D(
        256, (3, 3), activation="relu", padding="same", name="block3_conv3"
    )(x)
    x = MaxPooling2D((2, 2), strides=(2, 2), name="block3_pool")(x)

    # Block 4
    x = Conv2D(
        512, (3, 3), activation="relu", padding="same", name="block4_conv1"
    )(x)
    x = Conv2D(
        512, (3, 3), activation="relu", padding="same", name="block4_conv2"
    )(x)
    x = Conv2D(
        512, (3, 3), activation="relu", padding="same", name="block4_conv3"
    )(x)
    x = MaxPooling2D((2, 2), strides=(2, 2), name="block4_pool")(x)

    # Block 5
    x = Conv2D(
        512, (3, 3), activation="relu", padding="same", name="block5_conv1"
    )(x)
    x = Conv2D(
        512, (3, 3), activation="relu", padding="same", name="block5_conv2"
    )(x)
    x = Conv2D(
        512, (3, 3), activation="relu", padding="same", name="block5_conv3"
    )(x)
    x = MaxPooling2D((2, 2), strides=(2, 2), name="block5_pool")(x)

    if include_top:
        # Classification block
        x = Flatten(name="flatten")(x)
        x = Dense(4096, activation="relu", name="fc1")(x)
        x = Dense(4096, activation="relu", name="fc2")(x)

        imagenet_utils.validate_activation(classifier_activation, weights)
        x = Dense(
            classes, activation=classifier_activation, name="predictions"
        )(x)
    else:
        if pooling == "avg":
            x = GlobalAveragePooling2D()(x)
        elif pooling == "max":
            x = GlobalMaxPooling2D()(x)

    # Ensure that the model takes into account
    # any potential predecessors of `input_tensor`.
    if input_tensor is not None:
        inputs = layer_utils.get_source_inputs(input_tensor)
    else:
        inputs = img_input
    # Create model.
    model = training.Model(inputs, x, name="vgg16")

    # Load weights.
    if weights == "imagenet":
        if include_top:
            weights_path = data_utils.get_file(
                "vgg16_weights_tf_dim_ordering_tf_kernels.h5",
                TF_WEIGHTS_PATH,
                cache_subdir="models",
                file_hash="64373286793e3c8b2b4e3219cbf3544b",
            )
        else:
            weights_path = data_utils.get_file(
                "vgg16_weights_tf_dim_ordering_tf_kernels_notop.h5",
                TF_WEIGHTS_PATH_NO_TOP,
                cache_subdir="models",
                file_hash="6d6bbae143d832006294945121d1f1fc",
            )
        model.load_weights(weights_path)
    elif weights is not None:
        model.load_weights(weights)

    return model

In [12]:
model = VGG16(weights='imagenet',include_top=False, pooling='avg')

In [13]:
from PIL import Image

if __name__ == '__main__':

    img_path = 'elephant.jpg'
    img = Image.open(img_path)
    img = img.resize((224, 224))
    x = np.array(img)
    x = np.expand_dims(x, axis=0)
    x = preprocess_input(x)
    print('Input image shape:', x.shape)

    preds = model.predict(x)
    print(preds.shape)
    pred = decode_predictions(preds)
    print('Predicted:', pred[0][0][1])

Input image shape: (1, 224, 224, 3)
(1, 512)
Predicted: Afghan_hound


Keyframes extraction from videos using VSUMM

In [14]:
from sklearn.decomposition import PCA

Extracts the embeddings of the frames from the model and is inserted into an array called `features` to be fed to the kmeans clustering algorithm

In [15]:
def get_cnn_feat(frames_raw, frames_shape):
    frames=[]
    pca = PCA(n_components=frames_shape)
    for im in frames_raw:
        #print im.shape
        im = cv2.resize(im, (224, 224)).astype(np.float32)
        im[:,:,0] -= 103.939
        im[:,:,1] -= 116.779
        im[:,:,2] -= 123.68
        # print im.shape
        im = np.expand_dims(im, axis=0)
        #print im.shape
        frames.append(np.asarray(im))
    frames = np.array(frames)
    #print frames.shape

    model = VGG16(weights='imagenet', include_top=False, pooling='avg')

    i = 0
    features = np.ndarray((frames.shape[0], 512), dtype=np.float32)
    for x in frames:
        print(model.predict(x).shape)
        #print x.shape
        features[i,:] = model.predict(x)
        i+=1
    return pca.fit_transform(features)

Gets coloured Histogram which is generally used in VSUMM but not used in this case as we cluster the embeddings instead of the histogram.

In [16]:
def get_color_hist(frames_raw, num_bins):
    print ("Generating linear Histrograms using OpenCV")
    channels=['b','g','r']
    
    hist=[]
    for frame in frames_raw:
        feature_value=[cv2.calcHist([frame],[i],None,[int(num_bins)],[0,256]) for i,col in enumerate(channels)]
        hist.append(np.asarray(feature_value).flatten())
    
    hist=np.asarray(hist)
    #print "Done generating!"
    print ("Shape of histogram: " + str(hist.shape))
    
    return hist

In [17]:
# generic VSUMM to test with different features
# k means clustering to generate video summary
import sys

import numpy as np
import cv2
import scipy.io
import os

# k-means
from sklearn.cluster import KMeans
from sklearn.mixture import GaussianMixture
import IPython.display 
from IPython.display import Video, Image, display
import matplotlib.pyplot as plt


Saving the keyframes to respective created paths

In [18]:
# frame chosen every k frames
sampling_rate= 7

# percent of video for summary
percent= 2

# globalizing
num_centroids=0
SaveFrames = False
SaveKeyFrames = True

main() function to execute the code

In [None]:
def main():
    global num_bins, sampling_rate, num_centroids, percent
    #for folder in  os.listdir("faceforensics") :
        #if folder == 'manipulated_sequences':
    files_fake = gb.glob(pathname= str( 'faceforensics/manipulated_sequences/Deepfakes/c23/videos' + '/*.mp4'))
    #else:
    files_real = gb.glob(pathname= str('faceforensics/original_sequences/youtube/c23/videos' + '/*.mp4'))
    #In case code is stuck start with the ending video number and by somehow filtering the string and change j to the last frame value + 1 
    j = 8972          
    for file_fake, file_real in zip(files_fake, files_real):
        print(file_fake, file_real)
        print ("Opening fake video!")
        capture_fake = cv2.VideoCapture(file_fake)
        print ("Video opened\nChoosing frames_fake")
        Video(file_fake, width = 100)
    
        #choosing the subset of frames_fake from which video summary will be generated
        frames_fake = []
        i=0
        while(capture_fake.isOpened()):
            if i % int(sampling_rate) == 0:
                capture_fake.set(1,i)
                # print i
                ret, frame_fake = capture_fake.read()
                if len(frames_fake) == 512 or frame_fake is None:
                    break
                #im = np.expand_dims(im, axis=0) #convert to (1, width, height, depth)
                # print frame.shape
                frames_fake.append(np.asarray(frame_fake))
            i+=1
        frames_fake = np.array(frames_fake)#convert to (num_frames_fake, width, height, depth)
        print ("frames_fake chosen")
        print ("Length of fake video %d" % frames_fake.shape[0])
        # REPLACE WITH APPROPRIATE FEATURES

        features = get_cnn_feat(frames_fake, frames_fake.shape[0])
        print ("Shape of fake frame features " + str(features.shape))

        # clustering: defaults to using the features
        print ("Clustering")

        # converting percentage to actual number
        num_centroids=int(percent*frames_fake.shape[0]*sampling_rate/100)   

	    # choose number of centroids for clustering from user required frames_fake (specified in GT folder for each video)
        if percent == -1:
            video_address = file_fake.split('/')
            gt_file = video_address[len(video_address) - 1].split('.')[0] + '.mat'
            video_address[len(video_address) - 1] = gt_file
            video_address[len(video_address) - 2] = 'GT'
            gt_file = '/'.join(video_address)
            num_frames_fake = int(scipy.io.loadmat(gt_file).get('user_score').shape[0])

        if len(frames_fake) < num_centroids:
            print ("Samples too less to generate such a large summary")
            print ("Changing to maximum possible centroids")
            num_centroids=frames_fake.shape[0]
        #kmeans = GaussianMixture(n_components=num_centroid)
        kmeans = KMeans(n_clusters=num_centroids)
        print ("Done Clustering!")

        print ("Generating summary frames")
        summary_frames_fake=[]

        # transforms into cluster-distance space (n_cluster dimensional)
        """feature_transform= kmeans.fit_transform(features)
        frame_indices=[]
        for cluster in range(feature_transform.shape[1]):
            print ("Frame number: %d" % (np.argmin(feature_transform.T[cluster])*sampling_rate))
            frame_indices.append(np.argmin(feature_transform.T[cluster]))"""
        feat = kmeans.fit_predict(features)
        #plt.scatter(features[:, 0], features[:, 1], c = feat, s = 40, cmap = 'viridis')    
        frame_indices = []
        for cluster_center in kmeans.cluster_centers_:  # Loop over the cluster centers
            distances_to_center = np.linalg.norm(features - cluster_center, axis=1)  # Distances to the center
            closest_point_index = np.argmin(distances_to_center)  # Index of the closest data point
            frame_indices.append(closest_point_index)  # Choose the frame closest to the cluster center
    
        print(sorted(frame_indices))
	    # frames_fake generated in sequence from original video
        frame_indices=sorted(frame_indices)
        summary_frames_fake=[frames_fake[i] for i in frame_indices]
        print ("Generated summary")
        
        print ("Opening original video!")
        capture_real = cv2.VideoCapture(file_real)
        print ("Video opened\nChoosing frames_real")
        print(Video(file_real, width = 100))
        
        frames_real = []
        k = 0
        while(capture_real.isOpened()):
            if k % int(sampling_rate) == 0:
                capture_real.set(1,k)
                ret, frame_real = capture_real.read()
                if len(frames_real) == 512 or frame_real is None:
                    break
                #im = np.expand_dims(im, axis=0) #convert to (1, width, height, depth)
                # print frame.shape
                frames_real.append(np.asarray(frame_real))
            k+=1
        frames_real = np.array(frames_real)#convert to (num_frames_real, width, height, depth)
        print ("frames_real chosen")
        print ("Length of real video %d" % frames_real.shape[0])
        
        for i in frame_indices:
            #if folder == 'manipulated_sequences':
            frames_fake[i] = cv2.cvtColor(frames_fake[i], cv2.COLOR_BGR2RGB)
            cv2.imwrite(os.path.join('Training_images/fake', f'frame_{j}.jpg'), frames_fake[i])
            #else:
            cv2.imwrite(os.path.join('Training_images/real', f'frame_{j}.jpg'), frames_real[i])  
            j += 1 
        print(f"Length of selected Keyframes_fake:{len(frame_indices)}")        
    
if __name__ == '__main__':
        main()