In [2]:
import torch
import torch.nn as nn
import numpy as np
import h5py

# Load Data

In [3]:
x = h5py.File('datasets/single_act.hdf5', 'r')
x_train, x_test, y_train, y_test = [x['train_img'], x['test_img'], x['train_labels'], x['test_labels']]

print(x_train.shape)
print(y_train.shape)
print(x_test.shape)
print(y_test.shape)

(380, 128, 128, 3)
(380, 9)
(96, 128, 128, 3)
(96, 9)


In [4]:
num_class = y_test.shape[-1]
print('Num. class: ', num_class)

Num. class:  9


## Patch Embedding module

In [5]:
class PatchEmbed(nn.Module):
    """Split image into patches and then embed them.
    Parameters
    ----------
    img_size : int
        Size of the image (it is a square).
    patch_size : int
        Size of the patch (it is a square).
    in_chans : int
        Number of input channels.
    embed_dim : int
        The emmbedding dimension.
    Attributes
    ----------
    n_patches : int
        Number of patches inside of our image.
    proj : nn.Conv2d
        Convolutional layer that does both the splitting into patches
        and their embedding.
    """
    def __init__(self, img_size, patch_size, in_chans=3, embed_dim=768):
        super().__init__()
        self.img_size = img_size
        self.patch_size = patch_size
        self.n_patches = (img_size // patch_size) ** 2

        self.proj = nn.Conv2d(
                in_chans,
                embed_dim,
                kernel_size=patch_size,
                stride=patch_size,
        )

    def forward(self, x):
        """Run forward pass.
        Parameters
        ----------
        x : torch.Tensor
            Shape `(n_samples, in_chans, img_size, img_size)`.
        Returns
        -------
        torch.Tensor
            Shape `(n_samples, n_patches, embed_dim)`.
        """
        x = self.proj(x)  # (n_samples, embed_dim, n_patches ** 0.5, n_patches ** 0.5)
        x = x.flatten(2)  # (n_samples, embed_dim, n_patches)
        x = x.transpose(1, 2)  # (n_samples, n_patches, embed_dim)

        return x

## Attention Mechanism

In [6]:
class Attention(nn.Module):
    """Attention mechanism.
    
    Parameters
    ----------
    dim : int
        The input and out dimension of per token features.
    n_heads : int
        Number of attention heads.
    qkv_bias : bool
        If True then we include bias to the query, key and value projections.
    attn_p : float
        Dropout probability applied to the query, key and value tensors.
    proj_p : float
        Dropout probability applied to the output tensor.
        
    Attributes
    ----------
    scale : float
        Normalizing consant for the dot product.
    qkv : nn.Linear
        Linear projection for the query, key and value.
    proj : nn.Linear
        Linear mapping that takes in the concatenated output of all attention
        heads and maps it into a new space.
    attn_drop, proj_drop : nn.Dropout
        Dropout layers.
    """
    def __init__(self, dim, n_heads=8, qkv_bias=True, attn_p=0., proj_p=0.):
        super().__init__()
        self.n_heads = n_heads
        self.dim = dim
        self.head_dim = dim // n_heads
        self.scale = self.head_dim ** -0.5

        self.qkv = nn.Linear(dim, dim * 3, bias=qkv_bias)
        self.attn_drop = nn.Dropout(attn_p)
        self.proj = nn.Linear(dim, dim)
        self.proj_drop = nn.Dropout(proj_p)

    def forward(self, x):
        """Run forward pass.
        Parameters
        ----------
        x : torch.Tensor
            Shape `(n_samples, n_patches + 1, dim)`.
        Returns
        -------
        torch.Tensor
            Shape `(n_samples, n_patches + 1, dim)`.
        """
        n_samples, n_tokens, dim = x.shape

        if dim != self.dim:
            raise ValueError

        qkv = self.qkv(x)  # (n_samples, n_patches + 1, 3 * dim)
        qkv = qkv.reshape(
                n_samples, n_tokens, 3, self.n_heads, self.head_dim
        )  # (n_smaples, n_patches + 1, 3, n_heads, head_dim)
        qkv = qkv.permute(
                2, 0, 3, 1, 4
        )  # (3, n_samples, n_heads, n_patches + 1, head_dim)

        q, k, v = qkv[0], qkv[1], qkv[2]
        k_t = k.transpose(-2, -1)  # (n_samples, n_heads, head_dim, n_patches + 1)
        dp = (
           q @ k_t
        ) * self.scale # (n_samples, n_heads, n_patches + 1, n_patches + 1)
        attn = dp.softmax(dim=-1)  # (n_samples, n_heads, n_patches + 1, n_patches + 1)
        attn = self.attn_drop(attn)

        weighted_avg = attn @ v  # (n_samples, n_heads, n_patches +1, head_dim)
        weighted_avg = weighted_avg.transpose(
                1, 2
        )  # (n_samples, n_patches + 1, n_heads, head_dim)
        weighted_avg = weighted_avg.flatten(2)  # (n_samples, n_patches + 1, dim)

        x = self.proj(weighted_avg)  # (n_samples, n_patches + 1, dim)
        x = self.proj_drop(x)  # (n_samples, n_patches + 1, dim)

        return x

## MLP

In [7]:
class MLP(nn.Module):
    """Multilayer perceptron.
    Parameters
    ----------
    in_features : int
        Number of input features.
    hidden_features : int
        Number of nodes in the hidden layer.
    out_features : int
        Number of output features.
    p : float
        Dropout probability.
    Attributes
    ----------
    fc : nn.Linear
        The First linear layer.
    act : nn.GELU
        GELU activation function.
    fc2 : nn.Linear
        The second linear layer.
    drop : nn.Dropout
        Dropout layer.
    """
    def __init__(self, in_features, hidden_features, out_features, p=0.):
        super().__init__()
        self.fc1 = nn.Linear(in_features, hidden_features)
        self.act = nn.GELU()
        self.fc2 = nn.Linear(hidden_features, out_features)
        self.drop = nn.Dropout(p)

    def forward(self, x):
        """Run forward pass.
        Parameters
        ----------
        x : torch.Tensor
            Shape `(n_samples, n_patches + 1, in_features)`.
        Returns
        -------
        torch.Tensor
            Shape `(n_samples, n_patches +1, out_features)`
        """
        x = self.fc1(
                x
        ) # (n_samples, n_patches + 1, hidden_features)
        x = self.act(x)  # (n_samples, n_patches + 1, hidden_features)
        x = self.drop(x)  # (n_samples, n_patches + 1, hidden_features)
        x = self.fc2(x)  # (n_samples, n_patches + 1, out_features)
        x = self.drop(x)  # (n_samples, n_patches + 1, out_features)

        return x

## Block

In [8]:
class Block(nn.Module):
    """Transformer block.
    Parameters
    ----------
    dim : int
        Embeddinig dimension.
    n_heads : int
        Number of attention heads.
    mlp_ratio : float
        Determines the hidden dimension size of the `MLP` module with respect
        to `dim`.
    qkv_bias : bool
        If True then we include bias to the query, key and value projections.
    p, attn_p : float
        Dropout probability.
    Attributes
    ----------
    norm1, norm2 : LayerNorm
        Layer normalization.
    attn : Attention
        Attention module.
    mlp : MLP
        MLP module.
    """
    def __init__(self, dim, n_heads, mlp_ratio=4.0, qkv_bias=True, p=0., attn_p=0.):
        super().__init__()
        self.norm1 = nn.LayerNorm(dim, eps=1e-6)
        self.attn = Attention(
                dim,
                n_heads=n_heads,
                qkv_bias=qkv_bias,
                attn_p=attn_p,
                proj_p=p
        )
        self.norm2 = nn.LayerNorm(dim, eps=1e-6)
        hidden_features = int(dim * mlp_ratio)
        self.mlp = MLP(
                in_features=dim,
                hidden_features=hidden_features,
                out_features=dim,
        )

    def forward(self, x):
        """Run forward pass.
        Parameters
        ----------
        x : torch.Tensor
            Shape `(n_samples, n_patches + 1, dim)`.
        Returns
        -------
        torch.Tensor
            Shape `(n_samples, n_patches + 1, dim)`.
        """
        x = x + self.attn(self.norm1(x))
        x = x + self.mlp(self.norm2(x))

        return x

## Vision Transformer

In [22]:
class VisionTransformer(nn.Module):
    """Simplified implementation of the Vision transformer.
    Parameters
    ----------
    img_size : int
        Both height and the width of the image (it is a square).
    patch_size : int
        Both height and the width of the patch (it is a square).
    in_chans : int
        Number of input channels.
    n_classes : int
        Number of classes.
    embed_dim : int
        Dimensionality of the token/patch embeddings.
    depth : int
        Number of blocks.
    n_heads : int
        Number of attention heads.
    mlp_ratio : float
        Determines the hidden dimension of the `MLP` module.
    qkv_bias : bool
        If True then we include bias to the query, key and value projections.
    p, attn_p : float
        Dropout probability.
    Attributes
    ----------
    patch_embed : PatchEmbed
        Instance of `PatchEmbed` layer.
    cls_token : nn.Parameter
        Learnable parameter that will represent the first token in the sequence.
        It has `embed_dim` elements.
    pos_emb : nn.Parameter
        Positional embedding of the cls token + all the patches.
        It has `(n_patches + 1) * embed_dim` elements.
    pos_drop : nn.Dropout
        Dropout layer.
    blocks : nn.ModuleList
        List of `Block` modules.
    norm : nn.LayerNorm
        Layer normalization.
    """
    def __init__(
            self,
            img_size=128,
            patch_size=16,
            in_chans=3,
            n_classes=num_class,
            embed_dim=256,
            depth=4,  # 12
            n_heads=8, # 12
            mlp_ratio=4.,
            qkv_bias=True,
            p=0.,
            attn_p=0.,
    ):
        super().__init__()

        self.patch_embed = PatchEmbed(
                img_size=img_size,
                patch_size=patch_size,
                in_chans=in_chans,
                embed_dim=embed_dim,
        )
        self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim))
        self.pos_embed = nn.Parameter(
                torch.zeros(1, 1 + self.patch_embed.n_patches, embed_dim)
        )
        self.pos_drop = nn.Dropout(p=p)

        self.blocks = nn.ModuleList(
            [
                Block(
                    dim=embed_dim,
                    n_heads=n_heads,
                    mlp_ratio=mlp_ratio,
                    qkv_bias=qkv_bias,
                    p=p,
                    attn_p=attn_p,
                )
                for _ in range(depth)
            ]
        )

        self.norm = nn.LayerNorm(embed_dim, eps=1e-6)
        self.head = nn.Linear(embed_dim, n_classes)


    def forward(self, x):
        """Run the forward pass.
        Parameters
        ----------
        x : torch.Tensor
            Shape `(n_samples, in_chans, img_size, img_size)`.
        Returns
        -------
        logits : torch.Tensor
            Logits over all the classes - `(n_samples, n_classes)`.
        """
        n_samples = x.shape[0]
        x = self.patch_embed(x)

        cls_token = self.cls_token.expand(
                n_samples, -1, -1
        )  # (n_samples, 1, embed_dim)
        x = torch.cat((cls_token, x), dim=1)  # (n_samples, 1 + n_patches, embed_dim)
        x = x + self.pos_embed  # (n_samples, 1 + n_patches, embed_dim)
        x = self.pos_drop(x)

        for block in self.blocks:
            x = block(x)

        x = self.norm(x)

        cls_token_final = x[:, 0]  # just the CLS token
        x = self.head(cls_token_final)

        return x

