In [None]:
import cv2
import numpy as np
from sklearn.metrics import classification_report

import os
import csv

In [None]:
#Function to calculate hog descriptors from images that contain traffic signs(positives).
def calcHogTrafficSigns(rootpath):
    
    hog = cv2.HOGDescriptor((64, 64), (16, 16), (8, 8), (8, 8), 9)
    features = []
    
    for c in range(0,43):
        prefix = rootpath + '/' + format(c, '05d') + '/'
        gtFile = open(prefix + 'GT-'+ format(c, '05d') + '.csv')
        gtReader = csv.reader(gtFile, delimiter=';')
        next(gtReader, None)

        for row in gtReader:
            resized_image = cv2.resize(cv2.imread(prefix + row[0]), (64, 64))
            fd = hog.compute(resized_image)
            features.append(np.ravel(fd).tolist())
        gtFile.close()
        
    return features

In [None]:
#Function to calculate hog descriptors from images.
def calculateHog(folder):
    
    hog = cv2.HOGDescriptor((64, 64), (16, 16), (8, 8), (8, 8), 9)
    features = []
    
    #Iterates through all images in the folder.
    for filename in os.listdir(folder):
        img = cv2.imread(os.path.join(folder, filename))
        
        #Resizes and reshapes the image if needed.
        if img.shape[0] != 64 or img.shape[1] != 64:
            if img.shape[0] < img.shape[1]:
                img = img[0:img.shape[0], 0:img.shape[0]]
            else:
                img = img[0:img.shape[1], 0:img.shape[1]]
                
            img = cv2.resize(img, (64, 64))
            
        fd = hog.compute(img)
        features.append(np.ravel(fd).tolist())
        
    return features

In [None]:
def writeFeaturesToCSV(filename, features):
    with open(filename, mode='w', newline='') as features_file:
        features_writer = csv.writer(features_file)

        for elem in features:
            features_writer.writerow(elem)

In [None]:
def appendFeaturesToCSV(filename, features):
    with open(filename, mode='a', newline='') as features_file:
        features_writer = csv.writer(features_file)

        for elem in features:
            features_writer.writerow(elem)

In [None]:
def readCSV(filename, max_rows=None, delimiter=',', skip_header=0):
    return np.genfromtxt(filename, max_rows=max_rows, delimiter=delimiter, skip_header=skip_header)

In [None]:
SIGN_FEATURES_FILE_NAME = "sign_features.csv"
NON_SIGN_FEATURES_FILE_NAME = "negative_features.csv"
MY_NEGATIVES_FILE_NAME = "my_negatives.csv"

def getFeaturesAndWrite():
    #Acquires the positive features and writes them to the file.
    rootpath = "dataset/GTSRB_Final_Training_Images/GTSRB/Final_Training/Images"
    features = calcHogTrafficSigns(rootpath)
    writeFeaturesToCSV(SIGN_FEATURES_FILE_NAME, features)

    #Acquires the hard mined negative features and writes them to the file.
    rootpath = "dataset/autogen"
    features = calculateHog(rootpath)
    writeFeaturesToCSV(MY_NEGATIVES_FILE_NAME, features)

    #Acquires the negative features and writes them to the file.
    rootpath = "dataset/Non_Sign"
    features = calculateHog(rootpath)
    writeFeaturesToCSV(NON_SIGN_FEATURES_FILE_NAME, features)

In [None]:
getFeaturesAndWrite()

In [None]:
def loadFeatures():
    sign_features = readCSV(SIGN_FEATURES_FILE_NAME)
    non_sign_features = readCSV(NON_SIGN_FEATURES_FILE_NAME)
    my_non_sign_features = readCSV(MY_NEGATIVES_FILE_NAME)
    video_non_sign_features = readCSV("video_negatives.csv")

    non_sign_features = np.concatenate((non_sign_features, my_non_sign_features), axis=0)
    non_sign_features = np.concatenate((non_sign_features, video_non_sign_features), axis=0)

    return (sign_features, non_sign_features)

In [None]:
(sign_features, non_sign_features) = loadFeatures()

In [None]:
def prepareTestAndTrain(sign_features, non_sign_features):
    features = np.concatenate((sign_features, non_sign_features), axis=0)
    labels = np.append(np.ones(len(sign_features)), np.zeros(len(non_sign_features)))\
                .reshape(len(sign_features) + len(non_sign_features), 1)

    data = np.hstack((features, labels))
    np.random.shuffle(data)

    percentage = 80
    partition = int(len(data) * percentage / 100)

    f_train, f_test = data[:partition, :-1], data[partition:, :-1]
    l_train, l_test = data[:partition, -1].ravel(), data[partition:, -1].ravel()

    f_train = f_train.astype('float32')
    f_test = f_test.astype('float32')
    l_train = l_train.astype('int32')
    l_test = l_test.astype('int32')
    
    return (f_train, f_test, l_train, l_test)

