In [1]:
import dataiku
from dataiku import pandasutils as pdu
import pandas as pd
import numpy as np
dataiku.use_plugin_libs('deeplearning-image-v2')
from dku_deeplearning_image.misc_objects import DkuModel, DkuImageGenerator
import dku_deeplearning_image.dku_constants as constants
from sklearn.model_selection import train_test_split
import tensorflow as tf
import pdb
from dku_config import DSSParameter, DkuConfig
from dku_deeplearning_image.recipes import RetrainRecipe

In [2]:
import numpy as np
import dku_deeplearning_image.utils as utils
import dku_deeplearning_image.dku_constants as constants
import math
import os

from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input as resnet50_preprocessing

import logging
logger = logging.getLogger(__name__)


def threadsafe_generator(f):
    """A decorator that takes a generator function and makes it thread-safe.
    Edit: Do not try to externalize, decorator is called before function is defined, there will be an error.
    """
    def g(*a, **kw):
        return utils.threadsafe_iter(f(*a, **kw))
    return g


class DkuImageGenerator(object):
    def __init__(self, images_folder, labels, input_shape, batch_size, preprocessing,
                 use_augmentation, extra_images_gen=None, n_augm=None):
        self.images_folder = images_folder
        self.labels = labels
        self.input_shape = input_shape
        self.batch_size = batch_size
        self.preprocessing = preprocessing
        self.use_augmentation = use_augmentation
        self.extra_images_gen = extra_images_gen
        self.n_augm = n_augm

    def _get_batch_size_adapted(self):
        return int(self.batch_size / self.n_augm) if self.use_augmentation else self.batch_size

    def _preprocess_img(self, images_folder, img_filename):
        image = utils.get_cached_file_from_folder(images_folder, img_filename)
        return utils.preprocess_img(image, self.input_shape, self.preprocessing)

    def _get_augmented_images(self, image):
        augm_image = np.tile(image, (self.n_augm, 1, 1, 1))
        return next(self.extra_images_gen.flow(augm_image, batch_size=self.n_augm))

    def _process_one_image(self, row):
        img_filename = row[constants.FILENAME].decode('utf-8') if isinstance(row[constants.FILENAME], bytes) else row[constants.FILENAME]
        label = row[constants.LABEL].decode('utf-8') if isinstance(row[constants.LABEL], bytes) else row[constants.LABEL]
        label_index = self.labels.index(label)
#         print("preprocessing: ", img_filename)
        try:
            image = self._preprocess_img(self.images_folder, img_filename)
            if self.use_augmentation:
                X_batch = self._get_augmented_images(image)
                y_batch = [label_index] * self.n_augm
            else:
                X_batch = [image]
                y_batch = [label_index]
        except IOError as e:
            logger.info("Cannot read the image '{}', skipping it. Error: {}".format(img_filename, e))
            X_batch, y_batch = [], []
        return X_batch, y_batch

    def _get_batch_features(self, batch_df):
        X_batch_list = []
        y_batch_list = []

        for index, row in batch_df.iterrows():
            X_batch, y_batch = self._process_one_image(row)
            X_batch_list.extend(X_batch)
            y_batch_list.extend(y_batch)

        X_batch = np.array(X_batch_list)

        actual_batch_size = X_batch.shape[0]
        y_batch = np.zeros((actual_batch_size, len(self.labels)))
        y_batch[range(actual_batch_size), y_batch_list] = 1

        return X_batch, y_batch

    @threadsafe_generator
    def load(self, image_df):
        image_df = pd.DataFrame(image_df, columns=["__dku__image_filename", "__dku__image_label"])
        n_images = image_df.shape[0]
        batch_size_adapted = self._get_batch_size_adapted()
        n_batch = int(math.ceil(n_images * 1.0 / batch_size_adapted))
        while True:
            for num_batch in range(n_batch):
                batch_df = image_df.iloc[num_batch * batch_size_adapted: (num_batch + 1) * batch_size_adapted, :]
                yield self._get_batch_features(batch_df)

In [3]:
def format_label_df(label_dataset, col_filename, col_label):
    renaming_mapping = {
        col_filename: constants.FILENAME,
        col_label: constants.LABEL
    }
    label_df = label_dataset.get_dataframe().rename(columns=renaming_mapping)[list(renaming_mapping.values())]
    return label_df

