In [78]:
import numpy as np
import torch

In [79]:
from torch.utils.data import Dataset


class TripletGestureDataset(Dataset):
    fingers = [
        [0, 1, 2, 3, 4],
        [0, 5, 6, 7, 8],
        [0, 9, 10, 11, 12],
        [0, 13, 14, 15, 16],
        [0, 17, 18, 19, 20]
    ]

    def __init__(self, data: np.ndarray, labels: np.ndarray):
        """
        初始化函数。
        :param data: 形状为(n, 21, 3)的numpy数组，n为样本数量。
        :param labels: 形状为(n, num_classes)的numpy数组，每个标签为one-hot编码。
        """

        # ## 数据增强
        # data = np.abs(data)
        # data = (data - np.min(data)) / (np.max(data) - np.min(data))

        ## add the two nearby length in each finger
        for finger in self.fingers:
            for i in range(len(finger) - 1):
                dist = data[:, finger[i + 1]] - data[:, finger[i]]
                # add a new dimension
                dist = np.expand_dims(dist, axis=1)
                data = np.concatenate((data, dist), axis=1)

        self.data = torch.tensor(data, dtype=torch.float32)
        # 将one-hot编码的标签转换为类别索引
        self.labels = np.argmax(labels, axis=1)
        # 预计算每个类别的索引列表，以便快速随机选择样本
        self.indices = [np.where(self.labels == i)[0] for i in np.unique(self.labels)]

    def __len__(self):
        """
        返回数据集中的样本数。
        """
        return len(self.data)

    def __getitem__(self, idx):
        """
        从数据集中获取一个样本及其正例和负例。
        :param idx: 锚点样本的索引。
        """
        anchor = self.data[idx]
        label = self.labels[idx]

        # 选择正例，即同一类别中的另一个样本
        positive_index = idx
        while positive_index == idx:  # 确保正例不是锚点本身
            positive_index = np.random.choice(self.indices[label])
        positive = self.data[positive_index]

        # 选择负例，即不同类别的样本
        negative_label = np.random.choice([l for l in range(len(self.indices)) if l != label])
        negative_index = np.random.choice(self.indices[negative_label])
        negative = self.data[negative_index]

        return anchor, positive, negative, label

    def get_input_dim(self):
        return self.data.shape[1] * self.data.shape[2]

In [80]:
from torch.utils.data import DataLoader

## load the data
raw_data = np.load('./dataset/8class_dataset_100k.npz')
train_data, train_label, test_data, test_label = raw_data['train_data'], raw_data['train_label'], raw_data['test_data'], \
    raw_data['test_label']

train_dataset = TripletGestureDataset(train_data, train_label)
test_dataset = TripletGestureDataset(test_data, test_label)

train_loader = DataLoader(train_dataset, batch_size=1024, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1024, shuffle=False)

data_dim = train_dataset.get_input_dim()
print(data_dim)

123


In [81]:
import torch.nn as nn
import torch.optim as optim


# 初始化模型, 损失函数, 和优化器
class TripletAutoencoder(nn.Module):
    def __init__(self, input_dim, latent_dim=2):
        super(TripletAutoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(True),
            nn.Linear(128, 256),
            nn.ReLU(True),
            nn.Linear(256, latent_dim),
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 256),
            nn.ReLU(True),
            nn.Linear(256, 128),
            # nn.Dropout(0.5),
            nn.ReLU(True),
            nn.Linear(128, input_dim),
            nn.Tanh(),
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return encoded, decoded


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = TripletAutoencoder(input_dim=data_dim, latent_dim=2).to(device)

reconstruction_loss = nn.MSELoss()
triplet_loss = nn.TripletMarginLoss(margin=1.0)

optimizer = optim.Adam(model.parameters(), lr=0.01)

In [82]:
## summary the model
from torchsummary import summary

summary(model, (data_dim,), 1024, "cuda")

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Linear-1                [1024, 128]          15,872
              ReLU-2                [1024, 128]               0
            Linear-3                [1024, 256]          33,024
              ReLU-4                [1024, 256]               0
            Linear-5                  [1024, 2]             514
            Linear-6                [1024, 256]             768
              ReLU-7                [1024, 256]               0
            Linear-8                [1024, 128]          32,896
              ReLU-9                [1024, 128]               0
           Linear-10                [1024, 123]          15,867
             Tanh-11                [1024, 123]               0
Total params: 98,941
Trainable params: 98,941
Non-trainable params: 0
----------------------------------------------------------------
Input size (MB): 0.48
Forward/ba

In [83]:
from tqdm.notebook import tqdm

model.train()

num_epochs = 10

