In [None]:
import traitlets
import ipywidgets.widgets as widgets
from IPython.display import display
from jetbot import Camera, bgr8_to_jpeg

camera = Camera.instance(width=224, height=224)

image = widgets.Image(format='jpeg', width=224, height=224)  # this width and height doesn't necessarily have to match the camera

camera_link = traitlets.dlink((camera, 'value'), (image, 'value'), transform=bgr8_to_jpeg)

display(image)

运行完上面的代码块后，就可以实时的看到摄像头拍摄到的画面。接下来在第二个代码块中我们希望创建一个建立一个叫dataset的文件夹，里面有五个子文件夹，分别是 stop、left、right、forward、backward，用于分类放置每类手势的图片。

In [None]:
import os

stop_dir = 'dataset/stop'

# we have this "try/except" statement because these next functions can throw an error if the directories exist already
try:
    os.makedirs(stop_dir)
except FileExistsError:
    print('Directories not created becasue they already exist')

为了采集数据方便，我们需要创建5个按键和对应的文本框

In [None]:
button_layout = widgets.Layout(width='128px', height='64px')
stop_button = widgets.Button(description='add stop', button_style='success', layout=button_layout)
stop_count = widgets.IntText(layout=button_layout, value=len(os.listdir(stop_dir)))

display(widgets.HBox([stop_count, stop_button]))

现在，这些按钮什么也做不了。我们必须附加上功能函数，已保存图像为每个类别的按钮 ''n_click ''。我们将保存'' Image ''部件(而不是相机)的值，因为它已经是压缩的JPEG格式! 为了确保不会重复任何文件名(即使是在不同的机器上!)，我们将使用python中的''uuid ''包，它定义了''uuid ''方法来生成唯一标识符。这个惟一标识符由当前时间和机器地址等信息生成。

In [None]:
from uuid import uuid1

def save_snapshot(directory):
    image_path = os.path.join(directory, str(uuid1()) + '.jpg')
    with open(image_path, 'wb') as f:
        f.write(image.value)

def save_stop():
    global stop_dir, stop_count
    save_snapshot(stop_dir)
    stop_count.value = len(os.listdir(stop_dir))
    
    
# attach the callbacks, we use a 'lambda' function to ignore the
# parameter that the on_click event would provide to our function
# because we don't need it.
stop_button.on_click(lambda x: save_stop())

In [None]:
display(image)
display(widgets.HBox([stop_count, stop_button]))

为了方便采集数据，我们可以将按钮和摄像头的画面放在一起。

当你将代码全部完善以后，上面的按钮就可以将图像保存到相应的目录中了。你可以使用Jupyter左边目录文件浏览器来查看这些文件! 现在你就可以开始采集手势图片了。当你采集完成图片以后记得运行下面的代码将dataset文件进行压缩，然后将压缩后的文件下载到自己的电脑上。

In [None]:
!zip -r -q dataset.zip dataset

以下代码请同学们在自己的电脑上运行：

In [None]:
!pip install 

In [None]:
import cv2
import os
import mediapipe as mp


def data_calculate(folder_path, class_name):
    mp_hands = mp.solutions.hands
    hands = mp_hands.Hands(static_image_mode=True,
                           max_num_hands=2,
                           min_detection_confidence=0.5,
                           min_tracking_confidence=0.5)
    fail_img = []
    for img_name in os.listdir(folder_path):
        img = cv2.imread(folder_path + '/' + img_name)
        # Flip Horizontal
        img = cv2.flip(img, 1)
        # BGR to RGB
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        try:
            results = hands.process(img)

            with open(f'data.csv', 'a') as f:
                for i in results.multi_hand_landmarks[0].landmark:
                    # print(i.x, i.y, i.z)
                    f.write(f'{i.x},{i.y},{i.z},')
                f.write(class_name)
                f.write('\n')
        except:
            fail_img.append(img_name)

    for i in fail_img:
        print(f"Can not extract image {i}")
    print(len(fail_img))


