In [1]:
import os
import numpy as np
import tensorflow as tf
import tensorflow_hub as hub
import cv2
import tqdm
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
import pickle

In [2]:
model = hub.load("https://tfhub.dev/google/movenet/singlepose/lightning/4")
movenet = model.signatures['serving_default']

In [3]:
def run_inference(movenet, image):
    """이미지에서 포즈 키포인트를 추출하는 함수"""
    input_image = tf.image.resize_with_pad(tf.expand_dims(image, axis=0), 192, 192)
    input_image = tf.cast(input_image, dtype=tf.int32)
    
    # Run model inference
    keypoints_with_scores = movenet(input_image)
    return keypoints_with_scores['output_0'].numpy()

In [4]:
def extract_keypoints_from_video_frames(video_path, movenet):
    cap = cv2.VideoCapture(video_path)
    keypoints_list = []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        # 현재 프레임에서 MoveNet을 사용하여 키포인트 추출
        keypoints = run_inference(movenet, frame)
        keypoints_list.append(keypoints)
    
    cap.release()
    return keypoints_list

In [5]:
def calculate_mean_keypoints_from_file(keypoints_data):
    
    # 모든 동영상에 대한 평균 키포인트 계산
    mean_keypoints_all_videos = []
    for keypoints_list in keypoints_data['keypoints']:
        # 각 동영상에 대한 키포인트 리스트에서 평균 계산
        mean_keypoints = [[sum(pos) / len(keypoints_list) for pos in zip(*frame)] for frame in zip(*keypoints_list)]
        mean_keypoints_all_videos.append(mean_keypoints)
    
    return mean_keypoints_all_videos, keypoints_data['labels']

In [6]:
def calculate_keypoint_changes(keypoints_data):
    # 변경된 부분: 이미 로드된 키포인트 데이터를 직접 사용
    # 키포인트 데이터는 각 동영상의 프레임별 키포인트 리스트를 포함하는 리스트

    changes_list = []  # 변화량을 저장할 리스트 초기화

    for keypoints_list in keypoints_data['keypoints']:
        changes = []  # 개별 동영상의 키포인트 변화량을 저장할 리스트
        prev_keypoints = None

        for keypoints in keypoints_list:
            keypoints = np.array(keypoints)
            if prev_keypoints is not None:
                # 현재 프레임과 이전 프레임의 키포인트 사이의 변화량 계산
                change = np.abs(keypoints - prev_keypoints)
                changes.append(change)
            prev_keypoints = keypoints

        # 모든 변화량의 평균 계산
        if changes:
            mean_changes = np.mean(changes, axis=0)
        else:
            # 변화량이 없는 경우, 0으로 채워진 배열 반환
            mean_changes = np.zeros_like(keypoints_list[0])

        changes_list.append(mean_changes)

    return changes_list

In [7]:
def calculate_angle(point1, point2, point3):
    """
    세 점을 이용하여 두 벡터 사이의 각도를 계산합니다.
    :param point1, point2, point3: 각 점의 좌표를 나타내는 (x, y) 튜플이나 리스트.
    :return: 두 벡터 사이의 각도(도).
    """
    # 벡터 v1과 v2 생성
    v1 = np.array(point1) - np.array(point2)
    v2 = np.array(point3) - np.array(point2)
    
    # 벡터의 내적과 노름(크기)을 사용하여 각도(라디안) 계산
    angle_rad = np.arccos(np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2)))
    
    # 각도를 도로 변환
    angle_deg = np.degrees(angle_rad)
    
    return angle_deg

# 평균
def calculate_angle_changes(keypoints_data, point_indices):
    angle_changes_list = []
    for keypoints_list in keypoints_data['keypoints']:
        angles = []
        for frame_keypoints in keypoints_list:
            # 키포인트 데이터가 충분한지 확인
            if len(frame_keypoints) > max(point_indices):
                p1 = frame_keypoints[point_indices[0]][:2]  # x, y 좌표만 사용
                p2 = frame_keypoints[point_indices[1]][:2]
                p3 = frame_keypoints[point_indices[2]][:2]
                angle = calculate_angle(p1, p2, p3)
                angles.append(angle)
            else:
                # 충분한 데이터가 없는 경우 계산에서 제외
                continue
        
        if angles:  # 각도 데이터가 있을 경우에만 계산
            angle_changes = np.abs(np.diff(angles))
            mean_angle_change = np.mean(angle_changes)
            angle_changes_list.append(mean_angle_change)
        else:
            # 각도 데이터가 없는 경우 0으로 처리
            angle_changes_list.append(0)
    
    return np.array(angle_changes_list)

