In [1]:
import tensorflow as tf
import numpy as np
from tfUtils import *
import matplotlib.pyplot as plt
import innvestigate

Using TensorFlow backend.


In [2]:
from keras.layers import Input, Activation, Dense, Flatten, ZeroPadding2D, Conv2D, BatchNormalization, MaxPooling2D
from keras.models import Model

In [3]:
def generate_data(m, n_x, seed = 9):
    np.random.seed = seed
    X = np.random.randn(m, n_x)
    Y = np.zeros((m, 2))
    dist = np.random.randint(low = 0, high = n_x-1, size = m)
    for i in range(m):
        t = np.random.rand();
        if t < 0.5:
            X[i, dist[i]] = -20
            Y[i, 0] = 1
        elif t >= 0.5:
            X[i, dist[i]] = 20
            Y[i, 1] = 1
    return X, Y, dist

In [4]:
def initialize_parameters(shape, name, which):
    if which == "weights":
        return tf.get_variable(name, shape, 
                               initializer = tf.contrib.layers.xavier_initializer())
    elif which == "bias":
        return tf.get_variable(name, shape, 
                              initializer = tf.zeros_initializer())
    return

In [5]:
def create_placeholders(n_h, n_w, n_c, n_y):
    X = tf.placeholder(tf.float32, [None, n_h, n_w, n_c])
    Y = tf.placeholder(tf.float32, [None, n_y])
    return X, Y

In [6]:
def flatten(layer):
    size = layer.shape
    num_features = size[1] * size[2] * size[3]
    flattened_layer = tf.reshape(layer, [-1, num_features])
    return flattened_layer, num_features

In [7]:
def fully_connected(layer, input_features, output_features, activation, indicator):
    weights = initialize_parameters([input_features, output_features], "W" + str(indicator), "weights")
    bias = initialize_parameters([1, output_features], "b" + str(indicator), "bias")
    Z = tf.matmul(layer, weights) + bias
    if activation == "relu":
        return tf.nn.relu(Z), weights, bias
    elif activation == "sigmoid":
        return tf.nn.sigmoid(Z), weights, bias
    elif activation == "tanh":
        return tf.nn.tanh(Z), weights, bias
    elif activation == "linear":
        return Z, weights, bias

In [9]:
def conv_2D(layer, filter_size, output_channels, stride, padding, activation, use_maxpool, indicator):
    input_channels = layers.shape[3]
    weights = initialize_parameters([filter_size, filter_size, input_channels, output_channels], "W" + str(indicator), 
                                   "weights")
    bias = initialize_parameters([output_channels], "b" + str(indicator), "bias")
    Z = tf.nn.conv2d(layer, weights, strides = [1, stride, stride, 1], padding = padding) + bias
    if activation == "relu":
        A = tf.nn.relu(Z)
    elif activation == "sigmoid":
        A = tf.nn.sigmoid(Z)
    elif activation == "tanh":
        A = tf.nn.tanh(Z)
    elif activation == "linear":
        A = Z
    if use_maxpool:
        return tf.nn.max_pool(A, ksize = [1, 2, 2, 1], strides = [1, 2, 2, 1], padding = 'VALID'), weights, bias
    else:
        return A, weights, bias

In [10]:
def compute_cost(Z, Y):
    cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits = Z, labels = Y))
    return cost

In [11]:
X_train, Y_train, dist = generate_data(50000, 144)
X_train = X_train.reshape(-1, 12, 12, 1)

In [17]:
def model(X_train, Y_train, layers, filter_size, stride, padding, activation, 
          use_maxpool, epochs, batch_size, starting_rate, decay):
    input_shape = X.shape
    output_shape = Y.shape
    X, Y = create_placeholders(input_shape[1], input_shape[2], input_shape[3], output_shape[1])
    parameters = {}
    A = X
    already_flat = 0
    for i in range(1, len(layers) + 1):
        if layers[i][0] == 'conv_2D':
            A, parameters['W' + str(i)], parameters['b' + str(i)] = conv_2D(
                A, filter_size, layers[i - 1], stride, padding, activation[i - 1], use_maxpool, indicator = i)
        elif layers[i][0] == 'fc':
            if already_flat == 0:
                A, input_layers = flatten(A)
            else: input_layers = A.get_shape()[1]
            A, parameters['W' + str(i)], parameters['b' + str(i)] = fully_connected(
                A, input_layers, layers[i - 1], activation[i - 1], indicator = i)
    cost = compute_cost(A, Y)
    hard_A = tf.argmax(tf.nn.softmax(A, axis = 1), axis = 1)
    acc = tf.reduce_mean(tf.cast(tf.equal(hard_A, tf.argmax(Y, axis = 1)), tf.float32))
    global_steps = tf.Variable(0, trainable = False)
    learning_rate = tf.train.exponential_decay(starting_rate, global_steps, 5000, decay, staircase = True)
    train = tf.train.AdamOptimizer(learning_rate = learning_rate).minimize(cost, global_step = global_steps)
    init = tf.global_variables_initializer()
    cost_list = []
    with tf.Session() as sess:
        sess.run(init)
        for epoch in range(epochs):
            minibatches = random_minibatches(X_train, Y_train, batch_size, seed = epoch)
            epoch_cost = 0
            for minibatch in minibatches:
                sess.run(train, feed_dict = {X : minibatch[0], Y : minibatch[1]})
                epoch_cost += sess.run(cost, feed_dict = {X : minibatch[0], Y : minibatch[1]}) / len(minibatches)
            cost_list.append(epoch_cost)        
            if epoch % 5 == 0:
                print(epoch_cost)
        parameter_names = []
        parameter_list = []
        for i in range(1, len(parameters) / 2 + 1):
            parameter_names.append("W" + str(i))
            parameter_names.append("b" + str(i))
        for j in range(len(parameter_names)):
            parameter_list.append(sess.run(parameters[parameter_names[j]]))
        print("accuracy", sess.run(acc, feed_dict = {X : X_train, Y : Y_train}))
    sess.close()
    plt.plot(np.array(cost_list), '-b')
    plt.show()
    return parameter_list