data_calculate(folder_path='dataset/stop', class_name="0")
#data_calculate(folder_path='dataset/left', class_name="1")
#data_calculate(folder_path='dataset/right', class_name="2")
#data_calculate(folder_path='dataset/forward', class_name="3")
#data_calculate(folder_path='dataset/backward', class_name="4")

请同学们认真阅读并理解上述代码，自行提取出剩下四种手势的手部关键点数据制作成数据集。提取好的手部关键点数据集data.csv将用来训练后续的神经网络模型。

In [None]:
import itertools
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import pandas as pd
import random


def load_data(filename):
    readbook = pd.read_csv(f'{filename}.csv')
    nplist = readbook.T.to_numpy()
    data = nplist[0:-1].T
    data = np.float64(data)
    target = nplist[-1]
    return data, target


def random_number(data_size, key):
    number_set = []
    for i in range(data_size):
        number_set.append(i)

    if key == 1:
        random.shuffle(number_set)

    return number_set


def split_dataset(data_set, target_set, rate, ifsuf):
    train_size = int((1 - rate) * len(data_set))  # 计算训练集的数据个数
    data_index = random_number(len(data_set), ifsuf)
    x_train = data_set[data_index[:train_size]]
    x_test = data_set[data_index[train_size:]]
    y_train = target_set[data_index[:train_size]]
    y_test = target_set[data_index[train_size:]]
    return x_train, x_test, y_train, y_test


def inputtotensor(inputtensor, labeltensor):  # 将数据集的输入和标签转为tensor格式
    inputtensor = np.array(inputtensor)
    inputtensor = torch.FloatTensor(inputtensor)

    labeltensor = np.array(labeltensor)
    labeltensor = labeltensor.astype(float)
    labeltensor = torch.LongTensor(labeltensor)

    return inputtensor, labeltensor


def addbatch(data_train, data_test, batchsize):
    data = TensorDataset(data_train, data_test)
    data_loader = DataLoader(data, batch_size=batchsize, shuffle=False)

    return data_loader


# 定义神经网络模型
class Net(nn.Module):
    def __init__(self, n_channels=63, n_classes=5, dropout_probability=0.2):
        super(Net, self).__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.dropout_probability = dropout_probability
        self.all_conv_high = torch.nn.ModuleList([torch.nn.Sequential(
            torch.nn.Conv1d(in_channels=1, out_channels=8, kernel_size=7, padding=3),
            torch.nn.ReLU(),
            torch.nn.AvgPool1d(2),

            torch.nn.Conv1d(in_channels=8, out_channels=4, kernel_size=7, padding=3),
            torch.nn.ReLU(),
            torch.nn.AvgPool1d(2),

            torch.nn.Conv1d(in_channels=4, out_channels=4, kernel_size=7, padding=3),
            torch.nn.ReLU(),
            torch.nn.Dropout(p=self.dropout_probability),
            torch.nn.AvgPool1d(2)
        )])
        self.all_conv_low = torch.nn.ModuleList([torch.nn.Sequential(
            torch.nn.Conv1d(in_channels=1, out_channels=8, kernel_size=3, padding=1),
            torch.nn.ReLU(),
            torch.nn.AvgPool1d(2),

            torch.nn.Conv1d(in_channels=8, out_channels=4, kernel_size=3, padding=1),
            torch.nn.ReLU(),
            torch.nn.AvgPool1d(2),

            torch.nn.Conv1d(in_channels=4, out_channels=4, kernel_size=3, padding=1),
            torch.nn.ReLU(),
            torch.nn.Dropout(p=self.dropout_probability),
            torch.nn.AvgPool1d(2)
        )])
        self.all_residual = torch.nn.ModuleList([torch.nn.Sequential(
            torch.nn.AvgPool1d(2),
            torch.nn.AvgPool1d(2),
            torch.nn.AvgPool1d(2)
        )])

        self.fc = torch.nn.Sequential(
            torch.nn.Linear(in_features=9 * 7, out_features=512),
            torch.nn.ReLU(),
            torch.nn.Linear(in_features=512, out_features=n_classes)
        )
        for module in itertools.chain(self.all_conv_high, self.all_conv_low, self.all_residual):
            for layer in module:
                if layer.__class__.__name__ == "Conv1d":
                    torch.nn.init.xavier_uniform_(layer.weight, gain=torch.nn.init.calculate_gain('relu'))
                    torch.nn.init.constant_(layer.bias, 0.1)

        for layer in self.fc:
            if layer.__class__.__name__ == "Linear":
                torch.nn.init.xavier_uniform_(layer.weight, gain=torch.nn.init.calculate_gain('relu'))
                torch.nn.init.constant_(layer.bias, 0.1)

    def forward(self, input):

        input = input.unsqueeze(1)

        high = self.all_conv_high[0](input)
        low = self.all_conv_low[0](input)
        ap_residual = self.all_residual[0](input)

        # Time convolutions are concatenated along the feature maps axis
        output = torch.cat([
            high,
            low,
            ap_residual
        ], dim=1)
        N, C, F = output.size()
        output = self.fc(output.view(N, C * F))

        return output


