In [1]:
## general imports
import os
import cv2

## numpy
import numpy as np

## pandas
import pandas as pd

## scipy
from scipy.io import loadmat

## matplotlib
import matplotlib.pyplot as plt
%matplotlib inline

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def normalize_data(data):
    if len(np.shape(data)) == 4:
        data = np.transpose(np.stack(data), (0, 2, 1, 3))
        original_shape = np.shape(data)
        data = data.reshape(np.shape(data)[0], np.shape(data)[1], np.shape(data)[2] * np.shape(data)[3])

        min_val = np.min(np.min(data, 2), 0)
        max_val = np.max(np.max(data, 2), 0)
        mean_val = np.mean(np.mean(data,2),0)
        std_val = np.std(np.std(data,2),0)

        for i in range(0, np.shape(data)[0]):
            for j in range(0, np.shape(data)[1]):
                data[i, j] = (data[i, j] - mean_val[j])/std_val[j]

        data = data.reshape(original_shape)
        data = np.transpose(data, (0, 2, 1, 3))
        #

    else:
        data = np.stack(data)
        temp = data.flatten()
        temp = temp[temp>0]
        min_val = np.min(temp, 0)
        max_val = np.max(temp, 0)
        mean_val = np.mean(temp, 0)
        std_val = np.std(temp, 0)

        if len(np.shape(data)) == 2:
            for i in range(0, np.shape(data)[0]):
                if (max_val - min_val) == 0:
                    print("!!!!!!!!!!!!!!!!!!!!!!!")
                data[i] = ((data[i] - mean_val) / std_val)
            data = np.stack(data)
            data = data.reshape(np.shape(data)[0], 10, 30, 1)
        elif len(np.shape(data)) == 3:
            for i in range(0, np.shape(data)[0]):
                for j in range(0, np.shape(data)[1]):
                    data[i, j] = (data[i, j] - mean_val) / std_val
            data = np.stack(data)
            data = data.reshape(np.shape(data)[0], 10, 30, int(np.shape(data)[2]))
    return data

In [3]:
def custom_pad_image(image, target_size):
    """
    Pads an image to a specified size.

    Args:
        image: The image to be padded (numpy array).
        target_size: The desired size of the padded image (tuple of integers (height, width)).

    Returns:
        The padded image (numpy array).
    """

    # Get the original image shape
    h, w = image.shape[:2]
    old_image_height, old_image_width, channels = image.shape

    # Calculate the amount of padding needed for each dimension
    new_image_width = target_size[0]
    new_image_height = target_size[1]
    color = (0,0,0)
    result = np.full((new_image_height,new_image_width, channels), color, dtype=np.uint8)

    # compute center offset
    x_center = (new_image_width - old_image_width) // 2
    y_center = (new_image_height - old_image_height) // 2

    # copy img image into center of result image
    result[y_center:y_center+old_image_height,
        x_center:x_center+old_image_width] = image

    return result

In [4]:
def split_given_size(a, size):
    ret = np.split(a, np.arange(size,len(a),size))
    if ret[-1].shape[0] != size:
        ret = ret[:-1]
    return ret

SITUATION AWARENESS

In [5]:
df = pd.read_csv("./situation_awareness_labels.csv")

In [None]:
meta_psd = []
meta_pupil_diam = []
meta_gaze_speed = []
meta_fixation = []
meta_scores = []
meta_eye_landmarks = []
meta_face_landmarks = []
chunk_lengths = {}
for user in sorted(os.listdir("../../../user_data/")):
    for task_name in sorted(os.listdir("../../../user_data/" + user + "/Task 1/")):
        if task_name == ".DS_Store" or task_name == "EEG" or task_name == "Eye Tracker" or task_name == "NASA" or task_name == "Video":
            continue
        to_save = task_name if task_name[-1] != "y" else task_name[:-10]

        # path to save dataset
        save_path = "../../../multimodal_dataset_SA/" + user + "_" + to_save

        # LOADING THE DATA
        file = f"./{user}/{task_name}.json"
        c = df["name"] == file
        filtered_df = df[c]
        if len(filtered_df) == 0:
            print(f"User {user} on task {task_name} is not available in the csv file")
            continue
        else:
            print(f"Parsing data files of user {user}, task {task_name}")   

            # PSD
            psd = []
            if task_name[-1] != 'y':
                psd = loadmat(f"../../../subject_data_for_chuhao/{user}/EEG/min_processed/backup/Task 1 Without Secondary Task/{to_save}.mat")
            else:
                psd = loadmat(f"../../../subject_data_for_chuhao/{user}/EEG/min_processed/backup/Task 1 With Secondary Task/{to_save}.mat")       
            meta_psd.append(psd['all_bands'][:,0:31,0:40])

            # eye tracker
            meta_fixation.append(np.loadtxt(f"../../../subject_data_for_chuhao/{user}/Eye Tracker/Task 1/fixation/{task_name}.txt"))
            meta_pupil_diam.append(np.loadtxt(f"../../../subject_data_for_chuhao/{user}/Eye Tracker/Task 1/pupil_diam/{task_name}.txt"))
            meta_gaze_speed.append(np.loadtxt(f"../../../subject_data_for_chuhao/{user}/Eye Tracker/Task 1/gaze_speed/{task_name}.txt"))

            # facial landmarks
            meta_eye_landmarks.append(np.loadtxt(f"../../../subject_data_for_chuhao/{user}/Video/Task 1/Eye Landmarks/{task_name}.txt",delimiter=','))
            meta_face_landmarks.append(np.loadtxt(f"../../../subject_data_for_chuhao/{user}/Video/Task 1/Face Landmarks/{task_name}.txt",delimiter=','))
            
            #label
            label = filtered_df['labels'].iloc[0]

            # video
            task_video_data = []
            for frame in sorted(os.listdir("../../../user_data/" + user + "/Task 1/" + task_name + "/Images/")):
                image_path = "../../../user_data/" + user + "/Task 1/" + task_name + "/Images/" + frame
                image = cv2.imread(image_path)
                image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
                # print(image_path)
                # image = custom_pad_image(image, target_size=(224, 224))         # to pad images from shape 112 x 112 to 224 x 224
                numpy_data = np.asarray(image).astype(np.float32)
                numpy_data = numpy_data.transpose(2, 1, 0)
                numpy_data = numpy_data / 255
                task_video_data.append(numpy_data)
            task_video_data = np.array(task_video_data)

            chunks = split_given_size(a=task_video_data, size=30)
            chunk_lengths[f"{user}/{to_save}"] = len(chunks)