## Create model

In [23]:
custom_config = {
        "img_size": x_test.shape[1],
        "in_chans": x_test.shape[-1],
        "patch_size": 16,
        "embed_dim": x_test.shape[1]*2,
        "depth": 4,  # 12
        "n_heads": 8,  # 12
        "qkv_bias": True,
        "mlp_ratio": 4,
}

model_custom = VisionTransformer(**custom_config)
# test
inp = torch.rand(1, 3, 128, 128)
res_c = model_custom(inp)
res_c.shape

torch.Size([1, 9])

## Train

In [63]:
def train(model, model_name, epochs, batch_size):
    
    myloss = nn.CrossEntropyLoss()
    train_acc, test_acc = list(), list()
    patience = 20
    max_test_acc = 0.
    train_losses, test_losses = list(), list()
    
    n_tr_batches = x_train.shape[0] // batch_size 
    n_ts_batches = x_test.shape[0] // batch_size 
    print('n_tr_batches:', n_tr_batches)
    print('n_ts_batches:', n_ts_batches)
    
    for epoch in range(epochs):
        train_loss = 0
        test_loss = 0
        acc = 0
        
        # train
        loc_acc = list()
        for i in range(n_tr_batches):
            # Local batches and labels
            local_X, local_y = x_train[i*batch_size:(i+1)*batch_size], y_train[i*batch_size:(i+1)*batch_size]
            data = torch.tensor(local_X, dtype=torch.float).permute(0, 3, 1, 2)
            data = data.to(device)
            optimizer.zero_grad()
            output = model(data)