In [None]:
def train_test(traininput, trainlabel, testinput, testlabel, batchsize):
    traindata = addbatch(traininput, trainlabel, batchsize)  # shuffle打乱数据集
    maxacc = 0
    start=time.time()
    for epoch in range(101):
        for step, data in enumerate(traindata):
            net.train()
            inputs, labels = data
            # 前向传播
            out = net(inputs)
            # 计算损失函数
            loss = loss_func(out, labels)
            # 清空上一轮的梯度
            optimizer.zero_grad()
            # 反向传播
            loss.backward()
            # 参数更新
            optimizer.step()

        # 测试准确率
        net.eval()
        testout = net(testinput)
        testloss = loss_func(testout, testlabel)
        prediction = torch.max(testout, 1)[1]  # torch.max
        pred_y = prediction.numpy()
        target_y = testlabel.data.numpy()
        j = 0
        for i in range(?):
            if pred_y[i] == target_y[i]:
                j += 1
        acc = j / pred_y.size

        if epoch % 10 == 0:
            print("训练次数为", ?, "的准确率为:", ?)
        if acc > maxacc:
            torch.save(net.state_dict(), "?", _use_new_zipfile_serialization=False)
            print('save '+ str(acc))
            maxacc = ?

    end = time.time()
    print(end-start)

上述代码中已经完整定义了训练模型需要用到的各种函数(其中神经网络模型的具体代码保存在代码文件中)，接下来请同学们自行阅读理解上述代码，补全空缺部分（即？处）的代码，使代码能正确运行。

In [None]:
if __name__ == "__main__":
    feature, label = load_data('?')
    split = ?  
    ifshuffle = ?  
    x_train, x_test, y_train, y_test = split_dataset(feature, label, split, ifshuffle)
    traininput, trainlabel = inputtotensor(?, ?)
    testinput, testlabel = inputtotensor(?, ?)
    traininput = nn.functional.normalize(traininput)
    testinput = nn.functional.normalize(?)
    LR = ?          #（0.001~0.009）
    batchsize = ?   #（2~10）
    net = Net()
    optimizer = torch.optim.Adam(net.parameters(), LR)
    loss_func = torch.nn.CrossEntropyLoss()
    train_test(traininput, trainlabel, testinput, testlabel, batchsize)

请同学们认真阅读理解各个参数的含义，自行设置参数，补全主函数中空缺的代码，训练手势识别神经网络模型达到尽可能高的准确率。在训练完成后，同学们可以将上述代码的最后一行train_test函数的调用注释掉，同时在下方添加以下代码，运行后即可加载训练好的模型，输出数据集中五类手势的混淆矩阵。

