In [1]:
import os
import re
import yaml
import torch
import numpy as np
import pandas as pd
import tensorflow as tf

from konlpy.tag import Okt
from tensorflow.keras import layers, models
from transformers import BertTokenizer, BertModel
from transformers import GPT2LMHeadModel, PreTrainedTokenizerFast

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
model = GPT2LMHeadModel.from_pretrained("skt/kogpt2-base-v2")
tokenizer = PreTrainedTokenizerFast.from_pretrained("skt/kogpt2-base-v2")

The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'GPT2Tokenizer'. 
The class this function is called from is 'PreTrainedTokenizerFast'.


In [3]:
with open('./package.yaml') as f:
    file = yaml.load(f, Loader=yaml.FullLoader)
    POSEDATAPATH = file["path"]["pose_keypoints"]   # Pose Keypoints 저장 경로
    FACEDATAPATH = file["path"]["faceMesh_keypoints"]   # FaseMesh Keypoints 저장 경로
    LABELPATH = file["path"]["labelPATH"]   # metadata 경로

In [4]:
metadata = pd.read_excel(LABELPATH).sort_values("번호") # 데이터셋 메타데이터 로드 & "번호" 컬럼을 기준으로 내림차순 정렬
metadata.head(10)  # 상위 10개 항목 도출

Unnamed: 0,번호,언어 제공자 ID,취득연도,방향,타입(단어/문장),파일명,한국어,Unnamed: 7
10460,1,1,2017,정면,단어,KETI_SL_0000000001.MOV,0,
10440,2,1,2017,정면,단어,KETI_SL_0000000002.MOV,1,
10420,3,1,2017,정면,단어,KETI_SL_0000000003.MOV,2,
10399,4,1,2017,정면,단어,KETI_SL_0000000004.MOV,3,
10379,5,1,2017,정면,단어,KETI_SL_0000000005.MOV,4,
10359,6,1,2017,정면,단어,KETI_SL_0000000006.MOV,5,
10339,7,1,2017,정면,단어,KETI_SL_0000000007.MOV,6,
10319,8,1,2017,정면,단어,KETI_SL_0000000008.MOV,7,
10299,9,1,2017,정면,단어,KETI_SL_0000000009.MOV,8,
10279,10,1,2017,정면,단어,KETI_SL_0000000010.MOV,9,


In [5]:
metadata = metadata[["번호", "방향", "파일명", "한국어"]]   # 컬럼 필터링
metadata = metadata[metadata["방향"] == "정면"] # "방향"컬럼 정면 값을 가진 열 필터링
metadata

Unnamed: 0,번호,방향,파일명,한국어
10460,1,정면,KETI_SL_0000000001.MOV,0
10440,2,정면,KETI_SL_0000000002.MOV,1
10420,3,정면,KETI_SL_0000000003.MOV,2
10399,4,정면,KETI_SL_0000000004.MOV,3
10379,5,정면,KETI_SL_0000000005.MOV,4
...,...,...,...,...
318,10371,정면,KETI_SL_0000010371.MOV,허리가 아파서 일어날 수 없어요
3309,10372,정면,KETI_SL_0000010372.MOV,어떤 사람이 칼에 찔려서 피를 많이 흘리고 있어요
3898,10373,정면,KETI_SL_0000010373.MOV,아이가 말벌에 쏘여서 기절했어요
5838,10374,정면,KETI_SL_0000010374.MOV,무릎 인대를 다친 것 같아요


In [11]:
def remove_josa(tokens):
    josa_pattern = r'이|가|은|는|을|를|에|의|와|과'     # 필터링 대상 조사
    return [token for token in tokens if not re.fullmatch(josa_pattern, token)]

def kor2word_okt(string):
    okt = Okt()
    tokens = okt.morphs(string, norm=True, stem=True)
    process_tokens = remove_josa(tokens)
    # print(process_tokens)
    return process_tokens

def get_embeddings(text):
    inputs = tokenizer(text, return_tensors="pt")
    with torch.no_grad():
        outputs = model(**inputs, output_hidden_states=True)
    hidden_states = outputs.hidden_states
    # Take the last layer's hidden states
    embeddings = hidden_states[-1].mean(dim=1)  # Average over sequence tokens
    return embeddings.numpy()

In [7]:
okt = Okt()

In [12]:
metadata['tokenized'] = metadata['한국어'].apply(lambda x: okt.morphs(str(x)) if isinstance(x, str) else okt.morphs(str(x)))
metadata['processed_token'] = metadata['tokenized'].apply(lambda x: ', '.join(x))
metadata.head()