def build_train_test_sets(label_df, train_ratio, random_seed):
    train_df, test_df = train_test_split(
        label_df,
        stratify=label_df[constants.LABEL],
        train_size=train_ratio,
        random_state=random_seed)
    return train_df, test_df


In [4]:
images_folder = dataiku.Folder("3ep5yCky")
model_folder = dataiku.Folder("VrGKntsf")
labels_dataset = dataiku.Dataset("labels")

In [5]:
col_filename = "path"
col_label = "label"

label_df = format_label_df(labels_dataset, col_filename, col_label)
train_df, test_df = build_train_test_sets(label_df, 0.8, 1887)

In [6]:
from PIL import UnidentifiedImageError, ImageFile
from tensorflow.keras.preprocessing.image import img_to_array, load_img
configs = DkuConfig(**{
    'should_use_gpu': {'value': False},
    'gpu_usage': {'value': 'all'},
    'gpu_list': {'value': []},
    'gpu_memory_allocation_mode': {'value': 'memory_limit'},
    'gpu_memory_limit': {'value': 31},
    'col_filename': {'value': 'path'},
    'col_label': {'value': 'label'},
    'train_ratio': {'value': 0.9},
    'input_shape': {'value': (197, 197, 3)},
    'batch_size': {'value': 10},
    'model_pooling': {'value': 'max'},
    'model_reg': {'value': {'l1': 0.2, 'l2': 0.1}},
    'model_dropout': {'value': 0.5},
    'layer_to_retrain': {'value': 'n_last'},
    'layer_to_retrain_n': {'value': 2},
    'optimizer': {'value': 'adam'},
    'learning_rate': {'value': 0.001},
    'custom_params_opti': {'value': []},
    'nb_epochs': {'value': 5},
    'nb_steps_per_epoch': {'value': 5},
    'nb_validation_steps': {'value': 5},
    'data_augmentation': {'value': False},
    'n_augmentation': {'value': 0},
    'custom_params_data_augment': {'value': []},
    'use_tensorboard': {'value': True},
    'random_seed': {'value': 1338}
})

def get_cached_file_from_folder(folder, file_path):
    try:
        filename = file_path[0].replace(b'/', b'_')
    except Exception as err:
        raise err
    with folder.get_download_stream(file_path) as stream:
        with open(filename, 'wb') as f:
            f.write(stream.read())
            logger.info(f"cached file {file_path}")
    return filename

def preprocess_img(img_path, img_shape, preprocessing):
#     print("preprocessing: ", img_path)
    try:
        img = load_img(img_path, target_size=img_shape)
    except UnidentifiedImageError as err:
        logger.warning(f'The file {img_path} is not a valid image. skipping it. Error: {err}')
        return
    array = img_to_array(img)
    array = preprocessing(array)
    return array

def convert_target_to_np_array(target_array):
    dummies = pd.get_dummies(target_array)
    return {"remapped": dummies.values.astype(np.int8), "classes": list(dummies.columns)}

In [7]:
def retrain_old_way():
    def build_tfds(pddf):
        return tf.data.Dataset.from_generator(
             dku_generator.load,
             output_types=(tf.float32, tf.float32),
             output_shapes=(tf.TensorShape([None, 197, 197, 3]), tf.TensorShape([None, 2])),
             args=(pddf,)
        )
    
    retrain_recipe = build_retrain_recipe()

    dku_generator = DkuImageGenerator(
        images_folder=images_folder,
        labels=retrain_recipe.dku_model.get_distinct_labels(),
        input_shape=configs.input_shape,
        batch_size=configs.batch_size,
        preprocessing=retrain_recipe.dku_model.application.preprocessing,
        use_augmentation=False
    )

    train_tfds_old_way, test_tfds_old_way = build_tfds(train_df), build_tfds(test_df)
    model_fit(retrain_recipe, train_tfds_old_way, test_tfds_old_way)

In [8]:
def preprocess_image(images_folder, image_filename, image_shape):
    image_path = tf.numpy_function(lambda x: get_cached_file_from_folder(images_folder, x), [image_filename], tf.string)
    return tf.numpy_function(lambda x: preprocess_img(x, image_shape, resnet50_preprocessing), [image_path], tf.float32)

In [9]:
import time
import warnings
from tables import NaturalNameWarning
warnings.filterwarnings('ignore', category=NaturalNameWarning)

