## Курсовой проект по курсу "Фреймворк PyTorch для разработки искусственных нейронных сетей"

Нужно написать приложение, которое будет считывать и выводить кадры с веб-камеры. В процессе считывания определять что перед камерой находится человек, задетектировав его лицо на кадре. После этого, человек показывает жесты руками, а алгоритм должен считать их и определенным образом реагировать на эти жесты. 
На то, как система будет реагировать на определенные жесты - выбор за вами. Например, на определенный жест (жест пис), система будет здороваться с человеком. На другой, будет делать скриншот экрана. И т.д.
Для распознавания жестов, вам надо будет скачать датасет  https://www.kaggle.com/gti-upm/leapgestrecog, разработать модель для обучения и обучить эту модель

In [33]:
import os
import torch
import torchvision
import torch.nn as nn
import numpy as np
import torch.nn.functional as F
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as tt
from torchvision.utils import make_grid
import matplotlib.pyplot as plt
%matplotlib inline
import time
from torch.autograd import Variable
import pandas as pd
from PIL import Image
import random
from matplotlib import image
import shutil
from sklearn.model_selection import train_test_split

In [29]:
import cv2
from facenet_pytorch import MTCNN

In [38]:
data_dir = './leapGestRecog'
print(os.listdir(data_dir))

['00', '01', '02', '03', '04', '05', '06', '07', '08', '09', 'leapGestRecog']


In [36]:
digit_to_classname = {0: 'down', 
                  1: 'c', 
                  2: 'ok', 
                  3: 'l', 
                  4: 'palm_moved', 
                  5: 'fist', 
                  6: 'index', 
                  7: 'palm', 
                  8: 'thumb', 
                  9: 'fist_moved'}

In [42]:
imagepaths = []

for dirname, _, filenames in os.walk('./leapGestRecog'):
    for filename in filenames:
        path = os.path.join(dirname, filename)
        if path.endswith("png"):
            imagepaths.append(path)

print(len(imagepaths))

40000


In [43]:
X = []

for path in imagepaths[:19999]:
    img = cv2.imread(path)
    X.append(img) 

X = np.array(X)

print("Images loaded: ", len(X))

Images loaded:  19999


In [50]:
X_train, X_test = train_test_split(X, test_size = 0.3, random_state = 42)

In [59]:
train_transforms = tt.Compose([tt.Grayscale(num_output_channels=1), # Картинки чернобелые
                         
                         # Настройки для расширения датасета
                         tt.RandomHorizontalFlip(),           # Случайные повороты на 90 градусов
                         tt.RandomRotation(30),               # Случайные повороты на 30 градусов
                         tt.ToTensor()])                      # Приведение к тензору
test_transforms = tt.Compose([tt.Grayscale(num_output_channels=1), tt.ToTensor()])

In [60]:
train_dataset = ImageFolder(X_train, train_transforms)

ValueError: scandir: embedded null character in path

In [57]:
test_dataset  = ImageFolder(X_test, test_transforms)

ValueError: scandir: embedded null character in path

Сначала при исполнении train_dataset = ImageFolder(X_train, train_transforms) и test_dataset  = ImageFolder(X_test, test_transforms) выходила другая ошибка, Memory error без комментариев. Потом test_dataset  = ImageFolder(X_test, test_transforms) все же запустилась, и я подумала что для трейна не хватает памяти.  После увеличения файлов подкачки (к сожалению нет возможности запустить ноутбук на более мощной машине) ошибка изменилась, но и тест перестал работать. Гуглила ошибку, информация что дело в кодировке или слешах, но тут как будто не подходит. На этом месте работа тормозиться к сожалению, далее прилагаю дальнейший код, как это бы выглядело если бы работало :) Там вроде все нормально должно быть

In [None]:
train_dataset

In [None]:
batch_size = 100

In [None]:
def get_default_device():
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')
    
