In [None]:
import os
import glob
import pandas as pd
import matplotlib.pyplot as plt
import pickle 
import cv2
import mediapipe as mp
import numpy as np
from time import time
from datetime import datetime, timedelta

# Mediapipe configurations
mp_face_detection = mp.solutions.face_detection
mp_face_mesh = mp.solutions.face_mesh
mp_drawing = mp.solutions.drawing_utils

# Timestamp 데이터 프레임 정리 

In [None]:
Video = '/Users/iduli/Desktop/Ch2_25_Scientific_Data/Final/raw_Dataset/SENSORS/VIDEO'
raw_video_dir = os.path.join(Video, "raw_video")
video_dir = os.path.join(Video)
output_dir = os.path.join(Video, "results")
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# 1st 폴더 처리
folder_1st = os.path.join(raw_video_dir, "1st")
time_csv_1st = os.path.join(video_dir, "1st_time.csv")

# 2nd 폴더 처리
folder_2nd = os.path.join(raw_video_dir, "2nd")
time_csv_2nd = os.path.join(video_dir, "2nd_time.csv")

In [None]:
def get_baseline_time_for_condition(subject_num, condition, raw_audio_folder):
    filename = f"{subject_num}_1_{condition}_webcam-logger.csv"
    file_path = os.path.join(raw_audio_folder, filename)

    if not os.path.exists(file_path):
        return None
    df = pd.read_csv(file_path)
    # 첫 번째 행의 global_time을 baseline으로 사용
    baseline_time = df.loc[0, "global_time"]
    #print(baseline_time)
    return baseline_time

def parse_mmssms_to_timedelta(time_str):

    if pd.isnull(time_str) or time_str.strip() == "":
        return None
    mm, ss, ms_str = time_str.split(";")
    mm = int(mm)
    ss = int(ss)
    # '미리초(0~99)' * 10 => 실제 milliseconds
    ms = int(ms_str) * 10
    total_milliseconds = mm * 60000 + ss * 1000 + ms
    #print(total_milliseconds)
    return total_milliseconds

In [82]:
# 2nd 
time_df = pd.read_csv(time_csv_2nd)
results = []
for _, row in time_df.iterrows():
    pnum = row["pnum"]
    condition = str(row["condition"]).strip()
    #print(pnum, condition)
    c1_str = row["c1"]
    c2_str = row["c2"]
    c3_str = row["c3"]
    baseline = get_baseline_time_for_condition(pnum, condition, folder_2nd)
    if baseline is None:
        # 해당 파일이 없으면 None으로 두거나, 로그를 남길 수 있음
        c1_utc = None
        c2_utc = None
        c3_utc = None
    else:
        # 2) c1, c2, c3 각각 파싱
        
        c1_offset = parse_mmssms_to_timedelta(c1_str)
        c2_offset = parse_mmssms_to_timedelta(c2_str)
        c3_offset = parse_mmssms_to_timedelta(c3_str)

        c1_utc = int(baseline) + int(c1_offset) if c1_offset else None
        c2_utc = int(baseline) + int(c2_offset) if c2_offset else None
        c3_utc = int(baseline) + int(c3_offset) if c3_offset else None
    results.append({
        "pnum": pnum,
        "c1": c1_utc,
        "c2": c2_utc,
        "c3": c3_utc
    })

pd.set_option('display.float_format', lambda x: f'{x:.0f}')

time_2 = pd.DataFrame(results)
# pnum, condition 순으로 정렬
time_2.sort_values(by=["pnum"], inplace=True)
time_2_grouped = time_2.groupby("pnum", as_index=False).first()
print(time_2_grouped)

    pnum            c1            c2            c3
0      1 1708133022189 1708133874749 1708135064019
1      2 1708138921695 1708139374615 1708139837415
2      4 1708149174093 1708149673917 1708150138067
3      5 1708154998132 1708155464002 1708156023392
4      6 1708160855018 1708161215418 1708161682458
5      7 1708220866360 1708221284930 1708221830720
6      8 1708224638019 1708225072039 1708225517089
7     10 1708236380941 1708236820081 1708237334871
8     11 1708241795392 1708242291402 1708242944622
9     12 1708307665621 1708308079571 1708308581531
10    13 1708311237274 1708312013614 1708312435544
11    14 1708316103451 1708316560461 1708317021341
12    15 1708322961823 1708323444973 1708323913713


