# State Farm Distracted Driver Detection - Data parallel Resnet

This notebook is a test of comming data parallel features of Keras.

## Setup libraries and functions

Import libraries and functions for future use.

In [1]:
# Plots displayed inline in notebook
%matplotlib inline

# Make help libraries available
import sys

sys.path.append('D:/anlaursen/libraries')

# Set visible devices, so as to just use a single GPU.
import os
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"   # see issue #152
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1"

In [2]:
import numpy as np
import pandas as pd
import gc
import tensorflow as tf

from kerastools.resnet50 import Resnet50
from kerastools.utils import get_batches, save_array, load_array, get_classes, do_clip

from keras.engine.training import Model
from keras.optimizers import Adam
from keras.layers import Dense, Dropout, Input
from keras.layers.core import Lambda
from keras.layers.merge import concatenate
from keras.preprocessing import image
import keras.backend as K

Using TensorFlow backend.


Define parallel model function.

In [3]:
def _get_available_devices():
    from tensorflow.python.client import device_lib
    local_device_protos = device_lib.list_local_devices()
    return [x.name for x in local_device_protos]

In [4]:
def multi_gpu_model(model, gpus):
    """Replicates a model on different GPUs.
    Specifically, this function implements single-machine
    multi-GPU data parallelism. It works in the following way:
    - Divide the model's input(s) into multiple sub-batches.
    - Apply a model copy on each sub-batch. Every model copy
        is executed on a dedicated GPU.
    - Concatenate the results (on CPU) into one big batch.
    E.g. if your `batch_size` is 64 and you use `gpus=2`,
    then we will divide the input into 2 sub-batches of 32 samples,
    process each sub-batch on one GPU, then return the full
    batch of 64 processed samples.
    This induces quasi-linear speedup on up to 8 GPUs.
    This function is only available with the TensorFlow backend
    for the time being.
    # Arguments
        model: A Keras model instance. To avoid OOM errors,
            this model could have been built on CPU, for instance
            (see usage example below).
        gpus: Integer >= 2, number of on GPUs on which to create
            model replicas.
    # Returns
        A Keras `Model` instance which can be used just like the initial
        `model` argument, but which distributes its workload on multiple GPUs.
    # Example
    ```python
        import tensorflow as tf
        from keras.applications import Xception
        num_samples = 1000
        height = 224
        width = 224
        num_classes = 1000
        # Instantiate the base model
        # (here, we do it on CPU, which is optional).
        with tf.device('/cpu:0'):
            model = Xception(weights=None,
                             input_shape=(height, width, 3),
                             classes=num_classes)
        # Replicates the model on 8 GPUs.
        # This assumes that your machine has 8 available GPUs.
        parallel_model = multi_gpu_model(model, gpus=8)
        parallel_model.compile(loss='categorical_crossentropy',
                               optimizer='rmsprop')
        # Generate dummy data.
        x = np.random.random((num_samples, height, width, 3))
        y = np.random.random((num_samples, num_classes))
        # This `fit` call will be distributed on 8 GPUs.
        # Since the batch size is 256, each GPU will process 32 samples.
        parallel_model.fit(x, y, epochs=20, batch_size=256)
    ```
    """
    if K.backend() != 'tensorflow':
        raise ValueError('`multi_gpu_model` is only available '
                         'with the TensorFlow backend.')
    if gpus <= 1:
        raise ValueError('For multi-gpu usage to be effective, '
                         'call `multi_gpu_model` with `gpus >= 2`. '
                         'Received: `gpus=%d`' % gpus)

    import tensorflow as tf

    target_devices = ['/cpu:0'] + ['/gpu:%d' % i for i in range(gpus)]
    available_devices = _get_available_devices()
    for device in target_devices:
        if device not in available_devices:
            raise ValueError(
                'To call `multi_gpu_model` with `gpus=%d`, '
                'we expect the following devices to be available: %s. '
                'However this machine only has: %s. '
                'Try reducing `gpus`.' % (gpus,
                                          target_devices,
                                          available_devices))

    def get_slice(data, i, parts):
        shape = tf.shape(data)
        batch_size = shape[:1]
        input_shape = shape[1:]
        step = batch_size // parts
        if i == gpus - 1:
            size = batch_size - step * i
        else:
            size = step
        size = tf.concat([size, input_shape], axis=0)
        stride = tf.concat([step, input_shape * 0], axis=0)
        start = stride * i
        return tf.slice(data, start, size)

    all_outputs = []
    for i in range(len(model.outputs)):
        all_outputs.append([])

    # Place a copy of the model on each GPU,
    # each getting a slice of the inputs.
    for i in range(gpus):
        with tf.device('/gpu:%d' % i):
            with tf.name_scope('replica_%d' % i):
                inputs = []
                # Retrieve a slice of the input.
                for x in model.inputs:
                    input_shape = tuple(x.get_shape().as_list())[1:]
                    slice_i = Lambda(get_slice,
                                     output_shape=input_shape,
                                     arguments={'i': i,
                                                'parts': gpus})(x)
                    inputs.append(slice_i)

                # Apply model on slice
                # (creating a model replica on the target device).
                outputs = model(inputs)
                if not isinstance(outputs, list):
                    outputs = [outputs]

                # Save the outputs for merging back together later.
                for o in range(len(outputs)):
                    all_outputs[o].append(outputs[o])

    # Merge outputs on CPU.
    with tf.device('/cpu:0'):
        merged = []
        for outputs in all_outputs:
            merged.append(concatenate(outputs,
                                      axis=0))
        return Model(model.inputs, merged)

