In [1]:
import tensorflow as tf
# Check if GPU is available
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))


Num GPUs Available:  1


In [2]:
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
from matplotlib import pyplot as plt
import cv2
import os
import csv

import tensorflow as tf
from tensorflow import shape,math
from tensorflow.keras import Input,layers,Model
from tensorflow.keras.losses import mse,binary_crossentropy
from tensorflow.keras.utils import plot_model

# Load Model

In [3]:
# # Download the model from TF Hub.
# model = hub.load("https://tfhub.dev/google/movenet/singlepose/lightning/4")
# movenet = model.signatures['serving_default']
# # document : https://storage.googleapis.com/movenet/MoveNet.SinglePose%20Model%20Card.pdf

In [4]:
interpreter = tf.lite.Interpreter(model_path='lite-model_movenet_singlepose_thunder_3.tflite')
interpreter.allocate_tensors()

INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


# Draw keypoint

In [5]:
def draw_keypoints(frame, keypoints, confidence_threshold):
    # c is channel
    y,x,c = frame.shape
    shaped = np.squeeze(np.multiply(keypoints, [y,x,1]))
    
    for kp in shaped:
        ky, kx, kp_conf = kp
        if kp_conf > confidence_threshold:
            cv2.circle(frame, (int(kx), int(ky)), 4, (0,255,0), -1)

# Draw lines btw keypoints

In [6]:
EDGES = {
    (0, 1): 'm',
    (0, 2): 'c',
    (1, 3): 'm',
    (2, 4): 'c',
    (0, 5): 'm',
    (0, 6): 'c',
    (5, 7): 'm',
    (7, 9): 'm',
    (6, 8): 'c',
    (8, 10): 'c',
    (5, 6): 'y',
    (5, 11): 'm',
    (6, 12): 'c',
    (11, 12): 'y',
    (11, 13): 'm',
    (13, 15): 'm',
    (12, 14): 'c',
    (14, 16): 'c'
}

In [7]:
def draw_connections(frame, keypoints, edges, confidence_threshold):
    y,x,c = frame.shape
    shaped = np.squeeze(np.multiply(keypoints, [y,x,1]))
    
    for edge, color in edges.items():
        p1, p2 = edge
        y1, x1, c1 = shaped[p1]
        y2, x2, c2 = shaped[p2]
        
        if (c1>confidence_threshold) & (c2 > confidence_threshold):
            cv2.line(frame, (int(x1), int(y1)), (int(x2), int(y2)), (255,0,0), 2)
        

# Make detection

In [None]:
interpreter.get_input_details()

In [None]:
interpreter.get_output_details()

In [None]:
folder_path = "./video/"  # 폴더 경로를 지정합니다.
file_list = os.listdir(folder_path)
file_list

In [None]:


for i in range(len(file_list)):
    file_path = os.path.join("./video/", file_list[i])
    cap = cv2.VideoCapture(file_path)
    kp_output = []

    while cap.isOpened():
        ret, frame = cap.read()

        if not ret:
            break

        img = frame.copy()
        img = tf.image.resize_with_pad(np.expand_dims(img, axis=0), 256,256)
        input_image = tf.cast(img, dtype=tf.float32)

        # setup the input and output
        input_details=interpreter.get_input_details()
        output_details=interpreter.get_output_details()


        # make prediction
        interpreter.set_tensor(input_details[0]['index'], np.array(input_image))
        interpreter.invoke()
        keypoints_with_scores = interpreter.get_tensor(output_details[0]['index'])
        print(keypoints_with_scores)
        kp_output.append(keypoints_with_scores)

        #rendering
        draw_connections(frame, keypoints_with_scores, EDGES, 0.11)
        draw_keypoints(frame, keypoints_with_scores, 0.11)
        
        if ret:  # 프레임이 유효한 경우에만 imshow를 호출합니다.
            cv2.imshow('MoveNet Lightning', frame)

        if cv2.waitKey(10) & 0xFF==ord('q'):
            break
        
    cap.release()
    cv2.destroyAllWindows()
    cv2.waitKey(1)
            



In [None]:
plt.imshow(tf.cast(np.squeeze(img), dtype=tf.int32))

In [8]:
# 한개만 테스트해보기

import csv

file_path = os.path.join("./video/", "walk.mov")
cap = cv2.VideoCapture(file_path)
kp_output = []

