In [1]:
import itertools
import torch
from scipy import ndimage as ndimage
from sklearn.utils import shuffle
import time
import math
import pickle
import numpy

In [None]:
"""
In this section, we will be implementing a neural network model for gesture detection. The input to the model will be a
tensor and shape (batch_size, duration, n_channels). Each hand skeleton will have 22 joints, and each joint will be 3*number of joints
channels over the time.

To extract features from the input data, we will first process each channel separately. We will use 1D convolutions to
process each channel, and the neural network will consist of three convolutional layers and pooling layers.

The output of each convolutional layer will be concatenated into a single output, which will be used as input for the next 
layer. Finally, the three outputs will be concatenated into one output.

The neural network architecture for this model can be summarized as follows:

Input layer with shape (batch_size, duration, n_channels)
Three 1D convolutional layers with padding, followed by a max pooling layer
Three output layers for each channel, with a concatenation layer at the end
By using 1D convolutions and pooling layers, we can extract meaningful features from the
hand skeleton data. The concatenation layer at the end allows us to combine the information from each channel 
into a single output, which can be used to predict the gesture being performed
"""

In [2]:
# load pre-processed data, you can also use another.pckl file
def load_data(filepath):
    file = open(filepath, 'rb')
    data = pickle.load(file, encoding='latin1') 
    file.close()
    return data['x_train'], data['x_test'], data['y_train_14'], data['y_train_28'], data['y_test_14'], data['y_test_28']

def preprocess_data(x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28):
    x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28 = shuffle_dataset(x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28)
    x_train, x_test = resize_sequences_length(x_train, x_test, final_length=100)
    return x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28



In [None]:
# convert to tensor type
def convert_to_pytorch_tensors(x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28):
    y_train_14, y_train_28, y_test_14, y_test_28 = numpy.array(y_train_14), numpy.array(y_train_28), numpy.array(y_test_14), numpy.array(y_test_28)
    y_train_14, y_train_28, y_test_14, y_test_28 = y_train_14 - 1, y_train_28 - 1, y_test_14 - 1, y_test_28 - 1
    x_train, x_test = torch.from_numpy(x_train), torch.from_numpy(x_test)
    y_train_14, y_train_28, y_test_14, y_test_28 = torch.from_numpy(y_train_14), torch.from_numpy(y_train_28), torch.from_numpy(y_test_14), torch.from_numpy(y_test_28)
    x_train, x_test = x_train.type(torch.FloatTensor), x_test.type(torch.FloatTensor)
    y_train_14, y_train_28, y_test_14, y_test_28 = y_train_14.type(torch.LongTensor), y_train_28.type(torch.LongTensor), y_test_14.type(torch.LongTensor), y_test_28.type(torch.LongTensor)
    return x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28

In [None]:
def resize_sequences_length(x_train, x_test, final_length=100):
    x_train = numpy.array([numpy.array([ndimage.zoom(x_i.T[j], final_length / len(x_i), mode='reflect') for j in range(numpy.size(x_i, 1))]).T for x_i in x_train])
    x_test  = numpy.array([numpy.array([ndimage.zoom(x_i.T[j], final_length / len(x_i), mode='reflect') for j in range(numpy.size(x_i, 1)) ]).T for x_i in x_test])
    return x_train, x_test

def shuffle_dataset(x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28):
    x_train, y_train_14, y_train_28 = shuffle(x_train, y_train_14, y_train_28)
    x_test,  y_test_14,  y_test_28  = shuffle(x_test,  y_test_14,  y_test_28)
    return x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28

In [3]:
class HandGestureNet(torch.nn.Module): 
    """
    citation:
    ------------
        @inproceedings{devineau2018deep,
            title={Deep learning for hand gesture recognition on skeletal data},
            author={Devineau, Guillaume and Moutarde, Fabien and Xi, Wang and Yang, Jie},
            booktitle={2018 13th IEEE International Conference on Automatic Face \& Gesture Recognition (FG 2018)},
            pages={106--113},
            year={2018},
            organization={IEEE}
        }
    """
    def __init__(self, n_channels=66, n_classes=14, dropout_probability=0.2):