#             print(output.shape)
#             print(torch.tensor(local_y, dtype=torch.int).shape)
            loss = myloss(output.to(device), torch.tensor(np.argmax(local_y, -1), dtype=torch.long).to(device))
#             loss = nn.CrossEntropyLoss(output, torch.tensor(local_y, dtype=torch.int))
            loss.backward()
            optimizer.step()
            train_loss += loss.cpu().detach().numpy()/n_tr_batches
            output = np.argmax(output.cpu().detach().numpy(), -1)
            y = np.argmax(local_y, -1)
            loc_acc.append(sum(output == y) / len(output) * 100)
        train_acc.append(np.mean(loc_acc))
        
        acc = 0
        # test
        loc_acc = list()
        for i in range(n_ts_batches):
            # Local batches and labels
            local_X, local_y = x_test[i*batch_size:(i+1)*batch_size], y_test[i*batch_size:(i+1)*batch_size]
            data = torch.tensor(local_X, dtype=torch.float).permute(0, 3, 1, 2)
            data = data.to(device)
#             print(data.shape)
            optimizer.zero_grad()
            output = model(data)
            loss = myloss(output.to(device), torch.tensor(np.argmax(local_y, -1), dtype=torch.long).to(device))
            test_loss += loss.cpu().detach().numpy()/n_ts_batches
            output = np.argmax(output.cpu().detach().numpy(), -1)
            y = np.argmax(local_y, -1)
            loc_acc.append(sum(output == y) / len(output) * 100)
        test_acc.append(np.mean(loc_acc))
        
        if test_acc[-1] > max_test_acc:
            max_test_acc = test_acc[-1]
            torch.save(model.state_dict(), model_name)
        train_losses.append(train_loss)
        test_losses.append(test_loss)
        
        if epoch > 30 and epoch - np.argmax(test_acc) > patience:
            break
        lr_scheduler.step(test_loss)
        print('Epoch: ', str(epoch+1)+'/'+str(epochs),'| Training acc: ', train_acc[-1], '| Testing acc: ', test_acc[-1])
        
