# Using GPU

In [1]:
import os

os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"]="1"

# Loading Data

In [2]:
import numpy as np

data = np.load("FinalData/channel_first_data.npz")
normalData = data["input"]
shuffledData = data["input"]
np.random.shuffle(shuffledData)

trainInput,trainOutput = shuffledData[0:15000],shuffledData[0:15000]
testInput,testOutput = shuffledData[15000:],shuffledData[15000:]

# Helper Functions

In [3]:
import numpy as np
import Sokoban
import random
import tensorflow as tf
import keras.backend as K
from keras.losses import categorical_crossentropy

dataTensor = tf.convert_to_tensor(normalData, dtype=tf.float32)

def fulldata_loss(y_true, y_pred):
    loss = categorical_crossentropy(y_true, y_pred)
    prob = 1 - K.mean(dataTensor, axis=[0,1,2], keepdims=True)
    prob = tf.divide(prob, tf.keras.backend.max(prob))
    prob = K.sum(tf.multiply(y_true, K.reshape(prob,(1,1,1,7))), axis=3)
    
    return K.mean(tf.multiply(loss, prob), axis=[1,2])

def minibatch_loss(y_true, y_pred):
    loss = categorical_crossentropy(y_true, y_pred)
    prob = 1 - K.mean(y_true, axis=[0,1,2], keepdims=True)
    prob = tf.divide(prob, tf.keras.backend.max(prob))
    prob = K.sum(tf.multiply(y_true, K.reshape(prob,(1,1,1,7))), axis=3)
    
    return K.mean(tf.multiply(loss, prob), axis=[1,2])
    
def sample_loss(y_true, y_pred):
    loss = categorical_crossentropy(y_true, y_pred)
    prob = 1 - K.mean(y_true, axis=[1,2], keepdims=True)
    prob = tf.divide(prob, tf.keras.backend.max(prob))
    prob = K.sum(tf.multiply(K.reshape(K.repeat_elements(prob, 100, axis=1),(-1,10,10,7)), y_true),axis=3)

    return K.mean(tf.multiply(loss, prob), axis=[1,2])
    
def normal_loss(y_true, y_pred):
    return K.mean(categorical_crossentropy(y_true, y_pred), axis=[1,2])

def get_contractive_loss(loss_fn, lam=1e-4):
    def contractive_loss(y_true, y_pred):
        cce = loss_fn(y_true, y_pred)
        W = K.variable(value=autoencoder.get_layer('encoded').get_weights()[0])  # N x N_hidden
        W = K.transpose(W)  # N_hidden x N
        h = autoencoder.get_layer('encoded').output
        h = K.reshape(h,(-1,encoding))
        dh = h * (1 - h)  # N_batch x N_hidden
        # N_batch x N_hidden * N_hidden x 1 = N_batch x 1
        contractive = K.sum(dh**2 * K.sum(W**2, axis=1), axis=1)
        return cce + lam * contractive
    
    return contractive_loss

def numpyToString(array):
    gameCharacters="# @$.+*"
    char_to_int = dict((c, i) for i, c in enumerate(gameCharacters))
    int_to_char = dict((i, c) for i, c in enumerate(gameCharacters))
    intArray = np.argmax(array, axis=2)
    output=""
    for (i,j), index in np.ndenumerate(intArray):
        if i > 0 and j == 0:
            output += "\n"
        output += int_to_char[index]
    return output

def convertStringTo2DArray(string):
    return np.array([list(l) for l in string.split("\n")])

def getOneHotEncodingMap(data):
    gameCharacters="# @$.+*"
    char_to_int = dict((c, i) for i, c in enumerate(gameCharacters))
    int_to_char = dict((i, c) for i, c in enumerate(gameCharacters))

    encodingData = np.zeros((data.shape[0], data.shape[1], len(gameCharacters)))
    for (i,j),c in np.ndenumerate(data):
        index=char_to_int[c]
        encodingData[i][j][index]=1
    return encodingData

def getHammingDistance(data1, data2):
    data1=np.argmax(data1, axis=3)
    data2=np.argmax(data2, axis=3)
    result=np.absolute(data1-data2)
    result=np.clip(result,0,1)
    return np.sum(result)/data1.shape[0]

def combineTwoStrings(str1, str2):
    if len(str1) == 0:
        return str2
    if len(str2) == 0:
        return str1
    lines1=str1.split("\n")
    lines2=str2.split("\n")
    linesOut=[]
    for i in range(10):
        linesOut.append(lines1[i] + "  " + lines2[i])
    return "\n".join(linesOut)

