# Image classification challenge

## Environment

In [None]:
from IPython.core.interactiveshell import InteractiveShell
import numpy as np
from keras.utils import np_utils
from tensorflow_core.python.keras.layers.pooling import GlobalAveragePooling2D
np.random.seed(4242)
from keras.preprocessing.image import ImageDataGenerator
InteractiveShell.ast_node_interactivity = "all"
import os
from datetime import datetime
import tensorflow as tf
from PIL import Image
import cv2
import json

In [None]:
# Set the seed for random operations.
# This let our experiments to be reproducible.
SEED = 4242
tf.random.set_seed(SEED)

# Set GPU memory growth
# Allows to only as much GPU memory as needed
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        # Currently, memory growth needs to be the same across GPUs
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        logical_gpus = tf.config.experimental.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        # Memory growth must be set before GPUs have been initialized
        print(e)


## Data Preprocessing

In [None]:
# ImageDataGenerator
# ------------------
apply_data_augmentation = True

# Create training ImageDataGenerator object
if apply_data_augmentation:
    train_data_gen = ImageDataGenerator(rotation_range=10,
                                        width_shift_range=0.15,
                                        height_shift_range=0.15,
                                        zoom_range=0.2,
                                        horizontal_flip=True,
                                        vertical_flip=True,
                                        fill_mode='constant',
                                        rescale=1./255)
else:
    train_data_gen = ImageDataGenerator(rescale=1./255)

# Create validation ImageDataGenerator object

valid_data_gen = ImageDataGenerator(rescale=1./255)


In [None]:
# Create generators to read images from datasets
# -------------------------------------------------------

# Get current working directory
#cwd = os.getcwd()
cwd = '../input/'
dataset_dir = os.path.join(cwd, 'Classification_Dataset')
training_dir = os.path.join(dataset_dir, 'training')

# batch size
bs = 32
# img shape
img_h = 256
img_w = 256

num_classes = 20
train_split = 0.8

decide_class_indices = True
if decide_class_indices:
    classes = [
        'owl',  # 0
        'galaxy',  # 1
        'lightning',  # 2
        'wine-bottle',  # 3
        't-shirt',  # 4
        'waterfall',  # 5
        'sword',  # 6
        'school-bus',  # 7
        'calculator',  # 8
        'sheet-music',  # 9
        'airplanes',  # 10
        'lightbulb',  # 11
        'skyscraper',  # 12
        'mountain-bike',  # 13
        'fireworks',  # 14
        'computer-monitor',  # 15
        'bear',  # 16
        'grand-piano',  # 17
        'kangaroo',  # 18
        'laptop'  # 19
        ]
else:
    classes = None


def padding(image):
    (height, width) = (image.shape[0], image.shape[1])
    if width > height:
        x = np.math.floor((width - height)/2)
        y = np.math.ceil((width - height)/2)
        return cv2.copyMakeBorder(image, x, y, 0, 0, cv2.BORDER_CONSTANT)
    else:
        x = np.math.floor((height - width)/2)
        y = np.math.ceil((height - width)/2)
        return cv2.copyMakeBorder(image, 0, 0, x, y, cv2.BORDER_CONSTANT)


# split training and validation sets
i = 0
images = []
for cls in classes:
    cls_dir = os.path.join(training_dir, cls)
    image_filenames = next(os.walk(cls_dir))[2]

    # iterate through all the images in the training set
    for img_name in image_filenames:
        img = Image.open(os.path.join(cls_dir, img_name)).convert('RGB')
        img_array = np.array(img)
        img_array = cv2.resize(img_array, (img_h, img_w))
        images.append(((img_array, img_name), i))
    i = i + 1

# shuffle to have random split
np.random.shuffle(images)

# split training and validation set
train_set = images[:int(len(images)*train_split)]
valid_set = images[int(len(images)*train_split):]

# tranform sets into numpy array
train_tuple_x, train_y = map(list, zip(*train_set))
train_x, train_img_names = map(list, zip(*train_tuple_x))
train_x = np.array(train_x)
valid_tuple_x, valid_y = map(list, zip(*valid_set))
valid_x, valid_img_names = map(list, zip(*valid_tuple_x))
valid_x = np.array(valid_x)

