In [1]:
import os
import pandas as pd

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.optim.lr_scheduler import StepLR

import numpy as np
import random

from copy import deepcopy

In [2]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True
    
seed_everything(12)

In [3]:
class skeleton_LSTM(nn.Module):
    def __init__(self, feature_dim, output_dim):
        super(skeleton_LSTM, self).__init__()
        
        self.feature_dim = feature_dim
        self.output_dim = output_dim
        
        self.lstm1 = nn.LSTM(input_size=self.feature_dim, hidden_size=128, num_layers=1, batch_first=True)
        self.layer_norm1 = nn.LayerNorm(128)
        
        self.lstm2 = nn.LSTM(input_size=128, hidden_size=256, num_layers=1, batch_first=True)
        self.layer_norm2 = nn.LayerNorm(256)
        
        self.lstm3 = nn.LSTM(input_size=256, hidden_size=512, num_layers=1, batch_first=True)
        self.layer_norm3 = nn.LayerNorm(512)
        
        self.fc1 = nn.Linear(512,256)
        self.fc2 = nn.Linear(256,output_dim)
        
    def forward(self, x):
        x, _ = self.lstm1(x)
        x = self.layer_norm1(x)
        
        x, _ = self.lstm2(x)
        x = self.layer_norm2(x)
        
        x, (hn, cn) = self.lstm3(x)
        x = self.layer_norm3(x)
        
        x = F.relu(self.fc1(x[:,-1,:]))
        embedding = self.fc2(x)
        
        return embedding


In [4]:
class head(nn.Module) :
    def __init__(self):
        super(head, self).__init__()
        
        # Feedforward layers
        self.fc1 = nn.Linear(64, 32)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(32, 1)  # Output layer has 1 unit for binary classification
        self.sigmoid = nn.Sigmoid()  # Sigmoid for probability output

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.sigmoid(x)
        return x

    

In [5]:
import os
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset

angle = [['left_biceps', 'left_forearm'],
         ['right_biceps', 'right_forearm'],
         ['between_shoulders', 'left_body'],
         ['between_shoulders', 'right_body'],
         ['between_shoulders', 'rigth_neck'],
         ['between_shoulders', 'left_neck'],
         ['between_pelvis','left_thigh'],
         ['between_pelvis','right_thigh'],
         ['right_thigh','right_calf'],
         ['left_thigh','left_calf'],
         ['right_body','right_thigh'],
         ['left_body','left_thigh']
        ]
         

body_parts = {'left_biceps': [11, 13],
              'left_forearm': [13, 15],
              'right_biceps': [12, 14],
              'right_forearm': [14, 16],
              'between_shoulders': [11, 12],
              'left_body': [11, 23],
              'right_body': [12, 24],
              'between_pelvis': [23, 24],
              'left_thigh': [23, 25],
              'left_calf': [25, 27],
              'right_thigh': [24, 26],
              'right_calf': [26, 28],
              'left_neck': [9, 11],
              'rigth_neck': [10, 12]}


def calculate_angles(matrix1, matrix2):
    dot_product = np.einsum('ij,ij->i', matrix1, matrix2)
    norm1 = np.linalg.norm(matrix1, axis=1)
    norm2 = np.linalg.norm(matrix2, axis=1)
    cos_theta = dot_product / (norm1 * norm2)
    angles = np.arccos(np.clip(cos_theta, -1.0, 1.0))
    return angles


def make_df_angle(path):
    df = pd.read_csv(path)
    df_angle = pd.DataFrame()

    for body_parts1, body_parts2 in angle:
        body_parts1_vec = body_parts[body_parts1]
        body_parts2_vec = body_parts[body_parts2]

        # 벡터 계산
        vec_mat1 = df.iloc[:, body_parts1_vec[0]*3+1:body_parts1_vec[0]*3+4].values - df.iloc[:, body_parts1_vec[1]*3+1:body_parts1_vec[1]*3+4].values
        vec_mat2 = df.iloc[:, body_parts2_vec[0]*3+1:body_parts2_vec[0]*3+4].values - df.iloc[:, body_parts2_vec[1]*3+1:body_parts2_vec[1]*3+4].values

        angles = calculate_angles(vec_mat1, vec_mat2)
        df_angle[f'{body_parts1}_{body_parts2}'] = angles
        
    df_angle = df_angle.replace([np.inf, -np.inf], 0.0)
    df_angle = df_angle.fillna(0.0)


    return df_angle