def getNoisedState(numpyLevel, randomSize=20):
    stringLvl=numpyToString(numpyLevel)
    state=Sokoban.State()
    state.stringInitialize(stringLvl.split("\n"))
    for i in range(randomSize):
        if random.random() < 0.5:
            state.update(0, 2*random.randint(0,1) - 1)
        else:
            state.update(2*random.randint(0,1) - 1, 0)
    return getOneHotEncodingMap(convertStringTo2DArray(str(state)))

  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
  from ._conv import register_converters as _register_converters
Using TensorFlow backend.


# Building the Model

In [None]:
import keras.layers as layers
from keras.models import Model

encoding=8

input_level=layers.Input(shape=(10,10,7))
encoded_level=layers.Input(shape=(encoding,))

x = layers.ZeroPadding2D(padding=(1,1))(input_level)
x = layers.Conv2D(128, (3,3), activation="relu", padding="same")(x)
x = layers.MaxPooling2D((2,2), padding="same")(x)
x = layers.Conv2D(64, (3,3), activation="relu", padding="same")(x)
x = layers.MaxPooling2D((2,2), padding="same")(x)
x = layers.Conv2D(32, (3,3), activation="relu", padding="same")(x)
x = layers.MaxPooling2D((3,3), padding="same")(x)
x = layers.Dense(encoding, activation="relu")(x)
encoded = layers.Flatten()(x)

x = layers.Reshape((1,1,encoding))(encoded)
x = layers.Dense(32, activation="relu")(x)
x = layers.UpSampling2D((3,3))(x)
x = layers.Conv2D(64, (3,3), activation="relu", padding="same")(x)
x = layers.UpSampling2D((2,2))(x)
x = layers.Conv2D(128, (3,3), activation="relu", padding="same")(x)
x = layers.UpSampling2D((2,2))(x)
x = layers.Conv2D(7, (3,3), padding="valid")(x)
decoded = layers.Softmax(axis=3)(x)

autoencoder = Model(input_level, decoded)
encoder = Model(input_level, encoded)
decoded = autoencoder.layers[-9](encoded_level)
decoded = autoencoder.layers[-8](decoded)
decoded = autoencoder.layers[-7](decoded)
decoded = autoencoder.layers[-6](decoded)
decoded = autoencoder.layers[-5](decoded)
decoded = autoencoder.layers[-4](decoded)
decoded = autoencoder.layers[-3](decoded)
decoded = autoencoder.layers[-2](decoded)
decoded = autoencoder.layers[-1](decoded)
decoder = Model(encoded_level,decoded)

autoencoder.compile(optimizer='adam', loss=normal_loss)
autoencoder.summary()

# Clear Model Files

In [None]:
import os

files=os.listdir("Weights/")
for f in files:
    if not os.path.isdir("Weights/" + f):
        os.remove("Weights/" + f)

# Training the Model

In [None]:
from keras.callbacks import ModelCheckpoint

autoencoder.fit(trainInput, trainOutput
                ,epochs=2000
                ,batch_size=128
                ,shuffle=True
                ,validation_data=(testInput, testOutput)
                ,callbacks=[ModelCheckpoint("Weights/model_{epoch:02d}_{loss:.2f}_{val_loss:.2f}.hdf5", 
                                            save_weights_only=False, save_best_only=False, period=20)]
               )

# Testing the Model

In [None]:
import random

usedInput = trainInput
usedOutput = trainInput

autoencoder.load_weights("Weights/model_2000_0.17_0.33.hdf5")

encodedLevels = encoder.predict(usedInput)
decodedLevels = decoder.predict(encodedLevels)
print(autoencoder.evaluate(usedInput, usedOutput, batch_size=128))

stringNormalLevels = ""
stringDecodedLevels = ""
for i in range(9):
    levelNumber=random.randint(0,len(usedInput)-1)
    stringNormalLevels = combineTwoStrings(stringNormalLevels, numpyToString(usedInput[levelNumber]))
    stringDecodedLevels = combineTwoStrings(stringDecodedLevels, numpyToString(decodedLevels[levelNumber]))
    
print(stringNormalLevels)
print()
print(stringDecodedLevels)

# Load a certain Model

In [None]:
import os

files=os.listdir("FinalWeights/Conv_16/")
for f in files:
    if "hdf5" not in f or "32" in f:
        continue
    autoencoder.load_weights("FinalWeights/Conv_16/" + f)
    encodedLevels = encoder.predict(normalData)
    decodedLevels = decoder.predict(encodedLevels)
    print(f)
    print(getHammingDistance(normalData,decodedLevels))

In [None]:
print(numpyToString(normalData[0]))
print()
print(numpyToString(getNoisedState(normalData[0])))