In [1]:
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import WebDriverException
import matplotlib.pyplot as plt
import time
import os
import random
from PIL import Image, ImageOps

#### Parameters

In [2]:
STILL_ALIVE_REWARD = 1
DEAD_REWARD = -1000

CROP_SHAPE = (750, 539, 1)
RESIZE_WIDTH = 180
RESIZE_HEIGHT = 137
READ_BATCH = 2

RIGHT = 1
DOWN = 2
LEFT = 3
UP = 4

SHOTS_FOLDER = 'data/shots/'

#### Browser functions

In [3]:
def selectLevel(l):
    xpath = '/html/body/section/div[2]/nav/p[' + str(l+1) + ']'
    
    level = browser.find_element_by_xpath(xpath)
    level.click()
    
    
def clickBoard():        
    xpath = '/html/body/section/div[2]/div'
    board = browser.find_element_by_xpath(xpath)

    try:
        board.click()
    except WebDriverException:
        print('Exception')
        return
    
    
def getScreen(fileName):    
    state = getState()
#     if state == 'playing' or state == 'paused':    
    ss = browser.get_screenshot_as_file(SHOTS_FOLDER + fileName)
    
    
def getState():
    xpath = '/html/body/section/div[2]'
    state = browser.find_element_by_xpath(xpath)
    c = state.get_attribute('class')
    
    return c.split(' ')[-1]


def getScore():
    state = getState()
    
    if state == 'playing' or state == 'paused':
        
        xpath = '/html/body/section/div[2]/p[1]/span'
        score = browser.find_element_by_xpath(xpath)

        if not score.text.isnumeric():
            return 0
        return int(score.text)
    
    return 0


def makeMove(m):
    xpath = '/html/body/section/div[2]/div'
    board = browser.find_element_by_xpath(xpath)

    if m == RIGHT:
        browser.find_element_by_tag_name('body').send_keys(Keys.ARROW_RIGHT)
    elif m == DOWN:
        browser.find_element_by_tag_name('body').send_keys(Keys.ARROW_DOWN)
    elif m == LEFT:
        browser.find_element_by_tag_name('body').send_keys(Keys.ARROW_LEFT)
    elif m == UP:
        browser.find_element_by_tag_name('body').send_keys(Keys.ARROW_UP)
        

def makeRandomMove():
    r = random.randint(0, 4)
    makeMove(r)
    
    return r

#### Preprocess functions

In [4]:
def getLastGameIndex():
    '''Get the index of the last current game.'''
    files = os.listdir(SHOTS_FOLDER)
    if len(files) == 0:
        return 0
    
    last = files[-1]
    
    return parseName(last)[0]


def getName(g, i, m, r):
    '''Form the file name from the board data.'''
    return str(g).zfill(3) + '_' + str(i).zfill(3) + '_' + str(m) + '_' + str(r) + '.png'


def parseName(fileName):
    '''Parse the board data from the file name.'''
    return [int(d) for d in fileName[:-4].split('_')]


def cropImgOld(fileName):
    ss = plt.imread(fileName)

    ss = ss[120:659, 273:1023, :]
    
    plt.imsave(fileName, ss)
        

def preprocImg(fileName):
    '''Preprocess a screenshot.'''
    im = Image.open(fileName)    
    
    # Crop board
    im = im.crop((273, 120, 1023, 659))
    
    # Grayscale
    im = ImageOps.grayscale(im)
    
    # Binarization
    t = 127
    im = im.point(lambda x: 255 if x > t else 0)
    
    # Resize
    im = im.resize((RESIZE_WIDTH, RESIZE_HEIGHT))
    
    return im
    
    
def preprocAll(gameIndex):
    '''Preprocess all screenshots in the data folder.'''
    files = os.listdir(SHOTS_FOLDER)
    for f in files:
        g = parseName(f)[0]
        
        if g >= gameIndex:        
            fileName =  SHOTS_FOLDER + f
            im = preprocImg(fileName)
            im.save(fileName)

In [9]:
preprocAll(86)

#### Play game

In [14]:
url = 'https://playsnake.org/'

