# Multi-Sensor Test
## Angle Threshold - DNN

### Experiment Aims：
Test the influence of the threshold of different steer angles on the classification accuracy, and choose a suitable threshold of the steer angle.

### Experiment Design：
For efficiency purposes, this experiment uses DNN to determine whether the current angle_threshold is reasonable.  
If the accuracy rate of DNN is low, we believe that the current angle_threshold cannot separate left and right well. If the accuracy is high, we can regard the current angle_threshold is reasonable.  
This experiment uses the KFold method of 4 Folds to reduce the influence of randomness in the separation of the training set and the test set on the results.

This experiment tested the effect of angle_threshold of 10, 20, 30...90 on the rationality of splitting the data set.

### Experiment Content：

In [1]:
import pandas as pd
import numpy as np
import os

In [2]:
# Read the data, assign a label to each image data according to the threshold, including go, stop, left, right
# The default speed_threshold=5, angle_threshold=30
# Finally we generate bounding_box data: X, corresponding label: y
import random
def process_data(data, speed_threshold, angle_threshold, sign_threshold, data_size):
    stop, go, left, right = split(data, speed_threshold, angle_threshold, sign_threshold, data_size)

    print("go, stop, left, right")
    print(len(go), len(stop), len(left), len(right) )

    X = np.array(list(go)+list(stop)+list(left)+list(right))
    y = np.array(list(np.ones(len(go)))+list(np.ones(len(stop))*2)+list(np.ones(len(left))*3)+list(np.ones(len(right))*4))
    
    mask = [i for i in range(len(y))]
    random.shuffle(mask)

    X=X[mask]
    y=y[mask]

    X = np.reshape(X, (len(y),21*5))

    print("X:", X.shape)
    print("y:", y.shape)
    return X, y-1

# Separate all data files into four categories: go, stop, left, and right by threshold.
# The default speed_threshold=5, angle_threshold=30
def split(data, speed_threshold=5, angle_threshold=30, sign_threshold=0.5, data_size=200):
    stop_full  = data[data["vehicle_speed"]<=speed_threshold]

    go = data[data["vehicle_speed"]>speed_threshold]
    go_full =  go[go["steering_angle_calculated"]<=angle_threshold]

    steer = go[go["steering_angle_calculated"]>angle_threshold]
    left_full  = steer[steer["steering_angle_sign"]<=sign_threshold]
    right_full = steer[steer["steering_angle_sign"]>sign_threshold]
    
    go    = get_box(go_full[:data_size])
    stop  = get_box(stop_full[:data_size])
    left  = get_box(left_full[:data_size])
    right = get_box(right_full[:data_size])
    
    return stop, go, left, right

# Take out the bounding boxs, turning angle, speed and other data of all pictures from the data file.
def get_box(fulltsv, padding=0):
    maxBox = 21

    header = [col for col in fulltsv]
    header.remove('box')
    
    x_full = []
    
    label_dict = {'Car': 1,
                 'VanSUV': 2,
                 'Pedestrian': 3,
                 'Trailer': 4,
                 'Bus': 5,
                 'Truck': 6,
                 'Bicycle': 7,
                 'MotorBiker': 8,
                 'Motorcycle': 9,
                 'Animal': 10,
                 'UtilityVehicle': 11,
                 'CaravanTransporter': 12,
                 'EmergencyVehicle': 13,
                 'Cyclist': 14}
    
    for index, row in fulltsv.iterrows():
        x = []
            
        boxs = eval(row['box'])
        for box in boxs[:maxBox]: # 生成x, 添加已有的box，box上限数量是maxBox
            x.append(box['2d_bbox'] + [label_dict[box['class']]])

        for i in range(maxBox - len(boxs)): # 填补空的box
            x.append([padding,padding,padding,padding,padding])
        
        x_full.append(x)
    return np.array(x_full)

In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F

