In [43]:
import os
import torch
from torch import nn
import torch.optim
import numpy as np  # to load dataset
import math
from sklearn.model_selection import train_test_split

%matplotlib inline

In [44]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device in use: {device}")

Device in use: cpu


## For Google Colab

In [45]:
# from google.colab import drive
# drive.mount("/content/drive/")
# %cd /content/drive/MyDrive/PyTorch/

## Load the dataset

In [46]:
# define the dataset
# 0 corresponds to rotating
# 1 corresponds to grasping

# num rows for one grasp
block_size = 2 * 24

def normalize_vector(nd_arr: np.ndarray) -> np.ndarray:
    vector_length = sum([i ** 2 for i in nd_arr]) ** 0.5
    normalized_arr = [i / vector_length for i in nd_arr]
    return normalized_arr

# Rotation
unedited_rotating_dataset = np.loadtxt('rotating.csv', delimiter=",")
# Split into multiple movements over time ("block_size" seconds)
edited_rotating_dataset = unedited_rotating_dataset[:,1:21] #corresponding to cols 2 - 21 inclusive, excluding the last row of zeros
# Convert positions into velocities
edited_rotating_dataset = np.diff(edited_rotating_dataset, axis=0)
# Normalize each row
edited_rotating_dataset = np.array([normalize_vector(row) for row in edited_rotating_dataset])

# Grasping
unedited_grasping_dataset = np.loadtxt('grasping.csv', delimiter=",")
edited_grasping_dataset = unedited_grasping_dataset[:,1:21]
edited_grasping_dataset = np.diff(edited_grasping_dataset, axis=0)
# Normalize
edited_rotating_dataset = np.array([normalize_vector(row) for row in edited_grasping_dataset])

# Process data

In [47]:
columns = edited_rotating_dataset.shape[1]

# Rotating
rows = edited_rotating_dataset.shape[0]
X = np.zeros((rows - block_size + 1, block_size * columns))  # number of rows times columns rows (from flattening)

# For each block
for i in range(0, rows - block_size + 1):
  # Get entire row, 1st block_size * 24 rows, 2nd of them and so on..
  X[i] = np.ndarray.flatten(edited_rotating_dataset[i : i + block_size, :])

# X now contains mutliple blocks
y = np.zeros((rows - block_size + 1))


# grasping
rows = edited_grasping_dataset.shape[0]
X2 = np.zeros((rows - block_size + 1, block_size * columns))
# For each block
for i in range(0, rows - block_size + 1):
  # Get entire row, 1st, block_size * 24 rows, second of them and so on..
  X2[i] = np.ndarray.flatten(edited_grasping_dataset[i : i + block_size, :])
# X now contains mutliple blocks
y2 = np.ones((rows - block_size + 1))
combined_X = np.concatenate((X, X2), axis=0)
combined_y = np.concatenate((y, y2), axis=0)

## Prepare for Processing

In [48]:
# Split training and testing data
X_train, X_test, y_train, y_test = train_test_split(combined_X, combined_y, test_size=0.2, random_state=42)

# covnert into tensors since numpy uses 64 bit floating point
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)
y_train = torch.reshape(y_train,(-1,1)) # a single dimension uses -1 to infer the length. new shape
y_test = torch.reshape(y_test,(-1,1)) # a single dimension uses -1 to infer the length. new shape

In [49]:
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        #self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(block_size * columns, 64), # input
            nn.ReLU(),
            nn.Linear(64, 16),
            nn.ReLU(),
            nn.Linear(16, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        #x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

In [50]:
model = NeuralNetwork().to(device)
X_train.to(device)
y_train.to(device)
print(model)

NeuralNetwork(
  (linear_relu_stack): Sequential(
    (0): Linear(in_features=960, out_features=64, bias=True)
    (1): ReLU()
    (2): Linear(in_features=64, out_features=16, bias=True)
    (3): ReLU()
    (4): Linear(in_features=16, out_features=1, bias=True)
    (5): Sigmoid()
  )
)


## Preparation for training

In [51]:
loss_fn = nn.BCELoss() # used for binary classification problems
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

## Training

In [52]:
n_epochs = 15
batch_size = 4

# CPU
if device == "cpu":
  for epoch in range(n_epochs):
    for i in range(0, len(X) - batch_size, batch_size):
      Xbatch = X_train[i:i+batch_size] # specifying the rows
      y_pred = model(Xbatch) # scalar
      ybatch = y_train[i:i+batch_size]
      loss = loss_fn(y_pred, ybatch)
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
    print(f"epoch: {epoch + 1}/{n_epochs}", end="\r")
  print(f'CPU: Finished epoch {epoch}, latest loss {loss}')

# GPU
elif device == "cuda":
  for epoch in range(n_epochs):
    for i in range(0, len(X) - batch_size, batch_size):
      Xbatch = X_train[i:i+batch_size] # specifying the rows
      y_pred = model(Xbatch.cuda()) # scalar
      ybatch = y_train[i:i+batch_size].cuda()
      loss = loss_fn(y_pred, ybatch)
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
    print(f"epoch: {epoch + 1}/{n_epochs}", end="\r")
  print(f'CUDA: Finished epoch {epoch}, latest loss {loss}')
else:
  print("Not using CPU or CUDA")

CPU: Finished epoch 14, latest loss 3.826694955932908e-05


In [53]:
# evaluate the accuracy of the model

with torch.no_grad():  # no_grad to avoid differentiating
    X_test = X_test.to(device)
    y_test = y_test.to(device)
    y_pred = model(X_test)

# rounds prediction to nearest integer, check if equal to y, 
accuracy = (y_pred.round() == y_test).float().mean()
print(f"Accuracy {accuracy}")

Accuracy 0.23079939186573029
