In [1]:
#######################################################################################################
# Summary
# 1. Tensorflow Multi-GPU example using Estimator & Dataset high-APIs
# 2. On-the-fly data-augmentation (random crop, random flip)
# ToDo:
# 3. Investigate tfrecord speed improvement (to match MXNet)
# References:
# https://www.tensorflow.org/performance/performance_guide
# 1. https://jhui.github.io/2017/03/07/TensorFlow-Perforamnce-and-advance-topics/
# 2. https://www.tensorflow.org/versions/master/performance/datasets_performance
# 3. https://github.com/pudae/tensorflow-densenet
# 4. https://stackoverflow.com/a/48096625/6772173
# 5. https://stackoverflow.com/questions/47867748/transfer-learning-with-tf-estimator-estimator-framework
# 6. https://github.com/BobLiu20/Classification_Nets/blob/master/tensorflow/common/average_gradients.py
# 7. https://github.com/BobLiu20/Classification_Nets/blob/master/tensorflow/training/train_estimator.py
#######################################################################################################

In [2]:
MULTI_GPU = True  # TOGGLE THIS

In [3]:
%%bash 
# Download model check-point and module from below repo:
#wget -N https://github.com/pudae/tensorflow-densenet/raw/master/nets/densenet.py
#wget -N https://ikpublictutorial.blob.core.windows.net/deeplearningframeworks/tf-densenet121.tar.gz
#tar xzvf tf-densenet121.tar.gz

In [4]:
import os
import sys
import time
import multiprocessing
import numpy as np
import pandas as pd
from PIL import Image
import random
import tensorflow as tf
from tensorflow.python.framework import dtypes
from tensorflow.python.framework.ops import convert_to_tensor
from tensorflow.contrib.data import Iterator
from common.utils import download_data_chextxray, get_imgloc_labels, get_train_valid_test_split
from common.utils import compute_roc_auc, get_cuda_version, get_cudnn_version, get_gpu_name
from common.params_dense import *
slim = tf.contrib.slim
import densenet  # Download from https://github.com/pudae/tensorflow-densenet

In [5]:
print("OS: ", sys.platform)
print("Python: ", sys.version)
print("Numpy: ", np.__version__)
print("Tensorflow: ", tf.__version__)
print("GPU: ", get_gpu_name())
print(get_cuda_version())
print("CuDNN Version ", get_cudnn_version())

OS:  linux
Python:  3.5.2 |Anaconda custom (64-bit)| (default, Jul  2 2016, 17:53:06) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)]
Numpy:  1.14.2
Tensorflow:  1.6.0
GPU:  ['Tesla P100-PCIE-16GB', 'Tesla P100-PCIE-16GB', 'Tesla P100-PCIE-16GB', 'Tesla P100-PCIE-16GB']
CUDA Version 9.0.176
CuDNN Version  7.0.5


In [6]:
CPU_COUNT = multiprocessing.cpu_count()
GPU_COUNT = len(get_gpu_name())
print("CPUs: ", CPU_COUNT)
print("GPUs: ", GPU_COUNT)

CPUs:  24
GPUs:  4


In [7]:
# Model-params
IMAGENET_RGB_MEAN_CAFFE = np.array([123.68, 116.78, 103.94], dtype=np.float32)
IMAGENET_SCALE_FACTOR_CAFFE = 0.017
# Paths
CSV_DEST = "chestxray"
IMAGE_FOLDER = os.path.join(CSV_DEST, "images")
LABEL_FILE = os.path.join(CSV_DEST, "Data_Entry_2017.csv")
print(IMAGE_FOLDER, LABEL_FILE)
CHKPOINT = 'tf-densenet121.ckpt'  # Downloaded tensorflow-checkpoint

chestxray/images chestxray/Data_Entry_2017.csv


In [8]:
# Manually scale to multi-gpu
if MULTI_GPU:
    LR *= GPU_COUNT 
    BATCHSIZE *= GPU_COUNT

In [9]:
%%time
# Download data
print("Please make sure to download")
print("https://docs.microsoft.com/en-us/azure/storage/common/storage-use-azcopy-linux#download-and-install-azcopy")
download_data_chextxray(CSV_DEST)

Please make sure to download
https://docs.microsoft.com/en-us/azure/storage/common/storage-use-azcopy-linux#download-and-install-azcopy
Data already exists
CPU times: user 595 ms, sys: 289 ms, total: 885 ms
Wall time: 883 ms


In [10]:
#####################################################################################################
## Data Loading