# PREPROCESSING AND NORMALIZATION     
eeg_normalized = normalize_data(meta_psd)       
eye_normalized = np.concatenate((normalize_data(meta_pupil_diam), normalize_data(meta_gaze_speed)), axis=3)
face_normalized = np.concatenate((normalize_data(meta_eye_landmarks), normalize_data(meta_face_landmarks)), axis=3)


In [None]:
print(eeg_normalized.shape)
print(eye_normalized.shape)
print(face_normalized.shape)

In [None]:
i = -1
for user in sorted(os.listdir("../../../user_data/")):
    for task_name in sorted(os.listdir("../../../user_data/" + user + "/Task 1/")):
        if task_name == ".DS_Store" or task_name == "EEG" or task_name == "Eye Tracker" or task_name == "NASA" or task_name == "Video":
            continue
        to_save = task_name if task_name[-1] != "y" else task_name[:-10]

        # path to save dataset
        save_path = "../../../multimodal_dataset_SA_112/" + user + "_" + to_save

        # LOADING THE DATA
        file = f"./{user}/{task_name}.json"
        c = df["name"] == file
        filtered_df = df[c]
        if len(filtered_df) == 0:
            print(f"User {user} on task {task_name} is not available in the csv file")
            continue
        else:
            print(f"Creating dataset for user {user}, task {task_name}") 
            i += 1

            eeg_data = eeg_normalized[i]
            eye_data = eye_normalized[i]
            face_data = face_normalized[i]
            
            #label
            l = filtered_df['labels'].iloc[0]

            eeg_chunks = split_given_size(a=eeg_data, size=1)
            eye_chunks = split_given_size(a=eye_data, size=1)
            face_chunks = split_given_size(a=face_data, size=1)

            for chunk in range(chunk_lengths[f"{user}/{to_save}"]):
                video_path = "../../../video_dataset_SA_112_30frames/" + user + "_" + to_save
                video_data = np.load(f"{video_path}_chunk_{chunk}.npz")
                video_chunk = video_data["video"]
                label = video_data["label"]
                eeg_chunk = eeg_chunks[chunk].squeeze()
                eye_chunk = eye_chunks[chunk].squeeze()
                face_chunk = face_chunks[chunk].squeeze()
                np.savez(save_path + "_chunk_" + str(chunk), video=video_chunk, eeg=eeg_chunk, eye=eye_chunk, face=face_chunk, label=label)

COGNITIVE LOAD

In [5]:
df = pd.read_csv("./mental_demands_labels.csv")

