In [2]:
### imports
import numpy as np
import keras
from matplotlib import pyplot as plt
import os
import copy
import pydot


Using TensorFlow backend.


# layer tests

In [139]:
### average pooling test

def arravg(array,numels,offset):
    avg=0
    ct = 0
    for i in range(numels):
        if array[i*offset] > -np.inf:
            avg += array[i*offset]
            ct += 1
    avg = avg/ct
    return avg

def avgpool1d(inputs,pool_size,stride,out_height,in_width):
    k=0
    outputs = np.zeros(out_height*in_width)
    inputs = inputs.flatten(order='C')
    for i in range(out_height):
        inrowidx = k*in_width;
        outrowidx = i*in_width;
        for j in range(in_width):
            outputs[outrowidx+j] = arravg(inputs[inrowidx+j:],pool_size,in_width)
        k += stride
    return outputs

inshape = (10,10)
pool_size=3
stride=1
pad = 'same'
a = keras.layers.Input(inshape)
b = keras.layers.AveragePooling1D(pool_size=pool_size, strides=stride, padding=pad)(a)
model = keras.models.Model(inputs=[a], outputs=[b])

x = np.random.random(inshape)
x1 = x[np.newaxis,...]
y = model.predict(x1)

(in_height,in_width) = model.layers[1].input_shape[1:]
(out_height, out_width) = model.layers[1].output_shape[1:]

