In [None]:

from tqdm import tqdm
import torch
import torch.nn as nn
import os
import sys

script_dir = os.getcwd()
# Get the path of the parent directory
parent_dir = os.path.dirname(script_dir)
# Add the parent directory to the Python path
sys.path.append(parent_dir)
import numpy as np

from models.heads import Classifier
from models.stgcn import STGCN
from utils.data_processing import Preprocess_Module
from pytorch_metric_learning import losses


transform = Preprocess_Module(data_augmentation=False)

from network import STGCN_Classifier_Metric


sample_folder = "sample_folder here"

backbone_cfg = {
    'type': 'STGCN',
    'gcn_adaptive': 'init',
    'gcn_with_res': True,
    'tcn_type': 'mstcn',
    'num_stages': 10,
    'inflate_stages': [5, 8],
    'down_stages': [5, 8],
    'graph_cfg': {
        'layout': 'coco',
        'mode': 'spatial'
    },
    'pretrained': None
}

model = STGCN_Classifier_Metric(backbone=backbone_cfg, num_classes=4)

device = 'cuda:0'


def load_sample(video, clip):
    data_file_path = os.path.join(sample_folder, str(video), str(video) + '_' + str(clip) + '.npy')
    data = np.load(data_file_path)
    tmp_dict = {}
    tmp_dict['img_shape'] = (320, 480)
    tmp_dict['label'] = -1
    tmp_dict['start_index'] = 0
    tmp_dict['modality'] = 'Pose'
    tmp_dict['total_frames'] = 124
    data = np.where(data == 0, 1e-4, data)
    data[np.isnan(data)] = 1e-4
    tmp_dict['keypoint'] = data[np.newaxis, :, :, :2]
    tmp_dict['keypoint'] = np.tile(tmp_dict['keypoint'], (2, 1, 1, 1))
    tmp_dict['keypoint_score'] = data[np.newaxis, :, :, 2] #because we do not have class 0
    tmp_dict['keypoint_score'] = np.tile(tmp_dict['keypoint_score'], (2, 1, 1))
    
    data = transform(tmp_dict)
    data = data['keypoint'][0]

    data = data.numpy()
    return data



In [None]:
# now we load the model

model_weights_dir = './sampled_models_triplet/model_' + str(0.1) + '_' + str(0) + '.pth'

# Load pre-trained weights to the backbone
state_dict = os.path.join(script_dir, model_weights_dir)
# load_checkpoint(model.backbone, backbone_state_dict)
tmp = torch.load(state_dict)
model.load_state_dict(tmp, strict=True)

model = model.to(device)
model.eval()

In [None]:
# load all test data


test_dataset_file = 'test_dataset14.npy'

test_data = np.load(test_dataset_file)


test_data_dict = {}
for i in range(len(test_data)):
    label, _, video, clip = test_data[i]
    if video not in test_data_dict:
        test_data_dict[video] = {}
        test_data_dict[video]['label'] = label - 1
        test_data_dict[video]['clip'] = []
    test_data_dict[video]['clip'].append(clip)


In [None]:
# compute embeddings for training data
training_dataset_file = 'sampled_trainingset/sampled_' + str(0.1) + '_' +str(0) + '.npy'
training_data = np.load(training_dataset_file)
embedding_dict = {}

print('computing embeddings')
for i in tqdm(range(len(training_data))):
    label, _, video, clip = training_data[i]
    label = label - 1

    if label not in embedding_dict:
        embedding_dict[label] = []
    #load the sample 
    sample = load_sample(video, clip)
    #get the embedding
    sample = torch.from_numpy(sample).float().to(device)
    with torch.no_grad():
        sample_result = model(sample.unsqueeze(0))
    # normalize the embedding using F.normalize
    sample_result = nn.functional.normalize(sample_result, p=2, dim=1)
    sample_result = sample_result.cpu().detach().numpy().squeeze(0)
    embedding_dict[label].append(sample_result)

#convert each dict item to array
for key in embedding_dict:
    embedding_dict[key] = np.array(embedding_dict[key])

# the shape of each array is (num_samples, 512)

#build a dict of embeddings mapping to label
lebel_dict = {}
for key in embedding_dict:
    for i in range(len(embedding_dict[key])):
        lebel_dict[tuple(embedding_dict[key][i])] = key

# get a 2d numpy array to put all embeddings together
all_embeddings = np.zeros((0, 256))
all_labels = []
for key in embedding_dict:
    all_embeddings = np.concatenate((all_embeddings, embedding_dict[key]), axis=0)
    for i in range(len(embedding_dict[key])):
        all_labels.append(key)
all_labels = np.array(all_labels)

#calculate the center of each class
centers = np.zeros((4, 256))
for key in embedding_dict:
    centers[key] = np.mean(embedding_dict[key], axis=0)

In [None]:
#calculate the embeddings of testing data


print('computing embeddings for testing data')
test_embedding_dict = {}
test_label_dict = {}
for key in test_data_dict:
    label = test_data_dict[key]['label']
    clips = test_data_dict[key]['clip']
    for c in clips:
        # load the sample
        sample = load_sample(key, c)
        # get the embedding
        sample = torch.from_numpy(sample).float().to(device)
        with torch.no_grad():
            sample_result = model(sample.unsqueeze(0))
        # normalize the embedding using F.normalize
        sample_result = nn.functional.normalize(sample_result, p=2, dim=1)
        sample_result = sample_result.cpu().detach().numpy().squeeze(0)
        if key not in test_embedding_dict:
            test_embedding_dict[key] = []
            test_label_dict[key] = label
        test_embedding_dict[key].append(sample_result)

        

In [None]:
# save training embeddings, labels, all embeddings, centers, 
# and testing embeddings, including the label
# save everything in a single file


save_dict = {}
save_dict['training_embedding_dict'] = embedding_dict
save_dict['training_label_dict'] = lebel_dict
save_dict['all_embeddings'] = all_embeddings
save_dict['all_labels'] = all_labels
save_dict['centers'] = centers
save_dict['test_embedding_dict'] = test_embedding_dict
save_dict['test_label_dict'] = test_label_dict

save_file = 'embeddings.npy'
np.save(save_file, save_dict)
