In [None]:
!pip install sentence_transformers

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertForSequenceClassification, AdamW, get_linear_schedule_with_warmup
import json
import os
import json
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer
import torch.nn.functional as F
from sentence_transformers import SentenceTransformer
import pickle

In [None]:
#데이터셋 배치로 저장
import json
import os
import pickle
def load_json_data(folder_path, batch_size=1000):
    json_data = []
    for root, dirs, files in os.walk(folder_path):
        for filename in files:
            if filename.endswith('.json'):
                file_path = os.path.join(root, filename)
                with open(file_path, 'r') as file:
                    data = json.load(file)
                    json_data.append(data)
                    # Save the batch and clear the list to free memory
                    if len(json_data) >= batch_size:
                        yield json_data
                        json_data = []
    if json_data:
        yield json_data
base_path = "/content/drive/MyDrive/Data/1.Training/라벨링데이터/REAL/WORD/"
folders = [os.path.join(base_path, f"{i:02d}") for i in range(1, 17)]

batch_size = 1000  
batch_idx = 0

for folder_path in folders:
    for batch in load_json_data(folder_path, batch_size=batch_size):
        pickle_path = f'/content/drive/MyDrive/dataset/dataset_batch_{batch_idx}.pkl'
        with open(pickle_path, 'wb') as f:
            pickle.dump(batch, f)
        batch_idx += 1


In [None]:
class CustomDataset(Dataset):
    def __init__(self, data_paths, max_num_keypoints):
        #self.data = data
        self.max_num_keypoints = max_num_keypoints
        self.embedding_model = SentenceTransformer("jhgan/ko-sroberta-multitask")
        self.data_paths = data_paths

    def load_batch_data(self, batch_data_paths):
         for batch_path in batch_data_paths:
             with open(batch_path, 'rb') as f:
                 batch_data = pickle.load(f)
                 yield batch_data

    def __len__(self):
        return sum(1 for _ in self.load_batch_data(self.data_paths))

    #데이터 패딩
    def _pad_tensor(self, tensor, target_length, pad_value=0):
        pad_size = target_length - tensor.size(0)
        return F.pad(tensor, (0, 0, 0, pad_size), value=pad_value)
    
    #입력 데이터 생성(텐서로)
    def __getitem__(self, idx):
      for batch_data in self.load_batch_data(self.data_paths):
          item = batch_data[idx]

          pose_keypoints = item['pose_keypoint']
          left_hand_keypoints = item['left_hand_keypoint']
          right_hand_keypoints = item['right_hand_keypoint']
          meaning = item['meaning']

          pose_x = [kp['x'] for keypoints in pose_keypoints for kp in keypoints]
          pose_y = [kp['y'] for keypoints in pose_keypoints for kp in keypoints]
          pose_z = [kp['z'] for keypoints in pose_keypoints for kp in keypoints]
          pose_v = [kp['visibility'] for keypoints in pose_keypoints for kp in keypoints]

          left_hand_x = [kp['x'] for keypoints in left_hand_keypoints for kp in keypoints]
          left_hand_y = [kp['y'] for keypoints in left_hand_keypoints for kp in keypoints]
          left_hand_z = [kp['z'] for keypoints in left_hand_keypoints for kp in keypoints]

          right_hand_x = [kp['x'] for keypoints in right_hand_keypoints for kp in keypoints]
          right_hand_y = [kp['y'] for keypoints in right_hand_keypoints for kp in keypoints]
          right_hand_z = [kp['z'] for keypoints in right_hand_keypoints for kp in keypoints]

          pose_tensor = torch.tensor([pose_x, pose_y, pose_z, pose_v]).float().transpose(0, 1)
          left_hand_tensor = torch.tensor([left_hand_x, left_hand_y, left_hand_z]).float().transpose(0, 1)
          right_hand_tensor = torch.tensor([right_hand_x, right_hand_y, right_hand_z]).float().transpose(0, 1)

          target_length = self.max_num_keypoints
          pose_tensor = self._pad_tensor(pose_tensor, target_length)
          left_hand_tensor = self._pad_tensor(left_hand_tensor, target_length)
          right_hand_tensor = self._pad_tensor(right_hand_tensor, target_length)

          names = [meaning_item['attributes'][0]['name'] for meaning_item in meaning]
          meaning_tensors = [torch.tensor(self.embedding_model.encode(name)).float() for name in names]
          combined_meaning_tensor = torch.stack(meaning_tensors, dim=0)
          combined_meaning_tensor = self._pad_tensor(combined_meaning_tensor, target_length)
          mask = (combined_meaning_tensor != 0).float()


          return pose_tensor, left_hand_tensor, right_hand_tensor, combined_meaning_tensor,mask


