In [None]:
import json
import torch
from torch.utils.data import DataLoader, Dataset
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence

import os
import json

from sklearn.preprocessing import LabelEncoder

import numpy as np

import torch.nn as nn
import torch.optim as optim

import torch.nn.functional as F

In [None]:
import os
import json
from sklearn.preprocessing import LabelEncoder

file_path = 'keypoints/01/NIA_SL_WORD0001_REAL01_F/NIA_SL_WORD0001_REAL01_F_000000000000_keypoints.json'

with open(file_path, 'r', encoding='utf-8') as file:
    json_data = json.load(file)
# pos:70 / face:70 / 
if 'pose_keypoints_3d' in json_data['people']:
    print(len(json_data['people']['face_keypoints_3d'])//4)


In [None]:
import os
import json
from sklearn.preprocessing import LabelEncoder


def get_word_list(num_folders_start=8, num_folders_end=9):
    folder_path = 'morpheme/01'

    # 단어들을 저장할 리스트
    word_list = []
    
    # 파일 이름 얻어오기
    file_names = [f for f in os.listdir(folder_path) if f.endswith('.json') and "F_morpheme" in f]

    # 파일 이름을 번호 순서대로 정렬하기
    file_names.sort(key=lambda x: int(x.split('_')[2][4:]))

    for idx in range(num_folders_start, num_folders_end + 1):
        for filename in file_names:
            file_path = os.path.join(folder_path, filename)
            
            with open(file_path, 'r', encoding='utf-8') as file:
                data = json.load(file)
                
                # 'data' 키 안의 요소들 순회
                for item in data['data']:
                    for attribute in item['attributes']:
                        word_list.append(attribute['name'])
                    

    # Label Encoder 초기화 및 학습
    label_encoder = LabelEncoder()
    encoded_labels = label_encoder.fit_transform(word_list)
    
    return encoded_labels

print(len(get_word_list()))

In [None]:
from tqdm import tqdm
import os

def get_sequence_files(num_folders_start=1, num_folders_end=7):
    base_folder_path = 'keypoints'

    # 전체 시퀀스를 저장할 리스트
    sequence_files = []

    for idx in range(num_folders_start, num_folders_end + 1):

        folder_path = os.path.join(base_folder_path, f'{idx:02d}')
        
        # 각 폴더의 파일 이름을 저장할 리스트
        # folder_files = []
        
        # 파일 이름 얻어오기
        file_names = [f for f in os.listdir(folder_path) if "F" in f]
        
        # 파일 이름을 번호 순서대로 정렬하기
        file_names.sort(key=lambda x: int(x.split('_')[2][4:]))
        
        for filename in file_names:
            file_path = os.path.join(folder_path, filename)
            
            json_names = [f for f in os.listdir(file_path) if "F" in f]       
            json_names.sort(key=lambda x: int(x.split('_')[5]))
            
            for i, jsonname in enumerate(json_names):
                json_path = os.path.join(file_path, jsonname)
                json_names[i] = json_path
            
            sequence_files.append(json_names)
        
    return sequence_files

print(get_sequence_files(1,7)[0])


In [None]:
def extract_keypoints(json_data):
    keypoint_types_2d = ['face_keypoints_2d', 'pose_keypoints_2d', 'hand_left_keypoints_2d', 'hand_right_keypoints_2d']
    keypoint_types_3d = ['face_keypoints_3d', 'hand_left_keypoints_3d', 'hand_right_keypoints_3d']

    # 2D와 3D 키포인트의 총 개수를 계산
    num_keypoints_2d = sum(len(json_data['people'][key]) // 3 for key in keypoint_types_2d if key in json_data['people'])
    num_keypoints_3d = sum(len(json_data['people'][key]) // 4 for key in keypoint_types_3d if key in json_data['people'])

    # Numpy 배열 초기화
    keypoints_2d = np.zeros((num_keypoints_2d, 3))  # (x, y, 0)
    keypoints_3d = np.zeros((num_keypoints_3d, 3))  # (x, y, z)

    def append_coordinates(keypoints_list, array, dimensions, offset=0):
        step = dimensions + 1  # dimensions + 1 because of the confidence score
        for i in range(0, len(keypoints_list), step):
            idx = i // step + offset
            if dimensions == 2:
                array[idx] = [keypoints_list[i], keypoints_list[i + 1], 0]
            elif dimensions == 3:
                array[idx] = [keypoints_list[i], keypoints_list[i + 1], keypoints_list[i + 2]]

    offset_2d = 0
    offset_3d = 0

    for key in keypoint_types_2d:
        if key in json_data['people']:
            append_coordinates(json_data['people'][key], keypoints_2d, dimensions=2, offset=offset_2d)
            offset_2d += len(json_data['people'][key]) // 3

    for key in keypoint_types_3d:
        if key in json_data['people']:
            append_coordinates(json_data['people'][key], keypoints_3d, dimensions=3, offset=offset_3d)
            offset_3d += len(json_data['people'][key]) // 4

    # 필요에 따라 keypoints_2d와 keypoints_3d를 하나의 배열로 합칠 수 있음
    keypoints = np.vstack((keypoints_2d, keypoints_3d))
    
    return keypoints

In [None]:
class SignLanguageDataset(Dataset):
    def __init__(self, sequence_files, labels):
        self.data = []
        self.labels = labels
        for files in sequence_files:
            sequence = []
            for file in files:
                with open(file, 'r') as f:
                    json_data = json.load(f)
                    keypoints = extract_keypoints(json_data)
                    sequence.append(keypoints)
            self.data.append(sequence)
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        
        sequence = torch.tensor(self.data[idx], dtype=torch.float32)        
        label = torch.tensor(self.labels[idx], dtype=torch.long)
        sequence = sequence.permute(2, 0, 1)
        return sequence, label


In [None]:
# word_list

# 폴더 경로
folder_path = 'morpheme/01'

# 단어들을 저장할 리스트
word_list = []

# 파일 이름 얻어오기
file_names = [f for f in os.listdir(folder_path) if f.endswith('.json') and "F_morpheme" in f]

# 파일 이름을 번호 순서대로 정렬하기
file_names.sort(key=lambda x: int(x.split('_')[2][4:]))

for filename in file_names:
    file_path = os.path.join(folder_path, filename)
    
    with open(file_path, 'r', encoding='utf-8') as file:
        data = json.load(file)
        
        # 'data' 키 안의 요소들 순회
        for item in data['data']:
            for attribute in item['attributes']:
                word_list.append(attribute['name'])

In [None]:
# 단어 임베딩
# 예시 한국어 단어 라벨
labels = word_list

# Label Encoder 초기화 및 학습
label_encoder = LabelEncoder()
encoded_labels = label_encoder.fit_transform(labels)

print("Original Labels:", labels)
print("Encoded Labels:", encoded_labels)

In [None]:
# sequence_files 
import os

# 폴더 경로
folder_path = 'keypoints/01'

# json 파일 경로 저장할 리스트
sequence_files = []

# 파일 이름 얻어오기
file_names = [f for f in os.listdir(folder_path) if "F" in f]

# 파일 이름을 번호 순서대로 정렬하기
file_names.sort(key=lambda x: int(x.split('_')[2][4:]))

for filename in file_names:
    file_path = os.path.join(folder_path, filename)
    
    json_names = [f for f in os.listdir(file_path) if "F" in f]       
    json_names.sort(key=lambda x: int(x.split('_')[5]))
    
    for i, jsonname in enumerate(json_names):
        json_path = os.path.join(file_path, jsonname)
        json_names[i] = json_path
    
    sequence_files.append(json_names)
print(sequence_files)
print(file_names)


In [None]:
from torch.nn.utils.rnn import pad_sequence
# Collate 함수 정의
def collate_fn(batch):
    # batch는 keypoints와 labels의 튜플로 구성된 리스트
    keypoints, labels = zip(*batch)
    
    # keypoints는 3D 텐서이므로, 텐서 리스트에서 시퀀스 길이(120)를 추출하여 패딩 처리
    keypoints_padded = pad_sequence([k.permute(1, 0, 2) for k in keypoints], batch_first=True, padding_value=0)
    
    # 패딩 후 다시 원래 차원으로 복원
    keypoints_padded = keypoints_padded.permute(0, 2, 1, 3)
    
    # 각 시퀀스의 길이를 계산 (여기서는 모두 120이 동일함)
    lengths = torch.tensor([k.size(1) for k in keypoints])
    
    # labels를 tensor로 변환
    labels = torch.tensor(labels)
    
    return keypoints_padded, labels, lengths


In [None]:
# Dataset and DataLoader
dataset = SignLanguageDataset(sequence_files, encoded_labels)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)

In [None]:
for batch_idx, (inputs, labels, lengths) in enumerate(dataloader):
    print(f"Batch {batch_idx}:")
    print(f"Inputs: {inputs}")
    print(f"Labels: {labels}")
    print(f"Lengths: {lengths}")
    print(f"Inputs shape: {inputs.shape}")
    print(f"Labels shape: {labels.shape}")
    
    # 배치 2개만 출력하고 종료
    if batch_idx == 1:
        break

In [None]:
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available else 'cpu')

In [None]:
device

In [None]:
import math

class PositionalEncoding(nn.Module):
    def __init__(self, dim_model, dropout_p, max_len=5000):
        super().__init__()
        
        self.dropout = nn.Dropout(dropout_p)
        
        # 최대 길이에 대한 Positional Encoding 생성
        pos_encoding = torch.zeros(max_len, dim_model)
        positions_list = torch.arange(0, max_len, dtype=torch.float).view(-1, 1) # 0, 1, 2, 3, ...
        division_term = torch.exp(torch.arange(0, dim_model, 2).float() * (-math.log(10000.0)) / dim_model)
        
        pos_encoding[:, 0::2] = torch.sin(positions_list * division_term)
        pos_encoding[:, 1::2] = torch.cos(positions_list * division_term)
        
        # Positional Encoding을 모델의 버퍼로 등록
        pos_encoding = pos_encoding.unsqueeze(0)
        self.register_buffer("pos_encoding", pos_encoding)
    
    def forward(self, token_embedding: torch.tensor) -> torch.tensor:
        seq_len = token_embedding.size(1)
        pos_encoding = self.pos_encoding[:, :seq_len, :]
        return self.dropout(token_embedding + pos_encoding)

In [None]:
class TransformerModel(nn.Module):
    def __init__(self, input_dim, model_dim, num_heads, num_layers, num_classes):
        super(TransformerModel, self).__init__()
        self.input_fc = nn.Linear(input_dim, model_dim)
        
        # Positional Encoding 추가
        self.positional_encoding = PositionalEncoding(dim_model=model_dim, dropout_p=0.1, max_len=500)
        
        encoder_layers = nn.TransformerEncoderLayer(d_model=model_dim, nhead=num_heads, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers)
        self.fc = nn.Linear(model_dim, num_classes)
    
    def forward(self, x, src_key_padding_mask):
        x = self.input_fc(x)
        
        # Positional Encoding 적용
        x = self.positional_encoding(x)
        
        x = self.transformer_encoder(x, src_key_padding_mask=src_key_padding_mask)  # transformer 대신 transformer_encoder 사용
        x = x.mean(dim=1)  # 시퀀스 차원 축소
        x = self.fc(x)
        return x
    

# 모델 초기화
input_dim = 249 * 3 # 각 키포인트의 2D 좌표(2)와 3D 좌표(3)를 사용
model_dim = 512  # 모델 차원
num_heads = 8  # 멀티헤드 어텐션의 헤드 수
num_layers = 3  # Transformer 레이어 수
num_classes = 2771  # 출력 클래스 수

model = TransformerModel(input_dim, model_dim, num_heads, num_layers, num_classes)

learning_rate = 0.0001
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

loss_values = []


In [None]:
num_epochs = 100

# Training loop
for epoch in range(num_epochs):
    for sequences, labels, lengths in dataloader:
        
        mean = torch.mean(sequences)
        std = torch.std(sequences)
        sequences = (sequences - mean) / std
        
        sequences = sequences.to(device)
        labels = labels.to(device)
        
        # 마스킹 생성
        src_key_padding_mask = (sequences.sum(dim=-1) == 0)
        src_key_padding_mask = src_key_padding_mask.any(dim=1).to(device)

        # 입력 텐서 변환: [batch_size, 3, seq_len, num_joints] -> [batch_size, seq_len, 3 * num_joints]
        batch_size, coord, seq_len, num_joints = sequences.size()
        sequences = sequences.permute(0, 2, 3, 1).contiguous()  # [batch_size, seq_len, num_joints, coord]
        sequences = sequences.view(batch_size, seq_len, -1)  # [batch_size, seq_len, num_joints * coord]

        # Forward pass
        model.to(device)
        outputs = model(sequences, src_key_padding_mask=src_key_padding_mask)
        loss = criterion(outputs, labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # 에폭당 loss 값을 기록합니다.
    loss_values.append(loss.item())
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
    

print('Training finished.')


In [None]:
num_epochs = 100

# Training loop
for epoch in range(num_epochs):
    for sequences, labels, lengths in dataloader:
        
        mean = torch.mean(sequences)
        std = torch.std(sequences)
        sequences = (sequences - mean) / std
        
        sequences = sequences.to(device)
        labels = labels.to(device)
        
        # 마스킹 생성
        src_key_padding_mask = (sequences.sum(dim=-1) == 0)
        src_key_padding_mask = src_key_padding_mask.any(dim=1).to(device)

        # 입력 텐서 변환: [batch_size, 3, seq_len, num_joints] -> [batch_size, seq_len, 3 * num_joints]
        batch_size, coord, seq_len, num_joints = sequences.size()
        sequences = sequences.permute(0, 2, 3, 1).contiguous()  # [batch_size, seq_len, num_joints, coord]
        sequences = sequences.view(batch_size, seq_len, -1)  # [batch_size, seq_len, num_joints * coord]

        # Forward pass
        model.to(device)
        outputs = model(sequences, src_key_padding_mask=src_key_padding_mask)
        loss = criterion(outputs, labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # 에폭당 loss 값을 기록합니다.
    loss_values.append(loss.item())
    print(f'Epoch [{epoch+100}/{num_epochs+100}], Loss: {loss.item():.4f}')
    

print('Training finished.')


In [None]:
import matplotlib.pyplot as plt

# Loss 값 시각화
plt.figure(figsize=(30, 10))
plt.plot(range(1, len(loss_values) + 1), loss_values, marker='o', linestyle='-', color='b')
plt.title('Training Loss over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.grid(True)
plt.show()

In [None]:


# 모델을 평가 모드로 전환
model.eval()

# 평가 데이터를 로드 (학습 데이터와 동일한 데이터로 테스트하는 경우 예제)
# dataloader = DataLoader(dataset, batch_size=1, shuffle=False)

# 평가 지표 초기화
correct = 0
total = 0
predict = []

# 평가 모드에서 그라디언트 계산 비활성화
with torch.no_grad():
    for inputs, labels, lengths in dataloader:
        # 모델 출력 계산
        
        inputs = inputs.to(device)
        labels = labels.to(device)
        
        batch_size, coord, seq_len, num_joints = inputs.size()
        inputs = inputs.permute(0, 2, 3, 1).contiguous()  # [batch_size, seq_len, num_joints, coord]
        inputs = inputs.view(batch_size, seq_len, -1)  # [batch_size, seq_len, num_joints * coord]
        
        # 패딩 마스크 생성
        src_key_padding_mask = create_padding_mask(inputs[:,:,0])
        src_key_padding_mask = src_key_padding_mask.to(device)
        
        outputs = model(inputs, src_key_padding_mask=src_key_padding_mask)
        
        # 소프트맥스 함수로 확률로 변환
        probabilities = torch.nn.functional.softmax(outputs, dim=1)
        
        # 가장 높은 확률을 가진 클래스의 인덱스 구하기
        _, predicted_classes = torch.max(probabilities, 1)
        
        # 정확도 계산
        total += labels.size(0)
        correct += (predicted_classes == labels).sum().item()
                
        
        # 예측된 클래스 인덱스를 원래 라벨로 변환
        for predicted_class, label in zip(predicted_classes, labels):
            predicted_label = word_list[predicted_class.item()]
            actual_label = word_list[label.item()]
            predict.append(predicted_label)
            # 출력
            print(f"Actual Label (encoded): {label.item()}")
            print(f"Predicted Label (encoded): {predicted_class.item()}")
            print(f"Actual Label: {actual_label}")
            print(f"Predicted Label: {predicted_label}")
            print('-' * 30)

# 전체 정확도 출력
accuracy = 100 * correct / total
print(f'Accuracy of the model on the test data: {accuracy:.2f}%')
        

In [None]:
print(len(set(predict)))

In [None]:
# import matplotlib.pyplot as plt

# # Loss 값 시각화
# plt.figure(figsize=(50, 25))
# plt.plot(range(1, len(loss_values) + 1), loss_values, marker='o', linestyle='-', color='b')
# plt.title('Training Loss over Epochs')
# plt.xlabel('Epoch')
# plt.ylabel('Loss')
# plt.grid(True)
# plt.show()

In [None]:
# import torch
# print(torch.__version__)
# print(torch.cuda.is_available())
