In [None]:
import zipfile
import os

def unzip(file_path, extract_dir):
  with zipfile.ZipFile(file_path, 'r') as zip_ref:
    for zipinfo in zip_ref.infolist():
        filename = zipinfo.filename
        # Replace problematic characters in the filename
        sanitized_filename = filename.encode('cp437', 'surrogateescape').decode('utf-8', 'replace')
        zipinfo.filename = sanitized_filename
        zip_ref.extract(zipinfo, extract_dir)

# 사용 예시
file_path = '/content/drive/MyDrive/zerobase/논문반/archive.zip'  # 압축 파일 경로
extract_dir = '/content/drive/MyDrive/zerobase/논문반'  # 압축을 풀 디렉토리 경로

unzip(file_path, extract_dir)

In [None]:


import cv2
import os

def video_to_images(video_path, output_dir):
  """
  비디오를 이미지 프레임으로 변환하여 저장하는 함수

  Args:
    video_path: 비디오 파일 경로
    output_dir: 이미지를 저장할 디렉토리 경로
  """
  # 비디오 캡처 객체 생성
  cap = cv2.VideoCapture(video_path)

  # 프레임 번호 초기화
  frame_count = 0

  # 이미지 저장 디렉토리 생성
  os.makedirs(output_dir, exist_ok=True)

  # 비디오 프레임 추출 및 저장
  while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
      break

    # 이미지 파일 이름 생성
    image_file = os.path.join(output_dir, f"frame_{frame_count:04d}.jpg")

    # 이미지 저장
    cv2.imwrite(image_file, frame)

    frame_count += 1

  # 자원 해제
  cap.release()

# 사용 예시
video_dir = '/content/drive/MyDrive/zerobase/논문반/RWF-2000/train/Fight'  # 원본 영상이 담긴 폴더 경로
output_dir = '/content/drive/MyDrive/zerobase/논문반/RWF-2000/train_images'  # 이미지를 저장할 디렉토리 경로

# 폴더 내 모든 비디오 파일에 대해 이미지 변환 수행
for filename in os.listdir(video_dir):
  if filename.endswith('.avi'):  # .mp4 확장자를 가진 파일만 처리
    video_path = os.path.join(video_dir, filename)
    video_name = os.path.splitext(filename)[0]  # 비디오 파일 이름 (확장자 제외)
    image_output_dir = os.path.join(output_dir, video_name)  # 각 비디오별 이미지 저장 디렉토리
    video_to_images(video_path, image_output_dir)


In [None]:
import os
import cv2
import torch
import pandas as pd
import numpy as np
from ultralytics import YOLO

class GetKeypoint:
    NOSE = 0
    LEFT_EYE = 1
    RIGHT_EYE = 2
    LEFT_EAR = 3
    RIGHT_EAR = 4
    LEFT_SHOULDER = 5
    RIGHT_SHOULDER = 6
    LEFT_ELBOW = 7
    RIGHT_ELBOW = 8
    LEFT_WRIST = 9
    RIGHT_WRIST = 10
    LEFT_HIP = 11
    RIGHT_HIP = 12
    LEFT_KNEE = 13
    RIGHT_KNEE = 14
    LEFT_ANKLE = 15
    RIGHT_ANKLE = 16

get_keypoint = GetKeypoint()

