In [1]:
import os
import numpy as np
from dataset import get_data, normalize
import sys
import torch
import torch.nn as nn
from torch.nn import functional as F
import torchvision
import torchvision.transforms as transforms
import time
from matplotlib import pyplot as plt
from sklearn.metrics import pairwise_distances
from torch.utils.data import TensorDataset, DataLoader
import scipy.spatial.distance as distance

cnt = 0

In [2]:
def compute_covariance_matrix(X):
    mean_vec = np.mean(X, axis=0)
    center_mat = X - mean_vec
    cov = np.dot(center_mat.T, center_mat) / (X.shape[0] - 1)
    return cov


def compute_eigenvalues_eigenvectors(cov):
    eig_val, eig_vec = np.linalg.eig(cov)
    idx = np.argsort(eig_val)[::-1]
    sort_eig_val = eig_val[idx]
    sort_eig_vec = eig_vec[:, idx]
    return sort_eig_val, sort_eig_vec

In [3]:
def visualize_features_pca_1(features, labels, cnt):
    cov = compute_covariance_matrix(features)
    eig_val, eig_vec = compute_eigenvalues_eigenvectors(cov)
    data = np.dot(features, eig_vec[:, :2])

    plt.figure(figsize=(10, 8))
    unique_labels = np.unique(labels)
    colors = plt.cm.get_cmap('tab10', len(unique_labels))

    for i, label in enumerate(unique_labels):
        idx = labels == label
        plt.scatter(data[idx, 0], data[idx, 1], c=colors(i), label=str(label))

    plt.legend()
    plt.xlabel('Principal Component 1')
    plt.ylabel('Principal Component 2')
    plt.title('Visualization of Conv2 layer using PCA')
    path = "./pca_conv2/"
    if not os.path.exists(path):
        os.mkdir(path)
    plt.savefig("./pca_conv2/" + str(cnt) + ".png")
    plt.show()
def visualize_features_pca_2(features, labels, cnt):
    cov = compute_covariance_matrix(features)
    eig_val, eig_vec = compute_eigenvalues_eigenvectors(cov)
    data = np.dot(features, eig_vec[:, :2])

    plt.figure(figsize=(10, 8))
    unique_labels = np.unique(labels)
    colors = plt.cm.get_cmap('tab10', len(unique_labels))

    for i, label in enumerate(unique_labels):
        idx = labels == label
        plt.scatter(data[idx, 0], data[idx, 1], c=colors(i), label=str(label))

    plt.legend()
    plt.xlabel('Principal Component 1')
    plt.ylabel('Principal Component 2')
    plt.title('Visualization of fc1 layer using PCA')
    path = "./pca_fc1/"
    if not os.path.exists(path):
        os.mkdir(path)
    plt.savefig("./pca_fc1/" + str(cnt) + ".png")
    plt.show()
def visualize_features_pca_3(features, labels, cnt):
    cov = compute_covariance_matrix(features)
    eig_val, eig_vec = compute_eigenvalues_eigenvectors(cov)
    data = np.dot(features, eig_vec[:, :2])

    plt.figure(figsize=(10, 8))
    unique_labels = np.unique(labels)
    colors = plt.cm.get_cmap('tab10', len(unique_labels))

    for i, label in enumerate(unique_labels):
        idx = labels == label
        plt.scatter(data[idx, 0], data[idx, 1], c=colors(i), label=str(label))

    plt.legend()
    plt.xlabel('Principal Component 1')
    plt.ylabel('Principal Component 2')
    plt.title('Visualization of the last layer using PCA')
    path = "./pca_final/"
    if not os.path.exists(path):
        os.mkdir(path)
    plt.savefig("./pca_final/" + str(cnt) + ".png")
    plt.show()

In [4]:
def compute_perplexity(dist, idx, beta):
    prob = np.exp(-dist * beta)
    prob[idx] = 0
    sum_prob = np.sum(prob)
    if sum_prob == 0:
        prob = np.maximum(prob, 1e-12)
        perp = -12
    else:
        prob /= sum_prob
        perp = 0
        for pj in prob:
            if pj != 0:
                perp += -pj * np.log(pj)
    return perp, prob


