In [None]:
from datetime import datetime
import numpy as np
import os
from pathlib import Path
import sys
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.callbacks import TensorBoard

module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

import src.false_labels_effect.util as util
import src.false_labels_effect.data_loader as dl
import src.false_labels_effect.models as mdls

%load_ext tensorboard

In [None]:
# --> TODO: set parameter below <--

# Select model task
#   'Class': main class (4) classification
#   'Subclass': sub class (14) classification
#   'Annotations' : polygon vertices prediction
model_task = 'Class'

# set number of images
limit_loaded_images = 100  # use None for "all" images

# set target size of images
resize_to = (244, 244) 

# set ratio of false labels in training data
false_ratio = 0

# define data loader parameters
val_split = 0.2
batch_size = 32

# define model processing parameter
n_epochs = 1
multiprocessing = False
n_workers = 1

In [None]:
# set training and test img png path
train_img_png_path = Path("..\\data\\Images_4c_Poly\\Train")
test_img_png_path = Path("..\\data\\Images_4c_Poly\\Test")

# set training and test img npy path
train_img_npy_path = Path('..\\data\\Images_4c_Poly\\Train_npy')
test_img_npy_path = Path('..\\data\\Images_4c_Poly\\Test_npy')

# set label path
train_label_path = Path("..\\data\\Labels_4c_Poly\\Train.npy")
test_label_path = Path("..\\data\\Labels_4c_Poly\\Test.npy")

In [None]:
# load labels
train_labels_dict = util.load_labels(train_label_path)
test_labels_dict = util.load_labels(test_label_path)


In [None]:
# load train images, resize and save as npy
if not os.path.exists(f'{train_img_npy_path}'):
    os.mkdir(train_img_npy_path)

    i = 0
    for image_path in train_img_png_path.iterdir():
        i += 1
        if limit_loaded_images is not None and i > limit_loaded_images:
            break

        # Load without resizing so that polygon fits (for now)
        img_id = image_path.name.split(".")[0]
        img = load_img(image_path)

        # Use util resize function resize image and polygon
        # TODO: poly resize currently not saved
        img_res, poly_res = util.resize(
            img, train_labels_dict[img_id], resize_to
        )

        npy_img = img_to_array(img_res)
        np.save(f'{train_img_npy_path}\\{img_id}', npy_img)

# load test images, resize and save as npy
if not os.path.exists(f'{test_img_npy_path}'):
    os.mkdir(test_img_npy_path)
    i = 0

    for image_path in test_img_png_path.iterdir():
        i += 1
        if limit_loaded_images is not None and i > limit_loaded_images:
            break

        # Load without resizing so that polygon fits (for now)
        img_id = image_path.name.split(".")[0]
        img = load_img(image_path)

        # Use util resize function resize image and polygon
        # TODO: poly resize currently not saved
        img_res, poly_res = util.resize(
            img, test_labels_dict[img_id], resize_to
        )

        npy_img = img_to_array(img_res)
        np.save(f'{test_img_npy_path}\\{img_id}', npy_img)

In [None]:
# create dict of included train and test images formated for keras data loader
partition = {}
train_img_ids_included = [str(i.name).split(".")[0] for i in train_img_npy_path.iterdir()]
test_img_ids_included = [str(i.name).split(".")[0] for i in test_img_npy_path.iterdir()]

# filter for test and train
partition['train'] = [id for id in train_img_ids_included if 'Train' in id]
partition['test'] = [id for id in test_img_ids_included if 'Test' in id]

In [None]:
# filter train labels to only include transformed images
train_labels_dict_incl = {}
for (key, value) in train_labels_dict.items():
    if key in partition['train']:
        train_labels_dict_incl[key] = value

# filter test labels to only include transformed images
test_labels_dict_incl = {}
for (key, value) in test_labels_dict.items():
    if key in partition['test']:
        test_labels_dict_incl[key] = value

# generate flattened dict of model task corresponding labels
train_labels_dict_flat = util.select_label(train_labels_dict_incl, model_task)
test_labels_dict_flat = util.select_label(test_labels_dict_incl, model_task)

# encode categorical labels for classification tasks
if model_task in ['Class', 'Subclass']:
    y_train, y_test = util.encode_labels(train_labels_dict_flat, test_labels_dict_flat)

In [None]:
# split training data into train and validation
partition['train'], y_train, partition['val'], y_val = util.train_val_split(partition['train'], y_train, val_split)
print('# train imgs:', len(partition['train']), '- # val imgs:', len(partition['val']), '- # test imgs:', len(partition['test']))

In [None]:
# set number of classes in labels
if model_task == 'Class':
    n_classes = 4
elif model_task == 'Subclass':
    n_classes = 14

# create false train labels
y_train = util.make_false_labels(y_train, false_ratio, n_classes)

In [None]:
# define data loader parameters
params = {'dim': (resize_to[0],resize_to[1]),
          'batch_size': batch_size,
          'n_classes': n_classes,
          'n_channels': 3,
          'shuffle': True}

# load data
training_loader = dl.DataLoader(partition['train'], y_train, **params)
validation_loader = dl.DataLoader(partition['val'], y_val, **params) # TODO: currently validation loader is not responding
test_loader = dl.DataLoader(partition['test'], y_test, **params)

In [None]:
# load models
basic_cnn = mdls.create_cnn_model(resize_to, n_classes)
# resnet_cnn = mdls.create_resnet_model(resize_to, n_classes)

In [None]:
# initialize logging
logdir = "../logs/scalars/" + basic_cnn._name + "/" + datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = TensorBoard(log_dir=logdir)

# compile model and train
basic_cnn.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])

history = basic_cnn.fit(x = training_loader,
                        epochs = n_epochs,
                        verbose = 2,
                        callbacks=[tensorboard_callback],
                        validation_data = validation_loader, # TODO: currently validation loader is not responding
                        use_multiprocessing = multiprocessing,
                        workers = n_workers)

In [None]:
# show test accuracy
score = basic_cnn.evaluate(x = test_loader,
                           batch_size = batch_size,
                           use_multiprocessing = multiprocessing,
                           workers = n_workers,
                           verbose = 0)

print('Test accuracy:', score[1])

In [None]:
# show logs:
# %tensorboard --logdir ../logs/scalars # or http://localhost:6006/