In [1]:
import cv2
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import os
from PIL import Image
import numpy as np

In [2]:
def getKeypoints(probMap, threshold=0.1):
    mapSmooth = cv2.GaussianBlur(probMap, (3,3), 0, 0)
    mapMask = np.uint8(mapSmooth > threshold)
    keypoints = []

    contours, _ = cv2.findContours(mapMask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)

    for cnt in contours:
        blobMask = np.zeros(mapMask.shape)
        blobMask = cv2.fillConvexPoly(blobMask, cnt, 1)
        maskedProbMap = mapSmooth * blobMask
        _, maxVal, _, maxLoc = cv2.minMaxLoc(maskedProbMap)
        keypoints.append((*maxLoc, maxVal))  # (x, y, confidence)

    return keypoints

In [3]:
import numbers
import logging
logging.basicConfig(level=logging.DEBUG)
def keypoints_to_vector(detected_keypoints, nPoints=18):
    """
    OpenPose에서 탐지된 관절 위치 데이터를 특징 벡터로 변환

    Parameters:
    detected_keypoints (list): 각 관절의 탐지된 위치 정보를 담고 있는 list
                               각 요소는 (x, y, confidence) 형태의t tuple
    nPoints (int): 모델에서 탐지하는 관절의 총 개수 - coco 18개

    Returns:
    numpy.ndarray: 모든 관절 위치를 표현하는 1D 벡터. 관절이 탐지되지 않은 경우 (0, 0)처리
    """

    # 각 관절의 x, y, confidence를 위한 공간을 확보
    feature_vector = np.zeros(nPoints * 3)  # nPoints에 맞는 크기 확인

    for i, keypoint in enumerate(detected_keypoints):
        # i가 nPoints를 넘지 않도록 합니다. 루프는 0부터 nPoints-1까지만 돌아야 합니다.
        if i >= nPoints:
            break  # nPoints 이상의 인덱스에 대해서는 처리하지 않고 루프 종료

        if keypoint:  # 관절이 탐지된 경우
            x, y, confidence = keypoint
            index = i * 3
            feature_vector[index] = x
            feature_vector[index + 1] = y
            feature_vector[index + 2] = confidence

    return feature_vector


In [4]:
import cv2
import numpy as np
import os

def process_image_with_openpose(image, net, inHeight=368, nPoints=18):
    frameWidth = image.shape[1]
    frameHeight = image.shape[0]
    inWidth = int((inHeight / frameHeight) * frameWidth)

    inpBlob = cv2.dnn.blobFromImage(image, 1.0 / 255, (inWidth, inHeight), (0, 0, 0), swapRB=False, crop=False)
    net.setInput(inpBlob)
    output = net.forward()

    detected_keypoints = []
    threshold = 0.1
    logging.debug("Processing image for keypoints...")
    for part in range(nPoints):
        probMap = output[0, part, :, :]
        probMap = cv2.resize(probMap, (frameWidth, frameHeight))
        keypoints = getKeypoints(probMap, threshold)
        logging.debug(f"Part {part}: Detected {len(keypoints)} keypoints.")
        for keypoint in keypoints:
            # getKeypoints 함수로부터 (x, y, confidence) 형태로 추출된 각 관절 정보를 detected_keypoints에 추가
            logging.debug(f"Keypoint: {keypoint}")
            detected_keypoints.append(keypoint)

    # 모든 관절 정보를 특징 벡터로 변환
    feature_vector = keypoints_to_vector(detected_keypoints, nPoints)
    logging.debug(f"Feature vector size: {len(feature_vector)}")
    logging.debug(f"Feature vector: {feature_vector}")
    return feature_vector

In [5]:
import cv2
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import os
from PIL import Image

