In [92]:
import glob
# import os
import cv2
from PIL import Image
import numpy as np
import random
import math
from tqdm.notebook import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
torch.cuda.is_available()

True

In [2]:
root = '../../jester/20bn-jester-v1/*'
num_classes = 2
num_worker = 0
batch_size = 4
scales = [1, 1/2**(1/4), 1/2**(1/2), 1/2**(3/4), 1/2]
sample_size = 224
sample_duration = 16
rgb_mean = (114.7748/255, 107.7354/255, 99.4750/255)
rgb_std = (38.7568578/255, 37.88248729/255, 40.02898126/255)
lr = 0.0001
momentum = 0.9
weight_decay = 5e-4

In [3]:
def load_all_path(root):
    video_dictionary = sorted(glob.glob(root))
    all_path = []
    for video_path in video_dictionary:
        file_list = sorted(glob.glob(video_path + '/*'))
        all_path.append(file_list)
    return all_path

In [4]:
all_path = load_all_path(root)
labels = np.genfromtxt('../../jester/jester-v1-labels.csv', delimiter=',', dtype=np.str)
labels.tolist()

['Swiping Left',
 'Swiping Right',
 'Swiping Down',
 'Swiping Up',
 'Pushing Hand Away',
 'Pulling Hand In',
 'Sliding Two Fingers Left',
 'Sliding Two Fingers Right',
 'Sliding Two Fingers Down',
 'Sliding Two Fingers Up',
 'Pushing Two Fingers Away',
 'Pulling Two Fingers In',
 'Rolling Hand Forward',
 'Rolling Hand Backward',
 'Turning Hand Clockwise',
 'Turning Hand Counterclockwise',
 'Zooming In With Full Hand',
 'Zooming Out With Full Hand',
 'Zooming In With Two Fingers',
 'Zooming Out With Two Fingers',
 'Thumb Up',
 'Thumb Down',
 'Shaking Hand',
 'Stop Sign',
 'Drumming Fingers',
 'No gesture',
 'Doing other things']

In [5]:
class TemporalCrop(object):
    """Temporally crop the given frame indices at a random location or at the center location.
        size (int): Desired output size of the crop.
    """

    def __init__(self, size, mode):
        self.size = size
        self.mode = mode

    def __call__(self, path):
        """
        Args:
            paths (list): paths to be cropped.
        Returns:
            list: Cropped paths.
        """
        num_frames = len(path)
        
        if self.mode == 'random':
            if num_frames < self.size:
                pad_before = (self.size - num_frames)//2
                pad_after = self.size - num_frames - pad_before
                new_path = [path[0]]*pad_before + path + [path[-1]]*pad_after
            else:
                begin_index = random.randint(0, num_frames - self.size)
                end_index = begin_index + self.size
                new_path = path[begin_index:end_index]
        else:
            if num_frames < self.size:
                pad_before = (self.size - num_frames)//2
                pad_after = self.size - num_frames - pad_before
                new_path = [path[0]]*pad_before + path + [path[-1]]*pad_after
            else:
                begin_index = (num_frames - self.size)//2
                end_index = begin_index + self.size
                new_path = path[begin_index:end_index]
        
        return new_path

    
class MultiScaleRandomCrop(object):

    def __init__(self, scales, size, interpolation=Image.BILINEAR):
        self.scales = scales
        self.size = size
        self.interpolation = interpolation
        
    @staticmethod
    def get_random_param(self):
        self.scale = self.scales[random.randint(0, len(self.scales) - 1)]
        self.topleft_x = random.random()
        self.topleft_y = random.random()

    def __call__(self, img):
        image_width, image_height = _get_image_size(img)
        min_length = min(image_width, image_height)
        crop_size = int(min_length * self.scale)

        topleft_x = self.topleft_x * (image_width - crop_size)
        topleft_y = self.topleft_y * (image_height - crop_size)
        bottomright_x = topleft_x + crop_size
        bottomright_y = topleft_x + crop_size

        img = F.crop(img, topleft_x, topleft_y, bottomright_x, bottomright_y)
        img = F.resize(img, self.size, self.interpolation)

        return img

# class SpatialElasticDisplacement(object):

#     def __init__(self, sigma=2.0, alpha=1.0, order=0, cval=0, mode="constant"):
#         self.alpha = alpha
#         self.sigma = sigma
#         self.order = order
#         self.cval = cval
#         self.mode = mode
    
#         @staticmethod
#     def get_random_param(self):
#         self.p = random.random()
        
#     def __call__(self, img):
#         if self.p < 0.50:
#             is_L = False
#             is_PIL = isinstance(img, Image.Image)
            
#             if is_PIL:
#                 img = np.asarray(img, dtype=np.uint8)
#             if len(img.shape) == 2:
#                 is_L = True
#                 img = np.reshape(img, img.shape + (1,))  