browser = webdriver.Chrome(executable_path='D:/Libraries/Drivers/chromedriver_win32/chromedriver.exe')
# browser = webdriver.Chrome(executable_path='D:/Biblioteci/Python/chromedriver_win32/chromedriver.exe')
browser.get(url) 

START_GAME = getLastGameIndex() + 1
gameIndex = START_GAME

maxGames = 100

for g in range(maxGames):
    
    # Select the level
    selectLevel(1)
    time.sleep(2)
    
    score = 0
    screenIndex = 0
    
    clickBoard()
    while getState() == 'playing' or getState() == 'paused':
        s2 = getScreen('prev.png')
        screenIndex += 1    

        clickBoard()
        m = makeRandomMove()
        time.sleep(0.1)
        clickBoard()
        
        # Update last board  
        r = max(STILL_ALIVE_REWARD, getScore() - score) 
        fileName = getName(gameIndex, screenIndex, m, r)
        os.rename(SHOTS_FOLDER + 'prev.png', SHOTS_FOLDER + fileName)
        score = getScore()
        
    # Change reward of last board
    g, i, m, r = parseName(fileName)
    os.rename(SHOTS_FOLDER + fileName, SHOTS_FOLDER + getName(g, i, m, DEAD_REWARD))
    
    time.sleep(1)
    
    gameIndex += 1

browser.quit()

preprocAll(START_GAME)

# RIGHT = 1
# DOWN = 2
# LEFT = 3
# UP = 4

### Q model

In [35]:
from keras.models import Sequential, Model
import keras.layers
from keras.layers import Conv2D, MaxPool2D, Flatten, Dense, Dropout, Input
from keras.regularizers import l2
from keras.optimizers import Adam, Adadelta, RMSprop
import keras.losses as losses
from keras.backend import set_image_data_format
from keras.wrappers.scikit_learn import KerasClassifier
from sklearn.model_selection import StratifiedKFold, cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
import numpy as np
import glob

set_image_data_format('channels_first')

#### Input functions

In [32]:
def readData():
    files = os.listdir(SHOTS_FOLDER)
    
    labels = np.zeros(len(files))    
    data = []
    
    for i, f in enumerate(files):
        im = Image.open(SHOTS_FOLDER + f)
        
        if f in history:
            labels[i] = history[f][2]
        else:
            labels[i] = STILL_ALIVE_REWARD
            
        data.append([np.asarray(im.convert("L"))])
        
    data = np.asarray(data)
    return data, labels


def sampleData(nSamples, propD, propR, propA):
    '''Sample the input data according to some proportions.
    
    Parameters
    ----------
    nSamples: total number of samples
    propD: proportion of samples which represent a death
    propR: proportion of samples which represent a reward
    propA: proportion of rest of samples
    '''
    
    files = os.listdir(SHOTS_FOLDER)
    
    perc = []
    nd, na, nr = (0, 0, 0)
    for f in files:
        r = parseName(f)[3]
        if r == DEAD_REWARD:
            perc.append('d')
            nd += 1
        elif r == STILL_ALIVE_REWARD:
            perc.append('a')
            na += 1
        else:
            perc.append('r')
            nr += 1
        
    n = propD * nd + propA * na + propR * nr
    pd = propD / n
    pa = propA / n
    pr = propR / n
    for i in range(len(perc)):
        if perc[i] == 'd':
            perc[i] = pd
        elif perc[i] == 'a':
            perc[i] = pa
        else:
            perc[i] = pr
        
    samples = np.random.choice(files, size=nSamples, replace=False, p=perc)
    return samples


def readSamplesBasic(samples):
    '''Read samples for basic model, which just classifies if a board is finished or not.'''
    data = []
    labels = []
    
    for file in samples:
        im = Image.open(SHOTS_FOLDER + file)
        g, i, m, r = parseName(file)
        
        data.append([np.asarray(im.convert("L"))])
        labels.append(1 if r > 0 else 0)
        
    data = np.asarray(data)
    return data, labels


def getPreviousSample(file):
    '''Get the name of the previous sample.'''
    
    g, i, m, r = parseName(file)
    i -= 1
    
    if i > 0:        
        prevFile = glob.glob(SHOTS_FOLDER + str(g).zfill(3) + '_' + str(i).zfill(3) + '*')[0]
        prevFile = prevFile.replace('\\', '/')
        prevFile = prevFile.split('/')[-1]
