Processing replay-attack into image frames for training

In [1]:
import os
import cv2
import shutil
import random
import string
import numpy as np
from tqdm.notebook import tqdm
# from imutils import face_utils
import matplotlib.pyplot as plt

In [2]:
face_cascade = cv2.CascadeClassifier('./haarcascade_frontalface_default.xml')  

In [3]:
def extract_face(img):
    # Convert to grayscale 
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)  
  
    # Detect the faces  
    faces = face_cascade.detectMultiScale(gray, 1.3, 4)  
    
    # Draw rectangle around the faces and crop the faces
    for (x, y, w, h) in faces:
        faces = img[y:y + h, x:x + w]
        
    return faces

In [4]:
img = cv2.imread('./test_img.png')

In [5]:
extract_face(img)

array([[[  0,  12,   7],
        [ 44,  54,  51],
        [ 33,  46,  42],
        ...,
        [182, 192, 185],
        [185, 196, 190],
        [190, 204, 198]],

       [[  0,  17,  12],
        [ 66,  81,  77],
        [ 65,  80,  74],
        ...,
        [178, 190, 185],
        [182, 197, 194],
        [191, 208, 205]],

       [[ 47,  63,  58],
        [ 36,  54,  49],
        [  0,  21,  11],
        ...,
        [184, 199, 196],
        [182, 201, 199],
        [184, 203, 202]],

       ...,

       [[ 14,  18,  24],
        [ 14,  19,  26],
        [  9,  18,  27],
        ...,
        [185, 201, 210],
        [178, 193, 201],
        [167, 181, 188]],

       [[ 14,  18,  25],
        [ 12,  18,  25],
        [  4,  14,  24],
        ...,
        [188, 203, 210],
        [177, 191, 196],
        [164, 178, 180]],

       [[ 11,  14,  25],
        [ 19,  24,  34],
        [ 14,  21,  33],
        ...,
        [188, 201, 206],
        [183, 194, 197],
        [178, 188, 190]]

In [6]:
def extract_face_and_save(image_path, output_path):
    # Load the image
    img = cv2.imread(image_path)
    
    # Convert to grayscale
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
  
    # Detect the faces
    faces = face_cascade.detectMultiScale(gray, 1.3, 4)
    
    # Draw rectangle around the faces and save the image
    for (x, y, w, h) in faces:
        cv2.rectangle(img, (x, y), (x + w, y + h), (0, 255, 0), 2)  # Draw a green rectangle around the face
        
    # Save the image with rectangles
    cv2.imwrite(output_path, img)
    
    return img

In [7]:
input_image_path = './test_img.png'
output_image_path = 'test_img_output_face_bound.png'
output_image = extract_face_and_save(input_image_path, output_image_path)


In [8]:
# START CAPTURING VIDEOS

def extract_multiple_videos(intput_filenames,pathOut,img_class,frame_rate, resize):
    """Extract video files into sequence of images.
       Intput_filenames is a list for video file names"""

    i = 0  # Counter of first video

    # Iterate file names:
    for intput_filename in intput_filenames:
        img_name = 'client'
        pattern = intput_filename.split('/')
        for p in pattern:
            if 'client' in p:
                f_name = p.split('_')
                for name in f_name:
                    if name.find('client') != -1:
                        img_name = name
        
        # print('the image will be named as:', img_name)
        
        
        sec_count = 0
        cap = cv2.VideoCapture(intput_filename)
        cap.set(cv2.CAP_PROP_POS_MSEC,(sec_count*frame_rate))

        # Keep iterating break
        while True:
            ret, frame = cap.read()  # Read frame from first video

            if ret:
                try:
                    cropped_faces = extract_face(frame)
                    cropped_faces = cv2.resize(cropped_faces,(resize,resize))
                    write_path = pathOut + f'{img_name}_{i}_{img_class}.png'
                    cv2.imwrite(write_path, cropped_faces)  # Write frame to JPEG file (1.jpg, 2.jpg, ...)
                    i += 1 # Advance file counter
                    sec_count += 1
                except:
                    pass
            else:
                # Break the interal loop when res status is False.
                break

            # cv2.waitKey(100) #Wait 100msec (for debugging)

        cap.release() #Release must be inside the outer loop
    return i

In [9]:
def start(in_dir , out_dir,img_class,frame_rate=500,resize=256):
    # traverse whole directory
    if not os.path.exists(out_dir):
        os.makedirs(out_dir)
    dir_list = []
    for root, dirs, files in os.walk(f'{in_dir}'):
        # select file name
        for file in files:
            # check the extension of files
            if file.endswith('.mov'):
                # print whole path of files
                path = os.path.join(root, file)
                # print(path)
                dir_list.append(path)
                # extractImages(path,'./data/train/real',frame_rate=15000)
    count = extract_multiple_videos(dir_list,f'{out_dir}',img_class,frame_rate=frame_rate, resize=resize)
    print(f'TOTAL FRAMES/IMAGES FORMED: {count}')

In [10]:
# START CAPTURING VIDEOS

def extract_attack_multiple_videos(intput_filenames,pathOut,img_class,frame_rate, resize):
    """Extract video files into sequence of images.
       Intput_filenames is a list for video file names"""

    i = 0  # Counter of first video

    # Iterate file names:
    for intput_filename in intput_filenames:
        print("Processing video:", intput_filename)
        img_name = 'client'
        pattern = intput_filename.split('/')
        for p in pattern:
            if 'client' in p:
                f_name = p.split('_')
                for name in f_name:
                    if name.find('client') != -1:
                        img_name = name
        
        # print('the image will be named as:', img_name)
        
        
        sec_count = 0
        cap = cv2.VideoCapture(intput_filename)
        cap.set(cv2.CAP_PROP_POS_MSEC,(sec_count*frame_rate))

        # Keep iterating break
        while True:
            ret, frame = cap.read()  # Read frame from first video
            if ret:
                try:
                    cropped_faces = extract_face(frame)
                    cropped_faces = cv2.resize(cropped_faces,(resize,resize))
                    write_path = pathOut + f'{img_name}_{i}_{img_class}.png'
                    cv2.imwrite(write_path, cropped_faces)  # Write frame to JPEG file (1.jpg, 2.jpg, ...)
                    i += 1 # Advance file counter
                    sec_count += 1
                except:
                    pass
            else:
                # Break the interal loop when res status is False.
                break

            # cv2.waitKey(100) #Wait 100msec (for debugging)

        cap.release() #Release must be inside the outer loop
    return i

In [11]:
def start_attack(in_dir , out_dir,img_class,frame_rate=500,resize=256):
    # traverse whole directory
    if not os.path.exists(out_dir):
        os.makedirs(out_dir)
    dir_list = []
    for root, dirs, files in os.walk(f'{in_dir}'):
        # select file name
        for file in files:
            # check the extension of files
            if file.endswith('.mov'):
                # print whole path of files
                path = os.path.join(root, file)
                # print(path)
                dir_list.append(path)
                # extractImages(path,'./data/train/real',frame_rate=15000)
    count = extract_attack_multiple_videos(dir_list,f'{out_dir}',img_class,frame_rate=frame_rate, resize=resize)
    print(f'TOTAL FRAMES/IMAGES FORMED: {count}')

### TRANSFORMING TRAINING VIDEOS TO TRAINING IMAGES

In [17]:
train_real_vids_path = 'D:/BTP/dataset/Replay-Attack/train/real/'
train_spoof_fixed_vids_path = 'D:/BTP/dataset/Reply-Attack/train/attack/fixed/'
train_spoof_hand_vids_path = 'D:/BTP/dataset/Reply-Attack/train/attack/hand/'

In [18]:
train_real_imgs_path = './data/Replay-Attack-processed/train/real/'
train_spoof_fixed_imgs_path = './data/Replay-Attack-processed/train/spoof/fixed/'
train_spoof_hand_imgs_path = './data/Replay-Attack-processed/train/spoof/hand/'

In [19]:
train_dir_real = {"in_dir": train_real_vids_path,"out_dir":train_real_imgs_path}
train_dir_attack_fixed = {"in_dir":train_spoof_fixed_vids_path, 'out_dir':train_spoof_fixed_imgs_path}
train_dir_attack_hand = {"in_dir":train_spoof_hand_vids_path, 'out_dir':train_spoof_hand_imgs_path}

In [15]:
start(train_dir_real['in_dir'],train_dir_real['out_dir'],img_class="real",frame_rate=500,resize=256) 

TOTAL FRAMES/IMAGES FORMED: 22490


In [15]:
start(train_dir_attack_fixed['in_dir'],train_dir_attack_fixed['out_dir'],img_class="attack_fixed",frame_rate=500,resize=256) 

TOTAL FRAMES/IMAGES FORMED: 35436


In [15]:
start(train_dir_attack_hand['in_dir'],train_dir_attack_hand['out_dir'],img_class="attack_hand",frame_rate=500,resize=256) 

TOTAL FRAMES/IMAGES FORMED: 33707


### TRANSFORMING TEST VIDEOS TO TESTING IMAGES 

In [20]:
test_real_vids_path = 'D:/BTP/dataset/Replay-Attack/test/real/'
test_spoof_fixed_vids_path = 'D:/BTP/dataset/Replay-Attack/test/attack/fixed/'
test_spoof_hand_vids_path = 'D:/BTP/dataset/Replay-Attack/test/attack/hand/'

In [21]:
test_real_imgs_path = './data/Replay-Attack-processed/test/real/'
test_spoof_fixed_imgs_path = './data/Replay-Attack-processed/test/spoof/fixed/'
test_spoof_hand_imgs_path = './data/Replay-Attack-processed/test/spoof/hand/'

In [22]:
test_dir_real = {"in_dir": test_real_vids_path,"out_dir":test_real_imgs_path}
test_dir_attack_fixed = {"in_dir":test_spoof_fixed_vids_path, 'out_dir':test_spoof_fixed_imgs_path}
test_dir_attack_hand = {"in_dir":test_spoof_hand_vids_path, 'out_dir':test_spoof_hand_imgs_path}

In [15]:
start(test_dir_real['in_dir'],test_dir_real['out_dir'],img_class="real",frame_rate=2000,resize=256) 

TOTAL FRAMES/IMAGES FORMED: 29229


In [15]:
start(test_dir_attack_fixed['in_dir'],test_dir_attack_fixed['out_dir'],img_class="attack_fixed",frame_rate=500,resize=256) 

TOTAL FRAMES/IMAGES FORMED: 46700


In [15]:
start(test_dir_attack_hand['in_dir'],test_dir_attack_hand['out_dir'],img_class="attack_hand",frame_rate=500,resize=256) 

TOTAL FRAMES/IMAGES FORMED: 44487


In [4]:
import os
import random
import csv

# Define the paths to your train and test folders
train_folder = './data/Replay-Attack-processed/train/'
test_folder = './data/Replay-Attack-processed/test/'

# Function to label and shuffle data in a folder
def process_folder(folder_path, label):
    image_paths = []
    for root, dirs, files in os.walk(folder_path):
        for file in files:
            if file.endswith('.png'):
                image_path = os.path.join(root, file)
                image_paths.append((image_path, label))
    
    # Shuffle the image paths
    random.shuffle(image_paths)
    
    return image_paths

# Process the train and test folders
train_real_paths = process_folder(os.path.join(train_folder, 'real'), 1)
test_real_paths = process_folder(os.path.join(test_folder, 'real'), 1)

# Include images from the "hand" and "fixed" subfolders within the "spoof" folder
train_fake_paths = process_folder(os.path.join(train_folder, 'spoof/hand'), 0)
train_fake_paths += process_folder(os.path.join(train_folder, 'spoof/fixed'), 0)

test_fake_paths = process_folder(os.path.join(test_folder, 'spoof/hand'), 0)
test_fake_paths += process_folder(os.path.join(test_folder, 'spoof/fixed'), 0)

# Combine real and fake paths
train_data = train_real_paths + train_fake_paths
test_data = test_real_paths + test_fake_paths

# Shuffle the combined data
random.shuffle(train_data)
random.shuffle(test_data)

# Define CSV filenames
train_csv_filename = 'train_data.csv'
test_csv_filename = 'test_data.csv'

# Write data to CSV files
def write_to_csv(filename, data):
    with open(filename, 'w', newline='') as csvfile:
        csv_writer = csv.writer(csvfile)
        csv_writer.writerow(['name', 'label'])
        csv_writer.writerows(data)

write_to_csv(train_csv_filename, train_data)
write_to_csv(test_csv_filename, test_data)

print(f"Train data saved to {train_csv_filename}")
print(f"Test data saved to {test_csv_filename}")


Train data saved to train_data.csv
Test data saved to test_data.csv


### CHANGE DIRECTORY STRUCTURE BY PLACING IMAGES PERSON WISE

In [32]:
person_wise_dataset_train_path = './data/Reply-Attack-processed/train/images/' 

In [15]:
def get_dir_list(path):
    dir_list = []
    for root, dirs, files in os.walk(f'{path}'):
        # select file name
        for file in files:
            # check the extension of files
            if file.endswith('.png'):
                # print whole path of files
                path = os.path.join(root, file)
                # print(path)
                dir_list.append(path)
    return dir_list

In [23]:
img_path_list_train_real = get_dir_list(train_real_imgs_path)
img_path_list_train_spoof_fixed = get_dir_list(train_spoof_fixed_imgs_path)
img_path_list_train_spoof_hand = get_dir_list(train_spoof_hand_imgs_path)
print(len(img_path_list_train_real)) # ./replay-images/train/real/
print(len(img_path_list_train_spoof_fixed)) # ./replay-images/train/spoof/fixed
print(len(img_path_list_train_spoof_hand)) # ./replay-images/train/spoof/hand

22490
35438
33707


In [24]:
img_path_list_train_real[0]

'./data/Reply-Attack-processed/train/real/client001_0_real.png'

In [25]:
def get_client_objects(path_list):
    clients = {}
    for file in path_list:
        path_parts = file.split('/')
        for part in path_parts:
            if 'client' in part:
                file_name_split = part.split('_')
                client_key = file_name_split[0]
                if client_key in clients.keys():
                    clients[client_key].append(file)
                else:
                    clients[client_key] = [file]
    return clients

In [26]:
real_client_objects =  get_client_objects(img_path_list_train_real)
spoof_fixed_client_objects = get_client_objects(img_path_list_train_spoof_fixed)
spoof_hand_client_objects = get_client_objects(img_path_list_train_spoof_hand)

In [27]:
print(sorted(real_client_objects.keys()))
print(sorted(spoof_fixed_client_objects.keys()))
print(sorted(spoof_hand_client_objects.keys()))

['client001', 'client002', 'client004', 'client006', 'client007', 'client008', 'client012', 'client016', 'client018', 'client025', 'client027', 'client103', 'client105', 'client108', 'client110']
['client001', 'client002', 'client004', 'client006', 'client007', 'client008', 'client012', 'client016', 'client018', 'client025', 'client027', 'client103', 'client105', 'client108', 'client110']
['client001', 'client002', 'client004', 'client006', 'client007', 'client008', 'client012', 'client016', 'client018', 'client025', 'client027', 'client103', 'client105', 'client108', 'client110']


In [33]:
person_wise_dir_list = get_dir_list(person_wise_dataset_train_path)

In [34]:
person_wise_dir_list

[]

In [None]:
ls

 DataGeneration.ipynb                              [0m[01;34mreplay-dummy-person-wise[0m/
 [01;34mdataset[0m/                                          [01;34mreplay-images[0m/
 haarcascade_frontalface_default.xml               [01;35mtest_img.png[0m
'PIxel Wise Supervision - Siamese-Network.ipynb'


### CREATE list.txt AND APPEND FORMATED PATHS OF IMAGES TO IT

In [29]:
with open('./data/Reply-Attack-processed/train/list.txt', 'w') as f:
    for path in person_wise_dir_list:
        if '/data/Reply-Attack-processed/train/images/' in path:
            s = path.split('/data/Reply-Attack-processed/train/images/')
            p = s[1]
            f.write("%s\n" % p)

In [30]:
for client_key in sorted(real_client_objects.keys()):
    if os.path.isdir(person_wise_dataset_train_path+client_key):
        shutil.rmtree(person_wise_dataset_train_path+client_key)
    single_client_path = person_wise_dataset_train_path+client_key
    os.mkdir(single_client_path)
    
    single_client_path_real = single_client_path + '/real'
    single_client_path_spoof = single_client_path + '/spoof'
    os.mkdir(single_client_path_real)
    os.mkdir(single_client_path_spoof)
    for raw_client_image_path in real_client_objects[client_key]:
        shutil.copy2(raw_client_image_path,single_client_path_real)
    for raw_client_image_path in spoof_fixed_client_objects[client_key]:
        shutil.copy2(raw_client_image_path,single_client_path_spoof)
    for raw_client_image_path in spoof_hand_client_objects[client_key]:
        shutil.copy2(raw_client_image_path,single_client_path_spoof)

FileNotFoundError: [WinError 3] The system cannot find the path specified: './data/Reply-Attack-processed/train/images/client001'