#         if not prog_bar:
#             plt.plot(train_losses, label="Train Loss")
#             plt.plot(test_losses, label="Validation Loss")
#             plt.xlabel("# Epoch")
#             plt.ylabel("Loss")
#             plt.legend(loc='upper right')
#             plt.show()
    return train_acc, test_acc

In [69]:
custom_config = {
        "img_size": 128,
        "in_chans": 3,
        "patch_size": 16,
        "embed_dim": 256,
        "depth": 2,  # 8
        "n_heads": 8,
        "qkv_bias": True,
        "mlp_ratio": 4,
}
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('device', device)
batch_size = 16
epochs = 50

model = VisionTransformer(**custom_config)
model = model.to(device)
model_name = 'har_baseline.pth'
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=5e-4)
lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, verbose=True, patience=10, factor=0.5)

train_acc, test_acc = train(model, model_name, epochs, batch_size)

device cuda
n_tr_batches: 23
n_ts_batches: 6
Epoch:  1/50 | Training acc:  15.48913043478261 | Testing acc:  18.75
Epoch:  2/50 | Training acc:  24.456521739130434 | Testing acc:  34.375
Epoch:  3/50 | Training acc:  37.5 | Testing acc:  33.333333333333336
Epoch:  4/50 | Training acc:  51.358695652173914 | Testing acc:  40.625
Epoch:  5/50 | Training acc:  55.43478260869565 | Testing acc:  43.75
Epoch:  6/50 | Training acc:  65.76086956521739 | Testing acc:  39.583333333333336
Epoch:  7/50 | Training acc:  77.17391304347827 | Testing acc:  46.875
Epoch:  8/50 | Training acc:  79.8913043478261 | Testing acc:  52.083333333333336
Epoch:  9/50 | Training acc:  92.3913043478261 | Testing acc:  53.125
Epoch:  10/50 | Training acc:  95.3804347826087 | Testing acc:  58.333333333333336
Epoch:  11/50 | Training acc:  94.02173913043478 | Testing acc:  48.958333333333336
Epoch:  12/50 | Training acc:  95.3804347826087 | Testing acc:  54.166666666666664
Epoch:  13/50 | Training acc:  95.10869565217