for epoch in range(num_epochs):
    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch + 1}/{num_epochs}")
    total_loss = 0
    total_r_loss = 0
    total_t_loss = 0

    for anchors, positives, negatives, _ in progress_bar:
        # 将数据移动到GPU上
        anchors, positives, negatives = anchors.to(device), positives.to(device), negatives.to(device)

        # 将数据展平
        anchors, positives, negatives = anchors.view(anchors.size(0), -1), positives.view(positives.size(0),
                                                                                          -1), negatives.view(
            negatives.size(0), -1)

        # 获取编码和解码的输出
        anchor_encoded, anchor_decoded = model(anchors)
        positive_encoded, _ = model(positives)
        negative_encoded, _ = model(negatives)

        # 计算重构损失和Triplet损失
        r_loss = reconstruction_loss(anchor_decoded, anchors)
        t_loss = triplet_loss(anchor_encoded, positive_encoded, negative_encoded)
        loss = r_loss + t_loss
        # loss = t_loss
        # loss = r_loss

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # 累计损失用于计算平均损失
        total_loss += loss.item()
        total_r_loss += r_loss.item()
        total_t_loss += t_loss.item()

        progress_bar.desc = f"Epoch {epoch + 1}/{num_epochs}"
        progress_bar.set_postfix({
            'total_loss': f'{total_loss / (progress_bar.n + 1):.4f}',
            'recon_loss': f'{total_r_loss / (progress_bar.n + 1):.4f}',
            'triplet_loss': f'{total_t_loss / (progress_bar.n + 1):.4f}'
        })

Epoch 1/10:   0%|          | 0/98 [00:00<?, ?it/s]

Epoch 2/10:   0%|          | 0/98 [00:00<?, ?it/s]

Epoch 3/10:   0%|          | 0/98 [00:00<?, ?it/s]

Epoch 4/10:   0%|          | 0/98 [00:00<?, ?it/s]

Epoch 5/10:   0%|          | 0/98 [00:00<?, ?it/s]

Epoch 6/10:   0%|          | 0/98 [00:00<?, ?it/s]

Epoch 7/10:   0%|          | 0/98 [00:00<?, ?it/s]

Epoch 8/10:   0%|          | 0/98 [00:00<?, ?it/s]

Epoch 9/10:   0%|          | 0/98 [00:00<?, ?it/s]

Epoch 10/10:   0%|          | 0/98 [00:00<?, ?it/s]

In [84]:
## run the test samples through the encoder

model.eval()

test_labels = []
test_latent = []

for inputs, _, _, labels in test_loader:
    inputs = inputs.to(device)
    inputs = inputs.view(-1, data_dim)
    outputs = model.encoder(inputs)
    test_latent.append(outputs.cpu().detach().numpy())
    test_labels.append(labels.cpu().detach().numpy())

test_latent = np.concatenate(test_latent, axis=0)
test_labels = np.concatenate(test_labels, axis=0)

print(test_latent.shape, test_labels.shape)

(50000, 2) (50000,)


In [85]:
## calculate the centroid of each class

centroids = np.zeros((11, 2))
for i in range(11):
    idx = test_labels == i
    centroids[i] = np.mean(test_latent[idx], axis=0)

In [86]:
## plot the latent space
%matplotlib tk
import matplotlib
import matplotlib.pyplot as plt

labels = np.unique(test_labels)
label_map = {
    0: 'call',
    1: 'dislike',
    2: 'fist',
    3: 'like',
    4: 'ok',
    5: 'one',
    6: 'palm',
    7: 'peace',
    8: 'rock',
    9: 'three',
    10: 'three2',
}

fig, ax = plt.subplots()

ax.set_title('Latent Space')

for i, label in enumerate(labels):
    idx = test_labels == label

    ax.scatter(test_latent[idx, 0],
               test_latent[idx, 1],
               c=np.array(matplotlib.colormaps['tab20'].colors[i]).reshape(1, -1),
               label=f"{label} {label_map[label]}",
               alpha=0.5)

    ax.scatter(centroids[i, 0], centroids[i, 1], c='black', marker='x', s=100)

ax.legend()
plt.show()

In [87]:
from sklearn.cluster import KMeans
from sklearn.metrics import confusion_matrix

# Initialize and fit KMeans
kmeans = KMeans(n_clusters=11, random_state=0).fit(test_latent)

# Predict the cluster IDs for each data point
cluster_ids = kmeans.predict(test_latent)

In [88]:
# Colors from a colormap
colors = matplotlib.cm.get_cmap('tab20', 11)

fig, ax = plt.subplots()
ax.set_title('Latent Space with K-Means Clustering')

for cluster in range(11):
    idx = cluster_ids == cluster
    ax.scatter(test_latent[idx, 0],
               test_latent[idx, 1],
               c=np.array(matplotlib.colormaps['tab20'].colors[cluster]).reshape(1, -1),
               label=f"Cluster {cluster}",
               alpha=0.5)

ax.legend()
# plt.show()

  colors = matplotlib.cm.get_cmap('tab20', 11)


<matplotlib.legend.Legend at 0x1c7e5353340>

In [89]:

# Calculate clustering accuracy
# We need to find the best match between cluster labels and true labels
def clustering_accuracy(true_labels, cluster_labels):
    # Confusion matrix between true labels and cluster labels
    matrix = confusion_matrix(true_labels, cluster_labels)
    # Summing the highest values in each column of the confusion matrix
    max_matches = np.sum(np.max(matrix, axis=0))
    accuracy = max_matches / len(true_labels)
    return matrix, accuracy


# Calculate and print the clustering accuracy
matrix, accuracy = clustering_accuracy(test_labels, cluster_ids)
print(f"Clustering Accuracy: {accuracy:.4f}")
# print("Confusion Matrix:")
# print(matrix)

Clustering Accuracy: 0.8105


In [90]:
# ## save model
# torch.save(model.state_dict(), './model/triplet_autoencoder_8934.pth')
# 
# ## save the centroids
# np.save('./model/triplet_autoencoder_8934_centroids.npy', centroids)