In [None]:
#Configure and return linear SVM.
#max_iter - maximum number of training iterations
#precision - target precision
def configLinearSvm(max_iter, precision):
    svm = cv2.ml.SVM_create()
    svm.setType(cv2.ml.SVM_C_SVC)
    svm.setKernel(cv2.ml.SVM_LINEAR)
    svm.setTermCriteria((cv2.TERM_CRITERIA_MAX_ITER, max_iter, precision))
    
    return svm

In [None]:
def saveSvm(svm, filename):
    svm.save(filename)#svmmodel.xml

In [None]:
def trainSvm(svm):
    (f_train, f_test, l_train, l_test) = prepareTestAndTrain(sign_features, non_sign_features)
    svm.train(f_train, cv2.ml.ROW_SAMPLE, l_train)
    
    retval, l_pred = svm.predict(f_test)

    print(classification_report(l_test, l_pred))

In [None]:
svm = configLinearSvm(100000, 1e-6)
trainSvm(svm)
saveSvm(svm, "model.xml")

In [None]:
#Dealocate arrays
sign_features = None
non_sign_features = None

In [None]:
def non_max_suppression(boxes, overlapThresh):
    if len(boxes) == 0:
        return []

    if boxes.dtype.kind == "i":
        boxes = boxes.astype("float")
        
    pick = []

    x1 = boxes[:,0]
    y1 = boxes[:,1]
    x2 = boxes[:,2]
    y2 = boxes[:,3]

    area = (x2 - x1 + 1) * (y2 - y1 + 1)
    idxs = np.argsort(y2)

    while len(idxs) > 0:

        last = len(idxs) - 1
        i = idxs[last]
        pick.append(i)

        xx1 = np.maximum(x1[i], x1[idxs[:last]])
        yy1 = np.maximum(y1[i], y1[idxs[:last]])
        xx2 = np.minimum(x2[i], x2[idxs[:last]])
        yy2 = np.minimum(y2[i], y2[idxs[:last]])

        w = np.maximum(0, xx2 - xx1 + 1)
        h = np.maximum(0, yy2 - yy1 + 1)

        overlap = (w * h) / area[idxs[:last]]

        idxs = np.delete(idxs, np.concatenate(([last], np.where(overlap > overlapThresh)[0])))

    return boxes[pick].astype("int")

In [None]:
svm = cv2.ml.SVM_load("model.xml")