if pad is 'same':
    pad_along_height = max((out_height - 1) * stride +
                        pool_size - in_height, 0)
    pad_top = int(pad_along_height // 2)
    pad_bottom = int(pad_along_height - pad_top)
elif pad is 'valid':
    pad_top=0
    pad_bottom=0
padded_in_height = in_height + pad_top + pad_bottom


y1 = np.concatenate((-np.inf*np.ones((pad_top,in_width)),x,-np.inf*np.ones((pad_bottom,in_width))),0)
y2 = avgpool1d(y1,pool_size,stride,out_height,in_width)
np.mean(np.abs(y.flatten(order='C')-y2.flatten(order='C')))


1.5043300182898677e-08

In [313]:
### max pooling test
def arrmax(array,numels,offset):
    maxval=array[0];
    for i in range(numels):
        if array[i*offset]>maxval:
            maxval = array[i*offset]
    return maxval  

def maxpool1d(inputs,pool_size,stride,out_height,in_width):
    k=0
    outputs = np.zeros(out_height*in_width)
    inputs = inputs.flatten(order='C')
    for i in range(out_height):
        inrowidx = k*in_width;
        outrowidx = i*in_width;
        for j in range(in_width):
            outputs[outrowidx+j] = arrmax(inputs[inrowidx+j:],pool_size,in_width)
        k += stride
    return outputs

inshape = (8,23)
pool_size=2
stride=1
pad = 'same'
a = keras.layers.Input(inshape)
b = keras.layers.MaxPooling1D(pool_size=pool_size, strides=stride, padding=pad)(a)
model = keras.models.Model(inputs=[a], outputs=[b])

x = np.random.random(inshape)
x1 = x[np.newaxis,...]
y = model.predict(x1)

(in_height,in_width) = model.layers[1].input_shape[1:]
(out_height, out_width) = model.layers[1].output_shape[1:]

if pad is 'same':
    pad_along_height = max((out_height - 1) * stride +
                        pool_size - in_height, 0)
    pad_top = int(pad_along_height // 2)
    pad_bottom = int(pad_along_height - pad_top)
elif pad is 'valid':
    pad_top=0
    pad_bottom=0
padded_in_height = in_height + pad_top + pad_bottom

y1 = np.concatenate((-np.inf*np.ones((pad_top,in_width)),x,-np.inf*np.ones((pad_bottom,in_width))),0)
y2 = maxpool1d(y1,pool_size,stride,out_height,in_width)
np.mean(np.abs(y.flatten(order='C')-y2.flatten(order='C')))

1.17117913332441e-08

In [148]:
### conv1d test

def conv1d(x,kernel,bias,out_height,out_width,kernel_size,in_width,padded_in_height,stride,dilation):
    x = x.flatten(order='C')
    kernel = kernel.flatten(order='C')
    out_size = out_height*out_width
    output = np.zeros(out_size)
   
    for p in range(out_height):
        outrowidx = p*out_width
        for k in range(out_width):
            for z in range(kernel_size):
                kernelidx = z*in_width*out_width
                for q in range(in_width):
                    inheightidx = q*out_width
                    output[outrowidx+k] += kernel[kernelidx+ inheightidx+ k]*x[(p*stride+ z*dilation)*in_width+q];
            output[outrowidx+k] += bias[k]
    
    return output

inshape = (10,10)
pool_size=3
stride=1
dilation = 1
kernel_size = 3
num_filters = 12
pad = 'same'
a = keras.layers.Input(inshape)
b = keras.layers.Conv1D(filters=num_filters, dilation_rate=dilation, kernel_size=kernel_size, strides=stride, padding=pad)(a)
model = keras.models.Model(inputs=[a], outputs=[b])

x = np.random.random(inshape)
x1 = x[np.newaxis,...]
y = model.predict(x1)

(in_height,in_width) = model.layers[1].input_shape[1:]
(out_height, out_width) = model.layers[1].output_shape[1:]

if pad is 'causal':
    pad_along_height = dilation*(kernel_size-1)
    pad_top = pad_along_height
    pad_bottom = 0
elif pad is 'same':
    pad_along_height = max((out_height - 1) * stride +
                        pool_size - in_height, 0)
    pad_top = int(pad_along_height // 2)
    pad_bottom = int(pad_along_height - pad_top)
elif pad is 'valid':
    pad_top=0
    pad_bottom=0
padded_in_height = in_height + pad_top + pad_bottom

kernel = model.layers[1].get_weights()[0]
bias = model.layers[1].get_weights()[1]
y1 = np.concatenate((np.zeros((pad_top,in_width)),x,np.zeros((pad_bottom,in_width))),0)
y2 = conv1d(y1,kernel,bias,out_height,out_width,kernel_size,in_width,padded_in_height,stride,dilation)
np.mean(np.abs(y.flatten(order='C')-y2.flatten(order='C')))

3.3243318054802334e-08

In [177]:
### simple RNN test
def rnn(inputs,weights,recurrent_weights,bias):
    state = np.zeros(bias.shape[0])
    for i in range(inputs.shape[0]):
        state = rnncell(inputs[i,:],state,weights,recurrent_weights,bias)
    return state
   
def rnncell(inputs,state,weights,recurrent_weights,bias):
    prev = state
    h = inputs@weights+bias
    output = h + prev@recurrent_weights
    output = np.tanh(output)
    
    return output

insize = (10,33)
a = keras.layers.Input(insize)
b = keras.layers.SimpleRNN(12, dropout=.5, recurrent_dropout=.5)(a)

model = keras.models.Model(inputs=[a], outputs=[b])
x = np.random.random(insize)
x1 = x[np.newaxis,...]
y = model.predict(x1)

model.layers[1].output_shape

weights = model.layers[1].get_weights()[0]
recurrent_weights = model.layers[1].get_weights()[1]
bias = model.layers[1].get_weights()[2]
a = rnn(x,weights,recurrent_weights,bias)
np.max(np.abs(a-y))


1.5031581013236206e-07

In [183]:
### GRU test

def gru(inputs, kernel, recurrent_kernel, input_bias, recurrent_bias, reset_after, units):
    
    state = np.zeros(units)
    for i in range(inputs.shape[0]):
        state = grucell(inputs[i,:], state, kernel, recurrent_kernel, input_bias, recurrent_bias, reset_after, units)
    return state
    

def grucell(inputs, state, kernel, recurrent_kernel, input_bias, recurrent_bias, reset_after, units):
    kernel_z = kernel[:, :units]
    recurrent_kernel_z = recurrent_kernel[:, :units]
    # reset gate
    kernel_r = kernel[:, units: units * 2]
    recurrent_kernel_r = recurrent_kernel[:,units:units * 2]
    # new gate
    kernel_h = kernel[:, units * 2:]
    recurrent_kernel_h = recurrent_kernel[:, units * 2:]
    
    input_bias_z = input_bias[:units]
    input_bias_r = input_bias[units: units * 2]
    input_bias_h = input_bias[units * 2:]
        
    recurrent_bias_z = recurrent_bias[:units]
    recurrent_bias_r = recurrent_bias[units: units * 2]
    recurrent_bias_h = recurrent_bias[units * 2:]
    
    
    x_z = inputs@kernel_z + input_bias_z
    x_r = inputs@kernel_r + input_bias_r
    x_h = inputs@kernel_h + input_bias_h
    
    h_tm1 = state

            
    recurrent_z = h_tm1@recurrent_kernel_z
    recurrent_r = h_tm1@recurrent_kernel_r
#     if reset_after: 
    recurrent_z = recurrent_z + recurrent_bias_z
    recurrent_r = recurrent_r + recurrent_bias_r

    z = np.tanh(x_z + recurrent_z)
    r = np.tanh(x_r + recurrent_r)

    # reset gate applied after/before matrix multiplication
    if reset_after:
        recurrent_h = h_tm1@recurrent_kernel_h + recurrent_bias_h
        recurrent_h = r * recurrent_h
    else:
        recurrent_h = (r * h_tm1)@recurrent_kernel_h
    hh = np.tanh(x_h + recurrent_h)
    h = z * h_tm1 + (1 - z) * hh
    return h

reset_after = True
units=12
insize = (10,33)
a = keras.layers.Input(insize)
b = keras.layers.GRU(units, reset_after=reset_after, activation='tanh', recurrent_activation='tanh',\
                    bias_initializer='glorot_uniform',dropout=.76, recurrent_dropout=.25)(a)

model = keras.models.Model(inputs=[a], outputs=[b])
x = np.random.random(insize)
x1 = x[np.newaxis,...]
y = model.predict(x1)

kernel = model.layers[1].get_weights()[0]
recurrent_kernel = model.layers[1].get_weights()[1]
bias = model.layers[1].get_weights()[2]
if reset_after:
    input_bias = bias[0,:]
    recurrent_bias = bias[1,:]
else:
    recurrent_bias = np.zeros(3*units)
    input_bias = bias

a = gru(x, kernel, recurrent_kernel, input_bias, recurrent_bias, reset_after, units)
np.max(np.abs(a-y)) 



7.875467575857442e-07

In [232]:
### dropout test
inshape = (10,20)
a = keras.layers.Input(inshape)
b = keras.layers.Dense(40, use_bias=False)(a)
c = keras.layers.Dropout(.9)(b)
d = keras.layers.Dense(40, use_bias=False)(c)
model = keras.models.Model(inputs=a, outputs=d)

x = np.random.random(inshape)
x1 = x[np.newaxis,...]
y = model.predict(x1)

y2 = (x@model.layers[1].get_weights()[0])@model.layers[3].get_weights()[0]

np.max(np.abs(y2-y)) 



3.6064534425506167e-07

In [3]:
### flatten / reshape test
x = np.array(range(12)).reshape((3,4))
x1 = np.reshape(x, (1, -1))
np.max(np.abs(x1.flatten() - x.flatten()))

0

In [331]:
### reshape test
inshape = (20,16)
a = keras.layers.Input(inshape)
b = keras.layers.Reshape((8,2,2,10))(a)

model = keras.models.Model(inputs=a, outputs=b)

x = np.random.random(inshape)
x1 = x[np.newaxis,...]
y = model.predict(x1)


np.max(np.abs(x.flatten()-y.flatten()))


2.9774109622238143e-08

In [53]:
### transpose testing
def transp(a,ndim,olddim,permute):
    a = a.flatten()
    b = np.zeros(a.shape)
    oldidx2d = np.array([0,0])
    oldidx3d = np.array([0,0,0])
    newidx2d = np.array([0,0])
    newidx3d = np.array([0,0,0])

#    b[i,j,k] = a[i * (dim2 * dim3) + (j * dim3) + k]
#    b[i,j] = a[i*dim2 + j]
    if ndim==1:
        return a # no need to transpose a 1d array
    if ndim==2:
        oldrows = olddim[0]
        oldcols = olddim[1]
        newrows = olddim[permute[0]]
        newcols = olddim[permute[1]]
        for i in range(oldrows):
            for j in range(oldcols):
                oldidx2d = [i,j]
                newidx2d = [oldidx2d[permute[0]],oldidx2d[permute[1]]]
                b[newidx2d[0]*newcols + newidx2d[1]] = a[oldidx2d[0]*oldcols + oldidx2d[1]]
        return b
    if ndim==3:
        oldrows = olddim[0]
        oldcols = olddim[1]
        oldchan = olddim[2]
        newrows = olddim[permute[0]]
        newcols = olddim[permute[1]]
        newchan = olddim[permute[2]]
        for i in range(oldrows):
            for j in range(oldcols):
                for k in range(oldchan):
                    oldidx3d = [i,j,k]
                    newidx3d = [oldidx3d[permute[0]],oldidx3d[permute[1]],oldidx3d[permute[2]]]
                    b[newidx3d[0]*newcols*newchan + newidx3d[1]*newchan + newidx3d[2]] =\
                        a[oldidx3d[0]*oldcols*oldchan + oldidx3d[1]*oldchan + oldidx3d[2]]
        return b
    if ndim==4:
        oldrows = olddim[0]
        oldcols = olddim[1]
        oldchan = olddim[2]
        oldtime = olddim[3]
        newrows = olddim[permute[0]]
        newcols = olddim[permute[1]]
        newchan = olddim[permute[2]]
        newtime = olddim[permute[3]]
        for i in range(oldrows):
            for j in range(oldcols):
                for k in range(oldchan):
                    for l in range(oldtime):
                        oldidx4d = [i,j,k,l]
                        newidx4d = [oldidx4d[permute[0]],oldidx4d[permute[1]],oldidx4d[permute[2]],oldidx4d[permute[3]]]
                        b[newidx4d[0]*newcols*newchan*newtime + newidx4d[1]*newchan*newtime + newidx4d[2]*newtime + newidx4d[3]] =\
                            a[oldidx4d[0]*oldcols*oldchan*oldtime + oldidx4d[1]*oldchan*oldtime + oldidx4d[2]*oldtime + oldidx4d[3]]
        return b

inshape = (5,7,8,12)
permute = np.array((3,2,1,4))
ndim = 4
l1 = keras.layers.Input(inshape)
l2 = keras.layers.Permute(permute)(l1)
model = keras.models.Model(inputs=l1, outputs=l2)

x = np.random.random(inshape)
x1 = x[np.newaxis,...]
y = model.predict(x1)

xt = transp(x,ndim,inshape,permute-1)

np.max(np.abs(xt-y.flatten()))
    

2.9799230083504824e-08

In [56]:
inshape = (4,5)
a = keras.layers.Input(inshape)
b = keras.layers.Dense(10)(a)
model = keras.models.Model(inputs=a, outputs=b)
model.layers[1].get_weights()[0].shape

(5, 10)

# misc testing and notes

In [25]:
### indexing testing
kernel_size=9
in_width=48
filters=19
b = np.random.random((kernel_size,in_width,filters))
a = b.flatten(order='C')
idx = (4,17,2)
b[idx] - a[idx[0] * (in_width * filters) + (idx[1] * filters) + idx[2]]

0.0

In [26]:
### input/output sizing with filters/padding
#out_height = ceil(float(in_height) / float(strides[1]))
#out_width  = ceil(float(in_width) / float(strides[2]))

#pad_along_height = max((out_height - 1) * strides[1] +
#                    filter_height - in_height, 0)
#pad_along_width = max((out_width - 1) * strides[2] +
#                   filter_width - in_width, 0)
#pad_top = pad_along_height // 2
#pad_bottom = pad_along_height - pad_top
#pad_left = pad_along_width // 2
#pad_right = pad_along_width - pad_left




In [14]:
### pydot graph testing
#graph = pydot.graph_from_dot_data(str())
graph = keras.utils.vis_utils.model_to_dot(model)
nodes = graph.get_nodes()
edges = graph.get_edges()
# for edge in edges:
#     print(edge)
# for node in nodes:
#     print(node)

In [248]:
### kgraph stuff

class keras_node():
    def __init__(self, pydot_node):
        self.pydot_node = pydot_node
        self.ID = pydot_node.to_string().split()[0]
        self.type = pydot_node.to_string().split()[2][:-3]
        self.name = pydot_node.to_string().split()[1][8:-1]
        self.inputs = []
        self.outputs = []
    
    def find_layer_idx(self,model):
        """finds the index into model.layers corresponding to the node"""
        for i, layer in enumerate(model.layers):
            if layer.name == self.name:
                self.layer_idx = i
                return
        self.layer_idx = None
            
class keras_edge():
    def __init__(self, pydot_edge):
        self.pydot_edge = pydot_edge
        self.start_id = pydot_edge.to_string().split()[0]
        self.end_id = pydot_edge.to_string().split()[2][:-1]

class keras_graph():
    def __init__(self, model):
        self.model = model
        self.pydot_graph = keras.utils.vis_utils.model_to_dot(model)
        self.edges = self.parse_edges(self.pydot_graph)
        self.nodes = self.parse_nodes(self.pydot_graph)
        self.input_nodes = []
        self.output_nodes = []
        
    def parse_edges(self,pydot_graph):
        """converts pydot edges to keras_graph edges"""
        edges = []
        for edge in pydot_graph.get_edges():
            edges += [keras_edge(edge)]
        return edges
    
    def parse_nodes(self,pydot_graph):
        """converts pydot nodes to keras_graph nodes"""
        nodes = []
        for i, node in enumerate(pydot_graph.get_nodes()[1:]):
            nodes += [keras_node(node)]
            nodes[i].find_layer_idx(self.model)
        return nodes
    
    def parse_connections(self):
        for edge in self.edges:
            self.find_node_from_id(edge.start_id).outputs += [self.find_node_from_id(edge.end_id)]
            self.find_node_from_id(edge.end_id).inputs += [self.find_node_from_id(edge.start_id)]

    def find_node_from_id(self,ID):
        """finds the node associated with a given ID"""
        for node in self.nodes:
            if node.ID == ID:
                return node
        return None
    
    def find_io_nodes(self):
        for node in self.nodes:
            if len(node.inputs) == 0:
                self.input_nodes += [node]
            if len(node.outputs) == 0:
                self.output_nodes += [node]
            
def write_model(model):
    kgraph = keras_graph(model)
    kgraph.parse_connections()
    kgraph.find_io_nodes()

    written_nodes = []
    unwritten_nodes = copy.deepcopy(kgraph.nodes)
    while len(unwritten_nodes)>0:
        for node in unwritten_nodes:
            if set(node.inputs).issubset(written_nodes):
                write_layer(model.layers[node.layer_idx],node)
                written_nodes.append(node)
                unwritten_nodes.remove(node)
        
def write_layer(layer,node):
    print(layer.name)
    

In [1]:
### check and io functions

def get_all_io_names(model):
    def flatten(x):
        if isinstance(x, list) or isinstance(x, tuple):
            return [a for i in x for a in flatten(i)]
        else:
            return [x]
    
    a = [get_layer_io_names(layer) for layer in model.layers]
    return list(set(flatten(a)))

def get_layer_io_names(layer):
    inputs = []
    num_inputs = 0
    error = False
    while not error:
        try:
            layer.get_input_at(num_inputs)
            num_inputs +=1
        except:
            error = True
    
    outputs = []
    num_outputs = 0
    error = False
    while not error:
        try:
            layer.get_output_at(num_outputs)
            num_outputs +=1
        except:
            error = True
    # num_inputs>1 -> shared layer
    for i in range(num_inputs):
        # is the input a list?
        if isinstance(layer.get_input_at(i), list):
            temp_list = []
            list_length = len(layer.get_input_at(i))
            for j in range(list_length):
                name = str(layer.get_input_at(i)[j]).split()[0].split('"')[1].split('/')[0].split(':')[0]
                temp_list.append(name)
            inputs.insert(i,temp_list)
        else:
            name = str(layer.get_input_at(i)).split()[0].split('"')[1].split('/')[0].split(':')[0]
            inputs.insert(i,name)
            
    for i in range(num_outputs):
        # is the output a list?
        if isinstance(layer.get_output_at(i), list):
            temp_list = []
            list_length = len(layer.get_output_at(i))
            for j in range(list_length):
                name = str(layer.get_output_at(i)[j]).split()[0].split('"')[1].split('/')[0].split(':')[0]
                temp_list.append(name)
            outputs.insert(i,temp_list)
        else:
            name = str(layer.get_output_at(i)).split()[0].split('"')[1].split('/')[0].split(':')[0]
            outputs.insert(i,name)

    return (inputs, outputs)

def is_valid_c_name(name):
    allowed_chars = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_123456789'
    allowed_starting_chars = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_'
    if not set(name).issubset(allowed_chars):
        return False
    if not set(name[0]).issubset(allowed_starting_chars):
        return False
    return True

def name_check(model):
    for layer in model.layers:
        assert (is_valid_c_name(layer.name)), "layer name '" + layer.name + "' is not a valid C name"

def layer_type(layer):
    return str(layer.__class__).split('.')[-1][0:-2]
def layers_supported_check(model):
    supported_layers = ['Dense','LSTM','Conv1D','InputLayer','MaxPooling1D','AveragePooling1D',\
                        'GlobalMaxPooling1D','GlobalAveragePooling1D','Add','Multiply','Average',\
                        'Maximum','Minimum','LeakyReLU','ELU','ThresholdedReLU','ReLU']
    for layer in model.layers:
        assert(layer_type(layer) in supported_layers), "layer type '" + \
        layer_type(layer) + "' is not supported at this time"
        
def activation_supported_check(model):
    supported_activations = ['linear', 'relu','softmax','softplus','softsign','relu','tanh',\
                             'sigmoid','hard_sigmoid','exponential' ]
    
    for layer in model.layers:
        if 'activation' in layer.get_config():
            assert(layer.get_config()['activation'] in supported_activations), \
            "activation type '" + layer.get_config()['activation'] + "' is not supported at this time"
        if 'recurrent_activation' in layer.get_config():
            assert(layer.get_config()['recurrent_activation'] in supported_activations), \
            "recurrent activation type '" + layer.get_config()['recurrent_activation'] + "' is not supported at this time"


# model testing

In [None]:
### model testing

inshape = (10,10)
pool_size=3
stride=1
dilation=1
num_filters=10
kernel_size=3
pad = 'valid'
a = keras.layers.Input(inshape)
b = keras.layers.Dense(10)(a)
c = keras.layers.AveragePooling1D(pool_size=pool_size, strides=stride, padding=pad)(b)
# d = keras.layers.Conv1D(filters=num_filters, kernel_size=kernel_size, strides=stride, padding=pad, dilation_rate=dilation)(c)
# e = keras.layers.Input((10,10))
# f = keras.layers.Dense(10)(e)
# g = keras.layers.LSTM(10)(f)
# h = keras.layers.add([g,d,e])
model = keras.models.Model(inputs=[a], outputs=[c])

# a = keras.layers.Input(inshape)
# b = keras.layers.Input(inshape)
# c = keras.layers.LSTM(20)
# d = c(a)
# e = c(b)
# model = keras.models.Model(inputs=[a,b], outputs=[d,e])

#print(model.layers[1].input_shape)
#print(model.layers[1].output_shape)
#print(model.layers[1].get_weights()[0].shape)
#model.layers[2].get_config()
print(model.layers[0].name)
str(model.layers[0].get_input_at(0)).split()[0][8:-4]
dir(model)
print(model.output[0].name)
len(model.layers[-1].input_shape)
#print(model.layers[5].name)