def compute_prob(x, perplexity=30.0):
    (n, d) = x.shape
    sum_x = np.sum(np.square(x), 1)
    dist = np.add(np.add(-2 * np.dot(x, x.T), sum_x).T, sum_x)
    pair_prob = np.zeros((n, n))
    beta = np.ones((n, 1))
    base_perp = np.log(perplexity)

    for i in range(n):
        mn = -np.inf
        mx = np.inf
        perp, this_prob = compute_perplexity(dist[i], i, beta[i])
        perp_diff = perp - base_perp
        tries = 0
        while np.abs(perp_diff) > 1e-5 and tries < 50:
            if perp_diff > 0:
                mn = beta[i].copy()
                if mx == np.inf or mx == -np.inf:
                    beta[i] = beta[i] * 2
                else:
                    beta[i] = (beta[i] + mx) / 2
            else:
                mx = beta[i].copy()
                if mn == np.inf or mn == -np.inf:
                    beta[i] = beta[i] / 2
                else:
                    beta[i] = (beta[i] + mn) / 2

            perp, this_prob = compute_perplexity(dist[i], i, beta[i])
            perp_diff = perp - base_perp
            tries = tries + 1
        pair_prob[i,] = this_prob
    return pair_prob

In [5]:
def tsne(X, n_components=2, perplexity=30.0, num_iter=500, lr=300):
    (n, d) = X.shape
    y = np.random.randn(n, n_components)
    dy = np.zeros((n, n_components))
    p1 = compute_prob(X, perplexity)
    p1 = ((p1 + np.transpose(p1)) / np.sum(p1)) * 4
    p1 = np.maximum(p1, 1e-12)

    for iter in range(num_iter):
        sum_y = np.sum(np.square(y), 1)
        num = 1 / (1 + np.add(np.add(-2 * np.dot(y, y.T), sum_y).T, sum_y))
        for i in range(n):
            num[i, i] = 0
        p2 = np.maximum(num / np.sum(num), 1e-12)
        l = p1 - p2
        for i in range(n):
            dy[i, :] = np.sum(np.tile(l[:, i] * num[:, i], (n_components, 1)).T * (y[i, :] - y), 0)
        y = y - lr * dy - np.tile(np.mean(y, 0), (n, 1))
        if iter == 100:
            p1 = p1 / 4
    return y

In [6]:
def visualize_features_tsne_1(features, labels, cnt):
    #print("begin")
    Y = tsne(features)
    plt.scatter(Y[:, 0], Y[:, 1], 20, labels, cmap="tab10")
    plt.legend()
    plt.xlabel('Principal Component 1')
    plt.ylabel('Principal Component 2')
    plt.title('Visualization of conv2 layer using t-SNE')
    path = "./tsne_conv2/"
    if not os.path.exists(path):
        os.mkdir(path)
    plt.savefig("./tsne_conv2/" + str(cnt) + ".png")
    plt.show()
def visualize_features_tsne_2(features, labels, cnt):
    Y = tsne(features)
    plt.scatter(Y[:, 0], Y[:, 1], 20, labels, cmap="tab10")
    plt.legend()
    plt.xlabel('Principal Component 1')
    plt.ylabel('Principal Component 2')
    plt.title('Visualization of fc1 layer using t-SNE')
    path = "./tsne_fc1/"
    if not os.path.exists(path):
        os.mkdir(path)
    plt.savefig("./tsne_fc1/" + str(cnt) + ".png")
    plt.show()
def visualize_features_tsne_3(features, labels, cnt):
    Y = tsne(features)
    plt.scatter(Y[:, 0], Y[:, 1], 20, labels, cmap="tab10")
    plt.legend()
    plt.xlabel('Principal Component 1')
    plt.ylabel('Principal Component 2')
    plt.title('Visualization of the last layer using t-SNE')
    path = "./tsne_final/"
    if not os.path.exists(path):
        os.mkdir(path)
    plt.savefig("./tsne_final/" + str(cnt) + ".png")
    plt.show()

