In [8]:
import torch
import numpy as np
from pathlib import Path
import logging
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
import argparse
from torch import nn
import torch.nn.functional as F
from torch.utils.data import Dataset

In [9]:
# parser = argparse.ArgumentParser("Train model.")
# parser.add_argument("--epochs", type=int, help="Number of epochs.", required=True)
# parser.add_argument("--data_dir", type=str, help="Path to data.", required=True)
# parser.add_argument(
#     "--checkpoint_dir", type=str, help="Path for saving checkpoints.", required=True
# )

# parser.add_argument("--lr", type=float, help="Learning rate.", required=True)
# parser.add_argument(
#     "--sample_file", type=int, help="Sample number to train from.", required=True
# )
# parser.add_argument(
#     "--batch_size", type=int, help="Batch Size.", required=True
# )

In [10]:
class FLAGS():
    def __init__(self):
        self.data_dir = '/home/tasbolat/some_python_examples/data_VT_SNN/'
        self.batch_size = 8
        self.sample_file = 1
        self.lr = 0.01
        self.epochs = 2000
        self.output_size = 20
args = FLAGS()

In [11]:
device = torch.device("cuda:1")

In [12]:
class ViTacDataset(Dataset):
    def __init__(self, path, sample_file, output_size):
        self.path = path
        sample_file = Path(path) / sample_file
        self.samples = np.loadtxt(sample_file).astype("int")
        self.tac_r = torch.load(Path(path) / "tactile_rectangular/tac_right.pt")
        self.tac_l = torch.load(Path(path) / "tactile_rectangular/tac_left.pt")
        self.output_size = output_size

    def __getitem__(self, index):
        input_index = self.samples[index, 0]
        class_label = self.samples[index, 1]
        target_class = torch.zeros((self.output_size, 1, 1, 1))
        target_class[class_label, ...] = 1

        return (
            self.tac_r[input_index],
            self.tac_l[input_index],
            target_class,
            class_label,
        )

    def __len__(self):
        return self.samples.shape[0]

In [13]:
train_dataset = ViTacDataset(
    path=args.data_dir, sample_file=f"train_80_20_{args.sample_file}.txt", output_size=args.output_size
)
train_loader = DataLoader(
    dataset=train_dataset, batch_size=args.batch_size, shuffle=False, num_workers=4
)
test_dataset = ViTacDataset(
    path=args.data_dir, sample_file=f"test_80_20_{args.sample_file}.txt", output_size=args.output_size
)
test_loader = DataLoader(
    dataset=test_dataset, batch_size=args.batch_size, shuffle=False, num_workers=4
)

In [16]:
a,b,c,d = train_dataset[0]

In [17]:
a.shape, b.shape

(torch.Size([2, 9, 7, 325]), torch.Size([2, 9, 7, 325]))

In [43]:
class CNN3D(nn.Module):

    def __init__(self):
        
        super(CNN3D, self).__init__()
        
        self.conv1 = nn.Conv3d(in_channels=2, out_channels=4, kernel_size=(7,3,3), stride=(4,1,1))
        self.conv2 = nn.Conv3d(in_channels=4, out_channels=8, kernel_size=(5,3,3), stride=(3,1,1))
        
    def forward(self, x):
        
        #print('Model input ', x.size())
        batch_size, C, H, W, sequence_size = x.size()
        x = x.view([batch_size, C, sequence_size, H, W])
        #print('Model input ', x.size())
        
        # pass to cnn3d
        out = self.conv1(x)
        out = F.relu(out)
        #print('conv1 out: ', out.shape)
        out = self.conv2(out)
        out = F.relu(out)
        #print('conv2 out: ', out.shape)
        out = out.view([batch_size, np.prod([8, 26, 5, 3])])
        #print(out.shape)
        
        return out

In [44]:
class MyNet(nn.Module):

    def __init__(self):
        
        super(MyNet, self).__init__()
        
        self.cnn_left = CNN3D()
        self.cnn_right = CNN3D()

        # 8, 35, 5, 3
        # Define the output layer
        self.fc = nn.Linear(np.prod([8, 26, 5, 3])*2, 20)
        
        self.drop = nn.Dropout(0.5)
        
        #self.fc_mlp = nn.Linear(6300, self.input_size)

    def forward(self, x_left, x_right):
        out_left = self.cnn_left(x_left)
        out_right = self.cnn_right(x_right)
        
        out = torch.cat([out_left, out_right], dim=1)
        
        out = self.fc(out)
        
        out = self.drop(out)
        return out

In [45]:
net = MyNet().to(device)
# Create snn loss instance.
criterion = nn.CrossEntropyLoss()
# Define optimizer module.
optimizer = torch.optim.RMSprop(
    net.parameters(), lr=0.00001)

