<a href="https://colab.research.google.com/github/AyanoNishimura/GColab_GestureRecognition/blob/master/GestureRecognition.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [91]:
# Import Chainer 
from chainer import Chain, Variable, optimizers, serializers, datasets, training, cuda
from chainer.training import extensions
import chainer.functions as F
import chainer.links as L
from chainer import Variable
import chainer

# Import NumPy and CuPy
import numpy as np
# import cupy as cp

# Install the PyDrive wrapper & import libraries.
# This only needs to be done once per notebook.
!pip install -U -q PyDrive
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# mout drive
from google.colab import drive
drive.mount('/gdrive')

print('Chainer version: ', chainer.__version__)
print('GPU availability:', chainer.cuda.available)
print('cuDNN availablility:', chainer.cuda.cudnn_enabled)

Drive already mounted at /gdrive; to attempt to forcibly remount, call drive.mount("/gdrive", force_remount=True).
Chainer version:  5.0.0
GPU availability: True
cuDNN availablility: True


モデル

In [0]:
class ClassificationModel(chainer.Chain):
    def __init__(self, n_units, n_out):
        super(ClassificationModel, self).__init__()
        with self.init_scope():
            self.l1 = L.LSTM(None, n_units)
            self.l2 = L.Linear(None, n_out)

    def reset_state(self):
        self.l1.reset_state()

    def forward(self, x_data, t_data, train=True):
        x, t = Variable(cuda.to_gpu(x_data)), Variable(cuda.to_gpu(t_data))
        h1 = self.l1(x)
        y = self.l2(h1)
        if train is False:
            return F.softmax(y).data
        return F.softmax_cross_entropy(y, t), F.accuracy(y, t)

学習用のクラス

In [0]:
import chainer
from chainer import optimizers
from chainer import serializers
import pandas as pd
from progressbar import ProgressBar
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import numpy as np

from sklearn.model_selection import train_test_split

class TrainManager():
    def __init__(self, is_debug):
        self.is_debug = is_debug
        self.n_units = 1000
#         self.alpha = 10 ** np.random.uniform(-6, -2)
#         self.weight_decay = 10 ** np.random.uniform(-8, -4)
        self.alpha = 0.0000102454