def classify_pose(keypoints):
    # 키포인트가 비어있거나 None인 경우 처리
    if keypoints is None or len(keypoints) == 0:
        print("No keypoints detected.")
        return 'unknown'  # 기본 행동 반환

    # 첫 번째 사람의 키포인트를 가져옴
    keypoints_data = keypoints.data[0]  # 첫 번째 사람의 키포인트 데이터
    if keypoints_data.shape[0] < 17:  # 17개의 키포인트 필요
        print("Not enough keypoints provided.")
        return 'unknown'  # 또는 다른 기본 행동 반환

    # 각 신체 부위의 y 좌표 설정
    nose_y = keypoints_data[GetKeypoint.NOSE, 1].item()
    left_shoulder_y = keypoints_data[GetKeypoint.LEFT_SHOULDER, 1].item()
    right_shoulder_y = keypoints_data[GetKeypoint.RIGHT_SHOULDER, 1].item()
    left_hip_y = keypoints_data[GetKeypoint.LEFT_HIP, 1].item()
    right_hip_y = keypoints_data[GetKeypoint.RIGHT_HIP, 1].item()
    left_knee_y = keypoints_data[GetKeypoint.LEFT_KNEE, 1].item()
    right_knee_y = keypoints_data[GetKeypoint.RIGHT_KNEE, 1].item()
    left_elbow_y = keypoints_data[GetKeypoint.LEFT_ELBOW, 1].item()
    right_elbow_y = keypoints_data[GetKeypoint.RIGHT_ELBOW, 1].item()
    left_wrist_y = keypoints_data[GetKeypoint.LEFT_WRIST, 1].item()
    right_wrist_y = keypoints_data[GetKeypoint.RIGHT_WRIST, 1].item()
    left_ankle_y = keypoints_data[GetKeypoint.LEFT_ANKLE, 1].item()
    right_ankle_y = keypoints_data[GetKeypoint.RIGHT_ANKLE, 1].item()

    shoulder_diff = abs(left_shoulder_y - right_shoulder_y)
    hip_diff = abs(left_hip_y - right_hip_y)
    shoulder_hip_diff = min(abs(left_shoulder_y - left_hip_y), abs(right_shoulder_y - right_hip_y))
    hip_knee_diff = min(abs(left_hip_y - left_knee_y), abs(right_hip_y - right_knee_y))
    elbow_wrist_diff = min(abs(left_elbow_y - left_wrist_y), abs(right_elbow_y - right_wrist_y))
    knee_ankle_diff = min(abs(left_knee_y - left_ankle_y), abs(right_knee_y - right_ankle_y))
    elbow_diff = abs(left_elbow_y - right_elbow_y)
    foot_diff = abs(left_ankle_y - right_ankle_y)
    foot_y = min(left_ankle_y, right_ankle_y)

    if shoulder_diff < 20 and hip_diff < 20:
        return 'falling'
    elif shoulder_diff < 20 and hip_diff < 20 and shoulder_hip_diff > 50 and hip_knee_diff < 30:
        return 'attack'
    elif nose_y < min(left_shoulder_y, right_shoulder_y) and shoulder_hip_diff > 50:
        return 'defense'
    elif shoulder_hip_diff > 50 and hip_knee_diff < 30 and knee_ankle_diff > 50:
        return 'aggressive'
    elif elbow_wrist_diff < 20 and min(left_wrist_y, right_wrist_y) < nose_y + 20:
        return 'punch'
    elif elbow_wrist_diff < 20 and max(left_wrist_y, right_wrist_y) > max(left_shoulder_y, right_shoulder_y):
        return 'push'
    elif abs(left_wrist_y - right_wrist_y) < 20 and min(left_wrist_y, right_wrist_y) < nose_y + 20 and min(left_wrist_y, right_wrist_y) > nose_y - 20:
        return 'grab_collar'
    elif knee_ankle_diff > 50 and foot_y < hip_diff:
        return 'kicking'
    elif elbow_diff < 20 and min(left_elbow_y, right_elbow_y) < shoulder_diff:
        return 'elbow_strike'
    elif shoulder_hip_diff > 50 and elbow_wrist_diff < 20 and max(left_wrist_y, right_wrist_y) > max(left_shoulder_y, right_shoulder_y):
        return 'shoving'
    elif abs(left_wrist_y - right_wrist_y) < 20 and min(left_wrist_y, right_wrist_y) < nose_y + 10 and min(left_wrist_y, right_wrist_y) > nose_y - 10:
        return 'choking'
    elif foot_diff > 50 and foot_y > min(left_ankle_y, right_ankle_y):
        return 'stomping'
    elif shoulder_hip_diff < 50 and hip_knee_diff < 50 and min(left_knee_y, right_knee_y) < nose_y:
        return 'crouching'
    else:
        return 'neutral'

