In [None]:
import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras import backend as K
from keras.models import model_from_json
import numpy as np
import cv2

In [None]:
from sklearn.model_selection import train_test_split
import os
import time

In [None]:
# Constants

width, height = 72, 98 # px
dir = 'imgs_fixed/'

num_classes = 4

batch_size = 128
epochs = 5

In [None]:
# Count the number of imgs to get the percentage
totalImgs = 0
for _, dirnames, filenames in os.walk(dir):
    totalImgs += len(filenames)
print('Dataset contains ', totalImgs, ' images')

X = np.zeros((1, height, width, 3)) # 3D

Y = np.zeros(num_classes)
#Y = np.hstack((Y, np.array([0, 0, 0, 0])))

# Check the shape

In [None]:
print('X shape: ', X.shape)
print('Y shape: ', Y.shape)

# Apply modifications to images

In [None]:
modifyImage = False

In [None]:
if modifyImage:
    import matplotlib.pyplot as plt
    %matplotlib inline

In [None]:
if modifyImage:
    img = cv2.imread(os.path.join(dir, 'yellow', '393.jpg'))
    img = cv2.medianBlur(img, 11)
    plt.imshow(img)
    plt.show()

# Load the images

In [None]:
cont = 0
for label in os.listdir(dir):
    y = []
    if label == 'red':
        y = [1, 0, 0, 0]
    elif label == 'yellow':
        y = [0, 1, 0, 0]
    elif label == 'green':
        y = [0, 0, 1, 0]
    else:
        y = [0, 0, 0, 1]
    
    imgsDir = os.path.join(dir, label)
    for img in os.listdir(imgsDir):
        imgPath = os.path.join(imgsDir, img)
        loadedImg = cv2.imread(imgPath, 1)
        loadedImg = cv2.medianBlur(loadedImg, 11) # apply smoothering filter
        

        XtoAdd = np.reshape(loadedImg, (1, height, width, 3))
        X = np.vstack((X, XtoAdd))
        
        Y = np.vstack((Y, np.array(y))) # append the label
        
        cont += 1
        if cont % 100 == 0:
            print('Processed imgs: ', int(cont/totalImgs*1000)/10, '%')

# Check final shape

In [None]:
print('X shape: ', X.shape)
print('Y shape: ', Y.shape)

In [None]:
# Remove first row (needed for vstack)
Y = Y[1:, :]
X = X[1:, :]

# Save the dataset because it is slow to load

In [None]:
saveTheModel = False

if saveTheModel:
    np.save('imgs.npz', X)
    np.save('labels.npz', Y)

# Normalize the data

In [None]:
X = X.astype('float32')
X /= 255

# Train test split

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.33)

In [None]:
x_train, x_test = X_train, X_test

In [None]:
print(X_train.shape[0], ' tranining samples')
print(X_test.shape[0], ' test samples')

# Reshape the images

If the image format is channels_first, we will reshape the image in the way Keras want ([samples, channels, height, width]), else in the form [samples, height, width, channels]

In [None]:
if K.image_data_format() == 'channels_first':
    X_train = X_train.reshape(X_train.shape[0], 3, height, width)
    X_test = X_test.reshape(X_test.shape[0], 3, height, width)
    input_shape = (3, height, width)
else:
    X_train = X_train.reshape(X_train.shape[0], height, width, 3)
    X_test = x_test.reshape(X_test.shape[0], height, width, 3)
    input_shape = (height, width, 3)

# Define the model

In [None]:
model = Sequential()

model.add(Conv2D(32, kernel_size=(32, 32),
                 activation='relu',
                 input_shape=input_shape))

model.add(Conv2D(64, (16, 16), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2), padding='same'))

model.add(Conv2D(64, (4, 4), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2), padding='same'))

model.add(Conv2D(128, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2), padding='same'))

model.add(Conv2D(256, (2, 2), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2), padding='same'))

model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.25))
model.add(Dense(50, activation='relu'))

model.add(Dropout(0.5))
model.add(Dense(num_classes, activation='softmax'))

In [None]:
# Compile the model
model.compile(loss=keras.losses.categorical_crossentropy,
              optimizer=keras.optimizers.Adadelta(),
              metrics=['accuracy'])

In [None]:
# Train the model
model.fit(x_train, y_train,
          batch_size=batch_size,
          epochs=epochs,
          verbose=1,
          validation_data=(x_test, y_test))

In [None]:
model.fit(x_train, y_train,
          batch_size=batch_size,
          epochs=20,
          verbose=1,
          validation_data=(x_test, y_test))

# Evaluate the model

In [None]:
score = model.evaluate(x_test, y_test, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

# Save the model for later use

In [None]:
model_json = model.to_json()
with open("model.json", "w") as json_file:
    json_file.write(model_json)
# serialize weights to HDF5
model.save_weights("weights.h5")
print("Model saved to disk")