class YogaPoseDataset(Dataset):
    def __init__(self, csv_file, img_dir, net, transform=None):
        self.annotations = pd.read_csv(csv_file)
        self.annotations = self.annotations[self.annotations['pose_name'].str.lower().str.startswith('a')]
        self.img_dir = img_dir
        self.net = net
        self.transform = transform

        unique_pose_ids = sorted(self.annotations['pose_id'].unique())
        self.pose_id_to_index = {pose_id: idx for idx, pose_id in enumerate(unique_pose_ids)}
        print(f"Filtered dataset size: {len(self.annotations)}")

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, index):
        img_path = os.path.join(self.img_dir, self.annotations.iloc[index, 0])
        image = Image.open(img_path).convert('RGB')
        pose_id = self.annotations.iloc[index, 1]
        pose_id = self.pose_id_to_index[pose_id]  # pose_id를 인덱스로
        pose_id = torch.tensor(pose_id, dtype=torch.long)

        if self.transform:
            image = self.transform(image)

        # OpenPose에서 특징 벡터 추출
        feature_vector = process_image_with_openpose(cv2.imread(img_path), self.net)

        # 특징 벡터를 Tensor로 변환
        feature_vector = torch.tensor(feature_vector, dtype=torch.float)

        pose_id = torch.tensor(int(pose_id), dtype=torch.long)

        return image, feature_vector, pose_id

In [6]:
from torch.utils.data import DataLoader
from torchvision import transforms


transform = transforms.Compose([
    #transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 정규화
])


protoFile = "/content/drive/MyDrive/openpose/pose_deploy_linevec_faster_4_stages.prototxt"
weightsFile = "/content/drive/MyDrive/openpose/pose_iter_160000.caffemodel"
net = cv2.dnn.readNetFromCaffe(protoFile, weightsFile)

train_dataset = YogaPoseDataset(
    csv_file='/content/drive/MyDrive/openpose/dataset/train_dataset.csv',
    img_dir='/content/drive/MyDrive/openpose/dataset',
    net=net,
    transform=transform
)


train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

Filtered dataset size: 171


In [7]:
import pandas as pd

# CSV 파일 경로
csv_file_path = '/content/drive/MyDrive/openpose/dataset/train_dataset.csv'

# 데이터프레임으로 읽기
data = pd.read_csv(csv_file_path)
data.head()

Unnamed: 0,file_name,pose_id,pose_name
0,yoganidrasana/46-0.png,94,Yoganidrasana
1,marjaryasana/32-0.png,7,Marjaryasana
2,yoganidrasana/45-0.png,94,Yoganidrasana
3,vasisthasana/30-0.png,73,Vasisthasana
4,halasana/51-1.png,59,Halasana


In [8]:
import cv2
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import matplotlib

from random import randint
from pathlib import Path
import json

from collections import defaultdict

import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
from torchvision.models import resnet34
device = torch.device('cuda:0')
device

device(type='cuda', index=0)

### RandomForest Classifier


In [None]:
# 1. Random Forest Classifier
from sklearn.ensemble import RandomForestClassifier
model=RandomForestClassifier(n_estimators=100,
                             criterion='gini',
                             min_samples_split=2,
                             min_samples_leaf=1,
                             max_features='sqrt',
                             bootstrap=False,
                             random_state=1)
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
# 데이터 로드 및 준비
features = []
labels = []
for i in range(len(train_dataset)):
    image, feature_vector, pose_id = train_dataset[i]
    features.append(feature_vector.numpy())
    labels.append(int(pose_id))
features = np.array(features)
labels = np.array(labels)
# 학습 및 테스트 데이터 분할
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, random_state=42)
# 모델 정의
model = RandomForestClassifier(n_estimators=100, criterion='gini', min_samples_split=2,
                               min_samples_leaf=1, max_features='sqrt', bootstrap=False, random_state=1)
# 모델 학습
model.fit(X_train, y_train)
# 예측 및 평가
predictions = model.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
print(f"Accuracy: {accuracy * 100:.2f}%")

### KNN classifier

