# Import Libraries

In [1]:
import os
import pandas as pd
import numpy as np
import zipfile
import cv2
import pickle

from glob import glob
from tqdm.auto import tqdm

# Data Load

### Data Target Folder : subject_id/synth
#### csv : gaze_x, gaze_y, gaze_z, rotation_x, rotation_y, rotation_z, translation_x, translation_y, translation_z

In [2]:
# 임시 데이터 제외하여 파일 또는 폴더 경로의 목록 반환 함수
def get_filepath_list(path):
    tmp_list = os.listdir(path)
    real_list = []
    for i in range(len(tmp_list)):
        ut = tmp_list[i]
        if ut[0] != ".": # 임시파일 예제 : .00000.bmp
            real_list.append(os.path.join(path,ut))
    return real_list

In [3]:
# Dataset Root Path
ut_path = "./UTMultiview"

# Subject Group Path List
ut_list = get_filepath_list(ut_path)

# UT Multi View Data 폴더에서 각 Subject의 그룹 별 폴도 목록 출력
print(ut_list)

['/Volumes/T7/DKU/GazeEstimation/dataset/UTMultiview/s00-09', '/Volumes/T7/DKU/GazeEstimation/dataset/UTMultiview/s10-19', '/Volumes/T7/DKU/GazeEstimation/dataset/UTMultiview/s20-29', '/Volumes/T7/DKU/GazeEstimation/dataset/UTMultiview/s30-39', '/Volumes/T7/DKU/GazeEstimation/dataset/UTMultiview/s40-49']


# Make Dataframe

In [4]:
def convert_pose(vector: np.ndarray) -> np.ndarray:
    rot = cv2.Rodrigues(np.array(vector).astype(np.float32))[0]
    vec = rot[:, 2]
    pitch = np.arcsin(vec[1])
    yaw = np.arctan2(vec[0], vec[2])
    return np.array([pitch, yaw]).astype(np.float32)


def convert_gaze(vector: np.ndarray) -> np.ndarray:
    x, y, z = vector
    pitch = np.arcsin(-y)
    yaw = np.arctan2(-x, -z)
    return np.array([pitch, yaw]).astype(np.float32)

In [5]:
def unzip_file(path, zip_list):
    # zip 파일 압축 해제 및 경로 리스트에 저장
    unzip_list = []
    for z_idx in range(len(zip_list)):
        zip_file = zip_list[z_idx]
        if zip_file[0] != ".":
            zip_nm = zip_file.replace("\\","/").split("/")[-1].split(".")[0]
            unzip_path = os.path.join(path, zip_nm)
            if os.path.isdir(unzip_path) != True:
                zip = zipfile.ZipFile(zip_file)
                zip.extractall(path=unzip_path)
            
            unzip_list.append(unzip_path)
    return unzip_list

## Synth

In [6]:
# 데이터프레임으로 구성하고자 하는 데이터의 리스트
datas = []

