In [14]:
# Import Libraries needed by the Lambda Function
import numpy as np
import h5py
import scipy
import os
from os import environ
import json
from json import dumps, loads
from boto3 import client, resource, Session
import botocore
import uuid
import io
from redis import StrictRedis as redis

# Import libraries needed for the Codebook
from PIL import Image
from scipy import ndimage
import matplotlib.pyplot as plt
%matplotlib inline

# Global Variables
s3_client = client('s3', region_name='us-west-2') # S3 access
s3_resource = resource('s3')
redis_client = client('elasticache', region_name='us-west-2')
#Retrieve the Elasticache Cluster endpoint
cc = redis_client.describe_cache_clusters(ShowCacheNodeInfo=True)
endpoint = cc['CacheClusters'][0]['CacheNodes'][0]['Endpoint']['Address']
lambda_client = client('lambda', region_name='us-west-2') # Lambda invocations

w = 0
b = 0
redis_client = client('elasticache', region_name='us-west-2')
# Retieve the Elasticache Cluster endpoint
cc = redis_client.describe_cache_clusters(ShowCacheNodeInfo=True)
endpoint = cc['CacheClusters'][0]['CacheNodes'][0]['Endpoint']['Address']
data_keys = {}

In [15]:
type(b)

int

In [16]:
def standardize(x_orig):
    """
    Standardize the input data
    
    Argument:
    x_orig -- Numpy array of image data
    
    Return:
    Call to `vectorize()`, stndrdized Numpy array of image data
    """
    return vectorize(x_orig) / 255

In [17]:
def vectorize(x_orig):
    """
    Vectorize the image data into a matrix of column vectors
    
    Argument:
    x_orig -- Numpy array of image data
    
    Return:
    Reshaped/Transposed Numpy array
    """
    return x_orig.reshape(x_orig.shape[0], -1).T

In [18]:
def name2str(obj, namespace):
    """
    Converts the name of the numpy array to string
    
    Arguments:
    obj -- Numpy array object
    namespace -- dictionary of the current global symbol table
    
    Return:
    List of the names of the Numpy arrays
    """
    return [name for name in namespace if namespace[name] is obj]

In [19]:
def obj2cache(endpoint, obj, name):
    def isjson(x):
        try:
            json_obj = json.loads(x)
        except ValueError as e:
            return False
        return True
    if 'numpy' in str(type(obj)):
        array_dtype = str(obj.dtype)
        length, width = obj.shape
        # Convert the array to string
        val = obj.ravel().tostring()
        # Create a key from the name and necessary parameters from the array
        # i.e. {name}|{type}#{length}#{width}
        key = '{0}|{1}#{2}#{3}'.format(name, array_dtype, length, width)
        # Store the binary string to Redis
        cache = redis(host=endpoint, port=6379, db=0)
        cache.set(key, val)
        return key
    elif type(obj) is str:
        key = '{0}|{1}'.format(name, 'string')
        val = obj
        cache = redis(host=endpoint, port=6379, db=0)
        cache.set(key, val)
        return key
    elif type(obj) is int:
        key = '{0}|{1}'.format(name, 'int')
        val = str(obj)
        cache = redis(host=endpoint, port=6379, db=0)
        cache.set(key, val)
        return key
    elif type(obj) is dict:
        #x = json.dumps(obj)
        #val = json.loads(x)
        val = json.dumps(obj)
        key = '{0}|{1}'.format(name, 'json')
        cache = redis(host=endpoint, port=6379, db=0)
        cache.set(key, val)
        return key
#    elif isjson(obj):
#        key = '{0}|{1}'.format(name, 'json')
#        val = json.dumps(obj)
#        cache = redis(host=endpoint, port=6379, db=0)
#        cache.set(key, val)
#        return key