In [None]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
features = []
labels = []
for i in range(len(train_dataset)):
    _, feature_vector, pose_id = train_dataset[i]
    features.append(feature_vector.numpy())
    labels.append(int(pose_id))
features = np.array(features)
labels = np.array(labels)
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, random_state=42)
knn = KNeighborsClassifier(n_neighbors=3)
knn.fit(X_train, y_train)
predictions = knn.predict(X_test)
# 평가
accuracy = accuracy_score(y_test, predictions)
print(f"KNN Accuracy: {accuracy * 100:.2f}%")

In [None]:
import torch
import torch.optim as optim
def calculate_accuracy(outputs, targets):
    _, predictions = torch.max(outputs, 1)
    correct = (predictions == targets).sum().item()
    total = targets.size(0)
    accuracy = 100 * correct / total
    return accuracy

def validate(model, valid_loader, criterion):
    model.eval()
    valid_loss = 0
    valid_accuracy = 0

    with torch.no_grad():
        for images, feature_vectors, pose_ids in valid_loader:
            feature_vectors = feature_vectors.to(device)
            pose_ids = pose_ids.to(device)
            outputs = model(feature_vectors)
            loss = criterion(outputs, pose_ids)
            valid_loss += loss.item()
            valid_accuracy += calculate_accuracy(outputs, pose_ids)

    valid_loss /= len(valid_loader)
    valid_accuracy /= len(valid_loader)
    return valid_loss, valid_accuracy

def train(model, train_loader, valid_loader, criterion, optimizer, epochs=10, print_every=20):
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        correct = 0
        total = 0

        for i, (images, feature_vectors, pose_ids) in enumerate(train_loader):
            feature_vectors = feature_vectors.to(device)
            pose_ids = pose_ids.to(device)

            optimizer.zero_grad()
            outputs = model(feature_vectors)
            loss = criterion(outputs, pose_ids)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += pose_ids.size(0)
            correct += (predicted == pose_ids).sum().item()

            if (i + 1) % print_every == 0:
                print(f'Epoch {epoch+1}, Batch {i+1}, Loss: {train_loss / (i+1):.4f}')

        train_accuracy = 100 * correct / total
        valid_loss, valid_accuracy = validate(model, valid_loader, criterion)
        print(f'End of Epoch {epoch+1}, '
              f'Train Loss: {train_loss / len(train_loader):.4f}, '
              f'Train Accuracy: {train_accuracy:.2f}%, '
              f'Valid Loss: {valid_loss:.4f}, Valid Accuracy: {valid_accuracy:.2f}%\n')



valid_dataset = YogaPoseDataset(
    csv_file='/content/drive/MyDrive/openpose/dataset/valid_dataset.csv',
    img_dir='/content/drive/MyDrive/openpose/dataset',
    net=net,
    transform=transform
)
valid_loader = DataLoader(valid_dataset, batch_size=32, shuffle=False)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

train(model, train_loader, valid_loader, criterion, optimizer, epochs=10, print_every=20)

### 2 layer MLP

In [None]:
import torch.nn as nn
import torch.nn.functional as F

class PoseClassifier(nn.Module):
    def __init__(self, input_size, num_classes):
        super(PoseClassifier, self).__init__()
        self.fc1 = nn.Linear(input_size, 64)
        self.bn1 = nn.BatchNorm1d(64)
        self.activation = nn.ReLU()
        self.dropout = nn.Dropout(0.2)
        self.fc2 = nn.Linear(64, 64)
        self.bn2 = nn.BatchNorm1d(64)
        self.outlayer = nn.Linear(64, num_classes)

    def forward(self, x):
        x = self.fc1(x)
        x = self.bn1(x)
        x = self.activation(x)
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.bn2(x)
        x = self.activation(x)
        x = self.outlayer(x)
        return x


#(18개 관절 * 3 좌표)
model = PoseClassifier(input_size=54, num_classes=13)
model = model.to(device)
device

device(type='cuda', index=0)