In [4]:
class CNN_Net(nn.Module):
    def __init__(self):
        super(CNN_Net, self).__init__()

        # First 2D convolutional layer, taking in 1 input channel (image),
        # outputting 32 convolutional features, with a square kernel size of 3
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        # Second 2D convolutional layer, taking in the 32 input layers,
        # outputting 64 convolutional features, with a square kernel size of 3
        self.conv2 = nn.Conv2d(32, 64, 3, 1)

        # Designed to ensure that adjacent pixels are either all 0s or all active
        # with an input probability
        self.dropout1 = nn.Dropout2d(0.25)
        self.dropout2 = nn.Dropout2d(0.5)

        # First fully connected layer
        self.fc1 = nn.Linear(9216, 128)
        # Second fully connected layer that outputs our 10 labels
        self.fc2 = nn.Linear(128, 4)
        
    def forward(self, x):
        # Pass data through conv1
        x = self.conv1(x)
        # Use the rectified-linear activation function over x
        x = F.relu(x)

        x = self.conv2(x)
        x = F.relu(x)

        # Run max pooling over x
        x = F.max_pool2d(x, 2)
        # Pass data through dropout1
        x = self.dropout1(x)
        # Flatten x with start_dim=1
        x = torch.flatten(x, 1)
        # Pass data through fc1
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)

        # Apply softmax to x
        output = F.log_softmax(x, dim=1)
        return output

class DNN_Net(nn.Module):
    def __init__(self):
        super(DNN_Net, self).__init__()
        
        self.fc1 = nn.Linear(105, 128)
        self.dropout1 = nn.Dropout(0.25) 
        
        self.fc2 = nn.Linear(128, 64)
        self.dropout2 = nn.Dropout(0.25)
        
        self.fc3 = nn.Linear(64, 36)
        self.dropout3 = nn.Dropout(0.25)
        
        self.fc4 = nn.Linear(36, 4)        

        
    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout1(x)
        x = self.fc2(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc3(x)
        x = F.relu(x)
        x = self.dropout3(x)
        x = self.fc4(x)
        
        output = F.log_softmax(x, dim=1)
        return output
my_nn = DNN_Net()
print(my_nn)

DNN_Net(
  (fc1): Linear(in_features=105, out_features=128, bias=True)
  (dropout1): Dropout(p=0.25)
  (fc2): Linear(in_features=128, out_features=64, bias=True)
  (dropout2): Dropout(p=0.25)
  (fc3): Linear(in_features=64, out_features=36, bias=True)
  (dropout3): Dropout(p=0.25)
  (fc4): Linear(in_features=36, out_features=4, bias=True)
)


In [5]:
def train(dataloader, model, loss_fn, optimizer, flag):
    size = len(dataloader.dataset)
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device, dtype=torch.long)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if flag:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
            
def test(dataloader, model, loss_fn, flag):
    size = len(dataloader.dataset)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device, dtype=torch.long)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= size
    correct /= size
    if flag:
        print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

def fit(model, train_dataloader, test_dataloader, epochs=100):
    model = model.to(device)
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

    epochs = 100
    for t in range(epochs):
        train(train_dataloader, model, loss_fn, optimizer, False)
        test(test_dataloader, model, loss_fn, False)
    return model

In [6]:
from sklearn.model_selection import cross_val_score
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import KFold

from torch.utils.data import DataLoader, TensorDataset
device = "cuda" if torch.cuda.is_available() else "cpu"
# device = "cpu"
print("Using {} device".format(device))

# Use Kfold Train and Test in this function
def process(X, y, n_splits=4):
    cm_result = np.zeros((4,4))
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=False)
    for train_index, test_index in kf.split(X):
    #     print("TRAIN:", train_index, "TEST:", test_index)
        X_train, X_test = X[train_index], X[test_index]
        y_train, y_test = y[train_index], y[test_index]
        
        X_train = torch.from_numpy(X_train)
        y_train = torch.from_numpy(y_train)
        X_test  = torch.from_numpy(X_test)
        y_test  = torch.from_numpy(y_test)    

        training_data = TensorDataset(X_train, y_train)
        testing_data  = TensorDataset(X_test , y_test)
        train_dataloader = DataLoader(training_data, batch_size=64)
        test_dataloader  = DataLoader(testing_data,  batch_size=64)
        
        model = DNN_Net()
        model = fit(model, train_dataloader, test_dataloader)
        
        pred = model(X_test.to(device))