# Subject Group for문
for g_idx in tqdm(range(len(ut_list))):
    group_path = ut_list[g_idx] # Subject Group Path

    # Subject ID별 데이터 관리
    sj_list = get_filepath_list(group_path)
    for s_idx in range(len(sj_list)):
        subject_path = sj_list[s_idx] # Subject ID별 경로
        synth_path = os.path.join(subject_path, "synth") # 사용할 데이터 폴더
        # test_path = os.path.join(subject_path, "test") # 사용할 데이터 폴더
        
        csv_list = sorted(glob(os.path.join(synth_path, "*.csv"))) # csv 파일 리스트
        zip_list = sorted(glob(os.path.join(synth_path, "*.zip"))) # zip 파일 리스트
        
        # zip 파일 압축 해제 및 경로 리스트에 저장
        unzip_list = unzip_file(synth_path, zip_list)

        # 압축이 해제 되어진 폴더 경로
        for u_idx in range(len(unzip_list)):
            unzip = unzip_list[u_idx].replace("\\","/")
            
            group_id = unzip.split("/")[-4]
            subject_id = unzip.split("/")[-3] # Subject ID (s00, s01, ..., n)
            seq_id = unzip.split("/")[-1].split("_")[0] # Sequence Number(0000, 0001, ... , n)
            loc_id = unzip.split("/")[-1].split("_")[1] # Eye Location(Left or Right)
            
            img_list = get_filepath_list(unzip) # Image File List
            pose_file = synth_path = os.path.join(ut_path, group_id, subject_id, "raw", "img"+str(seq_id), "headpose.txt")  # 사용할 데이터 폴더
            with open(pose_file, 'r') as file:
                data = file.read()
            # 데이터에서 HeadPose 부분 찾기
            start = data.find("HeadPose")
            end = data.find("Features")

            # HeadPose 부분 추출
            headpose_data = data[start:end]
            headpose_lines = headpose_data.split('\n')
            
            translation_line = headpose_lines[1].strip('[] ')  # 대괄호 및 공백 제거
            translation = [float(x) for x in translation_line.split()]
            translation_vector = np.array(translation)
            
            rotation_data = []
            for l_idx in range(2,5):
                rotation_line = headpose_lines[l_idx].strip('[] ')
                rotation = [float(x) for x in rotation_line.split()]
                rotation_data.append(rotation)
            rotation_matrix = np.array(rotation_data)
            
            # 트랜스레이션 벡터를 3D 위치 벡터로 변환
            head_position = -translation_vector  # 위치 벡터는 -트랜스레이션 벡터

            # 회전 행렬을 사용하여 방향 벡터 계산
            direction_vector = np.array([0, 0, 1])  # 초기 방향 벡터 (예를 들어, 머리가 z-축 방향을 향할 때)

            # 회전 행렬을 방향 벡터에 적용
            direction_vector = np.array(direction_vector, dtype=float)
            direction_vector_rotated = np.dot(rotation_matrix, direction_vector)

            # 위치 벡터와 방향 벡터를 더하여 유닛 벡터 계산
            head_direction = direction_vector_rotated + head_position

            # 유닛 벡터로 정규화
            head_unit_vector = head_direction / np.linalg.norm(head_direction)

            pose_x, pose_y, pose_z = map(float, head_unit_vector)
            pose_data = [pose_x,pose_y,pose_z]
            if loc_id == "left":
                pose = convert_pose(pose_data)
            else:
                pose = convert_pose(pose_data) * np.array([1, -1])
                    
            # Gaze CSV Data
            columns = ["gaze_x", "gaze_y", "gaze_z"]
            gaze_data = pd.read_csv(csv_list[u_idx], header=None).iloc[:,:3] # 0~8의 컬럼만 필요하기 때문에 마지막의 None 값의 컬럼은 제외
            gaze_data.columns = columns
            try:
                # gaze data의 행에 해당하는 이미지 호출하여 데이터프레임 구성
                for g_idx in range(len(gaze_data)):
                    img_path = img_list[g_idx]
                    image = cv2.imread(img_path)
                    image_array = np.array(image)
                    image_data_gray = cv2.cvtColor(image_array, cv2.COLOR_RGB2GRAY)
                    
                    gaze_data = [pose_x,pose_y,pose_z]
                    
                    if loc_id == "left":
                        gaze = convert_gaze(gaze_data)
                        image = image_data_gray
                    else:
                        image = image_data_gray[:, ::-1]
                        gaze = convert_gaze(gaze_data) * np.array([1, -1])
                        
                    data_list = [subject_id, seq_id, loc_id, image.ravel(), pose[0], pose[1], gaze[0], gaze[1]]
                    datas.append(data_list)
            except:
                print(f"ZIP ERROR : {subject_id} / {seq_id} / {loc_id}\n")

# 리스트에 담아두었던 정보들을 DataFrame으로 생성
data_df = pd.DataFrame(columns=["participant_id","day","eye_location","image","head_pitch","head_yaw","gaze_pitch","gaze_yaw"], data=datas)
data_df.head(3)
data_df = data_df.sort_values(by=['participant_id', 'day']).reset_index(drop=True)

  0%|          | 0/5 [00:00<?, ?it/s]

In [8]:
save_path = "./utm_dataset"
if os.path.isdir(save_path) == False:
    os.makedirs(save_path)
save_file = os.path.join(save_path, "utm_synth_dataset.parquet")
data_df.to_parquet(save_file, engine='pyarrow', index=False)

In [9]:
# 참가자별 저장
participant_list = sorted(list(set(list(data_df['participant_id'].values))))
os.makedirs("./utm_dataset/subject_synth")
for par_id in tqdm(participant_list):
    par_df = data_df[data_df['participant_id'] == par_id]
    save_path = os.path.join("./utm_dataset/subject_synth", f'utm_synth_dataset_{par_id}.parquet')
    par_df.to_parquet(save_path, engine='pyarrow', index=False)

  0%|          | 0/50 [00:00<?, ?it/s]

# Sampled

In [10]:
# 각 참가자의 위치와 이미지의 순서 별로 1500장씩 추출
desired_sequence_count = 1500
result_df = pd.DataFrame()

for participant_id in sorted(data_df['participant_id'].unique()):
    for eye_location in data_df['eye_location'].unique():
        subset = data_df[(data_df['participant_id'] == participant_id) & (data_df['eye_location'] == eye_location)]
        
        if len(subset) >= desired_sequence_count:
            sampled_subset = subset.sample(desired_sequence_count)
        else:
            sampled_subset = subset.sample(desired_sequence_count, replace=True)
        
        result_df = pd.concat([result_df, sampled_subset])
result_df.reset_index(drop=True,inplace=True)

In [11]:
save_path = "./utm_dataset"
if os.path.isdir(save_path) == False:
    os.makedirs(save_path)
save_file = os.path.join(save_path, "sampled_utm_synth_dataset.parquet")
result_df.to_parquet(save_file, engine='pyarrow', index=False)

In [12]:
# 참가자별 저장
# 참가자별 저장
participant_list = sorted(list(set(list(result_df['participant_id'].values))))
os.makedirs("./utm_dataset/sampled_subject_synth")
for par_id in tqdm(participant_list):
    par_df = result_df[result_df['participant_id'] == par_id]
    save_path = os.path.join("./utm_dataset/sampled_subject_synth", f'sampled_utm_synth_dataset_{par_id}.parquet')
    par_df.to_parquet(save_path, engine='pyarrow', index=False)

  0%|          | 0/50 [00:00<?, ?it/s]