train_dict = {}
valid_dict = {}

for i in range(len(train_img_names)):
    img_name = train_img_names[i]
    class_x = classes[int(train_y[i])]
    if class_x in train_dict:
        train_dict[class_x].append(img_name)
    else:
        class_list = []
        class_list.append(img_name)
        train_dict[class_x] = class_list

for i in range(len(valid_img_names)):
    img_name = valid_img_names[i]
    class_x = classes[int(valid_y[i])]
    if class_x in valid_dict:
        valid_dict[class_x].append(img_name)
    else:
        class_list = []
        class_list.append(img_name)
        valid_dict[class_x] = class_list

final_dict = {"training": train_dict, "validation": valid_dict}
with open('dataset_split.json', 'w') as json_file:
    json.dump(final_dict, json_file)

train_y = np.array(train_y)
valid_y = np.array(valid_y)
train_y = np_utils.to_categorical(train_y, num_classes)
valid_y = np_utils.to_categorical(valid_y, num_classes)

# Training
train_gen = train_data_gen.flow(x=train_x,
                                y=train_y,
                                batch_size=bs,
                                shuffle=True,
                                seed=SEED)  # targets are directly converted into one-hot vectors

# Validation
valid_gen = valid_data_gen.flow(x=valid_x,
                                y=valid_y,
                                batch_size=bs,
                                shuffle=False,
                                seed=SEED)


In [None]:
# Create Dataset objects
# ----------------------

# Training
train_dataset = tf.data.Dataset.from_generator(lambda: train_gen,
                                               output_types=(tf.float32, tf.float32),
                                               output_shapes=([None, img_h, img_w, 3], [None, num_classes]))
train_dataset = train_dataset.repeat()

# Validation
# ----------
valid_dataset = tf.data.Dataset.from_generator(lambda: valid_gen,
                                               output_types=(tf.float32, tf.float32),
                                               output_shapes=([None, img_h, img_w, 3], [None, num_classes]))
valid_dataset = valid_dataset.repeat()


## Model Building
CNN with Xception Encoder

In [None]:
transf_learn = False

if transf_learn:
    xcep = tf.keras.applications.Xception(
        weights='imagenet', include_top=False, input_shape=(img_h, img_w, 3))
    print('Model Loaded')


In [None]:
if transf_learn:
    xcep.summary()
    xcep.layers


In [None]:
from tensorflow_core.python.keras.layers.core import Dropout

# Create Model With Transfer Learning
# ------------

if transf_learn:
    finetuning = True

    if finetuning:
        freeze_until = 110  # layer from which we want to fine-tune

        for layer in xcep.layers[:freeze_until]:
            layer.trainable = False
    else:
        xcep.trainable = False

    model = tf.keras.Sequential()
    model.add(xcep)
    model.add(GlobalAveragePooling2D())
    model.add(Dropout(0.25, seed=SEED))
    model.add(tf.keras.layers.Dense(units=100, activation='sigmoid'))
    model.add(tf.keras.layers.Dense(units=num_classes, activation='softmax'))

# Visualize created model as a table
model.summary()

# Visualize initialized weights
model.weights


Convolutional Neural Network (CNN)

In [None]:
# Keras Model subclassing
# -----------------------

# Create convolutional block
class ConvBlock(tf.keras.Model):
    def __init__(self, num_filters):
        super(ConvBlock, self).__init__()
        self.conv2d = tf.keras.layers.Conv2D(filters=num_filters,
                                             kernel_size=(3, 3),
                                             strides=(1, 1),
                                             padding='valid')
        # we can specify the activation function directly in Conv2D
        self.activation = tf.keras.layers.ReLU()
        self.pooling = tf.keras.layers.MaxPool2D(
            pool_size=(2, 2), padding='same')

    def call(self, inputs):
        x = self.conv2d(inputs)
        x = self.activation(x)
        x = self.pooling(x)
        return x


In [None]:
# Create Model Without Transfer Learning
# ------------
from tensorflow_core.python.keras.layers.core import Dropout


