In [None]:
import os
import os.path
import time
import boto3
import logging
import numpy as np
import mxnet as mx

In [None]:
logging.getLogger().setLevel(logging.INFO)
logger = logging.getLogger('mxnet_training')
fh = logging.FileHandler('model-training.log')
fh.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
logger.addHandler(fh)

In [None]:
batch_size = 120
epoch = 30
model_path = 'model'
model_name = 'mxnet_cnn_digits'
data_folder = "./dataset"

In [None]:
def load_data(path, filename, s3, model_bucket, logger):
    if not os.path.isfile(path + '/' + filename) or not os.access(path + '/' + filename, os.R_OK):
        s3.download_file(model_bucket, filename, path + '/' + filename)
    data_file = open(path + '/'+filename, 'r')
    data_list = data_file.readlines()
    data_file.close()
    logger.info(str(len(data_list)) + ' samples loaded from ' + path + '/'+filename)
    labels = np.empty([len(data_list),])
    pixels = np.empty([len(data_list),1,28,28])
    index=0
    for record in data_list:
        all_values = record.split(',')
        labels[index] = all_values[0]
        pixel = np.asfarray(all_values[1:]).reshape(28,28).astype(np.float32)/255
        pixels[index] = pixel
        index = index + 1  
    data = {'label': labels, 'data': pixels}    
    return data

In [None]:
def get_queue_url(sqs, name):
    response = sqs.list_queues()
    for url in response['QueueUrls']:
        if url.find('/' + name + '.fifo') != -1 or url.find('/' + name)!= -1:
            return url

In [None]:
def get_messages(sqs, queue, logger):
    messages = []
    response = sqs.receive_message(
        QueueUrl=queue,
        AttributeNames=['MessageDeduplicationId', 'MessageGroupId', 'ReceiptHandle']
    ) 
    if 'Messages' in response:
        for msg in response['Messages']:
            message = {
                        'group': msg['Attributes']['MessageGroupId'], 
                        'id': msg['Attributes']['MessageDeduplicationId'], 
                        'data': msg['Body'],
                        'handle': msg['ReceiptHandle']}
            messages.append(message)
            logger.info('Message fetched - group:' + msg['Attributes']['MessageGroupId'] + ', id: ' + msg['Attributes']['MessageDeduplicationId'])
    return messages

In [None]:
def delete_message(sqs, queue, message):
    response = sqs.delete_message(
        QueueUrl=queue,
        ReceiptHandle=message['handle']
    ) 
    return response

In [None]:
def prepare_data(message, logger):
    data_list = message['data'][0:len(message['data'])-1].split('\n')
    logger.info(str(len(data_list)) + ' samples loaded from data packet - ' + message['group'] + '_' + message['id'])
    labels = np.empty([len(data_list),])
    pixels = np.empty([len(data_list),1,28,28])
    index=0
    for record in data_list:
        all_values = record.split(',') 
        labels[index] = all_values[0]
        pixel = np.asfarray(all_values[1:]).reshape(28,28).astype(np.float32)/255
        pixels[index] = pixel
        index = index + 1  
    data = {'label': labels, 'data': pixels}    
    return data

In [None]:
def create_model(batch_size, logger): 
    #Define Multilayer perceptron using MXNet symbolic interface

    #Placeholder variable for input data
    data = mx.sym.var('data')

    # first conv layer
    conv1 = mx.sym.Convolution(data=data, kernel=(5,5), num_filter=20)
    tanh1 = mx.sym.Activation(data=conv1, act_type="tanh")
    pool1 = mx.sym.Pooling(data=tanh1, pool_type="max", kernel=(2,2), stride=(2,2))

    # second conv layer
    conv2 = mx.sym.Convolution(data=pool1, kernel=(5,5), num_filter=50)
    tanh2 = mx.sym.Activation(data=conv2, act_type="tanh")
    pool2 = mx.sym.Pooling(data=tanh2, pool_type="max", kernel=(2,2), stride=(2,2))

    # first fullc layer
    flatten = mx.sym.flatten(data=pool2)
    fc1 = mx.symbol.FullyConnected(data=flatten, num_hidden=500)
    tanh3 = mx.sym.Activation(data=fc1, act_type="tanh")

    # second fullc
    fc2 = mx.sym.FullyConnected(data=tanh3, num_hidden=10)
    # softmax loss
    softmax = mx.sym.SoftmaxOutput(data=fc2, name='softmax')


    # Visualize the network structure with output size
    shape = {'data' : (batch_size, 1, 28, 28)}
    #mx.viz.plot_network(symbol=lenet, shape=shape)
    
    model  = mx.mod.Module(symbol=softmax, context=mx.gpu())
    
    logger.info('Training model created')
        
    return model