In [None]:
    input, label = inputtotensor(feature, label)
    input = nn.functional.normalize(input)
    model = Net()
    model.eval()
    model.load_state_dict(torch.load("model.pt"))
    output = model(input)
    pred = torch.max(output, 1)[1]
    C = confusion_matrix(label, pred, labels=[0, 1, 2, 3, 4])
    plt.matshow(C, cmap=plt.cm.Reds)
    for i in range(len(C)):
        for j in range(len(C)):
            plt.annotate(C[j, i], xy=(i, j), horizontalalignment='center', verticalalignment='center')

    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.show()

以下代码为将训练好的模型部署到小车上的代码：

In [None]:
import traitlets
from IPython.display import display
import ipywidgets.widgets as widgets
from jetbot import Camera, bgr8_to_jpeg

#camera = Camera.instance(width=224, height=224)
camera = Camera.instance(width=224, height=224, fps=20)
image = widgets.Image(format='jpg', width=224, height=224)

camera_link = traitlets.dlink((camera, 'value'), (image, 'value'), transform=bgr8_to_jpeg)

display(widgets.HBox([image]))

显示相机的实时画面

In [None]:
import cv2
import mediapipe as mp
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import itertools

def data_calculate(image):
    
    mp_hands = mp.solutions.hands
    hands = mp_hands.Hands(static_image_mode=True,
                           max_num_hands=2,
                           min_detection_confidence=0.5,
                           min_tracking_confidence=0.5)
    img = cv2.flip(image, 1)
    # BGR to RGB
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    input = []
    results = hands.process(img)
   
    for i in results.multi_hand_landmarks[0].landmark:
        input.extend([i.x, i.y, i.z])
    
    return input


def inputtotensor(inputtensor):
    inputtensor = np.array(inputtensor)
    inputtensor = torch.FloatTensor(inputtensor)

    return inputtensor


class Net(nn.Module):
    def __init__(self, n_channels=63, n_classes=5, dropout_probability=0.2):
        super(Net, self).__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.dropout_probability = dropout_probability
        self.all_conv_high = torch.nn.ModuleList([torch.nn.Sequential(
            torch.nn.Conv1d(in_channels=1, out_channels=8, kernel_size=7, padding=3),
            torch.nn.ReLU(),
            torch.nn.AvgPool1d(2),

            torch.nn.Conv1d(in_channels=8, out_channels=4, kernel_size=7, padding=3),
            torch.nn.ReLU(),
            torch.nn.AvgPool1d(2),

            torch.nn.Conv1d(in_channels=4, out_channels=4, kernel_size=7, padding=3),
            torch.nn.ReLU(),
            torch.nn.Dropout(p=self.dropout_probability),
            torch.nn.AvgPool1d(2)
        )])
        self.all_conv_low = torch.nn.ModuleList([torch.nn.Sequential(
            torch.nn.Conv1d(in_channels=1, out_channels=8, kernel_size=3, padding=1),
            torch.nn.ReLU(),
            torch.nn.AvgPool1d(2),

            torch.nn.Conv1d(in_channels=8, out_channels=4, kernel_size=3, padding=1),
            torch.nn.ReLU(),
            torch.nn.AvgPool1d(2),

            torch.nn.Conv1d(in_channels=4, out_channels=4, kernel_size=3, padding=1),
            torch.nn.ReLU(),
            torch.nn.Dropout(p=self.dropout_probability),
            torch.nn.AvgPool1d(2)
        )])
        self.all_residual = torch.nn.ModuleList([torch.nn.Sequential(
            torch.nn.AvgPool1d(2),
            torch.nn.AvgPool1d(2),
            torch.nn.AvgPool1d(2)
        )])

        self.fc = torch.nn.Sequential(
            torch.nn.Linear(in_features=9 * 7, out_features=512),
            torch.nn.ReLU(),
            torch.nn.Linear(in_features=512, out_features=n_classes)
        )
        for module in itertools.chain(self.all_conv_high, self.all_conv_low, self.all_residual):
            for layer in module:
                if layer.__class__.__name__ == "Conv1d":
                    torch.nn.init.xavier_uniform_(layer.weight, gain=torch.nn.init.calculate_gain('relu'))
                    torch.nn.init.constant_(layer.bias, 0.1)

        for layer in self.fc:
            if layer.__class__.__name__ == "Linear":
                torch.nn.init.xavier_uniform_(layer.weight, gain=torch.nn.init.calculate_gain('relu'))
                torch.nn.init.constant_(layer.bias, 0.1)

    def forward(self, input):

        input = input.unsqueeze(1)

        high = self.all_conv_high[0](input)
        low = self.all_conv_low[0](input)
        ap_residual = self.all_residual[0](input)

        # Time convolutions are concatenated along the feature maps axis
        output = torch.cat([
            high,
            low,
            ap_residual
        ], dim=1)
        N, C, F = output.size()
        output = self.fc(output.view(N, C * F))

        return output   