#         print(prevFile)
        return prevFile      
    else:
        return -1
    
    
def readSample(file):
    '''Read a single sample as an array.'''
    im = Image.open(SHOTS_FOLDER + file)
    g, i, m, r = parseName(file)

    data = np.asarray(im.convert("L"))
    label = 1 if r > 0 else 0
    
    return data, label
    
    
def readSampleMove(file):
    '''Read a single sample as an array, also returns the move.'''
    im = Image.open(SHOTS_FOLDER + file)
    g, i, m, r = parseName(file)

    data = np.asarray(im.convert("L"))
#     if r == -1000:
#         label = -1
#     elif r > 1:
#         label = r / 100
#     else:
#         label = 0
    label = r
    
    return data, m, label


def readBatchSamples(samples, batch):
    '''Read samples in batches, so that we also have information about the movement.'''
    data = []
    labels = []
    
    for file in samples:        
        dataBatch = []
        
        d, label = readSample(file)
        dataBatch.append(d)
        
        # Get previous files
        prevFile = file
        for p in range(batch - 1):
            prevFile = getPreviousSample(prevFile)
            
            if prevFile == -1:
                break
                
            d, _ = readSample(prevFile)
            dataBatch.append(d)
        
        if prevFile != -1:
            dataBatch.reverse()
            data.append(dataBatch)
            labels.append(label)
        
    data = np.asarray(data)
    return data, labels   


def readBatchSamplesMove(samples, batch):
    '''Read samples in batches, so that we also have information about the movement.
    Also reads the moves.'''
    data = []
    actions = []
    labels = []
    
    for file in samples:        
        dataBatch = []
        
        d, a, r = readSampleMove(file)
        dataBatch.append(d)
        
        # Get previous files
        prevFile = file
#         print(file)
        for p in range(batch - 1):
            prevFile = getPreviousSample(prevFile)
#             print('>', prevFile)
            
            if prevFile == -1:
                break
                
            d, _ = readSample(prevFile)
            dataBatch.append(d)
        
        if prevFile != -1:
            dataBatch.reverse()
            data.append(dataBatch)
            actions.append(a)
            labels.append(r)
#             print('->', a, r)
        
    data = np.asarray(data)
    actions = np.asarray(actions)
    return data, actions, labels    
    

In [31]:
model = nn_model_full()
data, actions, labels = readBatchSamplesMove(sampleData(1, 1, 10, 0.5), READ_BATCH)

#### DNN

In [7]:
def nn_model_basic():
    '''NN model for basic test.'''
    model = Sequential()
    
    model.add(Conv2D(16, (4, 4), input_shape=(1, RESIZE_HEIGHT, RESIZE_WIDTH), activation='relu', name='conv1'))
    model.add(Conv2D(16, (4, 4), activation='relu', name='conv2'))
    model.add(MaxPool2D(pool_size=(2,2), name='pool1'))
#     model.add(Dropout(0.2))
    
    model.add(Conv2D(8, (4, 4), activation='relu', name='conv3'))
    model.add(Conv2D(8, (4, 4), activation='relu', name='conv4'))
    model.add(MaxPool2D(pool_size=(2,2), name='pool2'))
#     model.add(Dropout(0.2))
    
    model.add(Conv2D(16, (4, 4), activation='relu', name='conv5'))
    model.add(MaxPool2D(pool_size=(2,2), name='pool3'))
#     model.add(Dropout(0.2))
    
    model.add(Flatten(name='flat'))
    model.add(Dense(100, activation='relu', name='dense1'))
    model.add(Dropout(0.5))
    model.add(Dense(40, activation='relu', name='dense2'))
    model.add(Dropout(0.5))
    model.add(Dense(1, activation='sigmoid', name='output'))
    
    model.compile(loss='binary_crossentropy',
                 optimizer = 'rmsprop',
                 metrics=['accuracy'])
    
    return model