#             image = img
#             image_first_channel = np.squeeze(image[..., 0])
#             indices_x, indices_y = self._generate_indices(image_first_channel.shape, alpha=self.alpha, sigma=self.sigma)
#             ret_image = (self._map_coordinates(
#                 image,
#                 indices_x,
#                 indices_y,
#                 order=self.order,
#                 cval=self.cval,
#                 mode=self.mode))

#             if  is_PIL:
#                 if is_L:
#                     return Image.fromarray(ret_image.reshape(ret_image.shape[:2]), mode= 'L')
#                 else:
#                     return Image.fromarray(ret_image)
#             else:
#                 return ret_image
#         else:
#             return img

#     def _generate_indices(self, shape, alpha, sigma):
#         assert (len(shape) == 2),"shape: Should be of size 2!"
#         dx = scipy.ndimage.gaussian_filter((np.random.rand(*shape) * 2 - 1), sigma, mode="constant", cval=0) * alpha
#         dy = scipy.ndimage.gaussian_filter((np.random.rand(*shape) * 2 - 1), sigma, mode="constant", cval=0) * alpha

#         x, y = np.meshgrid(np.arange(shape[0]), np.arange(shape[1]), indexing='ij')
#         return np.reshape(x+dx, (-1, 1)), np.reshape(y+dy, (-1, 1))

#     def _map_coordinates(self, image, indices_x, indices_y, order=1, cval=0, mode="constant"):
#         assert (len(image.shape) == 3),"image.shape: Should be of size 3!"
#         result = np.copy(image)
#         height, width = image.shape[0:2]
#         for c in range(image.shape[2]):
#             remapped_flat = scipy.ndimage.interpolation.map_coordinates(
#                 image[..., c],
#                 (indices_x, indices_y),
#                 order=order,
#                 cval=cval,
#                 mode=mode
#             )
#             remapped = remapped_flat.reshape((height, width))
#             result[..., c] = remapped
#         return result

In [6]:
def read_video(paths, mode):
    all_image = []
    temporal_transform = TemporalCrop(sample_duration, mode)
    if mode == 'train':
        spatial_transform = transforms.Compose([
            transforms.ToPILImage(),
            MultiScaleRandomCrop(scales, sample_size),
    #         SpatialElasticDisplacement(),
            transforms.ToTensor(),
            transforms.Normalize(rgb_mean, rgb_std)
        ])
    else:
        spatial_transform = transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize(sample_size),
            transforms.CenterCrop(sample_size),
            transforms.ToTensor(),
            transforms.Normalize(rgb_mean, rgb_std)
        ])
                
    new_paths = temporal_transform(paths)
    for path in new_paths:
        image = cv2.imread(path)
        image = spatial_transform(image)
        all_image.append(image)
    video = np.stack(all_image).transpose(1,0,2,3)
    return video

In [7]:
def map_idx(index):
    if index == 25 or index == 26:
        return 0
    return 1

In [98]:
class Dataset(Dataset):
    def __init__(self, all_path, x, y, mode):
        self.length = len(x)
        self.all_path = all_path
        self.x = x
        self.y = y
        self.mode = mode
    
    def __len__(self):
        return(self.length)
    
    def __getitem__(self, index):
        if self.mode == 'train':
            x = read_video(self.all_path[int(self.x[index,0])-1], 'random')
            y = map_idx(int(np.argwhere(self.y == self.x[index,1])))
            return torch.from_numpy(x), torch.tensor(y)
        elif self.mode == 'valid':
            x = read_video(self.all_path[int(self.x[index,0])-1], 'center')
            y = map_idx(int(np.argwhere(self.y == self.x[index,1])))
            return torch.from_numpy(x), torch.tensor(y)
        else:
            x = read_video(self.all_path[int(self.x[index])-1], 'center')
            return torch.from_numpy(x)

In [99]:
train = np.genfromtxt('../../jester/jester-v1-validation.csv', delimiter=';', dtype=np.str)    
train_data = Dataset(all_path, train, labels, 'train')
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=num_worker, pin_memory=True)

In [100]:
valid = np.genfromtxt('../../jester/jester-v1-validation.csv', delimiter=';', dtype=np.str)
valid_data = Dataset(all_path, valid, labels, 'valid')
valid_loader = DataLoader(valid_data, batch_size=batch_size, shuffle=False, num_workers=num_worker, pin_memory=True)

In [101]:
test = np.genfromtxt('../../jester/jester-v1-test.csv', delimiter=';', dtype=np.str)
test_data = Dataset(all_path, test, labels, 'test')
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, num_workers=num_worker, pin_memory=True)

In [40]:
# for i, (data, target) in enumerate(train_loader):
#     print(data.shape)