# 최솟값
def calculate_min_angle_changes(keypoints_data, point_indices):
    min_angle_changes_list = []
    for keypoints_list in keypoints_data['keypoints']:
        angles = []
        for frame_keypoints in keypoints_list:
            if len(frame_keypoints) > max(point_indices):
                p1 = frame_keypoints[point_indices[0]][:2]  # x, y 좌표만 사용
                p2 = frame_keypoints[point_indices[1]][:2]
                p3 = frame_keypoints[point_indices[2]][:2]
                angle = calculate_angle(p1, p2, p3)
                angles.append(angle)

        if angles:
            angle_changes = np.abs(np.diff(angles))
            min_angle_change = np.min(angle_changes) if len(angle_changes) > 0 else 0
            min_angle_changes_list.append(min_angle_change)
        else:
            min_angle_changes_list.append(0)
    
    return np.array(min_angle_changes_list)

# 최댓값
def calculate_max_angle_changes(keypoints_data, point_indices):
    max_angle_changes_list = []
    for keypoints_list in keypoints_data['keypoints']:
        angles = []
        for frame_keypoints in keypoints_list:
            if len(frame_keypoints) > max(point_indices):
                p1 = frame_keypoints[point_indices[0]][:2]  # x, y 좌표만 사용
                p2 = frame_keypoints[point_indices[1]][:2]
                p3 = frame_keypoints[point_indices[2]][:2]
                angle = calculate_angle(p1, p2, p3)
                angles.append(angle)

        if angles:
            angle_changes = np.abs(np.diff(angles))
            max_angle_change = np.max(angle_changes) if len(angle_changes) > 0 else 0
            max_angle_changes_list.append(max_angle_change)
        else:
            max_angle_changes_list.append(0)
    
    return np.array(max_angle_changes_list)


