In [57]:
import os
import json
import random
import numpy as np
import tensorflow as tf

In [58]:
# List that holds all the grids to be used as inputs for training
x_train_grid = []
# List that holds all the positions to be used as inputs for training
x_train_position = []
# List that holds all the distances to be used as inputs for training
x_train_distance = []
# List that holds all the locals to be used as inputs for training
x_train_locals = []
# List that holds all the directions to be used as outputs for training
y_train = []

# List that holds all the grids to be used as inputs for testing
x_test_grid = []
# List that holds all the positions to be used as inputs for testing
x_test_position = []
# List that holds all the distances to be used as inputs for testing
x_test_distance = []
# List that holds all the locals to be used as inputs for testing
x_test_locals = []
# List that holds all the directions to be used as outputs for testing
y_test = []

In [59]:
# Name of directory for grids
directory_name = './data/big/'

direct_count = [0 for i in range(4)]

# iterate through all training grids
for file_name in os.listdir(directory_name):
    f = open(directory_name + file_name)
    data = json.load(f)

    # Iterate through all the data in a given grid and append their input and output values
    for i in data:
        x_train_grid.append(i['gridworld'])
        x_train_position.append(i['position'])
        x_train_locals.append(i['local'])
        y_train.append(i['direction'])
        direct_count[i['direction']] += 1
    
    # Close file socket
    f.close()

In [60]:
# Name of directory for grids
directory_name = './data/bigtest/'

# iterate through all training grids
for file_name in os.listdir(directory_name):
    f = open(directory_name + file_name)
    data = json.load(f)

    # Iterate through all the data in a given grid and append their input and output values
    for i in data:
        x_test_grid.append(i['gridworld'])
        x_test_position.append(i['position'])
        x_test_locals.append(i['local'])
        y_test.append(i['direction'])
    
    # Close file socket
    f.close()

In [61]:
# Reshape the data
train_in_grid = np.reshape( x_train_grid, (-1, 50, 50, 1) ) / 2
train_in_position = np.reshape( x_train_position, (-1, 50, 50, 1) )
train_in_locals = np.reshape( x_train_locals, (-1, 5, 5, 1) )
train_out = tf.keras.utils.to_categorical( y_train, 4 )

test_in_grid = np.reshape( x_test_grid, (-1, 50, 50, 1) ) / 2
test_in_position = np.reshape( x_test_position, (-1, 50, 50, 1) )
test_in_locals = np.reshape( x_test_locals, (-1, 5, 5, 1) )
test_out = tf.keras.utils.to_categorical( y_test, 4 )

In [62]:
# Flatten the grid input
grid_input = tf.keras.layers.Input( shape = (50,50,1) )
cnn_grid = tf.keras.layers.Conv2D( filters = 4, kernel_size = (5,5), strides = (1,1),
                               padding = "valid", activation = tf.nn.relu )( grid_input )
flatten_grid = tf.keras.layers.Flatten()( cnn_grid )

In [63]:
# Flatten the position input
position_input = tf.keras.layers.Input( shape = (50,50,1) )
cnn_position = tf.keras.layers.Conv2D( filters = 2, kernel_size = (5,5), strides = (1,1),
                               padding = "valid", activation = tf.nn.relu )( position_input )
flatten_position = tf.keras.layers.Flatten()( cnn_position )

In [64]:
# flatten the local inputs
local_input = tf.keras.layers.Input( shape = (5,5,1) )
cnn_local = tf.keras.layers.Conv2D( filters = 3, kernel_size = (3,3), strides = (1,1),
                               padding = "valid", activation = tf.nn.relu )( local_input )
flatten_local = tf.keras.layers.Flatten()( cnn_local )

In [65]:
# Concatenate the grid and position into one vector which will be passed to neural network as input
final_input = tf.keras.layers.Concatenate()([flatten_grid, flatten_local, flatten_position])
print(final_input)

KerasTensor(type_spec=TensorSpec(shape=(None, 12723), dtype=tf.float32, name=None), name='concatenate_3/concat:0', description="created by layer 'concatenate_3'")


In [66]:
# Create layers for Neural Network
dense_1 = tf.keras.layers.Dense( units = 100, activation = tf.nn.relu )( final_input )
dense_2 = tf.keras.layers.Dense( units = 64, activation = tf.nn.relu )( dense_1 )
logits = tf.keras.layers.Dense( units = 4, activation = None )( dense_2 )
probabilities = tf.keras.layers.Softmax()( logits )

In [67]:
# Compile the neural network to use stochastic gradient descent as the optimizer and categorical_crossentropy as loss function
model = tf.keras.Model( inputs = [grid_input, local_input, position_input], outputs = probabilities )
opt = tf.keras.optimizers.SGD()
model.compile( optimizer = opt, loss = 'categorical_crossentropy', metrics = ['accuracy'] )

In [68]:
def generate_confusion_matrix( data, labels ):
    mat = [ [ 0 for i in range(4) ] for j in range(4) ]
    
    prob_predict = model.predict( data )
    #print(prob_predict)
    predictions = np.argmax( prob_predict, axis = 1 )
    
    for i in range( data[0].shape[0] ):
        mat[ labels[i] ][ predictions[i] ] += 1
        # if labels[i] == 0:
        #     print(prob_predict[i])
    
    for i in range(4):
        print( "\t".join( [ str(c) for c in mat[i] ] ) )

In [69]:
# Test out before training
generate_confusion_matrix( [test_in_grid, test_in_locals, test_in_position], y_test )

362	9	1	13
1034	6	0	28
2119	34	4	188
2813	24	4	187


In [70]:
# Train the model
history = model.fit( [train_in_grid, train_in_locals, train_in_position], train_out, validation_data=([test_in_grid, test_in_locals, test_in_position], test_out), epochs = 5 )

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [71]:
# Test out after training
generate_confusion_matrix( [test_in_grid, test_in_locals, test_in_position], y_test )

360	12	6	7
24	1030	3	11
11	3	2329	2
15	6	5	3002


In [72]:
model.save('./models/agent1_CNN')

INFO:tensorflow:Assets written to: ./models/agent1_CNN/assets