In [41]:
# def get_training_set(opt, spatial_transform, temporal_transform,
#                      target_transform):
#     assert opt.dataset in ['jester', 'egogesture', 'nv']

#     if opt.train_validate:
#         subset = ['training', 'validation']
#     else:
#         subset = 'training'
#     if opt.dataset == 'jester':
#         training_data = Jester(
#             opt.video_path,
#             opt.annotation_path,
#             subset,
#             spatial_transform=spatial_transform,
#             temporal_transform=temporal_transform,
#             target_transform=target_transform,
#             sample_duration=opt.sample_duration,
#             modality=opt.modality)
#     elif opt.dataset == 'egogesture':
#         training_data = EgoGesture(
#             opt.video_path,
#             opt.annotation_path,
#             subset,
#             spatial_transform=spatial_transform,
#             temporal_transform=temporal_transform,
#             target_transform=target_transform,
#             sample_duration=opt.sample_duration,
#             modality=opt.modality)
#     elif opt.dataset == 'nv':
#         training_data = NV(
#             opt.video_path,
#             opt.annotation_path,
#             subset,
#             spatial_transform=spatial_transform,
#             temporal_transform=temporal_transform,
#             target_transform=target_transform,
#             sample_duration=opt.sample_duration,
#             modality=opt.modality)
#     return training_data


# def get_validation_set(opt, spatial_transform, temporal_transform,
#                        target_transform):
#     assert opt.dataset in ['jester', 'egogesture', 'nv']

#     if opt.dataset == 'jester':
#         validation_data = Jester(
#             opt.video_path,
#             opt.annotation_path,
#             'validation',
#             opt.n_val_samples,
#             spatial_transform,
#             temporal_transform,
#             target_transform,
#             modality=opt.modality,
#             sample_duration=opt.sample_duration)
#     elif opt.dataset == 'egogesture':
#         validation_data = EgoGesture(
#             opt.video_path,
#             opt.annotation_path,
#             'validation',
#             opt.n_val_samples,
#             spatial_transform,
#             temporal_transform,
#             target_transform,
#             modality=opt.modality,
#             sample_duration=opt.sample_duration)
#     elif opt.dataset == 'nv':
#         validation_data = NV(
#             opt.video_path,
#             opt.annotation_path,
#             'validation',
#             spatial_transform=spatial_transform,
#             temporal_transform=temporal_transform,
#             target_transform=target_transform,
#             sample_duration=opt.sample_duration,
#             modality=opt.modality)
#     return validation_data

In [42]:
def conv3x3x3(in_planes, out_planes, stride=1):
    return nn.Conv3d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)

def conv1x1x1(in_planes, out_planes, stride=1):
    return nn.Conv3d(in_planes, out_planes, kernel_size=1, stride=stride, padding=0, bias=False)

In [50]:
class BasicBlock(nn.Module):
    def __init__(self, inplanes, planes, stride=1, downsample=False):
        super(BasicBlock, self).__init__()
        self.isDownSample = downsample
        self.conv1 = conv3x3x3(inplanes, planes, stride)
        self.bn1 = nn.BatchNorm3d(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3x3(planes, planes)
        self.bn2 = nn.BatchNorm3d(planes)
        
        if self.isDownSample:
            self.downconv = conv1x1x1(inplanes, planes, stride)
            self.downnorm = nn.BatchNorm3d(planes)

    def forward(self, x):
        residual = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.isDownSample:
            residual = self.downconv(residual)
            residual = self.downnorm(residual)

        out += residual
        out = self.relu(out)

        return out

In [69]:
class resnet10(nn.Module):
    def __init__(self, num_classes, depth, width, length):
        super(resnet10, self).__init__()
        self.depth = depth
        self.width = width
        self.length = length
        self.conv1 = nn.Conv3d(3, 8, kernel_size=7,stride=(1, 2, 2),padding=(3, 3, 3),bias=False)
        self.bn1 = nn.BatchNorm3d(8)
        self.relu1 = nn.ReLU(inplace=True)
        self.maxpool1 = nn.MaxPool3d(kernel_size=(3, 3, 3), stride=2, padding=1)
        
        self.block2 = BasicBlock(8, 16, stride=2, downsample=True)
        self.block3 = BasicBlock(16, 32, stride=2, downsample=True)
        self.block4 = BasicBlock(32, 64, stride=2, downsample=True)
        self.block5 = BasicBlock(64, 128, stride=2, downsample=True)
        last_duration = int(math.ceil(self.depth / 32))
        last_width = int(math.ceil(self.width / 64))
        last_length = int(math.ceil(self.length / 64))
        self.avgpool = nn.AvgPool3d((last_duration, last_width, last_length), stride=1)
        
        self.fc = nn.Linear(128, num_classes)
        
    def forward(self, x):
        #print(x.shape)
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu1(out)
        out = self.maxpool1(out)
        
        out = self.block2(out)
        out = self.block3(out)
        out = self.block4(out)
        out = self.block5(out)
        #print(out.shape)
        out = self.avgpool(out)
        out = torch.squeeze(out)
        #print(out.shape)
        out = self.fc(out)
        
        return out

In [71]:
# model = torch.hub.load('pytorch/vision:v0.5.0', 'resnext50_32x4d', pretrained=True)
model = resnet10(num_classes=num_classes, depth=16, width=224, length=224)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum, weight_decay=weight_decay, nesterov=True)
scheduler = ReduceLROnPlateau(optimizer, mode = 'min', factor=0.1, patience=2)
device = torch.device("cuda")
model.to(device)