In [83]:
# 2nd 
time_df = pd.read_csv(time_csv_1st)
results = []
for _, row in time_df.iterrows():
    pnum = row["pnum"]
    condition = str(row["condition"]).strip()
    #print(pnum, condition)
    c1_str = row["c1"]
    c2_str = row["c2"]
    c3_str = row["c3"]
    baseline = get_baseline_time_for_condition(pnum, condition, folder_1st)
    if baseline is None:
        # 해당 파일이 없으면 None으로 두거나, 로그를 남길 수 있음
        c1_utc = None
        c2_utc = None
        c3_utc = None
    else:
        # 2) c1, c2, c3 각각 파싱
        
        c1_offset = parse_mmssms_to_timedelta(c1_str)
        c2_offset = parse_mmssms_to_timedelta(c2_str)
        c3_offset = parse_mmssms_to_timedelta(c3_str)

        c1_utc = int(baseline) + int(c1_offset) if c1_offset else None
        c2_utc = int(baseline) + int(c2_offset) if c2_offset else None
        c3_utc = int(baseline) + int(c3_offset) if c3_offset else None
    results.append({
        "pnum": pnum,
        "c1": c1_utc,
        "c2": c2_utc,
        "c3": c3_utc
    })

pd.set_option('display.float_format', lambda x: f'{x:.0f}')

time_1 = pd.DataFrame(results)
# pnum, condition 순으로 정렬
time_1.sort_values(by=["pnum"], inplace=True)
time_1_grouped = time_1.groupby("pnum", as_index=False).first()

print(time_1_grouped)

    pnum            c1            c2            c3
0      1 1671068336785 1671068806645 1671069238335
1      2 1671072379186 1671072795326 1671073226566
2      3 1671078713202 1671080082890 1671079535240
3      4 1671083392936 1671083821136 1671084427321
4      8 1671167300402 1671167843772 1671168355472
5     10 1671176153254 1671176527614 1671176950044
6     12 1671245406272 1671245867362 1671246283292
7     13 1671251228083 1671251839213 1671252301653
8     14 1671256497984 1671256889694 1671257299414
9     16 1671267813860 1671268230560 1671268630930
10    18 1671332110114 1671332508724 1671332923974
11    19 1671337390010 1671337759720 1671338152320
12    20 1671343176119 1671343685068 1671344101538
13    21 1671348410357 1671348821827 1671349396847
14    22 1671353003060 1671353409090 1671353984170
15    23 1671429960227 1671430379247 1671430797957
16    25 1671587643730 1671588031700 1671588539130
17    26 1671672896380 1671673328260 1671673828350
18    27 1671688733167 16716891

# feature extraction

In [None]:
video_folder = '/Users/iduli/Desktop/Ch2_25_Scientific_Data/Final/raw_Dataset/SENSORS/VIDEO/parsed_video/2nd'
result_folder = '/Users/iduli/Desktop/Ch2_25_Scientific_Data/Final/raw_Dataset/SENSORS/VIDEO/features/2nd'

# Mediapipe, OpenCV and interval configurations
EAR_THRESHOLD = 0.3
FOURCC = 'XVID'

In [None]:
def save_features_to_csv(df_features, file_path):
    df_features.to_csv(file_path, index=False)

# Calculate the eye aspect ratio
def calculate_eye_aspect_ratio(eye_landmarks):
    A = np.linalg.norm(eye_landmarks[1] - eye_landmarks[5])
    B = np.linalg.norm(eye_landmarks[2] - eye_landmarks[4])
    C = np.linalg.norm(eye_landmarks[0] - eye_landmarks[3])
    return (A + B) / (2.0 * C)

# Calculate the distance between two points
def normalize_vector(v):
    norm = np.linalg.norm(v)
    if norm == 0:
        return v
    return v / norm