def build_retrain_recipe():
    retrain_recipe = RetrainRecipe(configs)

    retrain_recipe.load_dku_model(model_folder, label_df)
    retrain_recipe.compile()

    model_weights_path = retrain_recipe.dku_model.get_weights_path()

    callbacks = retrain_recipe._get_callbacks(
        output_model_folder=dataiku.Folder('hWWMMUFI'),
        model_weights_path=model_weights_path
    )
    return retrain_recipe

def timeit(method):
    def timed(*args, **kw):
        ts = time.time()
        result = method(*args, **kw)
        te = time.time()
        if 'log_time' in kw:
            name = kw.get('log_name', method.__name__.upper())
            kw['log_time'][name] = int((te - ts) * 1000)
        else:
            print('{}  {:.2f} ms'.format(method.__name__, (te - ts) * 1000))
        return result
    return timed

In [10]:
import time

@timeit
def model_fit(retrain_recipe, train_tfds, test_tfds):
    model_weights_path = retrain_recipe.dku_model.get_weights_path()

    callbacks = retrain_recipe._get_callbacks(
        output_model_folder=dataiku.Folder('hWWMMUFI'),
        model_weights_path=model_weights_path
    )
    retrain_recipe._retrain(
        train_generator=train_tfds,
        test_generator=test_tfds,
        callback_list=callbacks
    )

def retrain(train_tfds, test_tfds, opt_func=lambda x: x):
    retrain_recipe = build_retrain_recipe()
    model_fit(retrain_recipe, opt_func(train_tfds), opt_func(test_tfds))

In [11]:
def build_optimized_datasets(train_df, test_df, paral=True):
    ### Training Data
    # Building train dataset
    train_X_tfds = tf.data.Dataset.from_tensor_slices(train_df["__dku__image_filename"].values.reshape(-1, 1))
    train_X_tfds = train_X_tfds.map(lambda x: preprocess_image(images_folder, x, configs.input_shape),
                                    num_parallel_calls=(-1 if paral else None))

    # Building test dataset
    test_X_tfds = tf.data.Dataset.from_tensor_slices(test_df["__dku__image_filename"].values.reshape(-1, 1))
    test_X_tfds = test_X_tfds.map(lambda x: preprocess_image(images_folder, x, configs.input_shape),
                                  num_parallel_calls=(-1 if paral else None))

    ### Validation Data
    # Building train dataset
    train_y_values = convert_target_to_np_array(train_df["__dku__image_label"].values)
    train_y_tfds = tf.data.Dataset.from_tensor_slices(train_y_values["remapped"])

    # Building test dataset
    test_y_values = convert_target_to_np_array(test_df["__dku__image_label"].values)
    test_y_tfds = tf.data.Dataset.from_tensor_slices(test_y_values["remapped"])

    ### Gathering datasets
    train_tfds_common = tf.data.Dataset.zip((train_X_tfds, train_y_tfds)).batch(configs.batch_size, drop_remainder=True).repeat()
    test_tfds_common = tf.data.Dataset.zip((test_X_tfds, test_y_tfds)).batch(configs.batch_size, drop_remainder=True).repeat()
    
    return train_tfds_common, test_tfds_common


In [12]:
%%time
# Old way
retrain_old_way()

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


  tensor_proto.tensor_content = nparray.tostring()


Epoch 1/5
5/5 - 8s - accuracy: 0.6200 - loss: 27.5999 - val_accuracy: 0.7500 - val_loss: 23.0304
Epoch 2/5
5/5 - 5s - accuracy: 0.7143 - loss: 25.7568 - val_accuracy: 0.8611 - val_loss: 21.7466
Epoch 3/5
5/5 - 5s - accuracy: 0.8333 - loss: 23.2617 - val_accuracy: 1.0000 - val_loss: 20.9861
Epoch 4/5
5/5 - 5s - accuracy: 0.9286 - loss: 21.3702 - val_accuracy: 0.9167 - val_loss: 20.5420
Epoch 5/5
5/5 - 5s - accuracy: 0.9762 - loss: 20.4558 - val_accuracy: 1.0000 - val_loss: 19.7459
model_fit  35440.46 ms
CPU times: user 1min 40s, sys: 15.9 s, total: 1min 56s
Wall time: 40.8 s


In [54]:
%%time
# No Optimization
train_tfds_common, test_tfds_common = build_optimized_datasets(train_df, test_df, paral=False)
retrain(train_tfds_common, test_tfds_common)

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


  tensor_proto.tensor_content = nparray.tostring()


