In [None]:
from keras.preprocessing.image import ImageDataGenerator
import matplotlib.pyplot as plt
import csv
import numpy as np
import os
import cv2
from PIL import Image
import random

In [None]:
#transforms labels in form suitable for CNN
def label_trafo(labels, num_labels):
    new_labels = []
    for label in labels:
        tmp = np.zeros((num_labels))
        tmp[int(label)] = 1
        new_labels.append(tmp)
    return new_labels

In [None]:
def readTrafficSigns(rootpath, IMG_SIZE, min_img_size=32, classes=np.arange(0,43), img_format='png', label_trafo_=True, label_num=43):
    '''Reads traffic sign data for German Traffic Sign Recognition Benchmark.

    Arguments: path to the traffic sign data, for example './GTSRB/Training' and image size
    Returns:   trainingdata'''
    features = []
    labels = []

    # loop over all 42 classes
    for c in classes:
        prefix = rootpath + '/' + format(c, '05d') + '/' # subdirectory for class
        gtFile = open(prefix + 'GT-'+ format(c, '05d') + '.csv') # annotations file
        gtReader = csv.reader(gtFile, delimiter=';') # csv parser for annotations file
        k = 0
        # loop over all images in current annotations file
        for row in gtReader:
            #ignore first row in csv file
            if k == 0:
                pass
            else:
                #check if img has at least min_size
                if int(row[1]) < min_img_size:
                    pass
                else:
                    if img_format == 'ppm':
                        #read ppm format
                        features.append(cv2.resize(plt.imread(prefix + row[0]), (IMG_SIZE,IMG_SIZE)))
                        labels.append(int(row[7]))
                    elif img_format == 'png':
                        #read png format
                        features.append(cv2.resize(plt.imread(prefix + row[0][0:-4]+'.png'), (IMG_SIZE,IMG_SIZE)))
                        labels.append(int(row[7]))
                    else:
                        pass
                        
            k += 1
        gtFile.close()
    #transform labels
    if label_trafo_ == True:
        labels = label_trafo(labels, label_num)    
    features = np.reshape(features, (-1, IMG_SIZE, IMG_SIZE, 3))
    return [features, labels]    

In [None]:
# generate directory
os.mkdir('New_Augmented_Images/')
for c in range(0,43):
    os.mkdir('New_Augmented_Images/'+'Class'+str(c)) 

In [None]:
# define data preparation
shift = 0.1
datagen = ImageDataGenerator(rotation_range=15, width_shift_range=shift, height_shift_range=shift, shear_range=0.2, zoom_range=0.2)