def preprocess (x): 
    x = data_calculate(x)
    x = inputtotensor(x)
    x = x.view(1,63)
    x = nn.functional.normalize(x)
    return x

model = Net()                             
model.eval()
model.load_state_dict(torch.load("model.pt"))

模型加载

In [None]:
camera_link.unlink() 

为了减少JetBot的运算负担，我们需要执行如下代码，用以取消摄像头的连接。 此时摄像头只是不推流到浏览器上，但在Jetbot上的摄像头仍然处于工作状态。

In [None]:
from jetbot import Robot
robot = Robot()
from RGB_Lib import Programing_RGB
RGB = Programing_RGB()
import RPi.GPIO as GPIO
BEEP_pin = 6 
GPIO.setmode(GPIO.BCM)
# set pin as an output pin with optional initial state of HIGH
GPIO.setup(BEEP_pin, GPIO.OUT, initial=GPIO.LOW)

使用以上代码块创建驱动电机，RGB灯，蜂鸣器的robot实例：

In [None]:
import torch.nn.functional as F
import time

def update(change):
    global stop_slider, forward_slider,backward_slider,left_slider,right_slider,robot
    t1 = time.time()
    x = change['new'] 
    try:
        x = preprocess(x)
        output = model(x)
        y = torch.max(output, 1)[1]
        print(y)
        if y == 0: 
            robot.stop()
            GPIO.output(BEEP_pin, GPIO.LOW)
            RGB.Set_ChameleonLight_RGB()
            RGB.OFF_ALL_RGB()
        if y == 1:
            robot.forward(0.4)
            GPIO.output(BEEP_pin, GPIO.LOW)
            RGB.Set_BreathSColor_RGB(2)
            RGB.Set_BreathSSpeed_RGB(1)
            RGB.Set_BreathSLight_RGB()
        if y == 2: 
            robot.backward(0.4)
            RGB.OFF_ALL_RGB()
            GPIO.output(BEEP_pin, GPIO.LOW)
            RGB.Set_An_RGB(4, 0xFF, 0x00, 0x00)
        if y == 3: 
            robot.left(0.5)
            RGB.OFF_ALL_RGB()
            GPIO.output(BEEP_pin, GPIO.LOW)
            RGB.Set_An_RGB(9, 0xFF, 0x00, 0x00)
        if y == 4: 
            robot.right(0.5)
            GPIO.output(BEEP_pin, GPIO.LOW)
            RGB.Set_All_RGB(0xFF, 0x00, 0x00)   
    except:
        robot.stop()

    time.sleep(0.5)

update({'new': camera.value})  # we call the function once to intialize
camera.observe(update, names='value') 

最后，我们需要定义一个函数，每隔一定的时间采集一次手势图片，同时根据识别结果传输给小车相应的命令。同学们可以自行摸索定义上述代码中不同动作的灯光效果。

In [None]:
camera.unobserve(update, names='value')
time.sleep(1)
robot.stop()

停止小车运行