In [145]:
# ! pip install av
# ! pip install ffmpeg-python
# ! pip install opencv-python
# ! pip install mediapipe
# ! pip install pandas
# ! pip install tqdm

# start


## import

In [146]:
import cv2 as cv
import mediapipe as mp
import pandas as pd
import os
import torch
from torch import tensor
import torch.nn as nn
from torch.nn import DataParallel
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.utils.data import random_split
from torch.optim.lr_scheduler import ReduceLROnPlateau
import matplotlib.pyplot as plt
import ffmpeg

# GPU

In [147]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")
if torch.cuda.device_count() > 1:
    print(f"Number of GPUs available: {torch.cuda.device_count()}")
    multi_gpu = True
else:
    multi_gpu = False
device , multi_gpu

Number of GPUs available: 4


(device(type='cuda', index=0), True)

## skeleton to csv

In [148]:
def skeleton_csv(video_path ='', output_video_path ='', make_video=True, output_video_name='skeleton_video'):
    try:
        # Initialize MediaPipe Pose
        mp_pose = mp.solutions.pose
        mp_drawing = mp.solutions.drawing_utils
        pose = mp_pose.Pose(static_image_mode=False, model_complexity=1, enable_segmentation=False, min_detection_confidence=0.5)

        cap = cv.VideoCapture(video_path)
        if not cap.isOpened():
            print("Error: Unable to open video file at path:", video_path)
            return None
        all_landmarks = []

        # Get video properties for output video
        frame_width = int(cap.get(cv.CAP_PROP_FRAME_WIDTH))
        frame_height = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT))
        fps = cap.get(cv.CAP_PROP_FPS)

        # Define the codec and create VideoWriter object
        out = None
        if make_video:
            output_video_name = output_video_name + '.mp4'
            output_video_path = os.path.join(output_video_path, output_video_name)
            out = cv.VideoWriter(output_video_path, cv.VideoWriter_fourcc(*'mp4v'), fps, (frame_width, frame_height))

        while cap.isOpened():
            success, image = cap.read()
            if not success:
                break

            image_rgb = cv.cvtColor(image, cv.COLOR_BGR2RGB)
            results = pose.process(image_rgb)

            if results.pose_landmarks:
                # Draw the pose annotation on the image
                mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

                # Extract and store landmarks
                ref_x = (results.pose_landmarks.landmark[23].x + results.pose_landmarks.landmark[24].x) / 2
                ref_y = (results.pose_landmarks.landmark[23].y + results.pose_landmarks.landmark[24].y) / 2
                ref_z = (results.pose_landmarks.landmark[23].z + results.pose_landmarks.landmark[24].z) / 2

                landmarks = []
                for lm in results.pose_landmarks.landmark:
                    rel_x, rel_y, rel_z = lm.x - ref_x, lm.y - ref_y, lm.z - ref_z
                    landmarks.extend([rel_x, rel_y, rel_z, lm.visibility])
                all_landmarks.append(landmarks)

                if make_video:
                    out.write(image)  # Write the frame with pose to the output video

        cap.release()
        if make_video:
            out.release()

        df = pd.DataFrame(all_landmarks)

        #return df
        skeletons_tensor = torch.tensor(all_landmarks, dtype=torch.float32)
        print(skeletons_tensor.shape)
        return skeletons_tensor
    except:
        return torch.Tensor([1])  # 오류 발생 시 기본 텐서 반환


