In [85]:
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
from torchvision import datasets
from torchvision.transforms import ToTensor
import numpy as np
import pandas as pd
import os
import cv2
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

In [86]:
# # Download training data from open datasets.
# training_data = datasets.GTSRB(
#     root="data",
#     split="train",
#     download=True,
#     transform=ToTensor(),
# )

# # Download test data from open datasets.
# test_data = datasets.GTSRB(
#     root="data",
#     split="test",
#     download=True,
#     transform=ToTensor(),
# )
# type(test_data)

In [87]:
def zero_extend(n):
    if type(n) is not int:
        print("error in zero_extend")
        return
    elif n < 10:
        return "0000" + str(n)
    else:
        return "000" + str(n)

In [88]:
train_dir = '../input/german-traffic-sign-recognition/GTSRB_training/Training'

channels = 3
IMG_HEIGHT = 30
IMG_WIDTH = 30
NUM_CLASSES = len(os.listdir(train_dir))

image_data = []
image_labels = []

# Resize images to 30x30
for i in range(NUM_CLASSES):
    path = train_dir + '/' + zero_extend(i)
    images = os.listdir(path)
    print(path)

    for img in images:
        if img[0:2] == "GT":
            continue
        image = cv2.imread(path + '/' + img)
        image_fromarray = Image.fromarray(image, 'RGB')
        resize_image = image_fromarray.resize((IMG_HEIGHT, IMG_WIDTH))
        reshape_image_arr = np.reshape(np.array(resize_image), (3, 30, 30))
        image_data.append(reshape_image_arr)
        image_labels.append(i)

# Changing the list to numpy array
image_data = np.array(image_data)
image_labels = np.array(image_labels)

print(image_data.shape, image_labels.shape)

In [89]:
# shuffle dataset
shuffle_indexes = np.arange(image_data.shape[0])
np.random.shuffle(shuffle_indexes)
image_data = image_data[shuffle_indexes]
image_labels = image_labels[shuffle_indexes]

In [90]:
# split train data into training (80%) and validation (20%)
X_train, X_val, y_train, y_val = train_test_split(
    image_data, image_labels, test_size=0.2, random_state=42, shuffle=True)


# X_train = X_train/255 
# X_val = X_val/255

print("X_train.shape", X_train.shape)
print("X_valid.shape", X_val.shape)
print("y_train.shape", y_train.shape)
print("y_valid.shape", y_val.shape)

In [91]:
# construct dataloader for pytorch usage
batch_size = 64

tensor_x_train = torch.Tensor(X_train) # transform to torch tensor
tensor_y_train = torch.Tensor(y_train).long()
tensor_x_val = torch.Tensor(X_val)
tensor_y_val = torch.Tensor(y_val).long()

training_data = TensorDataset(tensor_x_train,tensor_y_train)
val_data = TensorDataset(tensor_x_val,tensor_y_val)

# Create data loaders.
train_dataloader = DataLoader(training_data, batch_size=batch_size)
val_dataloader = DataLoader(val_data, batch_size=batch_size) # do we need a loader for validation?

for X, y in train_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

In [92]:
# Define model
device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)

class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(2700, 1300),
            nn.ReLU(),
            nn.Linear(1300, 600),
            nn.ReLU(),
            nn.Linear(600, 300),
            nn.ReLU(),
            nn.Linear(300, 150),
            nn.ReLU(),
            nn.Linear(150, 43)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

    
class ConvNN(nn.Module):
    def __init__(self):
        super(ConvNN, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(3, 16, 3),
            nn.ReLU(),
            nn.Conv2d(16, 32, 3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.BatchNorm2d(32),
            nn.Dropout(p=0.5)
        )
        self.layer2 = nn.Sequential(
            nn.Conv2d(32, 64, 3),
            nn.ReLU(),
            nn.Conv2d(64, 128, 3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.BatchNorm2d(128),
            nn.Dropout(p=0.5)
        )
        self.flatten = nn.Flatten()
        self.lin1 = nn.Linear(2048, 512)
        self.lin2 = nn.Linear(512, 43)
#         nn.init.xavier_uniform_(self.lin1.weight)
#         nn.init.xavier_uniform_(self.lin2.weight)
        self.layer3 = nn.Sequential(
            self.lin1,
            nn.ReLU(),
            nn.Dropout(p=0.5),
            self.lin2
        )
        
    def forward(self, x):
        out = self.layer1(x)
#         print(out.size())
        out = self.layer2(out)
#         print(out.size())
        out = self.flatten(out)
        out = self.layer3(out)
        return out
        

model = ConvNN().to(device)
print(model)

In [93]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), 
                            lr=1e-3,
                            weight_decay=0.001
                           )

In [94]:
#from torch import autograd
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [95]:
def test_val(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [96]:
epochs = 40
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test_val(val_dataloader, model, loss_fn)
print("Done!")

In [97]:
# get testing data
test_dir = '../input/german-traffic-sign-recognition/GTSRB_testing/Images'
test_csv = pd.read_csv('../input/german-traffic-sign-recognition/GT-final_test.csv', sep=';')

labels = test_csv["ClassId"].values
imgs = test_csv["Filename"].values

test_data = []
for img in imgs:
    image = cv2.imread(test_dir + '/' + img)
    image_fromarray = Image.fromarray(image, 'RGB')
    resize_image = image_fromarray.resize((IMG_HEIGHT, IMG_WIDTH))
    reshape_image_arr = np.reshape(np.array(resize_image), (3, 30, 30))
    test_data.append(reshape_image_arr)

len(test_data)

In [98]:
# final testing
X_test = np.array(test_data)
tensor_X_test = torch.Tensor(X_test)
tensor_X_test = tensor_X_test.cuda()
with torch.no_grad():
    pred = model(tensor_X_test)

pred_classes = pred.argmax(1)
pred_classes = pred_classes.to('cpu')
print('Test Data accuracy: ', accuracy_score(labels, pred_classes)*100)