In [11]:
class XrayData():
    
    def __init__(self, img_dir, lbl_file, patient_ids, mode, 
                 width=WIDTH, height=HEIGHT, batch_size=BATCHSIZE, 
                 imagenet_mean=IMAGENET_RGB_MEAN_CAFFE, imagenet_scaling = IMAGENET_SCALE_FACTOR_CAFFE,
                 buffer=10):

        self.img_locs, self.labels = get_imgloc_labels(img_dir, lbl_file, patient_ids)
        self.data_size = len(self.labels)
        self.imagenet_mean = imagenet_mean
        self.imagenet_scaling = imagenet_scaling
        self.width = width
        self.height = height
        data = tf.data.Dataset.from_tensor_slices((self.img_locs, self.labels))
        
        # Processing
        # Output as channels-last and TF model will reshape in densenet.py
        # inputs = tf.transpose(inputs, [0, 3, 1, 2])
        if mode == 'training':
            # Augmentation and repeat
            data = data.shuffle(self.data_size).repeat().apply(
                tf.contrib.data.map_and_batch(self._parse_function_train, batch_size)).prefetch(buffer)
        elif mode == "validation":
            # Repeat
             data = data.repeat().apply(
                tf.contrib.data.map_and_batch(self._parse_function_inference, batch_size)).prefetch(buffer)           
        elif mode == 'testing':
            # No repeat, no augmentation
            data = data.apply(
                tf.contrib.data.map_and_batch(self._parse_function_inference, batch_size)).prefetch(buffer)
        
        self.data = data        
        print("Loaded {} labels and {} images".format(len(self.labels), len(self.img_locs)))
        
        
    def _parse_function_train(self, filename, label):
        img_rgb, label = self._preprocess_image_labels(filename, label)
        # Random crop (from 264x264)
        img_rgb = tf.random_crop(img_rgb, [self.height, self.width, 3])
        # Random flip
        img_rgb = tf.image.random_flip_left_right(img_rgb)
        # Channels-first
        img_rgb = tf.transpose(img_rgb, [2, 0, 1])
        return img_rgb, label
        
        
    def _parse_function_inference(self, filename, label):
        img_rgb, label = self._preprocess_image_labels(filename, label)
        # Resize to final dimensions
        img_rgb = tf.image.resize_images(img_rgb, [self.height, self.width])
        # Channels-first
        img_rgb = tf.transpose(img_rgb, [2, 0, 1])
        return img_rgb, label 
       
    
    def _preprocess_image_labels(self, filename, label):
        # load and preprocess the image
        img_decoded = tf.to_float(tf.image.decode_png(tf.read_file(filename), channels=3))
        img_centered = tf.subtract(img_decoded, self.imagenet_mean)
        img_rgb = img_centered * self.imagenet_scaling
        return img_rgb, tf.cast(label, dtype=tf.float32)

In [12]:
train_set, valid_set, test_set = get_train_valid_test_split(TOT_PATIENT_NUMBER)

train:21563 valid:3080 test:6162


In [13]:
with tf.device('/cpu:0'):
    # Create dataset for iterator
    train_dataset = XrayData(img_dir=IMAGE_FOLDER, lbl_file=LABEL_FILE, patient_ids=train_set,  
                             mode='training')
    valid_dataset = XrayData(img_dir=IMAGE_FOLDER, lbl_file=LABEL_FILE, patient_ids=valid_set,
                             mode='validation')
    test_dataset  = XrayData(img_dir=IMAGE_FOLDER, lbl_file=LABEL_FILE, patient_ids=test_set,
                             mode='testing')

Loaded 87306 labels and 87306 images
Loaded 7616 labels and 7616 images
Loaded 17198 labels and 17198 images


In [14]:
#####################################################################################################
## Helper Functions

In [15]:
def average_gradients(tower_grads):
    average_grads = []
    for grad_and_vars in zip(*tower_grads):
        grads = []
        for g, _ in grad_and_vars:
            expanded_g = tf.expand_dims(g, 0)
            grads.append(expanded_g)
        grad = tf.concat(axis=0, values=grads)
        grad = tf.reduce_mean(grad, 0)
        v = grad_and_vars[0][1]
        grad_and_var = (grad, v)
        average_grads.append(grad_and_var)
    return average_grads

In [16]:
def get_symbol(in_tensor, out_features):
    # Import symbol
    # is_training=True? | https://github.com/tensorflow/models/issues/3556
    with slim.arg_scope(densenet.densenet_arg_scope(data_format="NCHW")):
        base_model, _ = densenet.densenet121(in_tensor,
                                             num_classes=out_features,
                                             is_training=True)
        # Need to reshape from (?, 1, 1, 14) to (?, 14)
        sym = tf.reshape(base_model, shape=[-1, out_features])
    return sym

