In [1]:
import os
import cv2
import numpy as np

import skimage
import skimage.io
import skimage.transform

from sklearn.model_selection import StratifiedShuffleSplit
import sklearn.preprocessing
from sklearn.utils import class_weight

from keras.preprocessing.image import ImageDataGenerator
from keras.models import Sequential
from keras.models import load_model
from keras.layers import Conv2D, MaxPooling2D
from keras.layers import Activation, Dropout, Flatten, Dense
from keras import backend as K


Using TensorFlow backend.


In [2]:
IMG_WIDTH=32
IMG_HEIGHT=64
NUM_CLASSES=4
CLASSES=['red', 'yellow', 'green', 'unknown']

ENV='lot'   # sim or lot

In [3]:
def load_image(path):
    # load image
    img = skimage.io.imread(path)
    
    # convert to floating point
    img = img / 255.0
       
    return img


In [4]:
class DataSet:
    def __init__(self):
        self.labels = []
        self.labels_oh = None
        self.image_paths = []
        self.num_samples = 0
        self.images = None

        self.indices_train = None
        self.indices_val = None
        
        self.generator_train = None
        self.generator_valid = None
        self.numperclass = {'red':0, 'yellow':0, 'green':0, 'unknown':0}
    def load_data(self):
        # load data
        for label, name in enumerate(CLASSES):
            for root, dirs, files in os.walk(os.path.join('..', 'data_lights', ENV, name)):
                for filename in files:
                    self.labels.append(label)
                    self.image_paths.append(os.path.join(root,filename))
                    self.numperclass[name] = self.numperclass[name] + 1
                    
        self.images = np.array([load_image(path) for path in self.image_paths])

        # process data
        self.labels = np.array(self.labels)
        self.images = np.array(self.images)

        lb = sklearn.preprocessing.LabelBinarizer()
        self.labels_oh = lb.fit_transform(self.labels)
        
        self.num_samples = self.labels.shape[0]

        # split dataset
        ss = StratifiedShuffleSplit(n_splits=1, test_size=0.2)
        splitter = ss.split(np.zeros(self.num_samples), self.labels_oh)
        self.indices_train, self.indices_val = next(splitter)
        
    def create_generators(self, batch_size=16):
        # define data-generator for training set (zoom and flip the images)
        train_datagen = ImageDataGenerator(
            shear_range=0.0,
            zoom_range=0.2,
            rotation_range=0,
            horizontal_flip=True)

        self.generator_train = train_datagen.flow(
            self.images[self.indices_train],
            self.labels_oh[self.indices_train],
            batch_size=batch_size)

        # define data-generator for testing/validation (no modification)
        test_datagen = ImageDataGenerator()
        self.generator_valid = test_datagen.flow(
            self.images[self.indices_val],
            self.labels_oh[self.indices_val],
            batch_size=batch_size)
     
dataset = DataSet()

In [5]:
def create_model():
    if K.image_data_format() == 'channels_first':
        input_shape = (3, IMG_HEIGHT, IMG_WIDTH)
    else:
        input_shape = (IMG_HEIGHT, IMG_WIDTH, 3)
        
    model = Sequential()
    model.add(Conv2D(16, (1, 1), input_shape=input_shape))
    model.add(Activation('relu'))
    
    model.add(Conv2D(32, (3, 3)))
    model.add(Activation('relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Conv2D(32, (3, 3)))
    model.add(Activation('relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Conv2D(64, (3, 3)))
    model.add(Activation('relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Flatten())
    model.add(Dense(1024, activation='relu'))
    model.add(Dropout(0.5))

    model.add(Dense(1024, activation='relu'))
    model.add(Dropout(0.5))

    model.add(Dense(64, activation='relu'))
    model.add(Dense(NUM_CLASSES, activation='softmax'))           
    
    model.compile(loss='categorical_crossentropy',
                           optimizer='rmsprop',
                           metrics=['accuracy'])
    
    return model


In [6]:
def checkpoint_file():
    return "classifier_{}.h5".format(ENV)

In [7]:
model = create_model()

In [8]:
dataset.load_data()

In [9]:
dataset.create_generators()

In [10]:
print(dataset.numperclass)

class_weights = class_weight.compute_class_weight('balanced', np.unique(dataset.labels), dataset.labels)

print(class_weights)

{'unknown': 143, 'green': 617, 'yellow': 229, 'red': 244}
[ 1.26331967  1.34606987  0.49959481  2.15559441]


In [11]:
def classifier_train(p_epochs, p_batch_size=16):     
    # train the model
    
    model.fit_generator(
        dataset.generator_train,
        steps_per_epoch=(len(dataset.indices_train) // p_batch_size)*2,
        epochs=p_epochs,
        verbose=1,
        validation_data=dataset.generator_valid,
        validation_steps=len(dataset.indices_val) // p_batch_size, class_weight=class_weights)

    model.save(checkpoint_file())

In [12]:
classifier_train(20)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