In [51]:
train_accs = []
test_accs = []
train_loss = []
test_loss = []
for epoch in range(args.epochs):
    # Training loop.
    net.train()
    correct = 0
    batch_loss = 0
    train_acc = 0
    for i, (tac_left, tac_right, _, label) in enumerate(train_loader, 0):

        tac_left = tac_left.to(device)
        tac_right = tac_right.to(device)
        label = label.to(device)
        # Forward pass of the network.
        #print(in_viz.shape)
        out = net.forward(tac_left, tac_right)
        #print(out_tact.shape)
        # Calculate loss.
        #print(label.shape)
        loss = criterion(out, label)
        #print(loss)

        batch_loss += loss.cpu().data.item()
        # Reset gradients to zero.
        optimizer.zero_grad()
        # Backward pass of the network.
        loss.backward()
        # Update weights.
        optimizer.step()

        _, predicted = torch.max(out.data, 1)
        correct += (predicted == label).sum().item()

    # Reset training stats.
    train_acc = correct/len(train_loader.dataset)
    train_loss.append(batch_loss/len(train_loader.dataset))
    train_accs.append(train_acc)
    
    # testing
    net.eval()
    correct = 0
    batch_loss = 0
    train_acc = 0
    with torch.no_grad():
        for i, (tac_left, tac_right, _, label) in enumerate(train_loader, 0):
            tac_left = tac_left.to(device)
            tac_right = tac_right.to(device)
            label = label.to(device)
            # Forward pass of the network.
            out = net.forward(tac_left, tac_right)
            label = label.to(device)
            _, predicted = torch.max(out.data, 1)
            correct += (predicted == label).sum().item()
            # Calculate loss.
            loss = criterion(out, label)
            batch_loss += loss.cpu().data.item()
    train_acc = correct/len(train_loader.dataset)

    # testing
    net.eval()
    correct = 0
    batch_loss = 0
    test_acc = 0
    with torch.no_grad():
        for i, (tac_left, tac_right, _, label) in enumerate(test_loader, 0):
            tac_left = tac_left.to(device)
            tac_right = tac_right.to(device)
            # Forward pass of the network.
            out = net.forward(tac_left, tac_right)
            label = label.to(device)
            _, predicted = torch.max(out.data, 1)
            correct += (predicted == label).sum().item()
            # Calculate loss.
            loss = criterion(out, label)
            batch_loss += loss.cpu().data.item()

    test_loss.append(batch_loss/len(test_loader.dataset))
    test_acc = correct/len(test_loader.dataset)
    test_accs.append(test_acc)
    if epoch%10 == 0:
        print(epoch, 'Train:', train_acc, 'Test:', test_acc)

0 Train: 0.9625 Test: 0.6
10 Train: 0.9333333333333333 Test: 0.5
20 Train: 0.9875 Test: 0.6833333333333333
30 Train: 0.9958333333333333 Test: 0.6833333333333333
40 Train: 0.9875 Test: 0.7166666666666667
50 Train: 0.9916666666666667 Test: 0.7833333333333333
60 Train: 0.9916666666666667 Test: 0.8
70 Train: 0.9916666666666667 Test: 0.7833333333333333
80 Train: 0.9958333333333333 Test: 0.8166666666666667
90 Train: 0.9916666666666667 Test: 0.8
100 Train: 0.9958333333333333 Test: 0.8166666666666667
110 Train: 0.9916666666666667 Test: 0.8333333333333334
120 Train: 0.9958333333333333 Test: 0.8166666666666667
130 Train: 0.9875 Test: 0.7833333333333333
140 Train: 0.9916666666666667 Test: 0.8166666666666667
150 Train: 0.9958333333333333 Test: 0.8166666666666667
160 Train: 1.0 Test: 0.8166666666666667
170 Train: 0.9958333333333333 Test: 0.85
180 Train: 0.9958333333333333 Test: 0.85
190 Train: 0.9958333333333333 Test: 0.8166666666666667
200 Train: 0.9958333333333333 Test: 0.8333333333333334
210 Tra

KeyboardInterrupt: 

In [None]:
model_parameters = filter(lambda p: p.requires_grad, net.parameters())
params = sum([np.prod(p.size()) for p in model_parameters])

In [None]:
params

In [None]:
import matplotlib.pyplot as plt

In [None]:
np.max(test_accs)

In [None]:
fig, ax = plt.subplots(2, figsize=(15,3))
ax[0].plot(train_accs)
ax[0].plot(test_accs)
ax[0].set_ylabel('Accuracy')
ax[0].legend(['Train', 'Test'])

ax[1].plot(train_loss) 
ax[1].plot(test_loss)
ax[1].set_ylabel('Loss')
ax[1].legend(['Train', 'Test'])

plt.show()