In [None]:
meta_psd = []
meta_pupil_diam = []
meta_gaze_speed = []
meta_fixation = []
meta_scores = []
meta_eye_landmarks = []
meta_face_landmarks = []
chunk_lengths = {}
for user in sorted(os.listdir("../../../user_data/")):
    for task_name in sorted(os.listdir("../../../user_data/" + user + "/Task 1/")):
        if task_name == ".DS_Store" or task_name == "EEG" or task_name == "Eye Tracker" or task_name == "NASA" or task_name == "Video":
            continue
        to_save = task_name if task_name[-1] != "y" else task_name[:-10]

        # path to save dataset
        save_path = "../../../multimodal_dataset_CL_112/" + user + "_" + to_save

        # LOADING THE DATA
        c1 = df["User ID"] == int(user)
        c2 = df["Task ID"] == task_name
        filtered_df = df[c1 & c2]
        if len(filtered_df) == 0:
            print(f"User {user} on task {task_name} is not available in the csv file")
            continue
        else:
            print(f"Parsing data files of user {user}, task {task_name}")   

            # PSD
            psd = []
            if task_name[-1] != 'y':
                psd = loadmat(f"../../../subject_data_for_chuhao/{user}/EEG/min_processed/backup/Task 1 Without Secondary Task/{to_save}.mat")
            else:
                psd = loadmat(f"../../../subject_data_for_chuhao/{user}/EEG/min_processed/backup/Task 1 With Secondary Task/{to_save}.mat")       
            meta_psd.append(psd['all_bands'][:,0:31,0:40])

            # eye tracker
            meta_fixation.append(np.loadtxt(f"../../../subject_data_for_chuhao/{user}/Eye Tracker/Task 1/fixation/{task_name}.txt"))
            meta_pupil_diam.append(np.loadtxt(f"../../../subject_data_for_chuhao/{user}/Eye Tracker/Task 1/pupil_diam/{task_name}.txt"))
            meta_gaze_speed.append(np.loadtxt(f"../../../subject_data_for_chuhao/{user}/Eye Tracker/Task 1/gaze_speed/{task_name}.txt"))

            # facial landmarks
            meta_eye_landmarks.append(np.loadtxt(f"../../../subject_data_for_chuhao/{user}/Video/Task 1/Eye Landmarks/{task_name}.txt",delimiter=','))
            meta_face_landmarks.append(np.loadtxt(f"../../../subject_data_for_chuhao/{user}/Video/Task 1/Face Landmarks/{task_name}.txt",delimiter=','))
            
            #label
            label = filtered_df['labels'].iloc[0]

            # video
            task_video_data = []
            for frame in sorted(os.listdir("../../../user_data/" + user + "/Task 1/" + task_name + "/Images/")):
                image_path = "../../../user_data/" + user + "/Task 1/" + task_name + "/Images/" + frame
                image = cv2.imread(image_path)
                image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
                # print(image_path)
                # image = custom_pad_image(image, target_size=(224, 224))
                numpy_data = np.asarray(image).astype(np.float32)
                numpy_data = numpy_data.transpose(2, 1, 0)
                numpy_data = numpy_data / 255
                task_video_data.append(numpy_data)
            task_video_data = np.array(task_video_data)

            chunks = split_given_size(a=task_video_data, size=30)
            chunk_lengths[f"{user}/{to_save}"] = len(chunks)

# PREPROCESSING AND NORMALIZATION     
eeg_normalized = normalize_data(meta_psd)       
eye_normalized = np.concatenate((normalize_data(meta_pupil_diam), normalize_data(meta_gaze_speed)), axis=3)
face_normalized = np.concatenate((normalize_data(meta_eye_landmarks), normalize_data(meta_face_landmarks)), axis=3)


In [None]:
print(eeg_normalized.shape)
print(eye_normalized.shape)
print(face_normalized.shape)

In [None]:
i = -1
for user in sorted(os.listdir("../../../user_data/")):
    for task_name in sorted(os.listdir("../../../user_data/" + user + "/Task 1/")):
        if task_name == ".DS_Store" or task_name == "EEG" or task_name == "Eye Tracker" or task_name == "NASA" or task_name == "Video":
            continue
        to_save = task_name if task_name[-1] != "y" else task_name[:-10]

        # path to save dataset
        save_path = "../../../multimodal_dataset_CL_112/" + user + "_" + to_save

        # LOADING THE DATA
        file = f"./{user}/{task_name}.json"
        c = df["name"] == file
        filtered_df = df[c]
        if len(filtered_df) == 0:
            print(f"User {user} on task {task_name} is not available in the csv file")
            continue
        else:
            print(f"Creating data file for user {user}, task {task_name}") 
            i += 1

            eeg_data = eeg_normalized[i]
            eye_data = eye_normalized[i]
            face_data = face_normalized[i]
            
            #label
            l = filtered_df['labels'].iloc[0]

            eeg_chunks = split_given_size(a=eeg_data, size=1)
            eye_chunks = split_given_size(a=eye_data, size=1)
            face_chunks = split_given_size(a=face_data, size=1)

            for chunk in range(chunk_lengths[f"{user}/{to_save}"]):
                video_path = "../../../video_dataset_SA_112_30frames/" + user + "_" + to_save
                video_data = np.load(f"{video_path}_chunk_{chunk}.npz")
                video_chunk = video_data["video"]
                label = video_data["label"]
                eeg_chunk = eeg_chunks[chunk].squeeze()
                eye_chunk = eye_chunks[chunk].squeeze()
                face_chunk = face_chunks[chunk].squeeze()
                np.savez(save_path + "_chunk_" + str(chunk), video=video_chunk, eeg=eeg_chunk, eye=eye_chunk, face=face_chunk, label=label)