def nn_model_seq():
    '''Sequential NN model.'''
    model = Sequential()
    
    model.add(Conv2D(16, (4, 4), input_shape=(READ_BATCH, RESIZE_HEIGHT, RESIZE_WIDTH), activation='relu', name='conv1'))
    model.add(Conv2D(16, (4, 4), activation='relu', name='conv2'))
    model.add(MaxPool2D(pool_size=(2,2), name='pool1'))
#     model.add(Dropout(0.2))
    
    model.add(Conv2D(8, (4, 4), activation='relu', name='conv3'))
    model.add(Conv2D(8, (4, 4), activation='relu', name='conv4'))
    model.add(MaxPool2D(pool_size=(2,2), name='pool2'))
#     model.add(Dropout(0.2))
    
    model.add(Conv2D(16, (4, 4), activation='relu', name='conv5'))
    model.add(MaxPool2D(pool_size=(2,2), name='pool3'))
#     model.add(Dropout(0.2))
    
    model.add(Flatten(name='flat'))
    model.add(Dense(100, activation='relu', name='dense1'))
    model.add(Dropout(0.5))
    model.add(Dense(40, activation='relu', name='dense2'))
    model.add(Dropout(0.5))
    model.add(Dense(1, activation='sigmoid', name='output'))
    
    model.compile(loss='binary_crossentropy',
                 optimizer = 'rmsprop',
                 metrics=['accuracy'])
    
    return model



def nn_model_full():
    '''Full NN model, using the Keras Functional API.'''
    screen_input = Input(shape=(READ_BATCH, RESIZE_HEIGHT, RESIZE_WIDTH), name='screen_input')
    
    conv = Conv2D(16, (4, 4), activation='relu', name='conv1')(screen_input)
    conv = Conv2D(16, (4, 4), activation='relu', name='conv2')(conv)
    pool = MaxPool2D(pool_size=(2,2), name='pool1')(conv)
    
    conv = Conv2D(8, (4, 4), activation='relu', name='conv3')(pool)
    conv = Conv2D(8, (4, 4), activation='relu', name='conv4')(conv)
    pool = MaxPool2D(pool_size=(2,2), name='pool2')(conv)
    
    conv = Conv2D(16, (4, 4), activation='relu', name='conv5')(pool)
    pool = MaxPool2D(pool_size=(2,2), name='pool3')(conv)
    
    flat = Flatten(name='flat')(pool)
    dense = Dense(100, activation='relu', name='dense1')(flat)
    drop = Dropout(0.5)(dense)
    dense = Dense(40, activation='relu', name='dense2')(drop)
    drop = Dropout(0.5)(dense)
    
    action_input = Input(shape=(1,), name='action_input')
    concat = keras.layers.concatenate([drop, action_input])
    
    output = Dense(1, name='output')(concat)
    
    model = Model(inputs=[screen_input, action_input], outputs=output)
    
    model.compile(loss='binary_crossentropy',
                 optimizer = 'rmsprop',
                 metrics=['accuracy'])
    
    return model
    


#### Evaluation

In [66]:
# Basic model
# model = nn_model_basic()
# data, labels = readSamplesBasic(sampleData(200, 10, 0.5, 0.5))
# print(data.shape)
# print(labels)
# model.fit(data, labels, epochs=7, batch_size=10, shuffle=True)


# Sequential model 
# model = nn_model_seq()
# data, labels = readBatchSamples(sampleData(200, 10, 0.5, 0.5), READ_BATCH)
# print(data.shape)
# print(labels)
# model.fit(data, labels, epochs=7, batch_size=10, shuffle=True)


# Full model
model = nn_model_full()
data, actions, labels = readBatchSamplesMove(sampleData(500, 5, 5, 1), READ_BATCH)
print(data.shape)
print(len(actions))
print(labels)
labels = np.asarray(labels).reshape(-1, 1)
scaler = StandardScaler()
scaler.fit(labels)
labels = scaler.transform(labels)
model.fit([data, actions], labels, epochs=5, batch_size=10, shuffle=True)