In [None]:
#모델 구성
#손, 포즈 네트워크를 거쳐 feature 생성 -> 합쳐서 fc레이어 거침
class SignLanguageTranslationModel(nn.Module):
    def __init__(self, pose_input_dim, hand_input_dim, hidden_dim, output_dim):
        super(SignLanguageTranslationModel, self).__init__()
        #pose 네트워크
        self.pose_lstm = nn.LSTM(input_size=pose_input_dim, hidden_size=hidden_dim, batch_first=True)
        #hand 네트워크
        self.hand_lstm = nn.LSTM(input_size=hand_input_dim, hidden_size=hidden_dim, batch_first=True)
        #fc레이어
        self.fc = nn.Linear(hidden_dim * 2, output_dim)  

    def forward(self, pose_inputs, hand_inputs):
        pose_features, _ = self.pose_lstm(pose_inputs)
        hand_features, _ = self.hand_lstm(hand_inputs)
        #input 합침
        combined_features = torch.cat((pose_features[:, -1], hand_features[:, -1]), dim=1)  # Combine final LSTM outputs
        outputs = self.fc(combined_features)
        return outputs


In [None]:
#HyperParameters
MAX_NUM_KEYPOINTS = 5000
BATCH_SIZE = 64
POSE_INPUT_DIM = 4 
HAND_INPUT_DIM = 3 
MEANING_INPUT_DIM = 768
HIDDEN_DIM = 512
OUTPUT_DIM = 768
LEARNING_RATE = 0.001
NUM_EPOCHS = 30

In [None]:
def load_json_data(folder_path):
    json_data = []
    for root, dirs, files in os.walk(folder_path):
        for filename in files:
            if filename.endswith('.json'):
                file_path = os.path.join(root, filename)
                with open(file_path, 'r') as file:
                    data = json.load(file)
                    json_data.append(data)
    return json_data


In [None]:
import torch
from torch.utils.data import DataLoader

from torch.utils.data import Dataset
from transformers import BertTokenizer
import torch.nn.functional as F

# 배치 데이터 로드
base_path = "/content/drive/MyDrive/dataset/"
batch_data_paths = [f'{base_path}dataset_batch_{i}.pkl' for i in range(55)]  

In [None]:
#데이터셋 및 데이터로더 생성
custom_dataset = CustomDataset(batch_data_paths, max_num_keypoints=MAX_NUM_KEYPOINTS)
dataloader = DataLoader(custom_dataset, batch_size=8, shuffle=True)

In [None]:
#모델 학습
model = SignLanguageTranslationModel(
    pose_input_dim=POSE_INPUT_DIM,  # (x, y, z, visibility)
    hand_input_dim=HAND_INPUT_DIM * 2,  #  (x, y, z)
    hidden_dim=HIDDEN_DIM,
    output_dim=OUTPUT_DIM
).to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))

#손실함수 및 optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

for epoch in range(NUM_EPOCHS):
    total_loss = 0
    model.train()
    for batch in dataloader:
        pose_inputs, left_hand_inputs, right_hand_inputs, meaning_inputs, mask = [x.to(torch.device("cuda" if torch.cuda.is_available() else "cpu")) for x in batch]

        pose_inputs = pose_inputs.float()
        left_hand_inputs = left_hand_inputs.float()
        right_hand_inputs = right_hand_inputs.float()
        meaning_inputs = meaning_inputs.float()
        hand_inputs = torch.cat((left_hand_inputs, right_hand_inputs), dim=2)  # 양 손의 input 합쳐 모델 학습

        outputs = model(pose_inputs, hand_inputs)

        loss = criterion(outputs.unsqueeze(1), meaning_inputs.float())  
        # 마스크 적용
        loss = loss * mask  # 마스크를 곱하여 패딩 부분의 손실을 0
        loss = loss.sum() / mask.sum()  # 평균 손실 계산 
        total_loss += loss.item() 

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    average_loss = total_loss / len(dataloader)
    print(f'Epoch {epoch+1}/{NUM_EPOCHS}, Loss: {average_loss:4f}')