def to_device(data, device):
    if isinstance(data, (list,tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device
        
    def __iter__(self):
        for b in self.dl: 
            yield to_device(b, self.device)

    def __len__(self):
        return len(self.dl)

In [None]:
device = get_default_device()
device

In [None]:
train_dataloader = DeviceDataLoader(train_dataloader, device)
test_dataloader = DeviceDataLoader(test_dataloader, device)

In [None]:
class ResNet(nn.Module):
    def __init__(self, in_channels, num_classes):
        super().__init__()
        
        self.conv1 = self.conv_block(in_channels, 128)
        self.conv2 = self.conv_block(128, 128, pool=True)
        self.res1 = nn.Sequential(self.conv_block(128, 128), self.conv_block(128, 128))
        self.drop1 = nn.Dropout(0.5)
        
        self.conv3 = self.conv_block(128, 256)
        self.conv4 = self.conv_block(256, 256, pool=True)
        self.res2 = nn.Sequential(self.conv_block(256, 256), self.conv_block(256, 256))
        self.drop2 = nn.Dropout(0.5)
        
        self.conv5 = self.conv_block(256, 512)
        self.conv6 = self.conv_block(512, 512, pool=True)
        self.res3 = nn.Sequential(self.conv_block(512, 512), self.conv_block(512, 512))
        self.drop3 = nn.Dropout(0.5)
        
        self.classifier = nn.Sequential(nn.MaxPool2d(6), 
                                        nn.Flatten(),
                                        nn.Linear(512, num_classes))
    
    @staticmethod
    def conv_block(in_channels, out_channels, pool=False):
        layers = [nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1), 
                  nn.BatchNorm2d(out_channels), 
                  nn.ELU(inplace=True)]
        if pool: layers.append(nn.MaxPool2d(2))
        return nn.Sequential(*layers)
        
    def forward(self, xb):
        out = self.conv1(xb)
        out = self.conv2(out)
        out = self.res1(out) + out
        out = self.drop1(out)
        
        out = self.conv3(out)
        out = self.conv4(out)
        out = self.res2(out) + out
        out = self.drop2(out)
        
        out = self.conv5(out)
        out = self.conv6(out)
        out = self.res3(out) + out
        out = self.drop3(out)
        
        out = self.classifier(out)
        return out

In [None]:
model = to_device(ResNet(1, len(digit_to_classname)), device)

In [None]:
model

In [None]:
# Если работаете на гпу, очищаем весь кэш
if torch.cuda.is_available(): 
    torch.cuda.empty_cache()


epochs = 50
max_lr = 0.008
grad_clip = 0.1
weight_decay = 1e-4
optimizer = torch.optim.Adam(model.parameters(), max_lr, weight_decay=weight_decay)

In [None]:
total_steps = len(train_dataloader)
print(f'{epochs} epochs, {total_steps} total_steps per epoch')

In [None]:
print(torch.__version__)
scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr, epochs=epochs, 
                                                steps_per_epoch=len(train_dataloader))

In [None]:
epoch_losses = []

for epoch in range(epochs):
    
    time1 = time.time()
    running_loss = 0.0
    epoch_loss = []
    for batch_idx, (data, labels) in enumerate(train_dataloader):
        data, labels = Variable(data), Variable(labels)
        data = data.to(device)
        labels = labels.to(device)
        
        
        optimizer.zero_grad()
        
        outputs = model(data)
        loss = F.cross_entropy(outputs, labels)
        loss.backward()
        optimizer.step()
        scheduler.step()
        
        running_loss += loss.item()
        epoch_loss.append(loss.item())
        if (batch_idx+1) % 10000 == 9999:
            print(f'Train Epoch: {epoch+1}, Loss: {running_loss/10000}')
            time2 = time.time()
            print(f'Spend time for 10000 images: {time2 - time1} sec')
            time1 = time.time()
            running_loss = 0.0
    print(f'Epoch {epoch+1}, loss: ', np.mean(epoch_loss))
    epoch_losses.append(epoch_loss)
    

In [None]:
losses = [np.mean(loss) for loss in epoch_losses]
plt.plot(losses, '-x')
plt.xlabel('epoch')
plt.ylabel('losses')
plt.title('losses vs. No. of epochs')