In [None]:
def process_video(video_path):
    frame_idx = 0
    frame_time = 0
    # Open video
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Cannot open video {video_path}")
        return None    
    folder_path, video_file = os.path.split(video_path)
    print(video_file)
    base_name = os.path.splitext(video_file)[0]
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    # Initialize variables
    feature_list = []
    prev_lip_corner_left, prev_lip_corner_right = None, None
    prev_left_eye_aspect_ratio, prev_right_eye_aspect_ratio = None, None
    blink_count = 0

    with mp_face_detection.FaceDetection(model_selection=1, min_detection_confidence=0.7) as face_detection, \
         mp_face_mesh.FaceMesh(min_detection_confidence=0.7, min_tracking_confidence=0.7, refine_landmarks= True) as face_mesh:
        #print(frame_idx)
        while cap.isOpened():
            # Set frame time, will be added after 영상 시작시간  
            #current_frame_idx = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
            #print(current_frame_idx) frame 0 1 같은시간 읽어짐. 
            frame_time = cap.get(cv2.CAP_PROP_POS_MSEC)
            success, image = cap.read()
            error_type = 0
            if not success:
                # Case 1: 프레임 읽기 실패
                error_type = 1
                frame_features = [frame_time, np.nan, np.nan, np.nan, np.nan,
                                  np.nan, np.nan, np.nan, np.nan, np.nan, error_type]  
                break

            # Convert BGR to RGB for Mediapipe
            image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            image_rgb.flags.writeable = False

            # Process face detection and face mesh
            results = face_detection.process(image_rgb)
            mesh_results = face_mesh.process(image_rgb)

            if not results.detections:
                # Case 2: 얼굴 탐지 실패
                error_type = 2
                frame_features = [frame_time, np.nan, np.nan, np.nan, np.nan,
                                  np.nan, np.nan, np.nan, np.nan, np.nan, error_type]  
                feature_list.append(frame_features)
                continue

            # Convert RGB back to BGR for OpenCV
            image_rgb.flags.writeable = True
            image = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR)

            # Initialize default values
            rotation_x, rotation_y, rotation_z = np.nan, np.nan, np.nan
            total_translation = np.nan
            gaze_direction = np.array([np.nan, np.nan])
            left_lip_movement, right_lip_movement = np.nan, np.nan

            # Process face detection
            if results.detections:
                largest_face = max(results.detections, key=lambda detection: detection.location_data.relative_bounding_box.width * detection.location_data.relative_bounding_box.height)
                blink_count = 0  # Reset blink count for each frame

            # Process face mesh landmarks
            if mesh_results.multi_face_landmarks:
                for face_landmarks in mesh_results.multi_face_landmarks:
                    landmarks = np.array([(lm.x, lm.y, lm.z) for lm in face_landmarks.landmark])

                    # Calculate head movement
                    image_points = np.array([
                        (landmarks[1][0] * image.shape[1], landmarks[1][1] * image.shape[0]),
                        (landmarks[33][0] * image.shape[1], landmarks[33][1] * image.shape[0]),
                        (landmarks[263][0] * image.shape[1], landmarks[263][1] * image.shape[0]),
                        (landmarks[61][0] * image.shape[1], landmarks[61][1] * image.shape[0]),
                        (landmarks[291][0] * image.shape[1], landmarks[291][1] * image.shape[0]),
                        (landmarks[199][0] * image.shape[1], landmarks[199][1] * image.shape[0])
                    ], dtype="double")

                    '''considering changing the points from static value to dynamic value based on landmarks'''
                    model_points = np.array([
                        (0.0, 0.0, 0.0),
                        (-30.0, -125.0, -30.0),
                        (30.0, -125.0, -30.0),
                        (-60.0, -70.0, -60.0),
                        (60.0, -70.0, -60.0),
                        (0.0, -150.0, -100.0)
                    ]) 
                    '''model_points = np.array([
                                    (landmarks[1][0], landmarks[1][1], landmarks[1][2]),    # 코 끝
                                    (landmarks[33][0], landmarks[33][1], landmarks[33][2]), # 왼쪽 눈
                                    (landmarks[263][0], landmarks[263][1], landmarks[263][2]), # 오른쪽 눈
                                    (landmarks[61][0], landmarks[61][1], landmarks[61][2]), # 왼쪽 입술
                                    (landmarks[291][0], landmarks[291][1], landmarks[291][2]), # 오른쪽 입술
                                    (landmarks[199][0], landmarks[199][1], landmarks[199][2])  # 턱 끝
                                ])'''
                    size = image.shape
                    focal_length = size[1]
                    print(focal_length)
                    center = (size[1] / 2, size[0] / 2)
                    camera_matrix = np.array([
                        [focal_length, 0, center[0]],
                        [0, focal_length, center[1]],
                        [0, 0, 1]
                    ], dtype="double")
                    print(center[0])

                    dist_coeffs = np.zeros((4, 1))
                    success, rotation_vector, translation_vector = cv2.solvePnP(model_points, image_points, camera_matrix, dist_coeffs)

                    if success:
                        rotation_x, rotation_y, rotation_z = rotation_vector.ravel()
                        total_translation = np.linalg.norm(translation_vector)
                        #print('head')

                        # Calculate eye aspect ratio
                        left_eye_landmarks = landmarks[[33, 160, 158, 133, 153, 144]]
                        right_eye_landmarks = landmarks[[362, 385, 387, 263, 373, 380]]
                        left_ear = calculate_eye_aspect_ratio(left_eye_landmarks)
                        right_ear = calculate_eye_aspect_ratio(right_eye_landmarks)

                        if prev_left_eye_aspect_ratio is not None and prev_right_eye_aspect_ratio is not None:
                            if (left_ear < EAR_THRESHOLD and prev_left_eye_aspect_ratio >= EAR_THRESHOLD) or \
                               (right_ear < EAR_THRESHOLD and prev_right_eye_aspect_ratio >= EAR_THRESHOLD):
                                blink_count += 1
                        #print('eye')

                        prev_left_eye_aspect_ratio = left_ear
                        prev_right_eye_aspect_ratio = right_ear

                        # Calculate gaze direction
                        
                        left_iris_center = np.mean(landmarks[[474, 475, 476, 477]], axis=0)
                        right_iris_center = np.mean(landmarks[[469, 470, 471, 472]], axis=0)
                        nose_tip = landmarks[1]
                        gaze_direction = normalize_vector((left_iris_center + right_iris_center) / 2.0 - nose_tip)
                        #print('gaze')
                        # Calculate lip movement
                        left_lip_corner = landmarks[61]
                        right_lip_corner = landmarks[291]
                        if prev_lip_corner_left is not None and prev_lip_corner_right is not None:
                            left_lip_movement = np.linalg.norm(left_lip_corner - nose_tip) - np.linalg.norm(prev_lip_corner_left - nose_tip)
                            right_lip_movement = np.linalg.norm(right_lip_corner - nose_tip) - np.linalg.norm(prev_lip_corner_right - nose_tip)
                        #print('lip')
                        prev_lip_corner_left = left_lip_corner
                        prev_lip_corner_right = right_lip_corner
            # Append features
            frame_features = [frame_time, rotation_x, rotation_y, rotation_z, total_translation,
                              gaze_direction[0], gaze_direction[1], left_lip_movement, right_lip_movement, blink_count, error_type]

            feature_list.append(frame_features)
            frame_idx += 1

    cap.release()
    csv_file_path = os.path.join(result_folder, f'{base_name}.csv')
    save_features_to_csv(pd.DataFrame(feature_list, columns=[
        "Timestamp", "rotation_x", "rotation_y", "rotation_z",
        "Total Movement", "Gaze X", "Gaze Y",
        "Left Lip Movement", "Right Lip Movement",
        "Blink Count",'error_type']), csv_file_path)

In [None]:
# test 
process_video('/Users/iduli/Desktop/Ch2_25_Scientific_Data/Final/raw_Dataset/SENSORS/VIDEO/parsed_video/2nd/3_c1.mp4')

In [None]:
for file in os.listdir(video_folder): 
    process_video(os.path.join(video_folder,file))