In [None]:
# 학습된 모델 저장
torch.save(model, "train_All_test_embedding.pth")

In [None]:
torch.save(model.state_dict(),"train_All_test_embedding_state.pth")

평가

In [None]:
import os
import json
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset
from sentence_transformers import SentenceTransformer

class CustomDatasetforTest(Dataset):
    def __init__(self, data_paths, max_num_keypoints):
        self.max_num_keypoints = max_num_keypoints
        self.embedding_model = SentenceTransformer("jhgan/ko-sroberta-multitask")
        self.data_paths = data_paths

        # 모든 데이터를 한 번에 로드하고 저장
        self.data = []
        for batch_data in self.load_batch_data(self.data_paths):
            self.data.extend(batch_data)

    def load_batch_data(self, batch_data_paths):
        batch_data = []
        for directory in batch_data_paths:
            for root, _, files in os.walk(directory):
                for filename in files:
                    file_path = os.path.join(root, filename)
                    if file_path.endswith('.json'):
                        with open(file_path, 'r') as f:
                            try:
                                data = json.load(f)
                                batch_data.append(data)
                            except json.JSONDecodeError as e:
                                print(f"Error loading {file_path}: {e}")
                                continue
        return json.dumps(batch_data)

    def __len__(self):
        return len(self.data)

    def _pad_tensor(self, tensor, target_length, pad_value=0):
        pad_size = target_length - tensor.size(0)
        return F.pad(tensor, (0, 0, 0, pad_size), value=pad_value)

    def __getitem__(self, idx):
        item = self.data[idx]

        pose_keypoints = item['pose_keypoint']
        left_hand_keypoints = item['left_hand_keypoint']
        right_hand_keypoints = item['right_hand_keypoint']
        meaning = item['meaning']

        pose_x = [kp['x'] for keypoints in pose_keypoints for kp in keypoints]
        pose_y = [kp['y'] for keypoints in pose_keypoints for kp in keypoints]
        pose_z = [kp['z'] for keypoints in pose_keypoints for kp in keypoints]
        pose_v = [kp['visibility'] for keypoints in pose_keypoints for kp in keypoints]

        left_hand_x = [kp['x'] for keypoints in left_hand_keypoints for kp in keypoints]
        left_hand_y = [kp['y'] for keypoints in left_hand_keypoints for kp in keypoints]
        left_hand_z = [kp['z'] for keypoints in left_hand_keypoints for kp in keypoints]

        right_hand_x = [kp['x'] for keypoints in right_hand_keypoints for kp in keypoints]
        right_hand_y = [kp['y'] for keypoints in right_hand_keypoints for kp in keypoints]
        right_hand_z = [kp['z'] for keypoints in right_hand_keypoints for kp in keypoints]

        pose_tensor = torch.tensor([pose_x, pose_y, pose_z, pose_v]).float().transpose(0, 1)
        left_hand_tensor = torch.tensor([left_hand_x, left_hand_y, left_hand_z]).float().transpose(0, 1)
        right_hand_tensor = torch.tensor([right_hand_x, right_hand_y, right_hand_z]).float().transpose(0, 1)

        target_length = self.max_num_keypoints
        pose_tensor = self._pad_tensor(pose_tensor, target_length)
        left_hand_tensor = self._pad_tensor(left_hand_tensor, target_length)
        right_hand_tensor = self._pad_tensor(right_hand_tensor, target_length)

        names = [meaning_item['attributes'][0]['name'] for meaning_item in meaning]
        meaning_tensors = [torch.tensor(self.embedding_model.encode(name)).float() for name in names]
        combined_meaning_tensor = torch.stack(meaning_tensors, dim=0)
        combined_meaning_tensor = self._pad_tensor(combined_meaning_tensor, target_length)
        mask = (combined_meaning_tensor != 0).float()


        return pose_tensor, left_hand_tensor, right_hand_tensor, combined_meaning_tensor,mask