(454, 2, 137, 180)
454
[0, 0, -1, 0, -1, 0, -1, -1, -1, 0, -1, -1, 0, -1, -1, 0, -1, 0, 0, 0, -1, 0, -1, -1, -1, 0, 0, 0, 0, -1, -1, 0, -1, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 0, -1, -1, 0, -1, -1, -1, 0, 0, 0, -1, 0, 0, -1, -1, 0, 0, 0, 0, 0, 0, 0, 0, -1, -1, 0, -1, -1, 0, 0, 0, -1, -1, 0, -1, 0, 0, -1, -1, -1, -1, -1, 0, 0, -1, 0, 0, 0, 0, -1, -1, 0, -1, 0, -1, 0, 0, 0, 0, 0, -1, -1, -1, 0, 0, 0, 0, 0, 0, -1, 0, -1, -1, -1, -1, -1, 0, -1, 0, 0, -1, -1, -1, -1, -1, 0, 0.7, -1, 0, 0, 0, -1, -1, -1, 0, -1, -1, 0, -1, 0, 0, -1, 0, -1, 0, -1, -1, 0, 0, 0, -1, 0, 0, -1, 0, -1, 0, 0, 0, 0, -1, 0.4, -1, 0, 0, 0, 0, 1.55, 0, 0, -1, -1, 0, -1, -1, -1, 0, -1, 0.6, -1, 0, -1, -1, 0, -1, -1, 0, -1, 0, 0, -1, -1, 0, -1, 0, -1, -1, 0, -1, -1, 0, -1, -1, 0, 0, -1, -1, -1, 0, -1, 0, 0, 0, 0, 0, 0, -1, 0, 0, -1, 0, 0, 0, 0, -1, -1, 0, 0, 0, -1, 0.8, -1, -1, 0, 0, -1, 0, 0, -1, 0, 0, -1, 0, -1, 0, -1, 0, 0, -1, -1, -1, 0, 0, 0, -1, 0, 0, -1, -1, 0.4, -1, 0, -1, 0, 0, 0, 0, 0, -1, 0, 0, -1, 0, -1, -1, -1,

<keras.callbacks.History at 0x23fd2921c88>

In [67]:
# model.evaluate(data[30:50], labels[30:50])

print(model.predict([data[30:50], actions[30:50]]))
print(labels[30:50])

[[-212.66487]
 [-210.52394]
 [-212.94576]
 [-212.59361]
 [-213.29456]
 [-212.01389]
 [-209.86633]
 [-209.0947 ]
 [-210.29852]
 [-215.04355]
 [-211.09372]
 [-210.51903]
 [-213.66022]
 [-214.50018]
 [-209.20973]
 [-211.90112]
 [-209.58304]
 [-210.8607 ]
 [-213.99167]
 [-216.87529]]
[[-1.14550706]
 [ 0.80082604]
 [-1.14550706]
 [ 0.80082604]
 [ 0.80082604]
 [-1.14550706]
 [-1.14550706]
 [-1.14550706]
 [-1.14550706]
 [ 0.80082604]
 [ 0.80082604]
 [ 0.80082604]
 [ 0.80082604]
 [ 0.80082604]
 [-1.14550706]
 [-1.14550706]
 [ 0.80082604]
 [-1.14550706]
 [-1.14550706]
 [-1.14550706]]


In [65]:
# Build pipeline

layers = []
layers.append(('nn', KerasClassifier(build_fn=nn_model_full,
                                   epochs=5,
                                   batch_size=10,
                                   verbose=1)))
estimator = Pipeline(layers)


# Evaluate

data, actions, labels = readBatchSamplesMove(sampleData(200, 10, 0.5, 0.5), 2)

# print(labels)
# labels = np.asarray(labels).reshape(-1, 1)
# scaler = StandardScaler()
# scaler.fit(labels)
# labels = scaler.transform(labels)
# print(labels.reshape(labels.size))

labels = np.asarray(labels)
labels = labels.reshape((labels.size, 1))
actions = actions.reshape((actions.size, 1))
print(data.shape)
print(actions.shape)
print(labels.shape)
     
sKFold = StratifiedKFold(n_splits=2)
results = cross_val_score(estimator, [data, actions], labels, cv=sKFold, verbose=1, scoring='accuracy')

print("Results: %.5f (%.5f)" % (results.mean(), results.std()))

(193, 2, 137, 180)
(193, 1)
(193, 1)


ValueError: Found input variables with inconsistent numbers of samples: [2, 193]