class CNNClassifier(tf.keras.Model):
    def __init__(self, depth, start_f, num_classes):
        super(CNNClassifier, self).__init__()
        
        self.feature_extractor = tf.keras.Sequential()
        for i in range(depth):
            self.feature_extractor.add(ConvBlock(num_filters=start_f))
            start_f *= 2
            
        self.flatten = tf.keras.layers.Flatten()
        self.classifier = tf.keras.Sequential()
        self.classifier.add(tf.keras.layers.Dense(units=512, activation='sigmoid'))
        self.classifier.add(Dropout(0.25, seed=SEED))
        self.classifier.add(tf.keras.layers.Dense(units=128, activation='sigmoid'))
        self.classifier.add(Dropout(0.25, seed=SEED))
        self.classifier.add(tf.keras.layers.Dense(units=num_classes, activation='softmax'))
        
    def call(self, inputs):
        x = self.feature_extractor(inputs)
        x = self.flatten(x)
        x = self.classifier(x)
        return x


if not(transf_learn):
    depth = 5
    start_f = 7
    num_classes = 20
    # Create Model instance
    model = CNNClassifier(depth=depth,
                        start_f=start_f,
                        num_classes=num_classes)
    # Build Model
    model.build(input_shape=(None, img_h, img_w, 3))
    

In [None]:
# Optimization params
# -------------------

# Loss
loss = tf.keras.losses.CategoricalCrossentropy()

# learning rate
lr = 1e-3
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
# -------------------

# Validation metrics
# ------------------
metrics = ['accuracy']
# ------------------

# Compile Model
model.compile(optimizer=optimizer, loss=loss, metrics=metrics)


## Training with callbacks

In [None]:
out = '../output'
exps_dir = os.path.join(out, 'classification_experiments')
if not os.path.exists(exps_dir):
    os.makedirs(exps_dir)

now = datetime.now().strftime('%b%d_%H-%M-%S')

model_name = 'CNN'

exp_dir = os.path.join(exps_dir, model_name + '_' + str(now))
if not os.path.exists(exp_dir):
    os.makedirs(exp_dir)

callbacks = []

# Model checkpoint
# ----------------
ckpt_dir = os.path.join(exp_dir, 'ckpts')
if not os.path.exists(ckpt_dir):
    os.makedirs(ckpt_dir)

# ckpt_callback = tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(ckpt_dir, 'cp_{epoch:02d}.ckpt'),
#                                                   save_weights_only=True)  # False to save the model directly
# callbacks.append(ckpt_callback)

# Visualize Learning on Tensorboard
# ---------------------------------
tb_dir = os.path.join(exp_dir, 'tb_logs')
if not os.path.exists(tb_dir):
    os.makedirs(tb_dir)
    print(tb_dir)

# By default shows losses and metrics for both training and validation
tb_callback = tf.keras.callbacks.TensorBoard(log_dir=tb_dir,
                                             profile_batch=0,
                                             histogram_freq=1)  # if 1 shows weights histograms
callbacks.append(tb_callback)


# Early Stopping
# --------------
early_stop = True
if early_stop:
    es_callback = tf.keras.callbacks.EarlyStopping(
        monitor='val_loss', patience=10)
    callbacks.append(es_callback)


model.fit(x=train_dataset,
          epochs=100,
          steps_per_epoch=len(train_gen),
          validation_data=valid_dataset,
          validation_steps=len(valid_gen),
          callbacks=callbacks)


## Test Phase

In [None]:
def create_csv(results):
    csv_fname = 'results_'
    csv_fname += datetime.now().strftime('%b%d_%H-%M-%S') + '.csv'

    with open(csv_fname, 'w') as f:

        f.write('Id,Category\n')

        for key, value in results.items():
            f.write(key + ',' + str(value) + '\n')


image_filenames = next(os.walk('../input/Classification_Dataset/test'))[2]
results = {}
for img_name in image_filenames:

    # processing the test image and getting the prediction
    img = Image.open('../input/Classification_Dataset/test/' +
                     img_name).convert('RGB')
    img_array = np.array(img)
    img_array = cv2.resize(img_array, (img_h, img_w))
    img_array = np.expand_dims(img_array, 0)
    img_array = img_array / 255.
    prediction = np.argmax(model.predict(x=img_array))

    results[img_name] = prediction

create_csv(results)
print("CSV done!")