In [None]:
model = torch.load("/content/train_All_test_embedding.pth", map_location=torch.device('cuda' if torch.cuda.is_available() else 'cpu'))

criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

# 평가 모드로 설정
model.eval()

# 손실 초기화
total_loss = 0

# 테스트 데이터셋 로딩
test_json_data_paths = ["/content/drive/MyDrive/Data/2.Validation/라벨링데이터/REAL/WORD/17", "/content/drive/MyDrive/Data/2.Validation/라벨링데이터/REAL/WORD/18"]

# DataLoader 생성
test_dataset = CustomDatasetforTest(test_json_data_paths, max_num_keypoints=MAX_NUM_KEYPOINTS)
test_dataloader = DataLoader(test_dataset, batch_size=8, shuffle=False)

In [None]:
# 모델을 CPU로 이동
model.to(torch.device('cpu'))

# 평가 결과를 저장할 파일 경로
output_file = '/content/drive/MyDrive/predictions_tensor.json'

# 결과를 저장할 리스트 초기화
all_predictions = []
all_targets = []

# 평가 루프
with torch.no_grad():
    for batch in test_dataloader:
        pose_inputs, left_hand_inputs, right_hand_inputs, meaning_inputs, mask = [x.to(torch.device("cuda" if torch.cuda.is_available() else "cpu")) for x in batch]

        pose_inputs = pose_inputs.float()
        left_hand_inputs = left_hand_inputs.float()
        right_hand_inputs = right_hand_inputs.float()
        meaning_inputs = meaning_inputs.float()

        # 왼손, 오른손 데이터를 결합
        hand_inputs = torch.cat((left_hand_inputs, right_hand_inputs), dim=2)

        # 모델 예측
        outputs = model(pose_inputs, hand_inputs)

        # 손실 계산 (마스크 적용)
        loss = criterion(outputs.unsqueeze(1), meaning_inputs.float())
        loss = loss * mask  
        loss = loss.sum() / mask.sum()  # 평균 손실 계산
        total_loss += loss.item()

        # 예측값과 정답 저장
        all_predictions.append(outputs)
        all_targets.append(meaning_inputs)

average_loss = total_loss / len(test_dataloader)
print(f'Test Loss: {average_loss:.4f}')

In [None]:
import torch
import numpy as np

def calculate_accuracy_f1(predictions, targets):
    # 리스트에서 텐서로 변환
    predictions_tensor = torch.cat(predictions, dim=0)
    targets_tensor = torch.cat(targets, dim=0)

    # 텐서를 CPU로 이동하여 numpy 배열로 변환
    predictions_np = predictions_tensor.cpu().numpy()
    targets_np = targets_tensor.cpu().numpy()

    # 이진 분류로 간주하여 임계값을 설정하여 일치 여부 계산
    predictions_binary = (predictions_np >= 0.5).astype(int)
    targets_binary = (targets_np >= 0.5).astype(int)

    # 정확도 계산
    accuracy = np.mean(predictions_binary == targets_binary)

    # F1 점수 계산
    true_positives = np.sum(predictions_binary * targets_binary)
    false_positives = np.sum(predictions_binary * (1 - targets_binary))
    false_negatives = np.sum((1 - predictions_binary) * targets_binary)

    precision = true_positives / (true_positives + false_positives + 1e-10)
    recall = true_positives / (true_positives + false_negatives + 1e-10)
    f1_score = 2 * (precision * recall) / (precision + recall + 1e-10)

    return accuracy, f1_score

# 평가 결과 계산
accuracy, f1_score = calculate_accuracy_f1(all_predictions, all_targets)

# 결과 출력
print(f'Test Accuracy: {accuracy:.4f}')
print(f'Test F1 Score: {f1_score:.4f}')