In [None]:
torch.save(model.state_dict(), './model_state_50_epoch.pth')

In [None]:
net=ResNet(1, len(classes_train)).to(device)
net.load_state_dict(torch.load('./model_state_50_epoch.pth'))
net.eval()

In [None]:
# Создаем объект для считывания потока с веб-камеры(обычно вебкамера идет под номером 0. иногда 1)
cap = cv2.VideoCapture(0)  

# Класс детектирования и обработки лица с веб-камеры 
class HandDetector(object):

    def __init__(self, mtcnn):
        self.mtcnn = mtcnn
        self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
        self.emodel = ResNet(1, 10).to(self.device)
        self.emodel.load_state_dict(torch.load('./model_state_50_epoch.pth'))
        self.emodel.eval()

    # Функция рисования найденных параметров на кадре
    def _draw(self, frame, boxes, probs, gest):
        try:
            for box, prob, ld in zip(boxes, probs):
                # Рисуем обрамляющий прямоугольник 
                cv2.rectangle(frame,
                              (box[0], box[1]),
                              (box[2], box[3]),
                              (0, 0, 255),
                              thickness=2)

                # пишем на кадре какой жест распознан
                cv2.putText(frame, 
                    gest, (box[2], box[3]), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)


        except:
            print('Something wrong im draw function!')

        return frame
    
    # Функция для вырезания лиц с кадра
    @staticmethod
    def crop_faces(frame, boxes):
        faces = []
        for i, box in enumerate(boxes):
            faces.append(frame[int(box[1]):int(box[3]), 
                int(box[0]):int(box[2])])
        return faces
    
    @staticmethod
    def digit_to_classname(digit):
        classes = i{0: 'down', 
                  1: 'c', 
                  2: 'ok', 
                  3: 'l', 
                  4: 'palm_moved', 
                  5: 'fist', 
                  6: 'index', 
                  7: 'palm', 
                  8: 'thumb', 
                  9: 'fist_moved'}
            
            return classes[digit]
       
    # Функция в которой будет происходить процесс считывания и обработки каждого кадра
    def run(self):              
        # Заходим в бесконечный цикл
        while True:
            # Считываем каждый новый кадр - frame
            # ret - логическая переменая. Смысл - считали ли мы кадр с потока или нет
            ret, frame = cap.read()
            try:
                # детектируем расположение лица на кадре, вероятности на сколько это лицо
                # и особенные точки лица
                boxes, probs, landmarks = self.mtcnn.detect(frame, landmarks=False)
            if probs > 0.5:
                
                # Меняем размер изображения лица для входа в нейронную сеть
                face = cv2.resize(frame, (128, 128))
                # Превращаем в 1-канальное серое изображение
                face = cv2.cvtColor(face, cv2.COLOR_BGR2GRAY)
                # Превращаем numpy-картинку вырезанного лица в pytorch-тензор
                torch_face = torch.from_numpy(face).unsqueeze(0).to(self.device).float()
                # Загужаем наш тензор лица в нейронную сеть и получаем предсказание
                gest = self.gmodel(torch_face[None, ...])
                # Интерпретируем предсказание как строку нашей эмоции
                gest = self.digit_to_classname(gest.argmax().item())

                # Рисуем на кадре
                self._draw(frame, boxes, probs, gest)
                
            else:
                    print('Face not detected')

            except:
                print('Something wrong im main cycle!')


            # Показываем кадр в окне, и назвываем его(окно) - 'Face Detection'
            cv2.imshow('Face Detection', frame)
            
            # Функция, которая проверяет нажатие на клавишу 'q'
            # Если нажатие произошло - выход из цикла. Конец работы приложения
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
                
        # Очищаем все объекты opencv, что мы создали
        cap.release()
        cv2.destroyAllWindows()
        
        
# Загружаем мтцнн
mtcnn = MTCNN(keep_all=True, device=torch.device('cuda:0' if torch.cuda.is_available() else 'cpu'))
# Создаем объект нашего класса приложения
fcd = FaceDetector(mtcnn)
# Запускаем
fcd.run()