In [17]:
def model_fn_single(features, labels, mode, params):
    sym = get_symbol(features, out_features=params["n_classes"])
    # Predictions
    predictions = tf.sigmoid(sym)
    # ModeKeys.PREDICT
    if mode == tf.estimator.ModeKeys.PREDICT:
        return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions)
    # Optimizer & Loss
    optimizer = tf.train.AdamOptimizer(params['lr'], beta1=0.9, beta2=0.999)
    loss_fn = tf.losses.sigmoid_cross_entropy(labels, sym)
    loss = tf.reduce_mean(loss_fn)
    train_op = optimizer.minimize(loss, tf.train.get_global_step())
    # Create eval metric ops
    eval_metric_ops = {"val_loss": slim.metrics.streaming_mean(
        tf.losses.sigmoid_cross_entropy(labels, predictions))}

    return tf.estimator.EstimatorSpec(
        mode=mode,
        loss=loss,
        train_op=train_op,
        eval_metric_ops=eval_metric_ops)

In [18]:
def multi_gpu_X_y_split(features, labels, batchsize, gpus):
    # Make sure splits sum to batch-size
    split_size = batchsize // len(gpus)
    splits = [split_size, ] * (len(gpus) - 1)
    splits.append(batchsize - split_size * (len(gpus) - 1))
    # Split the features and labels
    features_split = tf.split(features, splits, axis=0)
    labels_split = tf.split(labels, splits, axis=0)
    return features_split, labels_split

In [19]:
def model_fn_multigpu(features, labels, mode, params):
    if mode == tf.estimator.ModeKeys.PREDICT:
        # Create symbol
        sym = get_symbol(features, out_features=params["n_classes"])
        # Predictions
        predictions = tf.sigmoid(sym)   
        # ModeKeys.PREDICT
        return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions)
    
    # For multi-gpu split features and labels
    features_split, labels_split = multi_gpu_X_y_split( features, labels, params["batchsize"], params["gpus"])
    tower_grads = []
    eval_logits = []
    # Training operation
    global_step = tf.train.get_global_step()
    optimizer = tf.train.AdamOptimizer(LR, beta1=0.9, beta2=0.999)
    # Load model on multiple GPUs
    with tf.variable_scope(tf.get_variable_scope()):
        for i in range(len(params['gpus'])):
            with tf.device('/gpu:%d' % i), tf.name_scope('%s_%d' % ("classification", i)) as scope:
                # Symbol
                sym = get_symbol(features_split[i], out_features=params["n_classes"])
                # Loss
                tf.losses.sigmoid_cross_entropy(labels_split[i], sym)
                # Training-ops
                update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS, scope)
                updates_op = tf.group(*update_ops)
                with tf.control_dependencies([updates_op]):
                    losses = tf.get_collection(tf.GraphKeys.LOSSES, scope)
                    total_loss = tf.add_n(losses, name='total_loss')
                # reuse var
                tf.get_variable_scope().reuse_variables()
                # grad compute
                grads = optimizer.compute_gradients(total_loss)
                tower_grads.append(grads)
                eval_logits.append(sym)

    # We must calculate the mean of each gradient
    grads = average_gradients(tower_grads)
    # Apply the gradients to adjust the shared variables.
    apply_gradient_op = optimizer.apply_gradients(grads, global_step=global_step)
    # Group all updates to into a single train op.
    train_op = tf.group(apply_gradient_op)
    # Create eval metric ops (predict on multi-gpu)
    predictions =  tf.concat(eval_logits, 0)
    eval_metric_ops = {"val_loss": slim.metrics.streaming_mean(
        tf.losses.sigmoid_cross_entropy(labels, predictions))}

    return tf.estimator.EstimatorSpec(
        mode=mode,
        loss=total_loss,
        train_op=train_op,
        eval_metric_ops=eval_metric_ops)

In [20]:
def train_input_fn():
    return train_dataset.data.make_one_shot_iterator().get_next()
def valid_input_fn():
    return valid_dataset.data.make_one_shot_iterator().get_next()
def test_input_fn():
    return test_dataset.data.make_one_shot_iterator().get_next()

In [21]:
# Warm start from saved checkpoint (not logits)
ws = tf.estimator.WarmStartSettings(ckpt_to_initialize_from=CHKPOINT, vars_to_warm_start="^(?!.*(logits))")
# Params
params={"lr":LR, "n_classes":CLASSES, "batchsize":BATCHSIZE, "gpus":list(range(GPU_COUNT))}
# Model functions
if MULTI_GPU:
    model_fn=model_fn_multigpu
else:
    model_fn=model_fn_single

In [22]:
%%time
# Create Estimator
nn = tf.estimator.Estimator(model_fn=model_fn, params=params, warm_start_from=ws)

INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_keep_checkpoint_max': 5, '_save_checkpoints_steps': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f328562a898>, '_task_id': 0, '_save_summary_steps': 100, '_task_type': 'worker', '_global_id_in_cluster': 0, '_session_config': None, '_is_chief': True, '_service': None, '_tf_random_seed': None, '_master': '', '_log_step_count_steps': 100, '_evaluation_master': '', '_num_ps_replicas': 0, '_keep_checkpoint_every_n_hours': 10000, '_model_dir': '/tmp/tmp4qzuvlp6', '_num_worker_replicas': 1, '_save_checkpoints_secs': 600}
CPU times: user 1.35 ms, sys: 3.37 ms, total: 4.72 ms
Wall time: 4.11 ms