resnet10(
  (conv1): Conv3d(3, 8, kernel_size=(7, 7, 7), stride=(1, 2, 2), padding=(3, 3, 3), bias=False)
  (bn1): BatchNorm3d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu1): ReLU(inplace=True)
  (maxpool1): MaxPool3d(kernel_size=(3, 3, 3), stride=2, padding=1, dilation=1, ceil_mode=False)
  (block2): BasicBlock(
    (conv1): Conv3d(8, 16, kernel_size=(3, 3, 3), stride=(2, 2, 2), padding=(1, 1, 1), bias=False)
    (bn1): BatchNorm3d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (conv2): Conv3d(16, 16, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1), bias=False)
    (bn2): BatchNorm3d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (downconv): Conv3d(8, 16, kernel_size=(1, 1, 1), stride=(2, 2, 2), bias=False)
    (downnorm): BatchNorm3d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (block3): BasicBlock(
    (conv1): Conv3d(16, 32, kernel_s

In [90]:
def train_epoch(model, train_loader, criterion, optimizer):
    model.train()

    running_loss = 0.0
    total_predictions = 0.0
    correct_predictions = 0.0
    
    itr = tqdm(train_loader)
    for data in itr:
        inputs, labels = data
        inputs = inputs.to(device)
        labels = labels.to(device)
        
        optimizer.zero_grad()

        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total_predictions += labels.size(0)
        
        correct_predictions += (predicted == labels).sum().item()
        loss = criterion(outputs, labels)
        running_loss += loss.item()
        #print(i, loss)
        loss.backward()
        optimizer.step()

    running_loss /= len(train_loader)
    acc = (correct_predictions/total_predictions)*100.0
    print('Training Loss: ', running_loss)
    print('Training Accuracy: ', acc, '%')
    return running_loss, acc

In [94]:
def test_model(model, test_loader, criterion):
    with torch.no_grad():
        model.eval()

        running_loss = 0.0
        total_predictions = 0.0
        correct_predictions = 0.0
        
        for data in tqdm(test_loader):
            inputs, labels = data
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)

            _, predicted = torch.max(outputs.data, 1)
            total_predictions += labels.size(0)
            correct_predictions += (predicted == labels).sum().item()

            loss = criterion(outputs, labels).detach()
            running_loss += loss.item()
            
        running_loss /= len(test_loader)
        acc = (correct_predictions/total_predictions)*100.0
        print('Validation Loss: ', running_loss)
        print('Validation Accuracy: ', acc, '%')
        return running_loss, acc

In [None]:
n_epochs = 100
Train_loss = []
Train_acc = []
Valid_loss = []
Valid_acc = []
num_no_improve = 0
for i in range(n_epochs):
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer)
    valid_loss, valid_acc = test_model(model, valid_loader, criterion)
    Train_acc.append(train_acc)
    Train_loss.append(train_loss)
    Valid_loss.append(valid_loss)
    scheduler.step(valid_loss)
    print('='*40)

    if i == 0:
        torch.save(model.state_dict(), 'project.pth')
    else:
        if test_acc > max(Test_acc):
            torch.save(model.state_dict(), 'project.pth')
            num_no_improve = 0
        else:
            num_no_improve += 1
    Valid_acc.append(valid_acc)
    
    training_loss = np.array(Train_loss).reshape(-1,1)
    training_acc = np.array(Train_acc).reshape(-1,1)
    validation_loss = np.array(Valid_loss).reshape(-1,1)
    validation_acc = np.array(Valid_acc).reshape(-1,1)
    result = np.concatenate([training_loss, training_acc, validation_loss, validation_acc]).T
    np.savetxt('result_project.csv', result, delimiter=',', fmt='%1.5f', header='training_loss,training_acc,validation_loss,validation_acc', comments='')
    
    if num_no_improve >= 10:
        break

HBox(children=(FloatProgress(value=0.0, max=3697.0), HTML(value='')))


Training Loss:  0.39790617815862
Training Accuracy:  86.46784337593833 %


HBox(children=(FloatProgress(value=0.0, max=3697.0), HTML(value='')))