In [7]:
def visualize_features_1(features, labels):
    visualize_features_tsne_1(features, labels, cnt)
    visualize_features_pca_1(features,labels, cnt)
def visualize_features_2(features, labels):
    visualize_features_tsne_2(features, labels, cnt)
    visualize_features_pca_2(features,labels, cnt)
def visualize_features_3(features, labels):
    visualize_features_tsne_3(features, labels, cnt)
    visualize_features_pca_3(features,labels, cnt)

In [8]:
if __name__ == '__main__':
    ######################## Get train/test dataset ########################
    X_train, X_test, Y_train, Y_test = get_data('dataset')
    ########################################################################


In [9]:
    train_dataset = TensorDataset(torch.from_numpy(X_train).float(), torch.from_numpy(Y_train).long())
    test_dataset = TensorDataset(torch.from_numpy(X_test).float(), torch.from_numpy(Y_test).long())

    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)


In [10]:
    class LeNet(nn.Module):
        def __init__(self):
            super(LeNet, self).__init__()
            self.conv1 = nn.Conv2d(1, 6, 5)
            self.pool1 = nn.MaxPool2d(2, 2)
            self.conv2 = nn.Conv2d(6, 16, 5)
            self.pool2 = nn.MaxPool2d(2, 2)
            self.fc1 = nn.Linear(16 * 5 * 5, 120)
            self.fc2 = nn.Linear(120, 84)
            self.fc3= nn.Linear(84, 10)

        def forward(self, x):
            x = self.pool1(F.relu(self.conv1(x)))
            x = self.pool2(F.relu(self.conv2(x)))
            con2 = x
            x = x.view(-1, 16 * 5 * 5)
            x = F.relu(self.fc1(x))
            fc1 = x
            x = F.relu(self.fc2(x))
            x = self.fc3(x)
            return con2, fc1, x

    class MyNet(nn.Module):
        def __init__(self):
            super(MyNet, self).__init__()
            self.conv1 = nn.Conv2d(1, 32, 5)
            self.bn1 = nn.BatchNorm2d(32)
            self.pool1 = nn.MaxPool2d(2, 2)
            self.conv2 = nn.Conv2d(32, 64, 5)
            self.bn2 = nn.BatchNorm2d(64)
            self.pool2 = nn.MaxPool2d(2, 2)
            self.fc1 = nn.Linear(64 * 5 * 5, 512)
            self.fc2 = nn.Linear(512, 256)
            self.fc3= nn.Linear(256, 10)

        def forward(self, x):
            x = self.pool1(self.bn1(F.relu(self.conv1(x))))
            x = self.pool2(self.bn2(F.relu(self.conv2(x))))
            con2 = x
            x = x.view(-1, 64 * 5 * 5)
            x = F.relu(self.fc1(x))
            fc1 = x
            x = F.relu(self.fc2(x))
            x = self.fc3(x)
            return con2, fc1, x

In [11]:
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    net = MyNet().to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(net.parameters(), lr=0.0025, momentum=0.9)


    loss_list = []
    acc_list = []
    loss_test_list = []
    acc_test_list = []
    num_epoch = 150
    for epoch in range(num_epoch):
        correct = 0
        total = 0
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = net(inputs)[-1]
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            running_loss += loss.item()
        train_loss = running_loss / len(train_loader)
        train_acc = correct / total

        acc_list.append(train_acc)
        loss_list.append(train_loss)
        net.eval()

        correct = 0
        total = 0
        testing_loss = 0.0

        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = net(inputs)[-1]
                loss = criterion(outputs, labels)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
                testing_loss += loss.item()
        test_loss = testing_loss / len(test_loader)
        test_acc = correct / total
        acc_test_list.append(test_acc)
        loss_test_list.append(test_loss)
        print('Epoch [{}/{}], Train Loss: {:.4f}, Train Acc: {:.3f}, Test Loss: {:.4f}, Test Acc: {:.3f}'
          .format(epoch+1, num_epoch, train_loss, train_acc, test_loss, test_acc))