#         y_pred = pred.argmax(1).detach().numpy()
        y_pred = pred.argmax(1).detach().cpu().numpy()
        y_test = y_test.numpy()

        cm = confusion_matrix(y_test, y_pred)
        cm_rate = cm/cm.sum(axis=1)
        cm_result += cm_rate
    
    correct = sum(y_pred==y_test)/len(y_test)
    print(f"Test Accuracy: {(100*correct):>0.1f}%")
    return cm_result/n_splits

Using cuda device


This experiment tried the influence of angle_threshold of 10, 20, 30...90 on the classification results.

The classification accuracy is expressed in the form of a confusion matrix.  
The diagonal line from top left to bottom right corresponds to the accuracy of the four categories. The four types are go, stop, left, and right.  
The data in each row represents the probability that the data which is actually belonged to the row is classified and classified into the corresponding column by the classifier.

In [25]:
data = pd.read_csv("full_info.tsv", sep ="\t")

for i in range(1,10):
    speed_threshold = 5
    angle_threshold = i*10
    sign_threshold = 0.5
    
    print("angle_threshold", angle_threshold)
    
    data_size = 200
    X, y = process_data(data, speed_threshold, angle_threshold, sign_threshold, data_size)
    X = X.astype(np.float32) 
    y = y.astype(np.long) 
    
    n_splits = 4
    print(process(X, y, 4))
    print()

angle_threshold 10
go, stop, left, right
200 200 200 200
X: (800, 105)
y: (800,)
Test Accuracy: 41.0%
[[0.50025253 0.12082591 0.24501594 0.13461275]
 [0.17687015 0.46573084 0.2025837  0.16768295]
 [0.29662568 0.19569222 0.29508399 0.2279755 ]
 [0.27784365 0.19255051 0.24583091 0.28598122]]

angle_threshold 20
go, stop, left, right
200 200 200 200
X: (800, 105)
y: (800,)
Test Accuracy: 34.5%
[[0.46941176 0.16427771 0.20454182 0.16370532]
 [0.22948802 0.38384093 0.20494998 0.17736936]
 [0.34633987 0.16151963 0.28045218 0.20821476]
 [0.2579085  0.20104049 0.27526411 0.26763943]]

angle_threshold 30
go, stop, left, right
200 200 200 200
X: (800, 105)
y: (800,)
Test Accuracy: 43.5%
[[0.50309591 0.13892365 0.17974507 0.18768577]
 [0.09660827 0.54964539 0.1900932  0.18453901]
 [0.22746129 0.19743429 0.33220669 0.2676773 ]
 [0.25700691 0.14465999 0.26567708 0.34664612]]

angle_threshold 40
go, stop, left, right
200 200 200 200
X: (800, 105)
y: (800,)
Test Accuracy: 38.0%
[[0.51174987 0.1386773

### Experiment Analysis：
The data in the third row and the third column and the data in the fourth row and fourth column of the confusion matrix correspond to the classification accuracy of the left class and the right class, respectively.  
According to the results of different angle_threshold, we can believe that when angle_threshold ∈ [20,70], the classification accuracy of the two types does not change a lot.  
When angle_threshold<20, the accuracy of the classification may deteriorate due to insufficient data discrimination.
When angle_threshold>70, it may be that the total amount of data becomes smaller, and the final classification accuracy becomes worse.

For angle_threshold ∈ [20,70], we can find that the accuracy rate is highest when angle_threshold=50, which is more appropriate angle_threshold

### Expeiment Conclusion：
50 should be a more appropriate angle_threshold. However, it does not improve the final accuracy rate much, and the accureate rate is still lower than 60%.