## End to End Learning for Self-Driving Cars 模型

In [None]:
import scipy.misc
import random
import os
import tensorflow as tf
from tensorflow.core.protobuf import saver_pb2
import matplotlib.pyplot as plt
import numpy as np
import scipy
from numpy import array
import time
import math
from datetime import timedelta
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' 

In [None]:
tf.__version__

## 超参数

In [None]:
batch_size = 200
L2NormConst = 0.001
learning_rate = 1e-4
keep_prob = tf.placeholder(tf.float32) #drop_out参数

## 五层卷积层，其后为全连接层，整个模型没有pool

!["模型"](./convnet.png)

## 构建模型

In [None]:
def weight(shape):
    return tf.Variable(tf.truncated_normal(shape=shape, stddev=0.1))

def bias(shape):
    return tf.Variable(tf.constant(0.1, shape=shape))

#卷积层
def conv_relu(x, W, stride, b):
    return tf.nn.relu(tf.nn.conv2d(x, W, strides=[1, stride, stride, 1], padding='VALID') + b)
#全连接层
def matmul_relu(fc, W, b):
    return tf.nn.relu(tf.matmul(fc, W) + b)

with tf.name_scope('input_data'):
    image = tf.placeholder(tf.float32, shape=[None, 66, 200, 3]) #输入为RGB图片
    angle = tf.placeholder(tf.float32, shape=[None, 1]) #图片对应的转向角

with tf.name_scope('convlayer'):
    
    #第一层卷积
    W_conv1 = weight([5, 5, 3, 24])
    b_conv1 = bias([24])
    conv1 = conv_relu(image, W_conv1, 2, b_conv1)
    
    #第二层卷积
    W_conv2 = weight([5, 5, 24, 36])
    b_conv2 = bias([36])
    conv2 = conv_relu(conv1, W_conv2, 2, b_conv2)

    #第三层卷积
    W_conv3 = weight([5, 5, 36, 48])
    b_conv3 = bias([48])
    conv3 = conv_relu(conv2, W_conv3, 2, b_conv3)

    #第四层卷积
    W_conv4 = weight([3, 3, 48, 64])
    b_conv4 = bias([64])
    conv4 = conv_relu(conv3, W_conv4, 1, b_conv4)
    
    #第五层卷积
    W_conv5 = weight([3, 3, 64, 64])
    b_conv5 = bias([64])
    conv5 = conv_relu(conv4, W_conv5, 1, b_conv5)

with tf.name_scope('full_connected_layer'):
    
    #展开成1维张量
    conv5_flat = tf.reshape(conv5, [-1, 1152])
    
    #第一层全连接层
    W_fc1 = weight([1152, 1164])
    b_fc1 = bias([1164])
    fc1 = matmul_relu(conv5_flat, W_fc1, b_fc1)
    fc1_drop = tf.nn.dropout(fc1, keep_prob)

    #第二层全连接层
    W_fc2 = weight([1164, 100])
    b_fc2 = bias([100])
    fc2 = matmul_relu(fc1, W_fc2, b_fc2)
    fc2_drop = tf.nn.dropout(fc2, keep_prob)

    #第三层全连接层
    W_fc3 = weight([100, 50])
    b_fc3 = bias([50])
    fc3 = matmul_relu(fc2, W_fc3, b_fc3)
    fc3_drop = tf.nn.dropout(fc3, keep_prob)

    #第四层全连接层
    W_fc4 = weight([50, 10])
    b_fc4 = bias([10])
    fc4 = matmul_relu(fc3, W_fc4, b_fc4)
    fc4_drop = tf.nn.dropout(fc4, keep_prob)

with tf.name_scope('output'):
    W_fc5 = weight([10, 1])
    b_fc5 = bias([1])
    angle_pred = tf.multiply(tf.atan(tf.matmul(fc4_drop, W_fc5) + b_fc5), 6)

with tf.name_scope('loss'):
    train_vars = tf.trainable_variables()
    loss = tf.reduce_mean(tf.square(tf.subtract(angle_pred, angle))) \
    + tf.add_n([tf.nn.l2_loss(v) for v in train_vars]) * L2NormConst #L2范数应对过拟合

## 载入数据

In [None]:
images = []
angles = []

#记录batch的位置
train_batch_pointer = 0
test_batch_pointer = 0

#读取文件
with open("driving_dataset/data.txt") as f:
    for line in f:
        images.append("driving_dataset/" + line.split()[0])   #文件名
        angles.append(float(line.split()[1]) * math.pi / 180) #将角度转为弧度


num_images = len(images)

combine = list(zip(images, angles)) #两个列表->元组->一个列表

#使用随机数种子保存shuffle的状态
random.seed(1)
random.shuffle(combine)

images, angles = zip(*combine) #一个列表->按一一对应的关系恢复为两个列表

train_images = images[:int(num_images * 0.8)] #训练用图片
train_angles = angles[:int(num_images * 0.8)]  #训练图片对应的角度

test_images = images[-int(num_images * 0.2):] #测试用
test_angles = angles[-int(num_images * 0.2):]

num_train_images = len(train_images)
num_test_images = len(test_images)

def train_batch(batch_size):
    global train_batch_pointer
    image_out = []
    angle_out = []
    for i in range(batch_size):
        #将像素值归一化
        image_out.append(scipy.misc.imresize(scipy.misc.imread(train_images[(train_batch_pointer + i) % num_train_images])[-150:], [66, 200]) / 255.0)
        angle_out.append([train_angles[(train_batch_pointer + i) % num_train_images]])
    train_batch_pointer += batch_size
    return image_out, angle_out

def test_batch(batch_size):
    global test_batch_pointer
    image_out = []
    angle_out = []
    for i in range(batch_size):
        #将像素值归一化
        image_out.append(scipy.misc.imresize(scipy.misc.imread(test_images[(test_batch_pointer + i) % num_test_images])[-150:], [66, 200]) / 255.0)
        angle_out.append([test_angles[(test_batch_pointer + i) % num_test_images]])
    test_batch_pointer += batch_size
    return image_out, angle_out

## 训练

In [None]:
#保存记录
saver = tf.train.Saver()
save_path = './checkpoints'
sess = tf.InteractiveSession()



train = tf.train.AdamOptimizer(learning_rate).minimize(loss)
sess.run(tf.global_variables_initializer())

total_iterations = 0 #在整个训练集上训练的次数

def optimize(num_iterations):
    start_time = time.time()
    global total_iterations
    for i in range(total_iterations, total_iterations + num_iterations):
        writer = tf.summary.FileWriter('./graph', tf.get_default_graph())
        for j in range(int(num_images/batch_size)):
            image_batch, angle_batch = train_batch(batch_size)
            train.run(feed_dict={image: image_batch, angle: angle_batch, keep_prob: 0.8})
            if j % 10 == 0:
                image_batch, angle_batch = test_batch(batch_size)
                loss_on_test = loss.eval(feed_dict={image:image_batch, angle: angle_batch, keep_prob: 1.0})
                print("total_iterations: %d, Step: %d, Loss_On_Test: %g" % (total_iterations , j, loss_on_test))
        saver.save(sess=sess, save_path=save_path)
    writer.close()
    total_iterations += num_iterations
    end_time = time.time()
    time_dif = end_time - start_time
    print("Time usage: " + str(timedelta(seconds=int(round(time_dif)))))



In [None]:
optimize(1) #训练一次