Epoch [1/150], Train Loss: 2.1627, Train Acc: 0.338, Test Loss: 2.2589, Test Acc: 0.260
Epoch [2/150], Train Loss: 2.1915, Train Acc: 0.419, Test Loss: 2.0883, Test Acc: 0.469
Epoch [3/150], Train Loss: 1.8943, Train Acc: 0.476, Test Loss: 1.6261, Test Acc: 0.414
Epoch [4/150], Train Loss: 1.3428, Train Acc: 0.516, Test Loss: 1.1154, Test Acc: 0.581
Epoch [5/150], Train Loss: 1.0673, Train Acc: 0.583, Test Loss: 1.0206, Test Acc: 0.616
Epoch [6/150], Train Loss: 0.9649, Train Acc: 0.637, Test Loss: 0.9108, Test Acc: 0.643
Epoch [7/150], Train Loss: 0.8923, Train Acc: 0.659, Test Loss: 0.8451, Test Acc: 0.685
Epoch [8/150], Train Loss: 0.8180, Train Acc: 0.697, Test Loss: 0.7975, Test Acc: 0.712
Epoch [9/150], Train Loss: 0.8283, Train Acc: 0.694, Test Loss: 0.8572, Test Acc: 0.686
Epoch [10/150], Train Loss: 0.7615, Train Acc: 0.732, Test Loss: 0.8100, Test Acc: 0.708
Epoch [11/150], Train Loss: 0.7141, Train Acc: 0.733, Test Loss: 0.7278, Test Acc: 0.736
Epoch [12/150], Train Loss: 0.

KeyboardInterrupt: 

In [None]:
        path = "./loss/"
        if not os.path.exists(path):
            os.mkdir(path)

        plt.plot(loss_list, label='Train Loss')
        plt.plot(loss_test_list, label='Test Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.title('Loss')
        plt.legend()

        plt.savefig("./loss/" + "loss_MyNet.png")
        plt.show()

        plt.plot(acc_list, label='Train Accuracy')
        plt.plot(acc_test_list, label='Test Accuracy')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.title('Accuracy')
        plt.legend()
        plt.savefig("./loss/" + "Acccuracy_MyNet.png")
        plt.show()

In [None]:
    net.eval()
    loss_test_list = []
    acc_test_list = []
    with torch.no_grad():
        correct = 0
        total = 0
        running_loss = 0.0
        features_list_con2 = []
        features_list_fc1 = []
        features_list_final = []
        labels_list = []
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            con2, fc1, final = net(inputs)
            features_con2 = con2.reshape(con2.shape[0], -1)
            features_fc1 = fc1.reshape(fc1.shape[0], -1)
            features_final = final.reshape(final.shape[0], -1)
            features_np_1 = features_con2.cpu().numpy()
            features_np_2 = features_fc1.cpu().numpy()
            features_np_3 = features_final.cpu().numpy()
            features_list_con2.append(features_np_1)
            features_list_fc1.append(features_np_2)
            features_list_final.append(features_np_3)
            labels_list.append(labels.cpu().numpy())
        #
        #     outputs = net(inputs)[-1]
        #     _, predicted = torch.max(outputs.data, 1)
        #     total += labels.size(0)
        #     correct += (predicted == labels).sum().item()
        #     running_loss += loss.item()
        #
        # print(f'ecoch {epoch+1}, loss {running_loss / len(train_loader):.3f}, train acc {correct / total:.3f}')
        # acc_list.append(correct / total)
        # loss_list.append(running_loss / len(train_loader))
        # acc = correct / total
        # print("accuracy:", acc)

In [None]:
    features_arr_1 = np.concatenate(features_list_con2, axis=0)
    features_arr_2 = np.concatenate(features_list_fc1, axis=0)
    features_arr_3 = np.concatenate(features_list_final, axis=0)
    labels_arr = np.concatenate(labels_list, axis=0)
    cnt += 1

In [None]:
    visualize_features_1(features_arr_1, labels_arr)

In [None]:
    visualize_features_2(features_arr_2, labels_arr)

In [None]:
    visualize_features_3(features_arr_3, labels_arr)