# Layers
        super(HandGestureNet, self).__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.dropout_probability = dropout_probability

        self.all_conv_high = torch.nn.ModuleList([torch.nn.Sequential(
            torch.nn.Conv1d(in_channels=1, out_channels=8, kernel_size=7, padding=3),
            torch.nn.ReLU(),
            torch.nn.AvgPool1d(2),

            torch.nn.Conv1d(in_channels=8, out_channels=4, kernel_size=7, padding=3),
            torch.nn.ReLU(),
            torch.nn.AvgPool1d(2),

            torch.nn.Conv1d(in_channels=4, out_channels=4, kernel_size=7, padding=3),
            torch.nn.ReLU(),
            torch.nn.Dropout(p=self.dropout_probability),
            torch.nn.AvgPool1d(2)
        ) for joint in range(n_channels)])

        self.all_conv_low = torch.nn.ModuleList([torch.nn.Sequential(
            torch.nn.Conv1d(in_channels=1, out_channels=8, kernel_size=3, padding=1),
            torch.nn.ReLU(),
            torch.nn.AvgPool1d(2),

            torch.nn.Conv1d(in_channels=8, out_channels=4, kernel_size=3, padding=1),
            torch.nn.ReLU(),
            torch.nn.AvgPool1d(2),

            torch.nn.Conv1d(in_channels=4, out_channels=4, kernel_size=3, padding=1),
            torch.nn.ReLU(),
            torch.nn.Dropout(p=self.dropout_probability),
            torch.nn.AvgPool1d(2)
        ) for joint in range(n_channels)])

        self.all_residual = torch.nn.ModuleList([torch.nn.Sequential(
            torch.nn.AvgPool1d(2),
            torch.nn.AvgPool1d(2),
            torch.nn.AvgPool1d(2)
        ) for joint in range(n_channels)])

        self.fc = torch.nn.Sequential(
            torch.nn.Linear(in_features=9 * n_channels * 12, out_features=1936),  # <-- 12: depends of the sequences lengths (cf. below)
            torch.nn.ReLU(),
            torch.nn.Linear(in_features=1936, out_features=n_classes)
        )
# process each channel
        for m in itertools.chain(self.all_conv_high, self.all_conv_low, self.all_residual):
            for layer in m:
                if layer.__class__.__name__ == "Conv1d":
                    torch.nn.init.xavier_uniform_(layer.weight, gain=torch.nn.init.calculate_gain('relu'))
                    torch.nn.init.constant_(layer.bias, 0.1)

        for ll in self.fc:
            if ll.__class__.__name__ == "Linear":
                torch.nn.init.xavier_uniform_(ll.weight, gain=torch.nn.init.calculate_gain('relu'))
                torch.nn.init.constant_(layer.bias, 0.1)

#     def next(self, input):
#         all_features = []
# # compute forward of the network
#         for channel in range(0, self.n_channels):
#             cc = input[:, :, channel]
#             cc = cc.unsqueeze(1)
#             low = self.all_conv_low[channel](cc)
#             high = self.all_conv_high[channel](cc)
            
#             a_residual = self.all_residual[channel](cc)

#             output_channel = torch.cat([high,low,a_residual], dim=1)
#             all_features.append(output_channel)

#         all_features = torch.cat(all_features, dim=1)
#         all_features = all_features.view(-1, 9 * self.n_channels * 12)  # <-- 12: depends of the initial sequence length (100).
#         output = self.fc(all_features)

#         return output

In [4]:
def batch(tensor, batch_size=32):
    tensor_list = []
    length = tensor.shape[0]
    i = 0
    while True:
        if (i + 1) * batch_size >= length:
            tensor_list.append(tensor[i * batch_size: length])
            return tensor_list
        tensor_list.append(tensor[i * batch_size: (i + 1) * batch_size])
        i += 1
# get training time
def time_since(since):
    now = time.time()
    s = now - since
    m = math.floor(s/60)
    s -= m * 60
    return '{:02d}m {:02d}s'.format(int(m), int(s))
# get accuracy
def acc_of(model, x, y_ref):
    acc = 0.
    model.eval()
    with torch.no_grad():
        predicted = model(x)
        _, predicted = predicted.max(dim=1)
        acc = 1.0 * (predicted == y_ref).sum().item() / y_ref.shape[0]

    return acc

In [5]:
try:
    from tensorboardX import SummaryWriter
except:
    class SummaryWriter():
        def __init__(self):
            pass
        def add_scalar(self, tag, scalar_value, global_step=None, walltime=None):
            pass

In [6]:
def train(model, criterion, optimizer,
          x_train, y_train, x_test, y_test,
          force_cpu=False, num_epochs=5):
#     Check if using a GPU
  """
    citation:
    ------------
        @inproceedings{devineau2018deep,
            title={Deep learning for hand gesture recognition on skeletal data},
            author={Devineau, Guillaume and Moutarde, Fabien and Xi, Wang and Yang, Jie},
            booktitle={2018 13th IEEE International Conference on Automatic Face \& Gesture Recognition (FG 2018)},
            pages={106--113},
            year={2018},
            organization={IEEE}
        }
    """
    if torch.cuda.is_available() and not force_cpu:
        device = torch.device("cuda")
    elif torch.has_mps:
        device = torch.device('mps')
    else: 
        device = torch.device("cpu")