Unnamed: 0,번호,방향,파일명,한국어,tokenized,processed_token
10460,1,정면,KETI_SL_0000000001.MOV,0,[0],0
10440,2,정면,KETI_SL_0000000002.MOV,1,[1],1
10420,3,정면,KETI_SL_0000000003.MOV,2,[2],2
10399,4,정면,KETI_SL_0000000004.MOV,3,[3],3
10379,5,정면,KETI_SL_0000000005.MOV,4,[4],4


In [31]:
embeddings = np.array([get_embeddings(word) for word in metadata['processed_token']])

In [32]:
embeddings = embeddings[:2000]

# TensorFlow 텐서로 변환
input_tensor = tf.convert_to_tensor(embeddings)  # TensorFlow 텐서로 변환

In [33]:
npy_dir = "C:/Users/NEULET/Desktop/tharm_ai/data/sample_data/pose"

In [34]:
npy_files = [f for f in os.listdir(npy_dir) if f.endswith('.npy')]

keypoints_list = []
max_length = 0

for file in npy_files:
    data = np.load(os.path.join(npy_dir, file))
    keypoints_list.append(data)
    max_length = max(max_length, data.shape[0])

padded_keypoints = np.zeros((2000, max_length, 33, 3))

for i, keypoints in enumerate(keypoints_list):
    if i < 2000:
        padded_keypoints[i, :keypoints.shape[0], :, :] = keypoints

# 4. 결과 확인
print(padded_keypoints.shape)  # (2000, max_Length, 33, 3)

(2000, 210, 33, 3)


In [35]:
print(padded_keypoints.shape)
print(embeddings.shape)

(2000, 210, 33, 3)
(2000, 1, 768)


In [92]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense, TimeDistributed, Masking, Reshape

encoder_input = Input(shape=(1, 768))
encoder_lstm = LSTM(512, return_state=True)
_, state_h, state_c = encoder_lstm(encoder_input)

# 디코더 입력, 첫 번째 타임스텝을 제외한 나머지 (209, 33, 3)
decoder_input = Input(shape=(209, 33, 3))

# (209, 33, 3) -> (209, 99)으로 변환
flattened_decoder_input = Reshape((209, 99))(decoder_input)

# 마스킹 레이어 추가
masked_decoder_input = Masking(mask_value=0.0)(flattened_decoder_input)

# LSTM 레이어
decoder_lstm = LSTM(512, return_sequences=True)
decoder_output = decoder_lstm(masked_decoder_input, initial_state=[state_h, state_c])

# 출력 레이어 (209, 99)
output = TimeDistributed(Dense(99, activation='linear'))(decoder_output)
output = Reshape((209, 33, 3))(output)

# 모델 정의
model = Model([encoder_input, decoder_input], output)

# 모델 컴파일
model.compile(optimizer='adam', loss='mse')

model.summary()



In [93]:
keypoints_input = padded_keypoints[:, :-1, :, :]  # 마지막 타임스텝 제외
keypoints_target = padded_keypoints[:, 1:, :, :]  # 첫 번째 타임스텝 제외

print(keypoints_input.shape)
print(keypoints_target.shape)

(2000, 209, 33, 3)
(2000, 209, 33, 3)


In [94]:
tf.config.run_functions_eagerly(True)

model.fit([embeddings, keypoints_input], keypoints_target, epochs=10, batch_size=32)

Epoch 1/10




[1m 1/63[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m1:33[0m 2s/step - loss: 0.3278

KeyboardInterrupt: 

In [78]:
word_embedding = embeddings[0]
word_embedding = word_embedding.reshape(1, 1, 768)
print(word_embedding.shape)

(1, 1, 768)


In [81]:
decoder_input = np.zeros((1, 210, 33, 3))  # 예시로 0으로 초기화
decoder_input is None

False

In [82]:
predicted_keypoints = model.predict([word_embedding, decoder_input])
predicted_keypoints = predicted_keypoints.reshape(210, 33, 3)
predicted_keypoints.shape

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 342ms/step


(210, 33, 3)

In [83]:
def cosine_similarity(A, B):
    # 두 벡터의 내적
    dot_product = np.dot(A, B)
    
    # 각 벡터의 크기(L2 norm)
    norm_A = np.linalg.norm(A)
    norm_B = np.linalg.norm(B)
    
    # 코사인 유사도 계산
    return dot_product / (norm_A * norm_B)

In [84]:
embeddings[0].shape

(1, 768)

In [85]:
predicted_keypoints = predicted_keypoints.squeeze()
predicted_keypoints.shape

(210, 33, 3)

In [86]:
padded_keypoints[0].shape

(210, 33, 3)

In [87]:
cosine_similarity(predicted_keypoints, padded_keypoints[0])

ValueError: shapes (210,33,3) and (210,33,3) not aligned: 3 (dim 2) != 33 (dim 1)