## Load model and Predict

In [71]:
model_name = 'har_baseline.pth'
batch_size = 16
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
loaded_model = VisionTransformer(**custom_config).to(device)
rand_model = VisionTransformer(**custom_config).to(device)
loaded_model.load_state_dict(torch.load(model_name))

<All keys matched successfully>

In [81]:
# test
acc = 0
n_ts_batches = x_test.shape[0] // batch_size 
loc_acc = list()
preds = []
for i in range(n_ts_batches):
    # Local batches and labels
    local_X, local_y = x_test[i*batch_size:(i+1)*batch_size], y_test[i*batch_size:(i+1)*batch_size]
    data = torch.tensor(local_X, dtype=torch.float).permute(0, 3, 1, 2)
    data = data.to(device)
    optimizer.zero_grad()
    output = loaded_model(data)
    preds.append(output.cpu().detach().numpy())
#     loss = myloss(output.to(device), torch.tensor(np.argmax(local_y, -1), dtype=torch.long).to(device))
#     test_loss += loss.cpu().detach().numpy()/n_ts_batches
    output = np.argmax(output.cpu().detach().numpy(), -1)
    y = np.argmax(local_y, -1)
    loc_acc.append(sum(output == y) / len(output) * 100)
test_acc = np.mean(loc_acc)
test_acc

58.333333333333336

In [83]:
pred_ls = [np.argmax(i, -1) for p in preds for i in p]
right = 0
wrong = 0
for i in range(len(y_test)):
    print('GT:', np.argmax(y_test[i], -1), ',      Pred:', pred_ls[i])
    if np.argmax(y_test[i], -1) == pred_ls[i]:
        right += 1
    else:
        wrong += 1
print('Acc:', right / (right+wrong) * 100)

GT: 2 ,      Pred: 0
GT: 7 ,      Pred: 6
GT: 4 ,      Pred: 8
GT: 5 ,      Pred: 6
GT: 1 ,      Pred: 1
GT: 4 ,      Pred: 2
GT: 6 ,      Pred: 6
GT: 2 ,      Pred: 0
GT: 4 ,      Pred: 6
GT: 2 ,      Pred: 2
GT: 4 ,      Pred: 4
GT: 2 ,      Pred: 7
GT: 5 ,      Pred: 5
GT: 4 ,      Pred: 4
GT: 5 ,      Pred: 5
GT: 6 ,      Pred: 6
GT: 6 ,      Pred: 6
GT: 6 ,      Pred: 2
GT: 3 ,      Pred: 8
GT: 2 ,      Pred: 0
GT: 3 ,      Pred: 3
GT: 4 ,      Pred: 4
GT: 4 ,      Pred: 1
GT: 0 ,      Pred: 0
GT: 5 ,      Pred: 5
GT: 1 ,      Pred: 1
GT: 4 ,      Pred: 4
GT: 6 ,      Pred: 6
GT: 5 ,      Pred: 8
GT: 4 ,      Pred: 8
GT: 6 ,      Pred: 6
GT: 8 ,      Pred: 8
GT: 6 ,      Pred: 6
GT: 4 ,      Pred: 4
GT: 5 ,      Pred: 4
GT: 6 ,      Pred: 6
GT: 8 ,      Pred: 0
GT: 5 ,      Pred: 5
GT: 1 ,      Pred: 6
GT: 0 ,      Pred: 0
GT: 4 ,      Pred: 4
GT: 0 ,      Pred: 0
GT: 4 ,      Pred: 2
GT: 2 ,      Pred: 0
GT: 0 ,      Pred: 0
GT: 1 ,      Pred: 1
GT: 4 ,      Pred: 8
GT: 4 ,      