def vis_frame(orig_img, boxes, keypoints, labels):
    vis_img = orig_img.copy()

    # boxes가 비어있거나 길이가 0일 경우 처리
    if boxes is None or len(boxes) == 0:
        print("No boxes detected.")
        return vis_img

    for i in range(len(boxes)):
        box = boxes[i].xyxy  # 각 박스의 좌표 가져오기
        if len(box) == 0:
            continue  # 박스가 없다면 건너뜀
        x1, y1, x2, y2 = box[0]  # 첫 번째 박스 좌표 가져오기
        cv2.rectangle(vis_img, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)

        # 키포인트 시각화
        if keypoints is not None and len(keypoints) > i:
            keypoints_data = keypoints[i].data  # 각 사람의 키포인트 데이터
            for j in range(keypoints_data.shape[0]):  # 각 키포인트 수 만큼 반복
                if keypoints_data[j].shape[0] == 3:  # (x, y, confidence) 확인
                    x, y, conf = keypoints_data[j].cpu().numpy()  # x, y 좌표와 신뢰도 가져오기
                    if conf > 0.5:  # 신뢰도가 0.5 이상인 경우에만 표시
                        cv2.circle(vis_img, (int(x), int(y)), 5, (0, 255, 0), -1)
            cv2.putText(vis_img, labels[i], (int(x1), int(y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
    return vis_img



def main():
    # YOLO Pose 모델 로드
    yolo_pose_model = YOLO('yolov8n-pose.pt')  # 다운로드한 모델 경로

    image_files = []
    image_directory = '/content/drive/MyDrive/zerobase/논문반/RWF-2000/train_images'

    for root, dirs, files in os.walk(image_directory):
        for file in files:
            if file.lower().endswith(('.png', '.jpg', '.jpeg')):
                image_files.append(os.path.join(root, file))

    image_files = sorted(image_files)
    output_directory = '/content/drive/MyDrive/zerobase/results/'
    csv_file_path = '/content/drive/MyDrive/zerobase/train_pose2_keypoint.csv'
    last_processed_file_path = '/content/drive/MyDrive/zerobase/last_processed_image.txt'

    if not os.path.exists(output_directory):
        os.makedirs(output_directory)

    # 마지막으로 처리한 이미지 읽기
    last_processed_image = None
    if os.path.exists(last_processed_file_path):
        with open(last_processed_file_path, 'r') as f:
            last_processed_image = f.read().strip()

    all_keypoints = []

    # 모든 이미지 파일 처리
    for idx, image_file in enumerate(image_files):
        # 마지막 처리한 이미지부터 이어서 처리
        if last_processed_image is not None and image_file <= last_processed_image:
            print(f'이미지 이미 처리됨, 건너뜁니다: {image_file}')
            continue

        input_image = cv2.imread(image_file)
        if input_image is None:
            print(f'오류: 이미지 {image_file}를 찾을 수 없거나 읽을 수 없습니다.')
            continue

        try:
            # YOLO Pose 모델 실행
            results = yolo_pose_model.predict(input_image, save=False)

            keypoints = results[0].keypoints  # YOLOv8의 Keypoints 객체
            boxes = results[0].boxes  # YOLOv8의 Boxes 객체
            labels = []

            if keypoints is not None:
                for i in range(len(keypoints)):
                    keypoints_data = keypoints[i].data  # 첫 번째 사람의 키포인트 데이터
                    label = classify_pose(keypoints_data)  # 포즈 분류
                    labels.append(label)

            # 결과 시각화 및 저장
            result_img = vis_frame(input_image, boxes, keypoints, labels)
            output_path = os.path.join(output_directory, f'result_{os.path.basename(image_file)}')
            cv2.imwrite(output_path, result_img)
            print(f'결과가 저장되었습니다: {output_path}')

            # 키포인트 저장 로직
            keypoints_data = [image_file] + labels + [k for k in keypoints]  # 각 키포인트 데이터 추가
            all_keypoints.append(keypoints_data)

            # 마지막으로 처리된 이미지 기록
            with open(last_processed_file_path, 'w') as f:
                f.write(image_file)

        except Exception as e:
            print(f'처리 중 오류 발생: {e}')

    # 최종 CSV 파일 저장
    df = pd.DataFrame(all_keypoints)
    df.to_csv(csv_file_path, index=False)
    print(f'키포인트가 {csv_file_path}에 저장되었습니다.')

if __name__ == '__main__':
    main()



Downloading https://github.com/ultralytics/assets/releases/download/v8.2.0/yolov8n-pose.pt to 'yolov8n-pose.pt'...


100%|██████████| 6.52M/6.52M [00:00<00:00, 78.5MB/s]


[1;30;43m스트리밍 출력 내용이 길어서 마지막 5000줄이 삭제되었습니다.[0m
결과가 저장되었습니다: /content/drive/MyDrive/zerobase/results/result_frame_0120.jpg

0: 384x640 1 person, 284.3ms
Speed: 2.9ms preprocess, 284.3ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)
결과가 저장되었습니다: /content/drive/MyDrive/zerobase/results/result_frame_0121.jpg

0: 384x640 1 person, 181.5ms
Speed: 4.1ms preprocess, 181.5ms inference, 1.2ms postprocess per image at shape (1, 3, 384, 640)
결과가 저장되었습니다: /content/drive/MyDrive/zerobase/results/result_frame_0122.jpg

0: 384x640 1 person, 171.2ms
Speed: 5.9ms preprocess, 171.2ms inference, 1.2ms postprocess per image at shape (1, 3, 384, 640)
결과가 저장되었습니다: /content/drive/MyDrive/zerobase/results/result_frame_0123.jpg

0: 384x640 1 person, 177.1ms
Speed: 5.4ms preprocess, 177.1ms inference, 1.2ms postprocess per image at shape (1, 3, 384, 640)
결과가 저장되었습니다: /content/drive/MyDrive/zerobase/results/result_frame_0124.jpg

0: 384x640 1 person, 170.3ms
Speed: 3.2ms preprocess, 170.3ms 