while cap.isOpened():
    ret, frame = cap.read()

    if not ret:
        break

    img = frame.copy()
    img = tf.image.resize_with_pad(np.expand_dims(img, axis=0), 256,256)
    input_image = tf.cast(img, dtype=tf.float32)

    # setup the input and output
    input_details=interpreter.get_input_details()
    output_details=interpreter.get_output_details()


    # make prediction
    interpreter.set_tensor(input_details[0]['index'], np.array(input_image))
    interpreter.invoke()
    keypoints_with_scores = interpreter.get_tensor(output_details[0]['index'])
    print(keypoints_with_scores)
    kp_output.append(keypoints_with_scores[:,:,:,:2])

    #rendering
    draw_connections(frame, keypoints_with_scores, EDGES, 0.11)
    draw_keypoints(frame, keypoints_with_scores, 0.11)
    

    if ret:  # 프레임이 유효한 경우에만 imshow를 호출합니다.
        cv2.imshow('MoveNet Lightning', frame)

    if cv2.waitKey(10) & 0xFF==ord('q'):
        break

cap.release()
cv2.destroyAllWindows()
cv2.waitKey(1)
            


# CSV 파일 경로
csv_file_path = "./output/keypoints_with_scores.csv"

# CSV 파일에 데이터 저장
with open(csv_file_path, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerows(np.array(kp_output).squeeze())

print("Keypoints with scores saved to", csv_file_path)


[[[[0.3334336  0.49905652 0.4976075 ]
   [0.32286683 0.50044245 0.41155928]
   [0.32310483 0.49353525 0.4775682 ]
   [0.32590118 0.50381166 0.37858585]
   [0.32826072 0.49040127 0.43960837]
   [0.36733773 0.50288814 0.47303984]
   [0.3784305  0.46371427 0.46916878]
   [0.42485058 0.55875444 0.30115384]
   [0.42407963 0.5608925  0.3218168 ]
   [0.43841895 0.48839498 0.18491346]
   [0.40655556 0.5316215  0.24241173]
   [0.5007399  0.50368744 0.458438  ]
   [0.49738297 0.48751062 0.51605165]
   [0.59867644 0.5179758  0.42814428]
   [0.59707135 0.4828868  0.39556086]
   [0.67645246 0.5135801  0.43777075]
   [0.66675085 0.4826001  0.4731909 ]]]]
[[[[0.33796278 0.49785185 0.5651906 ]
   [0.3286048  0.50319827 0.45765695]
   [0.3292219  0.48989558 0.45495158]
   [0.32803994 0.51428235 0.432028  ]
   [0.330621   0.47884035 0.5966989 ]
   [0.37137994 0.53524035 0.52762055]
   [0.38458827 0.45021644 0.5448327 ]
   [0.43606672 0.5537539  0.49616987]
   [0.43707895 0.4465643  0.53281343]
   [0.439

In [9]:
kp_output = np.array(kp_output)

In [10]:
kp_output.shape
# (frame, ?, ?, point, xyz)

(72, 1, 1, 17, 2)

In [None]:
# kp_output = kp_output.squeeze()

In [None]:
# 실제 위치 알아보기
print(keypoints_with_scores.shape)
print(keypoints_with_scores[0][0][2])
right_eye = keypoints_with_scores[0][0][2]
left_elbow =keypoints_with_scores[0][0][7]
left_elbow
# left_elbow의 실제 위치
np.array(left_elbow[:2]*[1080, 1920]).astype(int)
interpreter.get_tensor(interpreter.get_output_details()[0]['index'])

In [None]:
np.array(left_elbow[:2]*[1080, 1920]).astype(int)

# VAE code

In [66]:
import torch; torch.manual_seed(0)
import torch.nn as nn
import torch.nn.functional as F
import torch.utils
import torch.distributions
import numpy as np
device = torch.device('mps:0' if torch.backends.mps.is_available() else 'cpu')

latent_dims = 5



In [26]:
device = torch.device('cpu')
kp_output= torch.from_numpy(kp_output)

In [46]:
# class Encoder(nn.Module):
#     def __init__(self, latent_dims):
#         super(Encoder, self).__init__()
#         self.linear1 = nn.Linear(34, 512)
#         self.linear2 = nn.Linear(512, latent_dims)
    
#     def forward(self, x):
#         x = torch.flatten(x, start_dim=2)
#         x = F.relu(self.linear1(x))
#         return self.linear2(x)
    
class Decoder(nn.Module):
    def __init__(self, latent_dims):
        super(Decoder, self).__init__()
        self.linear1 = nn.Linear(latent_dims, 512)
        self.linear2 = nn.Linear(512, 2448)
        
    def forward(self, z):
        z = F.relu(self.linear1(z))
        z = torch.sigmoid(self.linear2(z))
        return z.reshape((72, 1, 1, 17, 2))
        # return z.reshape((4896,))

In [47]:
# class Autoencoder(nn.Module):
#     def __init__(self, latent_dims):
#         super(Autoencoder, self).__init__()
#         self.encoder = Encoder(latent_dims)
#         self.decoder = Decoder(latent_dims)
    
#     def forward(self, x):
#         z = self.encoder(x)
#         return self.decoder(z)

class VariationalEncoder(nn.Module):
    def __init__(self, latent_dims):
        super(VariationalEncoder, self).__init__()
        self.linear1 = nn.Linear(34, 512)
        self.linear2 = nn.Linear(512, latent_dims)
        self.linear3 = nn.Linear(512, latent_dims)
        
        self.N = torch.distributions.Normal(0, 1)
        self.N.loc = self.N.loc.to(device) # hack to get sampling on the GPU
        self.N.scale = self.N.scale.to(device)
        self.kl = 0
    
    def forward(self, x):
        x = torch.flatten(x, start_dim=2)
        x = F.relu(self.linear1(x))
        mu =  self.linear2(x)
        sigma = torch.exp(self.linear3(x))
        
        # Re-paramaterization trick !
        z = mu + sigma*self.N.sample(mu.shape)

        # Kullback-Leibler Divergence
        self.kl = (sigma**2 + mu**2 - torch.log(sigma) - 1/2).sum()
        
        return z
    
class VariationalAutoencoder(nn.Module):
    def __init__(self, latent_dims):
        super(VariationalAutoencoder, self).__init__()
        self.encoder = VariationalEncoder(latent_dims)
        self.decoder = Decoder(latent_dims)
    
    def forward(self, x):
        z = self.encoder(x)
        return self.decoder(z)

In [56]:
def train(autoencoder, data, epochs=20):
    opt = torch.optim.Adam(autoencoder.parameters())
    for epoch in range(epochs):
        for x in data:
            x = x.to(device) # GPU
            opt.zero_grad()
            x_hat = autoencoder(x)
            loss = ((x - x_hat)**2).sum() + autoencoder.encoder.kl
            loss.backward()
            opt.step()
        print (f'Train epoch:{epoch} with previous loss {loss}');
    return autoencoder

In [67]:
from torchsummary import summary
vae = VariationalAutoencoder(latent_dims).to(device) # GPU
summary(vae, input_size=kp_output.shape)

RuntimeError: Placeholder storage has not been allocated on MPS device!

In [68]:
kp_output.shape

torch.Size([72, 1, 1, 17, 2])

In [69]:
vae = VariationalAutoencoder(latent_dims).to(device) # GPU
vae = train(vae, kp_output)

Train epoch:0 with previous loss 4.488303184509277
Train epoch:1 with previous loss 4.6505560874938965
Train epoch:2 with previous loss 3.9155688285827637
Train epoch:3 with previous loss 2.7445449829101562
Train epoch:4 with previous loss 2.9081649780273438
Train epoch:5 with previous loss 4.276193618774414
Train epoch:6 with previous loss 5.461904048919678
Train epoch:7 with previous loss 3.48353910446167
Train epoch:8 with previous loss 4.5926055908203125
Train epoch:9 with previous loss 2.5173659324645996
Train epoch:10 with previous loss 3.347444534301758
Train epoch:11 with previous loss 2.74702525138855
Train epoch:12 with previous loss 3.7668371200561523
Train epoch:13 with previous loss 2.7870984077453613
Train epoch:14 with previous loss 2.9270901679992676
Train epoch:15 with previous loss 3.0011959075927734
Train epoch:16 with previous loss 3.0060195922851562
Train epoch:17 with previous loss 2.4009690284729004
Train epoch:18 with previous loss 2.875269889831543
Train epoch: