#### For correct work, you need to download dataset from kaggle and unpack in the same directory with code. 
#### Link: [https://www.kaggle.com/dmitryyemelyanov/chinese-traffic-signs](https://www.kaggle.com/dmitryyemelyanov/chinese-traffic-signs)	


## Importing libraries

In [13]:
import pandas as pd
import numpy as np
import os 
os.environ['KMP_DUPLICATE_LIB_OK']='True' 
import tqdm
import time 
import datetime
import torch
import torchvision
import warnings
warnings.filterwarnings("ignore")

from skimage import io
from skimage import transform as transform_image
import skimage
#from tqdm import tqdm
from PIL import Image
from random import randint


import matplotlib.pyplot as plt 
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim 
import matplotlib.pyplot as plt



## Reading the dataset 
#### output - number of rows in data pool

In [74]:
raw_data = pd.read_csv('annotations.csv', delimiter=',')
data = raw_data[raw_data.category < 16][raw_data.category != 9].reset_index(drop = True)
print(len(data))

2242


## Creating dataset class to reorganize data in torchvision format

In [4]:
class ChRoadSighnDataset(torch.utils.data.Dataset):

    def __init__(self, annotation, root_dir, transform=None):
        """
        Args:
            csv_file (string): Path to the csv file with annotations.
            root_dir (string): Directory with all the images.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        self.landmarks_frame = annotation
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.landmarks_frame)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img_name = os.path.join(self.root_dir, self.landmarks_frame.iloc[idx, 0])
        image = skimage.io.imread(img_name)
        landmarks = self.landmarks_frame.iloc[idx, 7]
        landmarks = np.array([landmarks])
        ##landmarks = landmarks.astype('float').reshape(-1, 2)
        sample = {'image': image, 'landmarks': landmarks}

        if self.transform:
            sample = self.transform(sample)

        return sample

## Creating transformation classes to transform the input data into tensor format of the same size

In [5]:
class Rescale(object):
    """Rescale the image in a sample to a given size"""

    def __init__(self, output_size):
        assert isinstance(output_size, (int, tuple))
        self.output_size = output_size

    def __call__(self, sample):
        image, landmarks = sample['image'], sample['landmarks']
        img = transform_image.resize(image, (self.output_size, self.output_size))

        return {'image': img, 'landmarks': landmarks}

In [6]:
class ToTensor(object):
    """Convert ndarrays in sample to Tensors."""

    def __call__(self, sample):
        image, landmarks = sample['image'], sample['landmarks']

        # swap color axis because
        # numpy image: H x W x C
        # torch image: C X H X W
        image = image.transpose((2, 0, 1))
        
        images = torch.from_numpy(image) 
        labels = torch.from_numpy(landmarks)
        return images, labels

## Defining of the neural network structure class 
#### __init__ - creating convolution/neural layers 
#### forward - defining order of layers and direction of data flow

In [7]:
class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size = 5, stride = 1, padding = 2)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(32, 64, kernel_size = 5, stride = 1, padding = 2)
        self.fc1 = nn.Linear(64*32*32, 100)
        self.fc2 = nn.Linear(100, 100)
        self.fc3 = nn.Linear(100, 16)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.pool(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = self.pool(x)
        x = x.view(-1, 64*32*32)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Unsupervised learning section

## Defining the sampling function for first learning method 
#### It devides data on two parts: train and test pools 
#### We have 2 types of sampling: static and random 
#### For training purposes, it's better to use random sampling

In [3]:
def static_sampling(data): 
    train_data = pd.DataFrame()
    test_data = pd.DataFrame()
    
    percentage_for_train_set = 0.8 ## in range 0-1
    percentage_for_test_set = 0.2 ## in range 0-1
    
    for class_num in set(data["category"]):
        data_preroll = data[data.category == class_num]
        sampled_class_train = data_preroll.iloc[: round(len(data_preroll)*percentage_for_train_set) ,:]
        train_data = pd.concat([sampled_class_train, train_data]) 
        sampled_class_test = data_preroll.iloc[round(len(data_preroll)*percentage_for_test_set) : ,:]
        test_data = pd.concat([sampled_class_test, test_data])
    
    return train_data.reset_index(drop = True), test_data.reset_index(drop = True)

In [5]:
def random_sampling(data): #normal fully random sampling
    from random import randint
    
    train_data = pd.DataFrame()
    test_data = pd.DataFrame()

    for class_num in set(data["category"]):
        sampled_class_train = data[data.category == class_num].sample(frac = 0.8, random_state = randint(0, 200))
        train_data = pd.concat([sampled_class_train, train_data])
        sampled_class_test = data[data.category == class_num].sample(frac = 0.2, random_state = randint(0, 200))
        test_data = pd.concat([sampled_class_test, test_data])
    return train_data.reset_index(drop = True), test_data.reset_index(drop = True)

## Data reorganizing to torchvision DataLoader format
#### It connects every image to corresponding label

In [10]:
transform = torchvision.transforms.Compose([Rescale(128), ToTensor()])
train_data, test_data = random_sampling(data)
print(len(train_data), len(test_data))

trainset = ChRoadSighnDataset(annotation = train_data, root_dir = 'images/', transform = transform)
testset = ChRoadSighnDataset(annotation = test_data, root_dir = 'images/', transform = transform)

batch_size = 10

trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=False, num_workers=0)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=0)

classes = {0 : 'sp_lim_5', 1 : 'sp_lim_15', 2 : 'sp_lim_30', 3 : 'sp_lim_40', 4 : 'sp_lim_50', 5 : 'sp_lim_60',
           6 : 'sp_lim_70', 7 : 'sp_lim_80', 8 : 'no_move_left_front', 10 : 'no_move_front', 11: 'no_move_left',
           12: 'no_move_left_right', 13: 'no_move_right', 14: 'no_outdrive', 15: 'no_reversal'} 

## Learning process

In [None]:
net_p = Net()
loss_func_p = []
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net_p.parameters(), lr=0.01, momentum=0.9)
clock_p = []

epoch_num = 5

tic = time.perf_counter()

for epoch in tqdm.tqdm(range(epoch_num)):  # loop over the dataset multiple times
    tic_av = time.perf_counter()
    running_loss = 0.0
    for i, (images, labels) in enumerate(trainloader, 0):
        # zero the parameter gradients
        optimizer.zero_grad()
        images = torch.tensor(images, dtype=torch.float32)
        # forward + backward + optimize
        outputs = net_p(images)
        labels = labels.squeeze(1)
        #print(labels)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        
    #tqdm.tqdm.write('[%d, %5d] loss: %.5f' % (epoch + 1, i + 1, running_loss / 2200))
    loss_func_p.append(running_loss/1793)
    toc_av = time.perf_counter()
    clock_p.append(toc_av - tic_av)
toc = time.perf_counter()
print('Finished Training in', toc-tic, "seconds, meaning", str(datetime.timedelta(seconds = toc-tic))) 


## Saving network

In [None]:
PATH = './net_passive_trained.pth'
torch.save(net_p.state_dict(), PATH)

## Statistic, testing and other info

In [None]:
## print(loss_func_p)
## print(len(loss_func_p))

## print(clock_p)
## print(sum(clock_p)/len(clock_p))

fig, axes = plt.subplots() 
axes.plot(loss_func_p) 
axes.set_xlabel('Epoch')
axes.set_ylabel('Loss')
axes.set_title("Loss per 50 epoch")

fig, axes = plt.subplots() 
axes.plot(clock_p) 
axes.set_xlabel('Epoch')
axes.set_ylabel('Execution time')
axes.set_title("Learning time per epoch of learning, seconds")

In [None]:
correct = 0
total = 0
# since we're not training, we don't need to calculate the gradients for our outputs
with torch.no_grad():
    for data in tqdm.tqdm(testloader):
        images, labels = data
        images = torch.tensor(images, dtype=torch.float32)
        # calculate outputs by running images through the network
        outputs = net_p(images)
        # the class with the highest energy is what we choose as prediction
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        #print(predicted == labels)
        correct += ((predicted == labels).sum().item())

#print(total)
#print(correct)
print('Accuracy of the network on the 449 test images: %d %%' % (100 * correct/5 / total))


# Active learning section 

#### Defining sampling functions. init_sampling is used for creating first training pool. Next iterations of data pool creation are using active_sampling function.

In [76]:
def init_sampling(data): 
    train_data = pd.DataFrame()
    train_data = data.sample(n = 1000, random_state = randint(0, 200))
    return train_data.reset_index(drop = True), data.reset_index(drop = True)

def active_sampling(data): 
    train_data = pd.DataFrame()
    test_data = pd.DataFrame()
    for class_num in set(data["category"]):
        data_preroll = data[data.category == class_num]
        sampled_class_train = data_preroll.iloc[: round(len(data_preroll)*0.8) ,:] 
        test_data = data
    train_data = data.sample(n = 200, random_state = randint(0, 200))
    return train_data.reset_index(drop = True), test_data.reset_index(drop = True)

## Initial learning process

In [None]:
## Learning with active learning algorithm 

##init network and learning algorithm 
net_a = Net()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net_p.parameters(), lr=0.01, momentum=0.9) 

##init vars and arrays 
batch_size = 10
clock_a = []
loss_func_a = []
raw_data = pd.read_csv('annotations.csv', delimiter=',')
data = raw_data[raw_data.category < 16][raw_data.category != 9].reset_index(drop = True)

##starting whole training process, main timer start
tic_av = time.perf_counter()

##init dataset and dataloader 
transform = torchvision.transforms.Compose([Rescale(128), ToTensor()])
init_data, _ = init_sampling(data)
init_set = ChRoadSighnDataset(annotation = init_data, root_dir = 'images/', transform = transform)
init_loader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=False, num_workers=0) #init_sampling(data)

##initial pre-training
running_loss = 0.0
for i, (images, labels) in enumerate(init_loader, 0): 
    optimizer.zero_grad()
    images = torch.tensor(images, dtype=torch.float32)

    outputs = net_a(images)
    labels = labels.squeeze(1)
    loss = criterion(outputs, labels)
    loss.backward()
    optimizer.step()
    running_loss += loss.item() 
    

##save and print statistics 
toc_av = time.perf_counter() 
loss_func_a.append(running_loss/200)
clock_a.append(toc_av-tic_av)

print("network initialized")
print("loss: ", loss_func_a)
print("time: ", toc_av-tic_av)

In [None]:
for epoch in tqdm.tqdm(range(20)):
    
    tic_av = time.perf_counter()
    
    next_epoch_loader, _ = active_sampling(data)
    active_set = ChRoadSighnDataset(annotation = init_data, root_dir = 'images/', transform = transform)
    active_loader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=False, num_workers=0)
    
    running_loss = 0.0
    for i, (images, labels) in enumerate(active_loader, 0): 
        optimizer.zero_grad()
        images = torch.tensor(images, dtype=torch.float32)

        outputs = net_a(images)
        labels = labels.squeeze(1)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    toc_av = time.perf_counter()   
    loss_func_a.append(running_loss/200)
    clock_a.append(toc_av-tic_av)


print('Finished Training in ', sum(clock_a), " seconds, or", str(datetime.timedelta(seconds = toc-tic))) 

## Saving network

In [None]:
PATH = './net_active_trained.pth'
torch.save(net_a.state_dict(), PATH)

## Statistic, testing and other info

In [None]:
#print(loss_func_a)
#print(len(loss_func_a))

#print(clock_a)
#print(sum(clock_a)/len(clock_a))

fig, axes = plt.subplots() 
axes.plot(loss_func_a) 
axes.set_xlabel('Epoch')
axes.set_ylabel('Loss')
axes.set_title("Loss per 50 epoch")


fig, axes = plt.subplots() 
axes.plot(clock_a) 
axes.set_xlabel('Epoch')
axes.set_ylabel('Execution time')
axes.set_title("Learning time per epoch of learning, seconds")