In [None]:
import os
import cv2 # type: ignore
import numpy as np # type: ignore
from keras.utils import to_categorical # type: ignore
from keras.preprocessing.sequence import pad_sequences # type: ignore
from tensorflow.keras.models import load_model # type: ignore
import tensorflow as tf # type: ignore
from tensorflow.keras import Sequential # type: ignore
from tensorflow.keras.layers import Conv2D, MaxPooling2D, AveragePooling2D, BatchNormalization # type: ignore
from tensorflow.keras.layers import LSTM , Dense, Input # type: ignore
from tensorflow.keras.models import Model # type: ignore


Preprocess

In [None]:
def video_to_grayscale(input_path):
    cap = cv2.VideoCapture(input_path)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    video = []
    count = 0
    while count<frame_count:
        ret, frame = cap.read()
        if not ret:
            break
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        img = cv2.resize(image, (128, 128), interpolation=cv2.INTER_AREA)
        video.append(img)  
#Display resized grayscale videos
    for i in range (0,len(video)):
        cv2.imshow("image",video[i])
        if cv2.waitKey(1) & 0xFF == ord('q'): 
            break
    print(len(video))
    return video

def video_to_grayscale_extract_frames(input_path, num_frames):
    cap = cv2.VideoCapture(input_path)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    #print("Original frame count ", frame_count)
    video = []
    count = 0
    total_frames = min(frame_count, num_frames)
    frame_interval = max(frame_count // total_frames, 1)
    out_frame_count = 0
    while count < frame_count:
        ret, frame = cap.read()
        if not ret:
            break
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        img = cv2.resize(image, (128, 128), interpolation=cv2.INTER_AREA)
        video.append(img)
        # Increment output frame count and break if reached desired number of frames
        out_frame_count += 1
        if out_frame_count >= total_frames:
            break
    cap.release()
    return np.stack(video)

In [None]:
def reshape_to_3d(input_data):
    # Reshape array to 3D
    """reshaped_data = np.squeeze(input_data)  # Remove single-dimensional entries
    reshaped_data = np.reshape(reshaped_data, (reshaped_data.shape[0], -1, 1))
    return reshaped_data"""
    reshaped_tensor = tf.squeeze(input_data)
    # Reshape the tensor to 3D
    reshaped_tensor = tf.reshape(reshaped_tensor, (tf.shape(reshaped_tensor)[0], -1, 1))
    return reshaped_tensor

Difference

In [None]:
#Finds difference of two input frames
def difference (frame1, frame2):
    frame_difference = frame1 - frame2
    return frame_difference

#Difference layer of FDCNN
def forward_difference(video):
    fdiff = []
    for i in range (0,(len(video)-1)):
        img1 = video[i]
        img2 = video[i+1]
        frame_difference = difference(img1,img2)
        fdiff.append(frame_difference)
    return fdiff

#Difference layer of PDCNN
def post_difference(video_feature):
    pdiff = []
    for i in range (0,(len(video_feature)-1)):
        img1 = video_feature[i]
        img2 = video_feature[i+1]
        feature_difference = difference(img1,img2)
        pdiff.append(feature_difference)
    return pdiff

Feature extraction and fusion

In [None]:
def create_cnn_model(input_shape):

    cnn = Sequential()

    cnn.add(tf.keras.Input(shape=input_shape))
    cnn.add(Conv2D(filters=2, kernel_size=(5, 5), strides=(1, 1), padding='valid', activation='elu'))
    cnn.add(MaxPooling2D(pool_size=(2, 2)))
    
    cnn.add(Conv2D(filters=4, kernel_size=(3, 3), strides=(1, 1), padding='valid', activation='elu'))
    cnn.add(MaxPooling2D(pool_size=(2, 2)))
    
    cnn.add(Conv2D(filters=8, kernel_size=(3, 3), strides=(1, 1), padding='valid', activation='elu'))
    cnn.add(AveragePooling2D(pool_size=(2, 2)))
    
    cnn.add(Conv2D(filters=48, kernel_size=(3, 3), strides=(1, 1), padding='valid', activation='elu'))
    cnn.add(AveragePooling2D(pool_size=(2, 2)))
    
    cnn.add(Conv2D(filters=24, kernel_size=(1, 1), strides=(1, 1), padding='valid', activation='elu'))
    cnn.add(AveragePooling2D(pool_size=(2, 2)))
    
    cnn.add(Conv2D(filters=48, kernel_size=(3, 3), strides=(1, 1), padding='valid', activation='elu'))
    cnn.add(BatchNormalization())
    cnn.compile(loss="mse", optimizer = "adam")
    return cnn
    #cnn.summary()

In [None]:
input_shape=(128,128,1)
#CNN model for FDCNN
fdcnn = create_cnn_model(input_shape)
#CNN model for PDCNN
pdcnn = create_cnn_model(input_shape)

In [None]:
#Extracts pixel differene features
def fdfeature_extraction(preprocessed_video,fdcnn):
    forward_diff = forward_difference(preprocessed_video)
    #display_forward_difference(preprocessed_video)
    fdfeature = []
    for frame in forward_diff:
        frame = frame.reshape((128, 128, 1))  
        fdf = fdcnn.predict(frame[np.newaxis, ...])
        fdfeature.append(fdf)
    return fdfeature

#Extracts deep differene features
def pdfeature_extraction(preprocessed_video,pdcnn):
    pdfeature = []
    for frame in preprocessed_video:
        frame = frame.reshape((128, 128, 1))  
        pdf = pdcnn.predict(frame[np.newaxis, ...])
        pdfeature.append(pdf)
    post_diff = post_difference(pdfeature)
    #display_post_difference(pdfeature)
    return post_diff

#Concatenates pixel difference features and deep difference features
def feature_fusion(preprocessed_video):
    pixel_difference_feature = fdfeature_extraction(preprocessed_video,fdcnn)
    deep_difference_feature = pdfeature_extraction(preprocessed_video,pdcnn)
    msd_feature = []
    for pdf, ddf in zip(pixel_difference_feature, deep_difference_feature):
        concatenated_feature = tf.concat([pdf, ddf], axis=-1)
        msd_feature.append(concatenated_feature)
    return tf.stack(msd_feature, axis=0)

LSTM Model

In [None]:
def lstm():
    input_layer = Input(shape=(None,96))
    lstm_layer = LSTM(144, dropout=0.2, recurrent_dropout=0.2)(input_layer)
    #attention_output = Attention(use_scale=False, dropout=0.2)([lstm_layer, lstm_layer])
    output_layer = Dense(1, activation='softmax')(lstm_layer)
    model = Model(inputs=input_layer, outputs=output_layer)
    return model

Load data for training

In [None]:
def load_data(data_path,numframes):
    x = []
    y = []
    for folder in ["Original", "Forged"]:
        folder_path = os.path.join(data_path, folder)
        for filename in os.listdir(folder_path):
            video_path = os.path.join(folder_path, filename)
            preprocessed_video = video_to_grayscale_extract_frames(video_path, numframes)
            
            # Check if the video has 100 frames, otherwise skip it
            if len(preprocessed_video) != numframes:
                continue
            
            msd = feature_fusion(preprocessed_video)
            msd_reshaped = reshape_to_3d(msd)
            x.append(msd_reshaped)
            if folder == "Original":
                y.append(0)
            else:
                y.append(1)
    return x, y

Training data

In [None]:
train_data_path = "D:/Final_Project/Dataset/ForgeryDataset/Insertion1/Training"
x_train, y_train = load_data(train_data_path,100)

Testing data

In [None]:
test_data_path = "D:/Final_Project/Dataset/ForgeryDataset/Insertion1/Testing"
x_test, y_test = load_data(test_data_path,100)

In [None]:
x_train = np.array(x_train)
y_train = np.array(y_train)
x_test = np.array(x_test)
y_test = np.array(y_test)

In [None]:
vfd_model = lstm()
#vfd_model = load_model("C:/Users/Preethi/Documents/InterframeVideoForgeryDetection/models/vfd_model.keras")
vfd_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
vfd_model.summary()

In [None]:
vfd_model.fit(x_train, y_train, epochs=10, batch_size=16, validation_data=(x_test, y_test))

In [None]:
predictions = vfd_model.predict(x_test)
print(len(predictions))

In [None]:
print(vfd_model.evaluate(x_test, y_test))

Example

In [None]:
vidpath = "D:/Final_Project/Dataset/ForgeryDataset/Deletion/Training/Original/original_train (40).avi"
preprocessed_video = video_to_grayscale(vidpath)
msd = feature_fusion(preprocessed_video)
msd_reshaped = reshape_to_3d(msd)
msd_re = np.array([msd_reshaped])
print(vfd_model.predict(msd_re))

Save model

In [None]:
vfd_model.save("C:/Users/Preethi/Documents/InterframeVideoForgeryDetection/models/vfd_model.keras")

Load model