In [None]:
def findTrafficSigns(image, svm, hog, round_num, it, features, save = False):
    fd = hog.compute(image, winStride=(16, 16))
    
    matches = []
    for i in range(0, len(fd), 1764):
        arr = fd[i:i + 1764]
        retval, pred = svm.predict(arr.T)
        if pred[0] == 1:
            x = int(((i / 1764) % 125) * 16)
            y = int(((i / 1764) // 125) * 16)
            matches.append((x, y))
            
            window = image[y:y + 64, x:x + 64]
            if save:
                cv2.imwrite("dataset/autogen/v_iter" + str(round_num) + "_" + str(it) + ".jpg", window)
            
            features.append(np.ravel(arr).tolist())
            
            it = it + 1
            

    for m in matches:
        cv2.rectangle(image, (m[0], m[1]), (m[0] + 64, m[1] + 64), (255, 0, 0), 2)
        
    return image, it

In [None]:
cap = cv2.VideoCapture('frankfurt.mp4')

if cap.isOpened() == False:
    print("Error opening video file.")

round_num = 3
it = 0

hog = cv2.HOGDescriptor((64, 64), (16, 16), (8, 8), (8, 8), 9)

cap.set(1, 8500)
ret, frame = cap.read()

j = 0
while ret and  j < 100:
    
    features = []

    resized = cv2.resize(frame, (2048, 1024))
    (image, it) = findTrafficSigns(resized, svm, hog, round_num, it, features, save = False)
    #appendFeaturesToCSV("video_negatives.csv", features)

    cv2.imshow('Video', image)

    if cv2.waitKey(1) == ord('q'):
        break
        
    j = j + 1
        
    ret, frame = cap.read()
        
cap.release()
cv2.destroyAllWindows()

In [None]:
def readTrainCNN(rootpath):
    '''Reads traffic sign data for German Traffic Sign Recognition Benchmark.

    Arguments: path to the traffic sign data, for example './GTSRB/Training'
    Returns:   list of images, list of corresponding labels'''
    images = [] # images
    labels = [] # corresponding labels
    # loop over all 42 classes
    for c in range(0,43):
        prefix = rootpath + '/' + format(c, '05d') + '/' # subdirectory for class
        gtFile = open(prefix + 'GT-'+ format(c, '05d') + '.csv') # annotations file
        gtReader = csv.reader(gtFile, delimiter=';') # csv parser for annotations file
        next(gtReader, None) # skip header
        # loop over all images in current annotations file
        for row in gtReader:
            images.append(cv2.resize(cv2.imread(prefix + row[0]), (64, 64))) # the 1th column is the filename
            labels.append(int(row[7])) # the 8th column is the label
        gtFile.close()
    return images, labels


In [None]:
def readTestCNN(rootpath):
    images = []
    labels = []
    
    gtFile = open(rootpath + "/GT-final_test.csv") # annotations file
    gtReader = csv.reader(gtFile, delimiter=';') # csv parser for annotations file
    next(gtReader, None) # skip header
    
    for row in gtReader:
        images.append(cv2.resize(cv2.imread(rootpath + '/' + row[0]), (64, 64))) # the 1th column is the filename
        labels.append(int(row[7])) # the 8th column is the label
    gtFile.close()

    return images, labels

In [None]:
from keras.models import Sequential
from keras.layers import Conv2D
from keras.layers import MaxPool2D
from keras.optimizers import SGD
from keras.utils import np_utils
from keras.layers.core import Activation
from keras.layers.core import Flatten
from keras.layers.core import Dense
from keras.layers import Dropout

In [None]:
class LeNet5:
    def __init__(self):
        self.model = Sequential()
        self.model.add(Conv2D(24, 5, input_shape=(64, 64, 3), padding='same', activation='relu'))
        self.model.add(Dropout(0.65))
        self.model.add(MaxPool2D(pool_size=(2, 2)))
        self.model.add(Conv2D(64, 5, padding='same', activation='relu'))
        self.model.add(Dropout(0.65))
        self.model.add(MaxPool2D(pool_size=(2, 2)))
        self.model.add(Flatten())
        self.model.add(Dense(480, activation='relu'))
        self.model.add(Dropout(0.67))
        self.model.add(Dense(84, activation='relu'))
        self.model.add(Dropout(0.67))
        self.model.add(Dense(43, activation='softmax'))
        
    def compile(self):
        self.model.compile(optimizer=SGD(lr=0.0005),\
                           loss="categorical_crossentropy",\
                           metrics=["accuracy"])
    
    def train(self, trainData, trainLabels, epochs_num):
        return self.model.fit(trainData, trainLabels, batch_size=32, epochs=epochs_num, verbose=1)
        
    def evaluate(self, testData, testLabels):
        (loss, accuracy) = self.model.evaluate(testData, testLabels, batch_size=32, verbose=1)
        print("accuracy: {:.2f}%".format(accuracy * 100))

In [None]:
trainData, trainLabels = readTrainCNN("dataset/GTSRB_Final_Training_Images/GTSRB/Final_Training/Images")
testData, testLabels = readTestCNN("dataset/GTSRB_Final_Test_Images/GTSRB/Final_Test/Images")

valData = testData[int(len(testData) * 0.6):len(testData)]
valLabels = testLabels[int(len(testLabels) * 0.6):len(testLabels)]

testData = testData[0:int(len(testData) * 0.6)]
testLabels = testLabels[0:int(len(testLabels) * 0.6)]

trainData = np.array(trainData)
testData = np.array(testData)
valData = np.array(valData)

trainData = trainData.astype('float32') / 255.0
testData = testData.astype('float32') / 255.0
valData = valData.astype('float32') / 255.0

trainLabels = np_utils.to_categorical(trainLabels, 43)
testLabels = np_utils.to_categorical(testLabels, 43)
valLabels = np_utils.to_categorical(valLabels, 43)

In [None]:
trainPerm = np.random.permutation(len(trainData))
trainData = trainData[trainPerm]
trainLabels = trainLabels[trainPerm]

In [None]:
cnn = LeNet5()
cnn.compile()

In [None]:
history = cnn.train(trainData, trainLabels, 500)

In [None]:
cnn.evaluate(testData, testLabels)

In [None]:
cnn.model.save("...")

In [None]:
import matplotlib.pyplot as plt

plt.plot(history.history['accuracy'])
plt.show()

In [None]:
from keras.models import load_model
loaded = load_model("cnn12")

In [None]:
l = cv2.resize(cv2.imread("C:/Users/dlalic/Desktop/testright.png"), (64, 64))

cv2.imshow("Sign", l)
cv2.waitKey(0)
cv2.destroyAllWindows()

temp_list = [l]
npl = np.array(temp_list)
print(npl.shape)

pred = loaded.predict(npl)

print(pred)
print(np.argmax(pred))