In [6]:
class LandmarkDataset(Dataset):
    def __init__(self,path):
        self.root_dir = path
        self.data = []
        self.labels = []
        self.label_to_indices = {}
        self.min_sequence_length = float('inf')

        # 디렉토리 탐색 및 최소 시퀀스 길이 계산
        for dance_name in os.listdir(self.root_dir):
            dance_path = os.path.join(self.root_dir, dance_name)
            if os.path.isdir(dance_path):
                for csv_file in os.listdir(dance_path):
                    # '_F'로 끝나는 파일은 제외
                    if csv_file.endswith(".csv") and not csv_file.endswith("_F.csv"):
                        file_path = os.path.join(dance_path, csv_file)
                        self.data.append(file_path)
                        self.labels.append(dance_name)

                        if dance_name not in self.label_to_indices:
                            self.label_to_indices[dance_name] = []
                        self.label_to_indices[dance_name].append(len(self.data) - 1)

                        # 각 CSV 파일의 시퀀스 길이를 체크하여 최소 시퀀스 길이 업데이트
                        df_angle = make_df_angle(file_path)
                        sequence_length = len(df_angle)
                        if sequence_length < self.min_sequence_length:
                            self.min_sequence_length = sequence_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # 현재 샘플의 라벨 및 파일 경로
        label = self.labels[idx]
        file_path1 = self.data[idx]

        # 같은 클래스의 다른 파일을 선택하여 positive 쌍 구성
        positive_idx = np.random.choice(self.label_to_indices[label])
        while positive_idx == idx:
            positive_idx = np.random.choice(self.label_to_indices[label])
        file_path2 = self.data[positive_idx]

        # 랜덤으로 다른 클래스의 샘플을 선택하여 negative 쌍 구성
        neg_label = np.random.choice([l for l in self.label_to_indices if l != label])
        negative_idx = np.random.choice(self.label_to_indices[neg_label])
        file_path3 = self.data[negative_idx]

        # 각 파일에서 관절 간 각도를 계산
        angles1 = make_df_angle(file_path1).values[:self.min_sequence_length]
        angles2 = make_df_angle(file_path2).values[:self.min_sequence_length]
        angles3 = make_df_angle(file_path3).values[:self.min_sequence_length]

        # numpy array를 torch tensor로 변환
        angles1 = torch.tensor(angles1, dtype=torch.float32)
        angles2 = torch.tensor(angles2, dtype=torch.float32)
        angles3 = torch.tensor(angles3, dtype=torch.float32)

        # Positive 쌍은 (angles1, angles2), negative 쌍은 (angles1, angles3)
        return angles1, angles2, angles3

    def load_landmark(self, file_path):
        df = pd.read_csv(file_path)
        df = df.drop(columns=['filename'])
        landmarks = df.values[:self.min_sequence_length]
        return landmarks

In [7]:
class TripletContrastiveLoss(nn.Module):
    def __init__(self, temperature=0.1):
        super(TripletContrastiveLoss, self).__init__()
        self.temperature = temperature

    def forward(self, anchor, positive, negative):
        # Normalize features
        anchor, positive, negative = F.normalize(anchor, dim=1), F.normalize(positive, dim=1), F.normalize(negative, dim=1)
        
        # Calculate similarities
        pos_sim = torch.exp(torch.sum(anchor * positive, dim=1) / self.temperature)  # Anchor-Positive similarity
        neg_sim = torch.exp(torch.sum(anchor * negative, dim=1) / self.temperature)  # Anchor-Negative similarity

        # Loss calculation: maximize anchor-positive similarity, minimize anchor-negative similarity
        loss = -torch.log(pos_sim / (pos_sim + neg_sim)).mean()
        return loss