In [149]:
def skeleton_csv90frames(video_path='', output_video_path='', make_video=True, output_video_name='skeleton_video'):

    
    # try:
    #     ffmpeg.probe(video_path)
    # except:
    #     print(f"FFmpeg error for file {video_path}: {e.stderr}")
    #     return torch.Tensor([1])  # Return a default tensor if FFmpeg cannot read the file


    # Initialize MediaPipe Pose
    mp_pose = mp.solutions.pose
    mp_drawing = mp.solutions.drawing_utils
    pose = mp_pose.Pose(static_image_mode=False, model_complexity=1, enable_segmentation=False, min_detection_confidence=0.5)

    cap = cv.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error: Unable to open video file at path:", video_path)
        return torch.Tensor([1]) # prevent from None going in to dataset (if not, learning stops)
    all_landmarks = []

    # Get video properties for output video
    frame_width = int(cap.get(cv.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv.CAP_PROP_FPS)

    # Define the codec and create VideoWriter object
    out = None
    if make_video: 
        output_video_name = output_video_name + '.mp4'
        output_video_path = os.path.join(output_video_path, output_video_name)
        out = cv.VideoWriter(output_video_path, cv.VideoWriter_fourcc(*'mp4v'), fps, (frame_width, frame_height))

    frame_count = 0
    max_frames = 30  # Process only first 90 frames

    while cap.isOpened() and frame_count < max_frames:
        success, image = cap.read()
        if not success:
            break

        image_rgb = cv.cvtColor(image, cv.COLOR_BGR2RGB)
        results = pose.process(image_rgb)

        if results.pose_landmarks:
            # Draw the pose annotation on the image
            mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

            # Extract and store landmarks
            ref_x = (results.pose_landmarks.landmark[23].x + results.pose_landmarks.landmark[24].x) / 2
            ref_y = (results.pose_landmarks.landmark[23].y + results.pose_landmarks.landmark[24].y) / 2
            ref_z = (results.pose_landmarks.landmark[23].z + results.pose_landmarks.landmark[24].z) / 2

            landmarks = []
            for lm in results.pose_landmarks.landmark:
                rel_x, rel_y, rel_z = lm.x - ref_x, lm.y - ref_y, lm.z - ref_z
                landmarks.extend([rel_x, rel_y, rel_z, lm.visibility])
            all_landmarks.append(landmarks)

            if make_video:
                out.write(image)  # Write the frame with pose to the output video

        frame_count += 1  # Increment frame count

    cap.release()
    if make_video:
        out.release()


    skeletons_tensor = torch.tensor(all_landmarks, dtype=torch.float32)
    if skeletons_tensor is None:
        skeletons_tensor = torch.Tensor([1]) # Prevent from None type return
    return skeletons_tensor



# Model

In [150]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.fc(out[:, -1, :])
        return out

# Get dataset - video

In [151]:
class UCF4Dataset(Dataset):
    def __init__(self, directory):
        ds_store_path = os.path.join(directory, '.DS_Store')
        if os.path.exists(ds_store_path):
            os.remove(ds_store_path)
        self.directory = directory
        self.classes = os.listdir(directory)
        self.data = []

        # ignore ._ files  (Mac os system folder)
        for class_name in self.classes:
            class_path = os.path.join(directory, class_name)
            for video in os.listdir(class_path):
                if not video.startswith('.'):
                    self.data.append((os.path.join(class_path, video), class_name))
        print(self.classes)
        print(len(self.classes))
        # MediaPipe 초기화
        self.mp_pose = mp.solutions.pose
        self.pose = self.mp_pose.Pose(static_image_mode=False, model_complexity=1)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        video_path, class_name = self.data[idx]
        skeletons = skeleton_csv(video_path=video_path,make_video=False)

        label = self.classes.index(class_name)
        return skeletons, label

## Function simplifyer

In [152]:
def extract_skeleton(video_path, pose):
    cap = cv.VideoCapture(video_path)
    if not cap.isOpened():
        print("비디오 파일을 열 수 없습니다:", video_path)
        return None

    all_landmarks = []
    while cap.isOpened():
        success, image = cap.read()
        if not success:
            break

        image_rgb = cv.cvtColor(image, cv.COLOR_BGR2RGB)
        results = pose.process(image_rgb)

        if results.pose_landmarks:
            landmarks = []
            for lm in results.pose_landmarks.landmark:
                landmarks.extend([lm.x, lm.y, lm.z, lm.visibility])
            all_landmarks.append(landmarks)

    cap.release()
    return all_landmarks

In [153]:
def convert_to_relative_position(all_landmarks):
    for landmarks in all_landmarks:
        # 여기서는 예를 들어 힙 중심을 기준으로 상대적 위치를 계산합니다.
        ref_x = (landmarks[23*4] + landmarks[24*4]) / 2
        ref_y = (landmarks[23*4+1] + landmarks[24*4+1]) / 2
        ref_z = (landmarks[23*4+2] + landmarks[24*4+2]) / 2

        for i in range(0, len(landmarks), 4):
            landmarks[i] -= ref_x
            landmarks[i+1] -= ref_y
            landmarks[i+2] -= ref_z

    return all_landmarks

In [154]:
def save_to_csv(all_landmarks, output_csv_path):
    df = pd.DataFrame(all_landmarks)
    df.to_csv(output_csv_path, index=False)

In [155]:
def create_skeleton_video(video_path, output_video_path, pose):
    # mp_drawing 모듈 로드
    mp_drawing = mp.solutions.drawing_utils

    cap = cv.VideoCapture(video_path)
    if not cap.isOpened():
        print("비디오 파일을 열 수 없습니다:", video_path)
        return

    frame_width = int(cap.get(cv.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv.CAP_PROP_FPS)

    out = cv.VideoWriter(output_video_path, cv.VideoWriter_fourcc(*'mp4v'), fps, (frame_width, frame_height))

    while cap.isOpened():
        success, image = cap.read()
        if not success:
            break

        image_rgb = cv.cvtColor(image, cv.COLOR_BGR2RGB)
        results = pose.process(image_rgb)

        if results.pose_landmarks:
            mp_drawing.draw_landmarks(image, results.pose_landmarks, mp.solutions.pose.POSE_CONNECTIONS)

        out.write(image)

    cap.release()
    out.release()


In [156]:
class UCF4Dataset_output_video(Dataset):
    def __init__(self, directory):
        # define your self
        #self.output_base_path = '..\\data\\UCF101\\UCF-101_outputvideo' #'..\\data\\UCF101\\UCF-101
        self.output_base_path = '..\\data\\UCF101\\UCF-101 - skell' #'..\\data\\UCF101\\UCF-101

        self.create_skeleton_video = 1
        # MediaPipe 초기화
        self.mp_pose = mp.solutions.pose
        self.pose = self.mp_pose.Pose(static_image_mode=False, model_complexity=1)


        # data cleaning
        ds_store_path = os.path.join(directory, '.DS_Store')
        if os.path.exists(ds_store_path):
            os.remove(ds_store_path)
        self.directory = directory
        self.classes = os.listdir(directory)
        self.data = []
        

        # ignore ._ files  (Mac os system folder)
        for class_name in self.classes:
            class_path = os.path.join(directory, class_name)
            for video in os.listdir(class_path):
                if not video.startswith('.'):
                    self.data.append((os.path.join(class_path, video), class_name))
        print(self.classes)
        print(len(self.classes))


    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
            video_path, class_name = self.data[idx]

            # 원본 폴더 구조를 새로운 출력 폴더에 복제
            relative_path = os.path.relpath(video_path, self.directory)
            output_video_path = os.path.join(self.output_base_path, relative_path)
            
            # 출력 디렉토리가 존재하는지 확인, 없으면 생성
            output_video_dir = os.path.dirname(output_video_path)
            if not os.path.exists(output_video_dir):
                os.makedirs(output_video_dir)

            # 원본 비디오 경로를 기반으로 고유한 출력 비디오 이름 생성
            base_name = os.path.basename(video_path)
            name, ext = os.path.splitext(base_name)
            ext = '.mp4'
            output_video_name = f"{name}_skeleton{ext}"

            # 출력 비디오 이름을 출력 경로에 추가
            output_video_full_path = os.path.join(output_video_dir, output_video_name)

            # 스켈레톤 데이터 추출
            all_landmarks = extract_skeleton(video_path, self.pose)

            # 상대 위치로 변환
            all_landmarks = convert_to_relative_position(all_landmarks)

            # 선택적으로 CSV 저장
            # save_to_csv(all_landmarks, 'output.csv')

            # 선택적으로 동영상 생성
            if self.create_skeleton_video:
                create_skeleton_video(video_path, output_video_full_path, self.pose)

            # 텐서로 변환
            skeletons_tensor = torch.tensor(all_landmarks, dtype=torch.float32)

            label = self.classes.index(class_name)
            return skeletons_tensor, label

In [157]:
# 데이터셋 및 데이터 로더 초기화
#dataset_path = '..\\data\\UCF101\\UCF4 small'  # 소규모 추출된 데이터셋의 경로
dataset_path = '..\\data\\UCF101\\UCF-101' #all dataset!


#dataset = UCF4Dataset(dataset_path)
dataset = UCF4Dataset_output_video(dataset_path) # Output the video too

# 데이터셋 크기 정의
dataset_size = len(dataset)
train_size = int(dataset_size * 0.7)  # 70%를 훈련 데이터로 사용
val_size = int(dataset_size * 0.15)  # 15%를 검증 데이터로 사용
test_size = dataset_size - train_size - val_size  # 나머지를 테스트 데이터로 사용

# Split
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

# Create Dataset
Batch_Size = 1

train_loader = DataLoader(train_dataset, batch_size=Batch_Size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=Batch_Size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=Batch_Size, shuffle=True)

['ApplyEyeMakeup', 'ApplyLipstick', 'Archery', 'BabyCrawling', 'BalanceBeam', 'BandMarching', 'BaseballPitch', 'Basketball', 'BasketballDunk', 'BenchPress', 'Biking', 'Billiards', 'BlowDryHair', 'BlowingCandles', 'BodyWeightSquats', 'Bowling', 'BoxingPunchingBag', 'BoxingSpeedBag', 'BreastStroke', 'BrushingTeeth', 'CleanAndJerk', 'CliffDiving', 'CricketBowling', 'CricketShot', 'CuttingInKitchen', 'Diving', 'Drumming', 'Fencing', 'FieldHockeyPenalty', 'FloorGymnastics', 'FrisbeeCatch', 'FrontCrawl', 'GolfSwing', 'Haircut', 'Hammering', 'HammerThrow', 'HandstandPushups', 'HandstandWalking', 'HeadMassage', 'HighJump', 'HorseRace', 'HorseRiding', 'HulaHoop', 'IceDancing', 'JavelinThrow', 'JugglingBalls', 'JumpingJack', 'JumpRope', 'Kayaking', 'Knitting', 'LongJump', 'Lunges', 'MilitaryParade', 'Mixing', 'MoppingFloor', 'Nunchucks', 'ParallelBars', 'PizzaTossing', 'PlayingCello', 'PlayingDaf', 'PlayingDhol', 'PlayingFlute', 'PlayingGuitar', 'PlayingPiano', 'PlayingSitar', 'PlayingTabla', 'P

In [158]:
len(train_loader),len(val_loader),len(test_loader)

(9324, 1998, 1998)

# Parameter

In [159]:
# Define the LSTM model
input_size = 132  # Adjust based on your skeleton data
hidden_size = 128
num_layers = 5
num_classes = 4  # Adjust based on the number of classes in your dataset
num_classes = 101 # Base on UCF101 origianl
model = LSTMModel(input_size, hidden_size, num_layers, num_classes)
if multi_gpu:
    model = nn.DataParallel(model)
model.to(device)

DataParallel(
  (module): LSTMModel(
    (lstm): LSTM(132, 128, num_layers=5, batch_first=True)
    (fc): Linear(in_features=128, out_features=101, bias=True)
  )
)

In [160]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = ReduceLROnPlateau(optimizer, 'min', patience=1, factor=0.5, min_lr=0.0001)

In [161]:
# Initialize variables to track the best model
best_val_loss = float('inf')
best_model = None

# Trackers for graph
train_losses = []
val_losses = []

In [162]:
from tqdm import tqdm
# Training loop
num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for skeletons, labels in tqdm(train_loader):
        skeletons, labels = skeletons.to(device), labels.to(device)

        try:
            outputs = model(skeletons)
        except:
            continue

        loss = criterion(outputs, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    avg_train_loss = total_loss / len(train_loader)
    train_losses.append(avg_train_loss)

    # 검증 루프
    model.eval()
    total_val_loss = 0
    with torch.no_grad():
        for skeletons, labels in val_loader:
            skeletons, labels = skeletons.to(device), labels.to(device)
            try:
                outputs = model(skeletons)
            except:
                continue

            loss = criterion(outputs, labels)
            total_val_loss += loss.item()

    avg_val_loss = total_val_loss / len(val_loader)
    val_losses.append(avg_val_loss)

    # 최고 모델 저장
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        best_model = model.state_dict()

    scheduler.step(avg_val_loss)

    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss}, Val Loss: {avg_val_loss}')


# Save the best model
torch.save(best_model, '../OUTPUT/ucf101_model.pth')

  0%|          | 17/9324 [03:02<32:32:01, 12.58s/it]

In [None]:
# Plotting train and validation losses
plt.plot(train_losses, label='Training Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
# Test loop for accuracy
correct = 0
total = 0
with torch.no_grad():
    for skeletons, labels in test_loader:
        try:
            outputs = model(skeletons)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        except:
            pass

accuracy = 100 * correct / total
print(f'Accuracy on test set: {accuracy}%')