#     put to device
    model = model.to(device)
    x_train, y_train, x_test, y_test = x_train.to(device), y_train.to(device), x_test.to(device), y_test.to(device)
    
    writer = SummaryWriter()
    
    x_train_batches = batch(x_train)
    y_train_batches = batch(y_train)
    
    start = time.time()

    print('[INFO] Started to train the model.')
    print('Training the model on {}.'.format('GPU' if (device == torch.device('cuda') or device == torch.device('mps')) else 'CPU'))
#     Start training the model
    for ep in range(num_epochs):

        model.train()

        current_loss = 0.0

        for idx_batch, train_batches in enumerate(zip(x_train_batches, y_train_batches)):
            x_train_batch, y_train_batch = train_batches
            optimizer.zero_grad()
            outputs = model(x_train_batch)
            loss = criterion(outputs, y_train_batch)
            loss.backward()
            optimizer.step()
            current_loss += loss.item()
        
        train_acc = acc_of(model, x_train, y_train)
        test_acc = acc_of(model, x_test, y_test)
        
        writer.add_scalar('data/accuracy_train', train_acc, ep)
        writer.add_scalar('data/accuracy_test', test_acc, ep)
        print('Epoch #{:03d} | Time elapsed : {} | Loss : {:.4e} | Accuracy_train : {:.4e} | Accuracy_test : {:.4e}'.format(
                ep + 1, time_since(start), current_loss, train_acc, test_acc))

    print('[INFO] Finished training the model. Total time : {}.'.format(time_since(start)))

In [7]:
# Load the dataset
x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28 = load_data('dhg_data.pckl')
x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28 = preprocess_data(x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28)
x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28 = convert_to_pytorch_tensors(x_train, x_test, y_train_14, y_train_28, y_test_14, y_test_28)
# instantiation Network
model = HandGestureNet(n_channels=66, n_classes=14)
# Loss function & Optimizer
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=model.parameters(), lr=1e-3)

In [8]:
num_epochs = 20
train(model=model, criterion=criterion, optimizer=optimizer, x_train=x_train, y_train=y_train_14, x_test=x_test, y_test=y_test_14,num_epochs=num_epochs)

[INFO] Started to train the model.
Training the model on GPU.
Epoch #001 | Time elapsed : 00m 38s | Loss : 3.5766e+02 | Accuracy_train : 3.3277e-01 | Accuracy_test : 3.4286e-01
Epoch #002 | Time elapsed : 01m 15s | Loss : 1.1782e+02 | Accuracy_train : 5.9874e-01 | Accuracy_test : 6.1190e-01
Epoch #003 | Time elapsed : 01m 54s | Loss : 8.1381e+01 | Accuracy_train : 7.3235e-01 | Accuracy_test : 7.3333e-01
Epoch #004 | Time elapsed : 02m 32s | Loss : 6.0906e+01 | Accuracy_train : 8.0630e-01 | Accuracy_test : 7.9286e-01
Epoch #005 | Time elapsed : 03m 10s | Loss : 4.9835e+01 | Accuracy_train : 8.3571e-01 | Accuracy_test : 8.1905e-01
Epoch #006 | Time elapsed : 03m 48s | Loss : 4.1625e+01 | Accuracy_train : 8.4496e-01 | Accuracy_test : 8.1905e-01
Epoch #007 | Time elapsed : 04m 26s | Loss : 3.5350e+01 | Accuracy_train : 8.7479e-01 | Accuracy_test : 8.4762e-01
Epoch #008 | Time elapsed : 05m 04s | Loss : 2.9909e+01 | Accuracy_train : 9.0546e-01 | Accuracy_test : 8.7381e-01
Epoch #009 | Time 

In [9]:
torch.save(model.state_dict(), 'gesture_pretrained_model.pt')

In [10]:
data = numpy.genfromtxt('GestureData.csv', delimiter=',')

data_processed = numpy.zeros((100, 66))

for i in range(len(data_processed)):
    j = i*22
    data_subset = numpy.concatenate(data[j:j+22])
    data_processed[i] = data_subset

data_processed = data_processed[numpy.newaxis, :]
gesture_batch = torch.from_numpy(data_processed)
gesture_batch = gesture_batch.type(torch.FloatTensor)

In [12]:
model = HandGestureNet(n_channels=66, n_classes=14)
model.load_state_dict(torch.load('gesture_pretrained_model.pt'))
model.eval()

# make predictions
with torch.no_grad():
    predictions = model(gesture_batch)
    _, predictions = predictions.max(dim=1)
    print("Predicted gesture classes: {}".format(predictions.tolist()))

Predicted gesture classes: [12]