Epoch 1/5
5/5 - 11s - accuracy: 0.5400 - loss: 28.1289 - val_accuracy: 0.5000 - val_loss: 26.4571
Epoch 2/5
5/5 - 7s - accuracy: 0.6600 - loss: 25.9002 - val_accuracy: 0.7000 - val_loss: 24.2476
Epoch 3/5
5/5 - 8s - accuracy: 0.7000 - loss: 22.6347 - val_accuracy: 0.6000 - val_loss: 22.4071
Epoch 4/5
5/5 - 8s - accuracy: 0.7600 - loss: 23.3522 - val_accuracy: 0.9000 - val_loss: 21.1457
Epoch 5/5
5/5 - 8s - accuracy: 0.9400 - loss: 20.7426 - val_accuracy: 0.8000 - val_loss: 21.5976
model_fit  50039.66 ms
CPU times: user 2min 25s, sys: 24.8 s, total: 2min 50s
Wall time: 56.4 s


In [55]:
%%time
# With parralele preprocessing
train_tfds_common, test_tfds_common = build_optimized_datasets(train_df, test_df, paral=True)
retrain(train_tfds_common, test_tfds_common)

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)
Epoch 1/5
5/5 - 7s - accuracy: 0.4800 - loss: 29.7724 - val_accuracy: 0.7000 - val_loss: 24.0917
Epoch 2/5
5/5 - 6s - accuracy: 0.6600 - loss: 25.8165 - val_accuracy: 0.8000 - val_loss: 22.4083
Epoch 3/5
5/5 - 6s - accuracy: 0.7600 - loss: 23.0411 - val_accuracy: 0.8000 - val_loss: 21.8577
Epoch 4/5
5/5 - 7s - accuracy: 0.8200 - loss: 22.2436 - val_accuracy: 1.0000 - val_loss: 20.2780
Epoch 5/5
5/5 - 6s - accuracy: 0.9000 - loss: 20.7942 - val_accuracy: 1.0000 - val_loss: 19.6407
model_fit  41918.84 ms
CPU times: user 2min 32s, sys: 24.1 s, total: 2min 56s
Wall time: 48.4 s


In [57]:
%%time
# With Cache
train_tfds_common, test_tfds_common = build_optimized_datasets(train_df, test_df, paral=True)
retrain(train_tfds_common, test_tfds_common, lambda x: x.cache())

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)
Epoch 1/5
5/5 - 9s - accuracy: 0.3800 - loss: 32.0228 - val_accuracy: 0.6000 - val_loss: 29.9594
Epoch 2/5
5/5 - 6s - accuracy: 0.6000 - loss: 29.8514 - val_accuracy: 0.5000 - val_loss: 23.7029
Epoch 3/5
5/5 - 6s - accuracy: 0.6600 - loss: 25.6862 - val_accuracy: 0.8000 - val_loss: 22.3687
Epoch 4/5
5/5 - 7s - accuracy: 0.7600 - loss: 23.5567 - val_accuracy: 1.0000 - val_loss: 20.7287
Epoch 5/5
5/5 - 7s - accuracy: 0.8600 - loss: 21.5015 - val_accuracy: 0.8000 - val_loss: 20.5641
model_fit  43904.32 ms
CPU times: user 2min 31s, sys: 25 s, total: 2min 56s
Wall time: 49.9 s


In [19]:
%%time
# With prefetch
train_tfds_common, test_tfds_common = build_optimized_datasets(train_df, test_df, paral=True)
retrain(train_tfds_common, test_tfds_common, lambda x: x.cache('./').prefetch(-1))

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)
Epoch 1/5


AlreadyExistsError:  There appears to be a concurrent caching iterator running - cache lockfile already exists ('./_0.lockfile'). If you are sure no other running TF computations are using this cache prefix, delete the lockfile and re-initialize the iterator. Lockfile contents: Created at: 1616623558
	 [[{{node MultiDeviceIteratorGetNextFromShard}}]]
	 [[RemoteCall]]
	 [[IteratorGetNextAsOptional]] [Op:__inference_test_function_71363]

Function call stack:
test_function


In [16]:
def clear_cache():
    for f in [x for x in os.listdir('./') if x.endswith('.jpg') or x.endswith('.jpeg')]:
        os.remove(f)
    print("The cache has been cleared")

In [17]:
clear_cache()

The cache has been cleared


In [18]:
os.listdir('./')

['model_weights_notop.h5', 'remote-run-env-def.json', 'model_weights.h5']