In [23]:
%%time
# Create train & eval specs
train_spec = tf.estimator.TrainSpec(input_fn=train_input_fn,
                                    max_steps=EPOCHS*(train_dataset.data_size//BATCHSIZE))
# Hard to run validation every epoch so playing around with throttle_secs to get 5 runs
eval_spec = tf.estimator.EvalSpec(input_fn=valid_input_fn,
                                  throttle_secs=400)

CPU times: user 31 µs, sys: 7 µs, total: 38 µs
Wall time: 44.3 µs


In [None]:
%%time
# 1 GPU - Main training loop: 50min 19s
# 4 GPU - Main training loop: 25min 8s
# Run train and evaluate (on validation data)
tf.estimator.train_and_evaluate(nn, train_spec, eval_spec)

In [25]:
%%time
# Main prediction loop: 44.4s
# Test AUC: 0.8155
predictions = list(nn.predict(test_input_fn))
y_truth = test_dataset.labels
y_guess = np.array(predictions)
print("Test AUC: {0:.4f}".format(compute_roc_auc(y_truth, y_guess, CLASSES))) 

INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmp4qzuvlp6/model.ckpt-1705
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
Full AUC [0.8095029096268271, 0.8730753531871049, 0.7890140063105355, 0.8841224641071695, 0.8791105523364449, 0.909663052586902, 0.7296254190733823, 0.846209840706696, 0.6351492058205271, 0.8433088569206826, 0.7720235409411627, 0.8095089838328521, 0.7474695735925051, 0.8898279027128161]
Test AUC: 0.8155
CPU times: user 3min 21s, sys: 21.9 s, total: 3min 43s
Wall time: 44.4 s


In [26]:
#####################################################################################################
## Synthetic Data (Pure Training)

In [27]:
# Test on fake-data -> no IO lag
batch_in_epoch = train_dataset.data_size//BATCHSIZE
tot_num = batch_in_epoch * BATCHSIZE
fake_X = np.random.rand(tot_num, 3, 224, 224).astype(np.float32)
fake_y = np.random.rand(tot_num, CLASSES).astype(np.float32) 

In [28]:
%%time
# Create Estimator
nn = tf.estimator.Estimator(model_fn=model_fn, params=params)  

INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_keep_checkpoint_max': 5, '_save_checkpoints_steps': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f327dd058d0>, '_task_id': 0, '_save_summary_steps': 100, '_task_type': 'worker', '_global_id_in_cluster': 0, '_session_config': None, '_is_chief': True, '_service': None, '_tf_random_seed': None, '_master': '', '_log_step_count_steps': 100, '_evaluation_master': '', '_num_ps_replicas': 0, '_keep_checkpoint_every_n_hours': 10000, '_model_dir': '/tmp/tmp81y2qplw', '_num_worker_replicas': 1, '_save_checkpoints_secs': 600}
CPU times: user 5.27 ms, sys: 123 µs, total: 5.4 ms
Wall time: 4.49 ms


In [29]:
%%time
# 4 GPU - Synthetic data: 17min 6s
nn.train(tf.estimator.inputs.numpy_input_fn(
    fake_X,
    fake_y,
    shuffle=False,
    num_epochs=EPOCHS,
    batch_size=BATCHSIZE))

INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Saving checkpoints for 1 into /tmp/tmp81y2qplw/model.ckpt.
INFO:tensorflow:loss = 0.7307674, step = 1
INFO:tensorflow:global_step/sec: 1.70092
INFO:tensorflow:loss = 0.6955669, step = 101 (58.795 sec)
INFO:tensorflow:global_step/sec: 1.86103
INFO:tensorflow:loss = 0.6935265, step = 201 (53.734 sec)
INFO:tensorflow:global_step/sec: 1.85679
INFO:tensorflow:loss = 0.6932544, step = 301 (53.856 sec)
INFO:tensorflow:global_step/sec: 1.84197
INFO:tensorflow:loss = 0.69336593, step = 401 (54.289 sec)
INFO:tensorflow:global_step/sec: 1.81324
INFO:tensorflow:loss = 0.6918529, step = 501 (55.150 sec)
INFO:tensorflow:global_step/sec: 1.83523
INFO:tensorflow:loss = 0.6934935, step = 601 (54.489 sec)
INFO:tensorflow:global_step/sec: 1.85034
INFO:tens

<tensorflow.python.estimator.estimator.Estimator at 0x7f3284ef0e80>