#         self.weight_decay = 0.0000025
        self.weight_decay = 0
        if is_debug:
            self.batch_size = 100
            self.epoch = 10
        else:
            self.batch_size = 1000
            self.epoch = 500

        self.models_path = '/gdrive/My Drive/DeepLearning/GestureRecognition/models/'
        self.output_filename = 'epoch_' + str(self.epoch) + '_units_' + str(self.n_units) + '_lr_' + str(self.alpha) + '_wd_' + str(self.weight_decay)

    ###
    # split data train:test = 8:2, each gesture number
    ###
    def get_data(self, file_path):
        data = pd.read_csv(file_path)
        gesture_num = int(data['gesture'].max())
        train = test = pd.DataFrame().reindex_like(data)[:0]
        for index in range(0, gesture_num + 1):
            tr, te = train_test_split(data.query('gesture == '+str(index)), test_size=0.2, shuffle=False)
            train = train.append(tr)
            test = test.append(te)
        train = train.reset_index(drop=True)
        test = test.reset_index(drop=True)

        return train, test

    ###
    # batch Formatter
    ###
    def get_batch(self, x_batch, t_batch):
        x_batch = x_batch.drop('participants', axis=1)
        x_batch = x_batch.values.astype(np.float32)
        t_batch = t_batch.drop('participants', axis=1)
        t_batch = t_batch.values.astype(np.int32)
        t_batch = t_batch.reshape(-1)

        return x_batch, t_batch

    ###
    # training
    ###
    def training(self, x_train, t_train, x_test, t_test):
        gesture_num = int(t_train['gesture'].max()) + 1
        print('-'*5 +'gesture num'+ '-'*5)
        print(gesture_num)
        print('-'*5 +'Hyper Parameter'+ '-'*5)
        print('lr: ', self.alpha)
        print('weight decay: ', self.weight_decay)

        # create onehot vector
        t_vector = [i for i in range(0, gesture_num)]
        n_labels = len(np.unique(t_vector))
        t_vector = np.eye(n_labels)[t_vector]

        model = ClassificationModel(self.n_units, gesture_num)
        
        # setting gpu
        model.to_gpu()

        optimizer = chainer.optimizers.Adam(alpha=self.alpha, weight_decay_rate=self.weight_decay)
        # optimizer = chainer.optimizers.SGD(lr=0.01)
        optimizer.setup(model)

        train_loss, train_acc, train_acc_list = [], [], []
        test_loss, test_acc, test_acc_list = [], [], []
        print('-'*5 +'train start'+ '-'*5)
        for epoch in range(self.epoch):
            print('Epoch: %d' % (epoch+1))

            train_sum_accuracy, train_sum_loss = 0, 0
            prg = ProgressBar(0, len(x_train))
            prg_num = 0
            for current_participants in range(int(x_train['participants'].min()), int(x_train['participants'].max() + 1)):
                x_data = x_train[x_train['participants'] == current_participants]
                t_data = t_train[t_train['participants'] == current_participants]
                x_data = x_data.reset_index(drop=True)
                t_data = t_data.reset_index(drop=True)
                model.reset_state()

                # training data each batch size
                for i in range(0, len(x_data), self.batch_size):
                    x_batch = x_data.iloc[i:i+self.batch_size-1]
                    t_batch = t_data.iloc[i:i+self.batch_size-1]
                    x_batch, t_batch = self.get_batch(x_batch, t_batch)

                    prg.update(prg_num)
                    prg_num = prg_num + len(x_batch)

                    # get loss and accuracy
                    loss, acc = model.forward(x_batch, t_batch)
                    # init gradient
                    model.zerograds()
                    loss.backward()
                    loss.unchain_backward()

                    optimizer.update()

                    train_loss.append(loss.data)
                    train_acc.append(acc.data)
                    train_sum_loss += float(loss.data) * len(x_batch)
                    train_sum_accuracy += float(acc.data) * len(x_batch)

            # show training data loss and accuracy
            print('train mean loss={}, accuracy={}'.format(train_sum_loss / len(x_train), train_sum_accuracy / len(x_train)))
            train_acc_list.append(train_sum_accuracy / len(x_train))

            # test data training
            test_sum_accuracy, test_sum_loss = 0, 0
            for current_participants in range(int(x_test['participants'].min()), int(x_test['participants'].max() + 1)):
                x_data = x_test[x_test['participants'] == current_participants]
                t_data = t_test[t_test['participants'] == current_participants]
                x_data = x_data.reset_index(drop=True)
                t_data = t_data.reset_index(drop=True)
                model.reset_state()

                # test data each batch size
                for i in range(0, len(x_data), self.batch_size):
                    x_batch = x_data.iloc[i:i+self.batch_size-1]
                    t_batch = t_data.iloc[i:i+self.batch_size-1]
                    x_batch, t_batch = self.get_batch(x_batch, t_batch)

                    loss, acc = model.forward(x_batch, t_batch)

                    test_loss.append(loss.data)
                    test_acc.append(acc.data)
                    test_sum_loss     += float(loss.data) * len(x_batch)
                    test_sum_accuracy += float(acc.data) * len(x_batch)

            print('test  mean loss={}, accuracy={}'.format(test_sum_loss / len(x_test), test_sum_accuracy / len(x_test)))
            test_acc_list.append(test_sum_accuracy / len(x_test))

        serializers.save_hdf5(self.models_path + self.output_filename, model)
        return train_acc_list, test_acc_list

    def classification_train(self, file_path):
        # split data
        train, test = self.get_data(file_path)
        print('-'*5 +'data size'+ '-'*5)
        print('train data: ', train.shape[0])
        print('test data: ', test.shape[0])

        t_train = pd.DataFrame({'gesture': train['gesture'], 'participants': train['participants']})
        x_train = train.drop('gesture', axis=1)
        t_test = pd.DataFrame({'gesture': test['gesture'], 'participants': test['participants']})
        x_test = test.drop('gesture', axis=1)

        train_acc_list, test_acc_list = self.training(x_train, t_train, x_test, t_test)

        # show graph
        plt.figure()
        x = np.arange(len(train_acc_list))
        plt.plot(x, train_acc_list, marker='o', label='train')
        t = np.arange(len(test_acc_list))
        plt.plot(t, test_acc_list, marker='+', label='test')
        plt.xlabel('epochs')
        plt.ylabel('accuracy')
        plt.ylim(-0.05, 1.0)
        plt.savefig(self.models_path + self.output_filename + '.png')
        
        print('Done!')


実行部分

In [0]:
# run training
is_debug = False
if is_debug:
  file_path = '/gdrive/My Drive/DeepLearning/GestureRecognition/data/small/train_classification.csv'
else:
  file_path = '/gdrive/My Drive/DeepLearning/GestureRecognition/data/formatted/train_classification.csv'

tr = TrainManager(is_debug)
tr.classification_train(file_path)