In [None]:
def load_model(model_path, model_name, epoch, logger): 
    model = mx.mod.Module.load(model_path + '/' + str(epoch) + '/' + model_name, epoch, load_optimizer_states =True, context=mx.gpu())
    logger.info('Training model '+model_name+' loaded from ' + model_path + '/' + str(epoch))
    return model

In [None]:
def measure_accuracy(model, test_data, batch_size):    
    #Test iteration to measure prediction accuracy
    test_iter = mx.io.NDArrayIter(test_data['data'], test_data['label'], batch_size)
    # predict accuracy of mlp
    acc = mx.metric.Accuracy()
    model.score(test_iter, acc)
    #assert acc.get()[1] > 0.98       
    return str(acc.get()[1])

In [None]:
def train(message, model, test_data, batch_size, epoch, logger):
    train_data = prepare_data(message, logger)
    train_iter = mx.io.NDArrayIter(train_data['data'], train_data['label'], batch_size, shuffle=True)
    val_iter = mx.io.NDArrayIter(test_data['data'], test_data['label'], batch_size)
    # Train for a maximum of num_epoch epochs, until convergence
    model.fit(train_iter, 
                  eval_data=val_iter, 
                  optimizer='sgd', 
                  optimizer_params={'learning_rate':0.1}, 
                  eval_metric='acc',  
                  batch_end_callback = mx.callback.Speedometer(batch_size, 100), 
                  num_epoch=epoch)      
    
    accuracy = measure_accuracy(model, test_data, batch_size)
    logger.info('Accuracy after processing packet - ' + message['group'] + '_' + message['id'] + ' : ' + accuracy)

In [None]:
def train_new_model(message, test_data, batch_size, epoch, logger): 
    model = create_model(batch_size, logger)
    train(message, model, test_data, batch_size, epoch, logger)
    return model    

In [None]:
def train_loaded_model(model_path, model_name, message, test_data, batch_size, epoch, logger): 
    model = load_model(model_path, model_name, epoch, logger)
    train(message, model, test_data, batch_size, epoch, logger)
    return model

In [None]:
def save_model(model, model_path, model_name, epoch, s3, model_bucket, logger):
    model.save_checkpoint(model_path + '/' + str(epoch) + '/' + model_name, epoch, save_optimizer_states=True)
    logger.info('Training model '+ model_name +' saved at ' + model_path + '/' + str(epoch))   
    model_files=[]
    for (dirpath, dirnames, filenames) in os.walk(model_path + '/' + str(epoch)):
        model_files.extend(filenames)
        break
    for model_file in model_files:
        s3.upload_file(model_path + '/' + str(epoch) + '/' + model_file, model_bucket, model_file)
        logger.info('Training model file '+ model_file +' uploaded to S3 Bucket ' + model_bucket)   

In [None]:
def fetch_model(model, model_path, model_name, epoch, s3, model_bucket, logger):
    epoch_name = model_name + '-' + epoch.zfill(4)
    params_file = epoch_name + '.params'
    states_file = epoch_name + '.states'
    json_file = model_name + '-symbol.json'
    fetched = True
    if not os.path.exists(model_path + '/' + str(epoch)):
        os.makedirs(model_path + '/' + str(epoch))        
    try:
        if not os.path.exists(model_path + '/' + str(epoch) + '/' + params_file):
            s3.download_file(model_bucket, params_file, model_path + '/' + str(epoch) + '/' + params_file)    
        if not os.path.exists(model_path + '/' + str(epoch) + '/' + states_file):
            s3.download_file(model_bucket, states_file, model_path + '/' + str(epoch) + '/' + states_file) 
        if not os.path.exists(model_path + '/' + str(epoch) + '/' + json_file):
            s3.download_file(model_bucket, json_file, model_path + '/' + str(epoch) + '/' + json_file) 
        logger.info('Training model '+ model_name +' ready to be loaded')
    except:
        logger.info('Training model '+ model_name +' does not exist in locally or in S3 Bucket ' + model_bucket)
        fetched = False
    return fetched

In [None]:
sqs = boto3.client('sqs', 'us-east-1')
s3 = boto3.client('s3')
queue = get_queue_url(sqs, 'training-data-queue')
test_data = load_data('dataset', 'mnist_test.csv', s3, 'my-ml-data-set', logger)

In [None]:
exists = fetch_model(model, model_path, model_name, epoch, s3, model_bucket, logger)  
while (True):
    messages = get_messages(sqs, queue, logger)
    if len(messages) > 0 :
        logger.info(str(len(messages)) + ' data packets found, model being trained...')      
        for message in messages:
            if exists:
                model = train_loaded_model(model_path, model_name, message, test_data, batch_size, epoch, logger)                
            else:       
                model = train_new_model(message, test_data, batch_size, epoch, logger)
                exists = True
            save_model(model,model_path, model_name, epoch, s3, 'my-ml-data-set', logger)    
            delete_message(sqs, queue, message)        
    else:
        logger.info('No data packets found, will check back in a moment')