In [8]:
def calculate_enhanced_autocorrelation_features(keypoints_data):
    features_list = []  # 각 동영상의 향상된 자기상관성 특성을 저장할 리스트

    for keypoints_list in keypoints_data['keypoints']:
        changes = []
        prev_keypoints = None
        for keypoints in keypoints_list:
            keypoints = np.array(keypoints)
            if prev_keypoints is not None:
                change = np.linalg.norm(keypoints - prev_keypoints)
                changes.append(change)
            prev_keypoints = keypoints

        if changes:
            changes = np.array(changes)
            autocorrelation = np.correlate(changes - np.mean(changes), changes - np.mean(changes), mode='full')
            autocorrelation = autocorrelation[autocorrelation.size // 2:]  # 자기상관성 값 중 양의 지연만 고려

            # 향상된 특성 계산
            mean_autocorrelation = np.mean(autocorrelation)
            std_autocorrelation = np.std(autocorrelation)
            peak_count = np.sum(autocorrelation > (mean_autocorrelation + std_autocorrelation))  # 평균 이상의 피크 수

            features = [mean_autocorrelation, std_autocorrelation, peak_count]
        else:
            features = [0, 0, 0]

        features_list.append(features)

    return features_list

In [9]:
def average_features_per_video(features, labels):
    features_per_video = []
    labels_per_video = []
    
    current_video_label = None
    current_video_features = []
    
    for feature_vector, label in zip(features, labels):
        if current_video_label is None:
            current_video_label = label
        
        if label == current_video_label:
            current_video_features.append(feature_vector)
        else:
            # 이전 동영상의 특징들의 평균을 계산
            average_features = np.mean(current_video_features, axis=0)
            features_per_video.append(average_features)
            labels_per_video.append(current_video_label)
            
            # 새 동영상의 데이터로 리셋
            current_video_features = [feature_vector]
            current_video_label = label
    
    # 마지막 동영상 처리
    if current_video_features:
        average_features = np.mean(current_video_features, axis=0)
        features_per_video.append(average_features)
        labels_per_video.append(current_video_label)
    
    return np.array(features_per_video), np.array(labels_per_video)

In [10]:
pkl_file_path = '/Users/diana/Desktop/BabyposeModel/capstone2_SEDA/keypoints_data_augmented.pkl'

# pickle 파일 로드
with open(pkl_file_path, 'rb') as f:
     keypoints_data = pickle.load(f)

In [11]:
mean_keypoints_all_videos, labels = calculate_mean_keypoints_from_file(keypoints_data)
changes_list = calculate_keypoint_changes(keypoints_data)
autocorrelation_list = calculate_enhanced_autocorrelation_features(keypoints_data)

back_angle_changes_list1 = calculate_angle_changes(keypoints_data, (6,12,16))
back_angle_changes_list2 = calculate_angle_changes(keypoints_data, (5,11,15))
head_angle_changes_list1 = calculate_angle_changes(keypoints_data, (0,6,12))
head_angle_changes_list2 = calculate_angle_changes(keypoints_data, (0,5,11))
leg_angle_changes_list1 = calculate_angle_changes(keypoints_data, (12,14,16))
leg_angle_changes_list2 = calculate_angle_changes(keypoints_data, (11,13,15))
eye_angle_changes_list1 = calculate_angle_changes(keypoints_data, (1,5,9))
eye_angle_changes_list2 = calculate_angle_changes(keypoints_data, (2,6,10))
strech_angle_changes_list1 = calculate_angle_changes(keypoints_data, (5,8,10))
strech_angle_changes_list2 = calculate_angle_changes(keypoints_data, (6,7,9))
finger_angle_changes_list1 = calculate_angle_changes(keypoints_data, (0,8,10))
finger_angle_changes_list2 = calculate_angle_changes(keypoints_data, (0,7,9))

back_min_angle_changes_list1 = calculate_min_angle_changes(keypoints_data, (6,12,16))
back_min_angle_changes_list2 = calculate_min_angle_changes(keypoints_data, (5,11,15))
head_min_angle_changes_list1 = calculate_min_angle_changes(keypoints_data, (0,6,12))
head_min_angle_changes_list2 = calculate_min_angle_changes(keypoints_data, (0,5,11))
leg_min_angle_changes_list1 = calculate_min_angle_changes(keypoints_data, (12,14,16))
leg_min_angle_changes_list2 = calculate_min_angle_changes(keypoints_data, (11,13,15))
eye_min_angle_changes_list1 = calculate_min_angle_changes(keypoints_data, (1,5,9))
eye_min_angle_changes_list2 = calculate_min_angle_changes(keypoints_data, (2,6,10))
strech_min_angle_changes_list1 = calculate_min_angle_changes(keypoints_data, (5,8,10))
strech_min_angle_changes_list2 = calculate_min_angle_changes(keypoints_data, (6,7,9))
finger_min_angle_changes_list1 = calculate_min_angle_changes(keypoints_data, (0,8,10))
finger_min_angle_changes_list2 = calculate_min_angle_changes(keypoints_data, (0,7,9))

back_max_angle_changes_list1 = calculate_max_angle_changes(keypoints_data, (6,12,16))
back_max_angle_changes_list2 = calculate_max_angle_changes(keypoints_data, (5,11,15))
head_max_angle_changes_list1 = calculate_max_angle_changes(keypoints_data, (0,6,12))
head_max_angle_changes_list2 = calculate_max_angle_changes(keypoints_data, (0,5,11))
leg_max_angle_changes_list1 = calculate_max_angle_changes(keypoints_data, (12,14,16))
leg_max_angle_changes_list2 = calculate_max_angle_changes(keypoints_data, (11,13,15))
eye_max_angle_changes_list1 = calculate_max_angle_changes(keypoints_data, (1,5,9))
eye_max_angle_changes_list2 = calculate_max_angle_changes(keypoints_data, (2,6,10))
strech_max_angle_changes_list1 = calculate_max_angle_changes(keypoints_data, (5,8,10))
strech_max_angle_changes_list2 = calculate_max_angle_changes(keypoints_data, (6,7,9))
finger_max_angle_changes_list1 = calculate_max_angle_changes(keypoints_data, (0,8,10))
finger_max_angle_changes_list2 = calculate_max_angle_changes(keypoints_data, (0,7,9))

In [12]:
features = []
mean_keypoints_all_videos = np.array(mean_keypoints_all_videos)
changes_list = np.array(changes_list)
autocorrelation_list = np.array(autocorrelation_list)

for i in range(len(mean_keypoints_all_videos)):
    combined_feature = np.concatenate([mean_keypoints_all_videos[i].flatten(), changes_list[i].flatten(), autocorrelation_list[i].flatten(),
    # fft_features_list[i].flatten(),
    [back_max_angle_changes_list1[i] - back_min_angle_changes_list1[i],
    back_max_angle_changes_list2[i] - back_min_angle_changes_list2[i],
    head_max_angle_changes_list1[i] - head_min_angle_changes_list1[i],
    head_max_angle_changes_list2[i] - head_min_angle_changes_list2[i],
    leg_max_angle_changes_list1[i] - leg_min_angle_changes_list1[i],
    leg_max_angle_changes_list2[i] - leg_min_angle_changes_list2[i],
    eye_max_angle_changes_list1[i] - eye_min_angle_changes_list1[i],
    eye_max_angle_changes_list2[i] - eye_min_angle_changes_list2[i],
    strech_max_angle_changes_list1[i] - strech_min_angle_changes_list1[i],
    strech_max_angle_changes_list2[i] - strech_min_angle_changes_list2[i],
    finger_max_angle_changes_list1[i] - finger_min_angle_changes_list1[i],
    finger_max_angle_changes_list2[i] - finger_min_angle_changes_list2[i]]])
    
    features.append(combined_feature)

# 특징과 레이블을 동영상 단위로 평균 내기
features_per_video, labels_per_video = average_features_per_video(features, labels)

In [13]:
# pickle 파일 로드 (예: 동영상 프레임별 특징 및 레이블이 포함된 데이터)
with open('keypoints_data_augmented.pkl', 'rb') as f:
    keypoints_data = pickle.load(f)

# 동영상 별 특성 집계 (예를 들어 평균을 사용)
video_features = []
video_labels = []
current_video_features = []
current_video_label = None

for features, label in zip(keypoints_data['keypoints'], keypoints_data['labels']):
    if current_video_label is None:
        current_video_label = label
    if label == current_video_label:
        current_video_features.append(features)
    else:
        if current_video_features:  # 비어있지 않은 경우에만 평균 계산
            # 최대 길이 찾기
            max_length = max(len(f) for f in current_video_features)
            # 모든 특징을 최대 길이에 맞추기
            padded_features = [np.pad(f, (0, max_length - len(f)), 'constant', constant_values=0) for f in current_video_features]
            # 안전하게 평균 계산
            video_features.append(np.mean(padded_features, axis=0))
            video_labels.append(current_video_label)
        
        # 다음 동영상 처리
        current_video_features = [features]
        current_video_label = label

# 마지막 동영상 처리
if current_video_features:
    # 최대 길이 찾기
    max_length = max(len(f) for f in current_video_features)
    # 모든 특징을 최대 길이에 맞추기
    padded_features = [np.pad(f, (0, max_length - len(f)), 'constant', constant_values=0) for f in current_video_features]
    # 안전하게 평균 계산
    video_features.append(np.mean(padded_features, axis=0))
    video_labels.append(current_video_label)

# 데이터 스케일링 및 분할
scaler = StandardScaler()
features_scaled = scaler.fit_transform(video_features)
X_train, X_test, y_train, y_test = train_test_split(features_scaled, video_labels, test_size=0.2, random_state=42)

# Optuna를 사용한 SVM 최적화
def objective(trial):
    c = trial.suggest_loguniform('C', 1e-3, 1e3)
    gamma = trial.suggest_loguniform('gamma', 1e-3, 1e3)
    kernel = trial.suggest_categorical('kernel', ['rbf', 'poly', 'sigmoid'])

    svm = SVC(C=c, gamma=gamma, kernel=kernel, random_state=42)
    svm.fit(X_train, y_train)
    return svm.score(X_test, y_test)

study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=50)

print(f'Best parameters: {study.best_params}')
print(f'Test Accuracy: {study.best_trial.value:.2f}%')

# 최적의 파라미터로 모델 재학습 및 저장
best_model = SVC(**study.best_params, random_state=42)
best_model.fit(features_scaled, video_labels)
with open('svm_model_video1.pkl', 'wb') as f:
    pickle.dump({'model': best_model, 'scaler': scaler}, f)

: 