This notebook runs as worker to start multiple training in parallel. It accepts the below parameter from the controller:

1. the number of cpus each worker has
2. the partitions of data that the worker should train

And the notebook could run alone if you just need one worker to train your models. You can set the above two parameters in the notebook.

The samle models trained is MNIST. You can find more detailed for the model by the below links:
1. https://github.com/IBMDataScience/sample-notebooks/blob/master/CloudPakForData/notebooks/Save__compress__and_deploy_a_Keras_model_aEmdZbHco.ipynb
2. http://yann.lecun.com/exdb/mnist/

In [1]:
from multiprocessing import Pool, Manager
import time
import sys,os,os.path
import tensorflow as tf
from tensorflow.keras.datasets import mnist
from contextlib import suppress
import matplotlib.pyplot as plt
import logging
logging.disable(30)


Use the following parameter to control whether multiple processors are used.

If it's true, the worker will leverage multiple cpus to train, or else will train in sequence.

In [2]:
is_multiple_processors = True

Load data

In [3]:
(X_train, y_train), (X_test, y_test) = mnist.load_data()

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


In [4]:
print('X_train shape:', X_train.shape)
print('y_train shape:', y_train.shape)
print('X_test shape:', X_test.shape)
print('y_test shape:', y_test.shape)

X_train shape: (60000, 28, 28)
y_train shape: (60000,)
X_test shape: (10000, 28, 28)
y_test shape: (10000,)


In [7]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Flatten
from tensorflow.keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras import backend as K

Set the parameters that are required for the Deep Learning (Keras) model.

In [8]:
batch_size = 128
num_classes = 10
epochs = 1

The image size is 28x28 pixels.

In [9]:
img_rows, img_cols = 28, 28

In [10]:
if K.image_data_format() == 'channels_first':
    X_train = X_train.reshape(X_train.shape[0], 1, img_rows, img_cols)
    X_test = X_test.reshape(X_test.shape[0], 1, img_rows, img_cols)
    input_shape = (1, img_rows, img_cols)
else:
    X_train = X_train.reshape(X_train.shape[0], img_rows, img_cols, 1)
    X_test = X_test.reshape(X_test.shape[0], img_rows, img_cols, 1)
    input_shape = (img_rows, img_cols, 1)

Scale the digits to be in the range of [0-1] instead of [0-255].
Convert the label arrays of training/test digit images into one hot format matrix.

Ex)
0 => [1, 0, 0, 0, 0, 0, 0, 0, 0, 0]

1 => [0, 1, 0, 0, 0, 0, 0, 0, 0, 0]

2 => [0, 0, 1, 0, 0, 0, 0, 0, 0, 0]

.
.
.

In [12]:
X_train = X_train.astype('float32')
X_test = X_test.astype('float32')
X_train /= 255
X_test /= 255
y_train = tf.keras.utils.to_categorical(y_train, num_classes)
y_test = tf.keras.utils.to_categorical(y_test, num_classes)

Read data partitions metat data

In [13]:
import pandas as pd
split_meta_data = pd.read_csv('/project_data/data_asset/split.csv')
split_meta_data.head(12)

Unnamed: 0,id,train_from,train_to,test_from,test_to
0,0,0,60000,0,10000
1,1,0,60000,0,10000
2,2,0,60000,0,10000
3,3,0,60000,0,10000
4,4,0,60000,0,10000
5,5,0,60000,0,10000
6,6,0,60000,0,10000
7,7,0,60000,0,10000
8,8,0,60000,0,10000
9,9,0,60000,0,10000


In this section, it is the interface between the user's own model and the multiprocessing framework. Here, the user needs to define two methods:
1. get_data(row)
The user need implement the function to read training data by the parameter row
2. train(row,index)
The user need implement the function to train models and save the model to a file

The framework ( the later codes) will handle the multiprocessing tasks.

In addtion, if the notebook is run alone. the two parameters need be set:
1. data_split
which training data for the notebook

2. cpus
how many cpus should be used in parallel

In [None]:
data_split = [0,1,2,3,4,5,6,7,8,9,10,11]
cpus = 6

def get_data(row):
    index = split_meta_data.iloc[[row]]
    X_train_sub = X_train[int(index['train_from']):int(index['train_to'])]
    y_train_sub = y_train[int(index['train_from']):int(index['train_to'])]
    X_text_sub= X_test[int(index['test_from']):int(index['test_to'])]
    y_test_sub = y_test[int(index['test_from']):int(index['test_to'])]
    return X_train_sub,y_train_sub,X_text_sub,y_test_sub

def train(row,index):
    print("Train on processor "+str(index)+ ' for row '+str(row))
    model = Sequential()
    model.add(Conv2D(32, 
                 kernel_size=(3, 3),
                 activation='relu',
                 input_shape=input_shape))
    model.add(Conv2D(64, (3, 3), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.25))
    model.add(Flatten())
    model.add(Dense(128, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(num_classes, activation='softmax'))
    model.compile(loss=tf.keras.losses.categorical_crossentropy,
              optimizer=tf.keras.optimizers.Adadelta(),
              metrics=['accuracy'])
    X_train_sub,y_train_sub,X_text_sub,y_test_sub = get_data(row)
    model.fit(X_train_sub,
          y_train_sub,
          batch_size=batch_size,
          epochs=epochs,
          verbose=0,
          validation_data=(X_text_sub, y_test_sub))
    score = model.evaluate(X_text_sub, y_test_sub, verbose=0)
    filename = '/project_data/data_asset/keras_mnist_model-'+str(row)+'-loss:'+('{:.2f}%'.format(score[0]))+'-accuracy'+('{:.2f}%'.format(score[1]*100))+'-on-'+str(index)+'-'+str(cpus)+'.h5'
    print(filename)
    model.save(filename)    

In this section, the multiprocessing framework will work to concurrently train multiple user-specified training tasks using multiple CPUs.

If is_multiple_processors is false, then the framework will train in sequence.

In [None]:
if "data_split" in os.environ :
    temp = os.environ["data_split"]
    temp = temp[1:-1]
    data_split=list(map(int,temp.split(",")))
if "cpus" in os.environ:
    cpus = int(os.environ["cpus"])

def process(q,index):
    while not q.empty():
        row = q.get(timeout = 2)
        try:
            train(row,index)
        except Exception as e:
            print(Process_id, q.qsize(), 'Error: ',e) 

            
manager=Manager()
workQueue = manager.Queue(10000)
for i in data_split:
    workQueue.put(i) 
if is_multiple_processors :    
    pool = Pool(processes=(cpus))
    start = time.time()
    for i in range(cpus):
        pool.apply_async(process,args=(workQueue, i))
    print("Started to train")
    pool.close()
    pool.join()
else :
    process(workQueue,0)
end = time.time()
print('Main process Ended!')               