In [None]:
#number of desired images per class
final_img_num = 2500
save_format = '.ppm'
read_format = 'ppm'
#augment images
for c in range(0,43):
    
    X, y = readTrafficSigns(rootpath='GTSRB/Final_Training/Images', IMG_SIZE=32, min_img_size=32, classes=[c], img_format=read_format)
    print(X.shape)
    
    X = X.astype('float32')

    # add arrays to big training array, same for labels
    if c == 0:
        features = X
        labels = np.array(y)
    else:
        features = np.concatenate((features, X), axis=0)
        labels = np.concatenate((labels, y), axis=0)
    datagen.fit(X)
    # save original images
    k = 0
    for img in X:
        cv2.imwrite('New_Augmented_Images/'+'Class'+str(c)+'/orig'+str(k)+save_format,cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
        k += 1
    img_num = X.shape[0]
    #augmentation step
    k = 0
    for X_batch, y_batch in datagen.flow(X, y, batch_size=32):#, save_to_dir='New_Augmented_Images/'+'Class'+str(c), save_prefix='aug', save_format='ppm'):
        # check if enough images were generated
        if img_num > final_img_num:
            break
        # create a grid of 3x3 images
        if k == 0:
            for i in range(0, 9):
                plt.subplot(330 + 1 + i)
                plt.imshow(X_batch[i].reshape(X.shape[1], X.shape[2], 3).astype(np.uint8), cmap=plt.get_cmap('gray'))
            # show the plot
            plt.show()
        # save image
        for img in X_batch:
            cv2.imwrite('New_Augmented_Images/'+'Class'+str(c)+'/aug'+str(k)+save_format,cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
            k += 1
        img_num += X_batch.shape[0]
        # add to big training array
        features = np.concatenate((features, X_batch), axis=0)
        labels = np.concatenate((labels, y_batch), axis=0)
np.save("aug_features_32x32", features)
np.save("aug_labels_32x32", labels)

# For fixed Training set only

In [None]:
from keras.preprocessing.image import ImageDataGenerator
import matplotlib.pyplot as plt
import csv
import numpy as np
import os
import cv2
from PIL import Image
import random

In [None]:
X_train = np.load("train_data.npy")*255
y_train = np.load('train_labels.npy')

In [None]:
# define data preparation
shift = 0.1
datagen = ImageDataGenerator(rotation_range=15, width_shift_range=shift, height_shift_range=shift, shear_range=0.2, zoom_range=0.2)

In [None]:
class_features = []
class_labels = []
for i in range(0,43):
    class_features.append([])
    class_labels.append([])

for i, label in enumerate(y_train):
    class_labels[int(np.argmax(label))].append(label)
    class_features[int(np.argmax(label))].append(X_train[i])
X_train = None
y_train = None    

In [None]:
# generate directory
os.mkdir('New_Augmented_Images/')
for c in range(0,43):
    os.mkdir('New_Augmented_Images/'+'Class'+str(c)) 

In [None]:
#number of desired images per class
final_img_num = 4000
save_format = '.ppm'
# scaling factor for the number of images
factor = 1
add_images = 1000

#augment images

# store original images




#for c in range(0,43):
for c in [7,20,29]:
    if c == 0:
        pass
        #features = np.array(class_features[c])
        #labels = np.array(class_labels[c])
    else:
        pass
        #features = np.concatenate((features, np.array(class_features[c])), axis=0)
        #labels = np.concatenate((labels, np.array(class_labels[c])), axis=0)
    X = np.array(class_features[c]).astype('float32')
    y = np.array(class_labels[c])
    datagen.fit(X)
    # save original images
    k = 0
    for img in X:
        #cv2.imwrite('New_Augmented_Images/'+'Class'+str(c)+'/orig'+str(k)+save_format,cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
        k += 1
    img_num = X.shape[0]
    #img_num = 0
    orig_num = X.shape[0]
    #augmentation step
    k = 0
    for X_batch, y_batch in datagen.flow(X, y, batch_size=32):#, save_to_dir='New_Augmented_Images/'+'Class'+str(c), save_prefix='aug', save_format='ppm'):
        
        if c == 7 and k == 0:
            features = X_batch
            labels = y_batch
        else:
            # add to big training array
            features = np.concatenate((features, X_batch), axis=0)
            labels = np.concatenate((labels, y_batch), axis=0)
            
        # create a grid of 3x3 images
        if k == 0:
            for i in range(0, 9):
                plt.subplot(330 + 1 + i)
                plt.imshow(X_batch[i].reshape(X.shape[1], X.shape[2], 3).astype(np.uint8), cmap=plt.get_cmap('gray'))
            # show the plot
            print(y_batch[0])
            plt.show()
            
        # check if enough images were generated    
        if img_num > final_img_num:
            break
        if img_num > add_images + orig_num:
            break    
        
        # save image
        for img in X_batch:
            #cv2.imwrite('New_Augmented_Images/'+'Class'+str(c)+'/aug'+str(k)+save_format,cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
            k += 1
        img_num += X_batch.shape[0]
        # empty memory
        class_features[c] = None
    print(features.shape)


In [None]:
print(features[0])

In [None]:
np.save("train_data_augmented_low+1000", features/255)
np.save("train_labels_augmented_low+1000", labels)

# Visualize augmentation

In [None]:
# visualize augmentation
X_train = np.load("train_data.npy")*255
y_train = np.load('train_labels.npy')

class_features = []
class_labels = []
for i in range(0,43):
    class_features.append([])
    class_labels.append([])

for i, label in enumerate(y_train):
    class_labels[int(np.argmax(label))].append(label)
    class_features[int(np.argmax(label))].append(X_train[i])
X_train = None
y_train = None    

# define data preparation
shift = 0.1
datagen = ImageDataGenerator(rotation_range=15, width_shift_range=shift, height_shift_range=shift, shear_range=0.2, zoom_range=0.2)


In [None]:
import matplotlib.pyplot as plt

fig, axs = plt.subplots(nrows=1, ncols=5, figsize=(5, 1), constrained_layout=True)

axs = axs.ravel()

#plt.subplots_adjust(left=0, bottom=0, right=1, top=1, wspace=0, hspace=0)

axs[0].imshow(class_features[14][0].astype('uint8'))
axs[0].set_xticks([])
axs[0].set_yticks([])

k = 1
for X_batch, y_batch in datagen.flow(np.reshape(class_features[14][0], (1,48,48,3)), np.reshape(class_labels[0][0],(1,43)), batch_size=9):
    axs[k].imshow(X_batch[0].astype('uint8'))
    #axs[k].imshow(class_features[14][0].astype('uint8'))
    axs[k].set_xticks([])
    axs[k].set_yticks([])
    if k == axs.shape[0]-1:
        break
    k += 1

plt.savefig('augmentation.png', dpi=500)
plt.show()

# Train pretrained model with augmented data

In [1]:
import tensorflow as tf
from keras.models import Model
from keras.layers import Dense, Dropout, Activation, Conv2D, Flatten, MaxPooling2D, Input
import matplotlib.pyplot as plt
import numpy as np
import keras
from keras.models import Sequential

Using TensorFlow backend.


In [None]:
X_train = np.load('train_data.npy')
y_train = np.load('train_labels.npy')
#X_val = np.load('val_data.npy')
#y_val = np.load('val_labels.npy')

In [None]:
#print(np.load('train_data_augmented_500_per_class.npy'))

In [2]:
from sklearn.model_selection import train_test_split

X_aug_train = np.concatenate((np.load('train_data_augmented_low+1000.npy'),np.load('train_data.npy')), axis=0)
y_aug_train = np.concatenate((np.load('train_labels_augmented_low+1000.npy'), np.load('train_labels.npy')), axis=0)

#X_aug_train = np.concatenate((np.load('train_data_augmented_500_per_class.npy'),np.load('train_data.npy')), axis=0)
#y_aug_train = np.concatenate((np.load('train_labels_augmented_500_per_class.npy'), np.load('train_labels.npy')), axis=0)


# split test set into real test set and small validation set
X_aug_train, X_aug_val, y_aug_train, y_aug_val = train_test_split(X_aug_train, y_aug_train, test_size=0.08, random_state=10)

In [None]:
#np.save('train_data_augmented_500_per_class+train_data', X_aug_train)
#np.save('train_labels_augmented_500_per_class+train_labels', y_aug_train)
#np.save('val_data_augmented_500_per_class+train_data', X_aug_val)
#np.save('val_labels_augmented_500_per_class+train_labels', y_aug_val)

#np.save('train_data_augmented_reproduce+train_data', X_aug_train)
#np.save('train_labels_augmented_reproduce+train_labels', y_aug_train)
#np.save('val_data_augmented_reproduce+train_data', X_aug_val)
#np.save('val_labels_augmented_reproduce+train_labels', y_aug_val)

#np.save('train_data_augmented_+750_2000max+train_data', X_aug_train)
#np.save('train_labels_augmented_+750_2000max+train_labels', y_aug_train)
#np.save('val_data_augmented_+750_2000max+train_data', X_aug_val)
#np.save('val_labels_augmented_+750_2000max+train_labels', y_aug_val)

In [None]:
from sklearn.model_selection import train_test_split

#X_aug_train = np.load('train_data_augmented_500_per_class.npy')
#y_aug_train = np.load('train_labels_augmented_500_per_class.npy')

X_aug_train = np.load('train_data_augmented_reproduce_class.npy')
y_aug_train = np.load('train_labels_augmented_reproduce_class.npy')


X_aug_train, X_aug_val, y_aug_train, y_aug_val = train_test_split(X_aug_train, y_aug_train, test_size=0.08, random_state=10)

In [11]:
model_1 = keras.models.load_model('Optimization Results/Standard Trained Model/CNN_16_96_128_epochs_5_relu_RMSprop_categorical_crossentropy_padding_same')

In [12]:
n_epochs = 1
status = model_1.fit(X_aug_train, y_aug_train, batch_size=64, epochs=n_epochs, validation_data=(X_aug_val,y_aug_val))#, callbacks=[tensorboard])
#status = model_1.fit(X_train, y_train, batch_size=64, epochs=n_epochs, validation_data=(X_val,y_val))
NAME = "CNN_16_96_128_relu_RMSprop_categorical_crossentropy_padding_same_augmented_low+1000"

path = 'Optimization Results/'+NAME

model_1.save(path)
results = np.zeros((3,n_epochs))
results[0::] = (np.array(status.epoch)+1)
results[1::] = np.array(status.history['val_loss'])
results[2::] = np.array(status.history['val_accuracy'])
#np.save(path, results)

Train on 36669 samples, validate on 3189 samples
Epoch 1/1