In [8]:
# 데이터셋과 데이터로더
batch_size = 8
train_dataset = LandmarkDataset('/kaggle/input/nipa-sample/sample_video')
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

val_dataset = LandmarkDataset('/kaggle/input/nipa-val')
val_dataloader = DataLoader(val_dataset, batch_size=len(val_dataset), shuffle=False)

In [9]:
# 하이퍼파라미터
feature_dim = len(angle)
output_dim = 64
num_epochs = 50
learning_rate = 0.001
temperature = .05
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [10]:
# 모델, 손실 함수, 옵티마이저 초기화
model = skeleton_LSTM(feature_dim, output_dim).to(device)
classification = head().to(device)

criterion1 = TripletContrastiveLoss(temperature=temperature)
criterion2 = nn.BCELoss()
criterion3 = nn.BCELoss()

optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
scheduler = StepLR(optimizer, step_size=10, gamma=0.1)

In [11]:
# 학습 루프

best_val_loss = np.inf
patience = 10
epochs_no_improve = 0

for epoch in range(num_epochs):
    train_loss = 0.0
    model.train()
    classification.train()
    
    for batch_idx, (anchor, pos, neg) in enumerate(train_dataloader):
            
        optimizer.zero_grad()

        # Positive 쌍과 Negative 쌍을 모델에 각각 통과
        
        anchor = anchor.to(device)
        pos = pos.to(device)
        neg = neg.to(device)
        
        anchor_emb = model(anchor)
        pos_emb = model(pos)
        neg_emb = model(neg)

        # Contrastive Loss 계산
        loss1 = criterion1(anchor_emb, pos_emb, neg_emb)
        
        pos_classification = classification(torch.add(anchor_emb,pos_emb))
        loss2 = criterion2(pos_classification,torch.full((pos_classification.shape[0],1),1.).to(device))
        
        neg_classification = classification(torch.add(anchor_emb,neg_emb))
        loss3 = criterion2(neg_classification,torch.full((pos_classification.shape[0],1),0.).to(device))
        
        loss = loss1+loss2+loss3
        train_loss += loss1.item()

        loss.backward()
        optimizer.step()
        scheduler.step()
        
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for batch_idx, (anchor, pos, neg) in enumerate(val_dataloader):
            anchor, pos, neg = anchor.to(device), pos.to(device), neg.to(device)

            anchor_emb = model(anchor)
            pos_emb = model(pos)
            neg_emb = model(neg)
            
            loss = criterion1(anchor_emb, pos_emb, neg_emb)
            val_loss += loss.item()
            
            
    # Early Stopping 체크
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        epochs_no_improve = 0  # Improvement이 있으면 카운트 리셋
        best_model = deepcopy(model)
    else:
        epochs_no_improve += 1

    if epochs_no_improve >= patience:
        print("Early stopping triggered!")
        break



#         if batch_idx % 10 == 0:
#             print(
#                 f'Epoch [{epoch + 1}/{num_epochs}], Step [{batch_idx + 1}/{len(dataloader)}], Loss: {loss.item():.4f}')

    # 에폭마다 평균 손실을 기록
    print(f'Epoch [{epoch + 1}/{num_epochs}], Train Average Loss: {train_loss / len(train_dataloader):.4f}, Validation Average Loss: {val_loss:.4f}')

print("Training complete!")

Epoch [1/50], Train Average Loss: 0.3646, Validation Average Loss: 0.3160
Epoch [2/50], Train Average Loss: 0.1248, Validation Average Loss: 0.2395
Epoch [3/50], Train Average Loss: 0.1474, Validation Average Loss: 0.4226
Epoch [4/50], Train Average Loss: 0.0874, Validation Average Loss: 0.4106
Epoch [5/50], Train Average Loss: 0.0992, Validation Average Loss: 0.2325
Epoch [6/50], Train Average Loss: 0.0399, Validation Average Loss: 0.4275
Epoch [7/50], Train Average Loss: 0.0748, Validation Average Loss: 0.3469
Epoch [8/50], Train Average Loss: 0.0880, Validation Average Loss: 0.1923
Epoch [9/50], Train Average Loss: 0.0085, Validation Average Loss: 0.3125
Epoch [10/50], Train Average Loss: 0.0513, Validation Average Loss: 0.4540
Epoch [11/50], Train Average Loss: 0.0499, Validation Average Loss: 0.2576
Epoch [12/50], Train Average Loss: 0.0481, Validation Average Loss: 0.2070
Epoch [13/50], Train Average Loss: 0.0837, Validation Average Loss: 0.5151
Epoch [14/50], Train Average Loss:

In [27]:
# 유클리디언 거리 계산
best_model.eval()

origin_path = '/kaggle/input/gmb-nipa/landmarks/landmarks/기본항목 집합곡 5/landmarks_3d_L.csv'
pos_path = '/kaggle/input/gmb-nipa/landmarks/landmarks/기본항목 집합곡 5/landmarks_3d_P.csv'
neg_path = '/kaggle/input/gmb-nipa/landmarks/landmarks/Only One (보아)/landmarks_3d_P.csv'

origin = make_df_angle(origin_path)
pos = make_df_angle(pos_path)
neg = make_df_angle(neg_path)

origin_input = torch.tensor(origin.iloc[:44,].values).unsqueeze(0).to(torch.float32).to(device)
pos_input = torch.tensor(pos.iloc[:44,].values).unsqueeze(0).to(torch.float32).to(device)
neg_input = torch.tensor(neg.iloc[:44,].values).unsqueeze(0).to(torch.float32).to(device)

origin_emb = best_model(origin_input)
pos_emb = best_model(pos_input)
neg_emb = best_model(neg_input)

pos_dist = torch.pow(F.pairwise_distance(origin_emb, pos_emb), 2)
neg_dist = torch.pow(F.pairwise_distance(origin_emb, neg_emb), 2)
print(pos_dist)
print(neg_dist)
neg_dist/pos_dist

tensor([0.2757], device='cuda:0', grad_fn=<PowBackward0>)
tensor([12.5613], device='cuda:0', grad_fn=<PowBackward0>)


  result = _VF.lstm(input, hx, self._flat_weights, self.bias, self.num_layers,


tensor([45.5628], device='cuda:0', grad_fn=<DivBackward0>)

In [28]:
# 정규화 후, 유클리디언 거리 계산
best_model.eval()

origin = make_df_angle(origin_path)
pos = make_df_angle(pos_path)
neg = make_df_angle(neg_path)

origin_input = torch.tensor(origin.iloc[:44,].values).unsqueeze(0).to(torch.float32).to(device)
pos_input = torch.tensor(pos.iloc[:44,].values).unsqueeze(0).to(torch.float32).to(device)
neg_input = torch.tensor(neg.iloc[:44,].values).unsqueeze(0).to(torch.float32).to(device)

origin_emb = F.normalize(best_model(origin_input), dim=1)
pos_emb = F.normalize(best_model(pos_input), dim=1)
neg_emb = F.normalize(best_model(neg_input), dim=1)

pos_dist = torch.pow(F.pairwise_distance(origin_emb, pos_emb), 2)
neg_dist = torch.pow(F.pairwise_distance(origin_emb, neg_emb), 2)
print(pos_dist)
print(neg_dist)
neg_dist/pos_dist

tensor([0.0052], device='cuda:0', grad_fn=<PowBackward0>)
tensor([0.2902], device='cuda:0', grad_fn=<PowBackward0>)


  result = _VF.lstm(input, hx, self._flat_weights, self.bias, self.num_layers,


tensor([55.4845], device='cuda:0', grad_fn=<DivBackward0>)

In [14]:
from time import time

model.eval()

start = time()
origin_path = '/kaggle/input/nipa-sample/sample_video/100 (슈퍼엠)/landmarks_3d_L.csv'
origin = make_df_angle(origin_path)
origin_input = torch.tensor(origin.iloc[:44,].values).unsqueeze(0).to(torch.float32).to(device)

origin_emb = F.normalize(best_model(origin_input),dim=1)
time()-start

  result = _VF.lstm(input, hx, self._flat_weights, self.bias, self.num_layers,


0.0372929573059082