In [20]:
def cache2obj(endpoint, key):
    if 'float64' in key:
        cache = redis(host=endpoint, port=6379, db=0)
        data = cache.get(key)
        # De-serialize the value
        array_dtype, length, width = key.split('|')[1].split('#')
        array = np.fromstring(data, dtype=array_dtype).reshape(int(length), int(width))
        return array
    elif 'int64' in key:
        cache = redis(host=endpoint, port=6379, db=0)
        data = cache.get(key)
        # De-serialize the value
        array_dtype, length, width = key.split('|')[1].split('#')
        array = np.fromstring(data, dtype=array_dtype).reshape(int(length), int(width))
        return array
    elif 'json' in key:
        cache = redis(host=endpoint, port=6379, db=0)
        data = cache.get(key)
        #parsed = json.loads(data)
        #array = json.dumps(parsed, indent=4, sort_keys=True)
        #return array
        return data
    elif 'int' in key:
        cache = redis(host=endpoint, port=6379, db=0)
        data = cache.get(key)
        return int(data)
    elif 'string' in key:
        cache = redis(host=endpoint, port=6379, db=0)
        data = cache.get(key)
        return data

In [21]:
# Load main dataset
dataset = h5py.File('/tmp/datasets.h5', "r")

# Retieve the Elasticache Cluster endpoint
cc = redis_client.describe_cache_clusters(ShowCacheNodeInfo=True)
endpoint = cc['CacheClusters'][0]['CacheNodes'][0]['Endpoint']['Address']

# Create numpy arrays from the various h5 datasets
train_set_x_orig = np.array(dataset["train_set_x"][:]) # train set features
train_set_y_orig = np.array(dataset["train_set_y"][:]) # train set labels
test_set_x_orig = np.array(dataset["test_set_x"][:]) # test set features
test_set_y_orig = np.array(dataset["test_set_y"][:]) # test set labels
#classes = np.array(dataset["list_classes"][:]) # the list of classes

# Reshape labels
train_set_y = train_set_y_orig.reshape((1, train_set_y_orig.shape[0]))
test_set_y = test_set_y_orig.reshape((1, test_set_y_orig.shape[0]))
    
# Preprocess inputs
train_set_x = standardize(train_set_x_orig)
test_set_x = standardize(test_set_x_orig)

# Dump the inputs to the temporary s3 bucket for TrainerLambda
#bucket = storage_init() # Creates a temporary bucket for the propogation steps
data_keys = {} # Dictionary for the hask keys of the data set
dims = {} # Dictionary of data set dimensions
a_list = [train_set_x, train_set_y, test_set_x, test_set_y]
a_names = [] # Placeholder for array names
for i in range(len(a_list)):
    # Create a lis of the names of the numpy arrays
    a_names.append(name2str(a_list[i], globals()))
for j in range(len(a_list)):
    # 
    data_keys[str(a_names[j][0])] = obj2cache(endpoint, obj=a_list[j], name=a_names[j][0])
    dims[str(a_names[j][0])] = a_list[j].shape
    
# Initialize weights
if w == 0: # Initialize weights to dimensions of the input data
    dim = dims.get('train_set_x')[0]
    weights = np.zeros((dim, 1))
    # Store the initial weights as a column vector on S3
    data_keys['weights'] = obj2cache(endpoint, obj=weights, name='weights')
else:
    #placeholder for random weight initialization
    pass
        
# Initialize Bias
if b != 0:
    #placeholder for random bias initialization
    #data_keys['bias'] = numpy2cache(endpoint, array=bias, name='bias')
    pass
else:
    data_keys['bias'] = obj2cache(endpoint, obj=b, name='bias')
    
# Initialize the results tracking object
data_keys['results'] = obj2cache(endpoint, obj='', name='results')
        

#return data_keys, [j for i in a_names for j in i], dims, #arams

In [22]:
data_keys

{'bias': 'bias|int',
 'results': 'results|string',
 'test_set_x': 'test_set_x|float64#12288#50',
 'test_set_y': 'test_set_y|int64#1#50',
 'train_set_x': 'train_set_x|float64#12288#209',
 'train_set_y': 'train_set_y|int64#1#209',
 'weights': 'weights|float64#12288#1'}

In [23]:
x = cache2obj(endpoint, key=data_keys['results'])
x

b''

In [24]:
x = cache2obj(endpoint, data_keys['bias'])
type(x)

int