## Setup batches
We define out validation and training badges for modelling

In [5]:
batch_size = 1

#path = ''
path = 'data/state-farm/'

train_batches = get_batches(path + 'train', batch_size = batch_size, shuffle = False)
val_batches = get_batches(path + 'valid', batch_size = batch_size, shuffle = False)

Found 19624 images belonging to 10 classes.
Found 2800 images belonging to 10 classes.


Load all data in memory

In [6]:
train_y = []
train_x = []
for i in range(train_batches.n):
    train_y.append(train_batches.next()[1])
    train_x.append(train_batches.next()[0])
    
train_y = np.concatenate(train_y, axis = 0)
train_x = np.concatenate(train_x, axis = 0)

print(train_y.shape, train_x.shape)

(19624, 10) (19624, 224, 224, 3)


In [7]:
val_y = []
val_x = []
for i in range(val_batches.n):
    val_y.append(val_batches.next()[1])
    val_x.append(val_batches.next()[0])
    
val_y = np.concatenate(val_y, axis = 0)
val_x = np.concatenate(val_x, axis = 0)

print(val_y.shape, val_x.shape)

(2800, 10) (2800, 224, 224, 3)


## Define model

We setup our initial Resnet50 model

In [8]:
# Instantiate the base model
# (here, we do it on CPU, which is optional).
with tf.device('/cpu:0'):
    resnet = Resnet50()
    resnet.finetune(train_batches)
    
resnet.model.summary()

____________________________________________________________________________________________________
Layer (type)                     Output Shape          Param #     Connected to                     
input_1 (InputLayer)             (None, 224, 224, 3)   0                                            
____________________________________________________________________________________________________
lambda_1 (Lambda)                (None, 224, 224, 3)   0           input_1[0][0]                    
____________________________________________________________________________________________________
zero_padding2d_1 (ZeroPadding2D) (None, 230, 230, 3)   0           lambda_1[0][0]                   
____________________________________________________________________________________________________
conv1 (Conv2D)                   (None, 112, 112, 64)  9472        zero_padding2d_1[0][0]           
___________________________________________________________________________________________

Ensure that all layers are trainable

In [9]:
for layer in resnet.model.layers:
    layer.trainable = True

In [10]:
# Replicates the model on 2 GPUs.
# This assumes that your machine has 2 available GPUs.
parallel_model = multi_gpu_model(resnet.model, gpus = 2)
parallel_model.compile(loss = 'categorical_crossentropy',
                       optimizer = 'adam')


In [13]:
parallel_model.fit(x = train_x,
                   y = train_y,
                   batch_size = 64,
                   epochs = 2,
                   validation_data = (val_x, val_y))

Train on 19624 samples, validate on 2800 samples
Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x20d717b4c88>

In [14]:
resnet = Resnet50()
resnet.finetune(train_batches)
resnet.model.compile(loss = 'categorical_crossentropy',
                     optimizer = 'adam')

for layer in resnet.model.layers:
    layer.trainable = True

In [16]:
resnet.model.fit(x = train_x,
                 y = train_y,
                 batch_size = 32,
                 epochs = 2,
                 validation_data = (val_x, val_y))

Train on 19624 samples, validate on 2800 samples
Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x20d76209550>