In [None]:
import torch
import torch.optim as optim
def calculate_accuracy(outputs, targets):
    _, predictions = torch.max(outputs, 1)
    correct = (predictions == targets).sum().item()
    total = targets.size(0)
    accuracy = 100 * correct / total
    return accuracy

def validate(model, valid_loader, criterion):
    model.eval()
    valid_loss = 0
    valid_accuracy = 0

    with torch.no_grad():
        for images, feature_vectors, pose_ids in valid_loader:
            feature_vectors = feature_vectors.to(device)
            pose_ids = pose_ids.to(device)
            outputs = model(feature_vectors)
            loss = criterion(outputs, pose_ids)
            valid_loss += loss.item()
            valid_accuracy += calculate_accuracy(outputs, pose_ids)

    valid_loss /= len(valid_loader)
    valid_accuracy /= len(valid_loader)
    return valid_loss, valid_accuracy

def train(model, train_loader, valid_loader, criterion, optimizer, epochs=10, print_every=20):
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        correct = 0
        total = 0

        for i, (images, feature_vectors, pose_ids) in enumerate(train_loader):
            feature_vectors = feature_vectors.to(device)
            pose_ids = pose_ids.to(device)

            optimizer.zero_grad()
            outputs = model(feature_vectors)
            loss = criterion(outputs, pose_ids)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += pose_ids.size(0)
            correct += (predicted == pose_ids).sum().item()

            if (i + 1) % print_every == 0:
                print(f'Epoch {epoch+1}, Batch {i+1}, Loss: {train_loss / (i+1):.4f}')

        train_accuracy = 100 * correct / total
        valid_loss, valid_accuracy = validate(model, valid_loader, criterion)
        print(f'End of Epoch {epoch+1}, '
              f'Train Loss: {train_loss / len(train_loader):.4f}, '
              f'Train Accuracy: {train_accuracy:.2f}%, '
              f'Valid Loss: {valid_loss:.4f}, Valid Accuracy: {valid_accuracy:.2f}%\n')



valid_dataset = YogaPoseDataset(
    csv_file='/content/drive/MyDrive/openpose/dataset/valid_dataset.csv',
    img_dir='/content/drive/MyDrive/openpose/dataset',
    net=net,
    transform=transform
)
valid_loader = DataLoader(valid_dataset, batch_size=32, shuffle=False)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

train(model, train_loader, valid_loader, criterion, optimizer, epochs=10, print_every=20)

Filtered dataset size: 41
End of Epoch 1, Train Loss: 13.3535, Train Accuracy: 25.15%, Valid Loss: 5.7393, Valid Accuracy: 43.40%

End of Epoch 2, Train Loss: 7.1545, Train Accuracy: 36.84%, Valid Loss: 4.9700, Valid Accuracy: 29.17%

End of Epoch 3, Train Loss: 4.9706, Train Accuracy: 42.11%, Valid Loss: 3.9035, Valid Accuracy: 42.53%

End of Epoch 4, Train Loss: 3.0009, Train Accuracy: 47.95%, Valid Loss: 3.5891, Valid Accuracy: 40.10%

End of Epoch 5, Train Loss: 3.4937, Train Accuracy: 51.46%, Valid Loss: 3.2360, Valid Accuracy: 40.10%

End of Epoch 6, Train Loss: 2.6861, Train Accuracy: 54.39%, Valid Loss: 2.9362, Valid Accuracy: 48.78%

End of Epoch 7, Train Loss: 2.4243, Train Accuracy: 57.31%, Valid Loss: 2.4216, Valid Accuracy: 47.22%

End of Epoch 8, Train Loss: 2.2075, Train Accuracy: 57.31%, Valid Loss: 2.2772, Valid Accuracy: 48.78%

End of Epoch 9, Train Loss: 1.7808, Train Accuracy: 57.31%, Valid Loss: 2.3315, Valid Accuracy: 48.78%

End of Epoch 10, Train Loss: 1.2896, 