In [25]:
# Extract the neural network parameters
with open('/tmp/parameters.json') as parameters_file:
    parameters = json.load(parameters_file)

In [26]:
# Build in additional parameters from neural network parameters
parameters['epoch'] = 1

# Next Layer to process
parameters['layer'] = 1

# Input data sets
# Simulate return variables from initialize_data()
parameters['input_data'] = [j for i in a_names for j in i]

parameters['data_keys'] = data_keys
parameters['data_dimensions'] = dims

# Initialize payload to `TrainerLambda`
payload = {}

# Initialize the overall state
payload['state'] = 'start'

# Dump the parameters to ElastiCache
payload['parameters'] = obj2cache(endpoint, obj=parameters, name='parameters')

# ElastiCache endpoint 
payload['endpoint'] = endpoint

# Prepare the payload for `TrainerLambda`
payload

{'endpoint': 'lnn-re-1o4ic3wm3z3t9.svpice.0001.usw2.cache.amazonaws.com',
 'parameters': 'parameters|json',
 'state': 'start'}

In [27]:
type(payload)

dict

In [28]:
a = dumps(payload)
b = loads(a)
print(type(a))
print(type(b))

<class 'str'>
<class 'dict'>


In [29]:
print(type(dumps(parameters)))

<class 'str'>


In [34]:
cache = redis(host=endpoint, port=6379, db=0)
x = cache.get('parameters|json')
x

b'{"epochs": 20, "layers": 1, "activations": {"layer1": "sigmoid"}, "neurons": {"layer1": 1}, "weight": 0, "bias": 0, "learning_rate": 0.005, "epoch": 1, "layer": 1, "input_data": ["train_set_x", "train_set_y", "test_set_x", "test_set_y"], "data_keys": {"train_set_x": "train_set_x|float64#12288#209", "train_set_y": "train_set_y|int64#1#209", "test_set_x": "test_set_x|float64#12288#50", "test_set_y": "test_set_y|int64#1#50", "weights": "weights|float64#12288#1", "bias": "bias|int", "results": "results|string"}, "data_dimensions": {"train_set_x": [12288, 209], "train_set_y": [1, 209], "test_set_x": [12288, 50], "test_set_y": [1, 50]}}'

In [35]:
json.loads(x)

{'activations': {'layer1': 'sigmoid'},
 'bias': 0,
 'data_dimensions': {'test_set_x': [12288, 50],
  'test_set_y': [1, 50],
  'train_set_x': [12288, 209],
  'train_set_y': [1, 209]},
 'data_keys': {'bias': 'bias|int',
  'results': 'results|string',
  'test_set_x': 'test_set_x|float64#12288#50',
  'test_set_y': 'test_set_y|int64#1#50',
  'train_set_x': 'train_set_x|float64#12288#209',
  'train_set_y': 'train_set_y|int64#1#209',
  'weights': 'weights|float64#12288#1'},
 'epoch': 1,
 'epochs': 20,
 'input_data': ['train_set_x', 'train_set_y', 'test_set_x', 'test_set_y'],
 'layer': 1,
 'layers': 1,
 'learning_rate': 0.005,
 'neurons': {'layer1': 1},
 'weight': 0}

In [36]:
key = payload['parameters']
print(key)

parameters|json


In [37]:
tmp = cache2obj(endpoint, key=payload['parameters'])
tmp

b'{"epochs": 20, "layers": 1, "activations": {"layer1": "sigmoid"}, "neurons": {"layer1": 1}, "weight": 0, "bias": 0, "learning_rate": 0.005, "epoch": 1, "layer": 1, "input_data": ["train_set_x", "train_set_y", "test_set_x", "test_set_y"], "data_keys": {"train_set_x": "train_set_x|float64#12288#209", "train_set_y": "train_set_y|int64#1#209", "test_set_x": "test_set_x|float64#12288#50", "test_set_y": "test_set_y|int64#1#50", "weights": "weights|float64#12288#1", "bias": "bias|int", "results": "results|string"}, "data_dimensions": {"train_set_x": [12288, 209], "train_set_y": [1, 209], "test_set_x": [12288, 50], "test_set_y": [1, 50]}}'