In [None]:
import tensorflow as tf

In [None]:
# physical_devices = tf.config.list_physical_devices('GPU')
# tf.config.experimental.set_memory_growth(physical_devices[0], True)

In [None]:
tf.__version__

In [None]:
import os
import numpy as np
import cv2
from PIL import Image
import matplotlib.pyplot as plt
from math import ceil

In [None]:
from chessutils import find_coeffs
from boardgen import moirebackground, chessboard

# Datagenerator

In [None]:
PATH_TO_IMG = 'img'

In [None]:
IMGSIZE = 480
MAXSHEAR = 0.15
MINSCALE = 0.5

In [None]:
figimgs = [f for f in os.listdir(PATH_TO_IMG) if f.split('_')[0]=='Chess']
figuresimgs = dict()
for f in figimgs:
    fn = f.split('_')[1].split('4')[0]
    img = cv2.imread(os.path.join(PATH_TO_IMG, f))
    figuresimgs[fn] = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

figs = ['p', 'b', 'n', 'r', 'q', 'k']
colors = ['d', 'l']

In [None]:
parameters = {
    'numcell':8,
    'cellsize':45,
    'figures':figs,
    'colors':colors,
    'shear':MAXSHEAR,
    'scale':MINSCALE,
}

In [None]:
NC = 5
boardsize = parameters['cellsize'] * parameters['numcell']
blank_mask = 255*np.ones((boardsize,boardsize), np.uint8)
for i in range(parameters['numcell']*parameters['numcell']):
    xp = i % parameters['numcell']
    yp = i // parameters['numcell']
    if (xp != 0) and (yp != 0):
        blank_mask[yp*parameters['cellsize']-NC:yp*parameters['cellsize']+NC,xp*parameters['cellsize']-NC:xp*parameters['cellsize']+NC] = 0


In [None]:
def boardmask(imgsize):
    boardimage, _, vecs = chessboard(figuresimgs, np.random.rand(13), imgsize, parameters)
    background = moirebackground(np.random.rand(8), imgsize)
    result = (background * np.asarray(boardimage)).astype(np.uint8)

    img = Image.fromarray(blank_mask, 'L')
    coeffs = find_coeffs(
         vecs.reshape((4,2)),
         [(0, 0), (boardsize, 0), (boardsize, boardsize), (0, boardsize)])

    img = img.transform((imgsize, imgsize), Image.PERSPECTIVE, coeffs, Image.NEAREST, fillcolor = 'white')
    
    brd_out = np.expand_dims(result / 255, axis=-1)
    msk_out = np.expand_dims(np.asarray(img) / 255, axis=-1)
    
    return brd_out, msk_out

In [None]:
def boardmaskgen():
    bd, rc = boardmask(IMGSIZE)
    yield bd, rc

In [None]:
dataset = tf.data.Dataset.from_generator(boardmaskgen,
                                         output_signature=
                                         (
                                          tf.TensorSpec(shape=(IMGSIZE,IMGSIZE,1), dtype=tf.float32),
                                          tf.TensorSpec(shape=(IMGSIZE,IMGSIZE,1), dtype=tf.float32)
                                         ))

In [None]:
for f in dataset.take(1):
    print(f[0].shape, f[1].shape)

In [None]:
for f in dataset.take(1):
    print(np.unique(f[1]))

In [None]:
for f in dataset.repeat(20).take(1):
    fig, axxes = plt.subplots(ncols=3, nrows=1, figsize=(9,3), sharex=True, sharey=True)
    axxes[0].imshow(np.squeeze(f[0]), cmap='gray')
    axxes[1].imshow(np.squeeze(f[1]), cmap='gray')
    axxes[2].imshow(np.squeeze(f[1]), cmap='gray', alpha=0.5)
    axxes[2].imshow(np.squeeze(f[0]), cmap='gray', alpha=0.5)
    axxes[0].axis('off')
    axxes[1].axis('off')
    axxes[2].axis('off')
    fig.tight_layout()
    plt.show()

In [None]:
trainset = dataset.repeat().batch(8)
valset = dataset.repeat(50).batch(1)

## UNET

In [None]:
from keras.models import Model
from keras.layers import Input, Conv2D, GlobalAveragePooling2D, Dense, Dropout, MaxPooling2D, Flatten, Conv2DTranspose, Concatenate

In [None]:
# Encoder Utilities

def conv2d_block(input_tensor, n_filters, name, kernel_size=3):
    '''
    Adds 2 convolutional layers with the parameters passed to it

    Args:
    input_tensor (tensor) -- the input tensor
    n_filters (int) -- number of filters
    kernel_size (int) -- kernel size for the convolution

    Returns:
    tensor of output features
    '''
    # first layer
    x = input_tensor
    for i in range(2):
        x = Conv2D(filters = n_filters, kernel_size = (kernel_size, kernel_size),\
                kernel_initializer = 'he_normal', padding = 'same', activation='relu', name=f'{name}_{i}')(x)

    return x


def encoder_block(inputs, name, n_filters=64, pool_size=(2,2), dropout=0.3):
    '''
    Adds two convolutional blocks and then perform down sampling on output of convolutions.

    Args:
    input_tensor (tensor) -- the input tensor
    n_filters (int) -- number of filters
    kernel_size (int) -- kernel size for the convolution

    Returns:
    f - the output features of the convolution block 
    p - the maxpooled features with dropout
    '''

    f = conv2d_block(inputs, n_filters=n_filters, name=f'{name}_conv')
    p = MaxPooling2D(pool_size=(2,2), name=f'{name}_pool')(f)
    p = Dropout(0.3, name=f'{name}_drop')(p)

    return f, p


def encoder(inputs):
    '''
    This function defines the encoder or downsampling path.

    Args:
    inputs (tensor) -- batch of input images

    Returns:
    p4 - the output maxpooled features of the last encoder block
    (f1, f2, f3, f4) - the output features of all the encoder blocks
    '''
    f1, p1 = encoder_block(inputs, name='enc1', n_filters=32, pool_size=(2,2), dropout=0.3)
    f2, p2 = encoder_block(p1, name='enc2', n_filters=64, pool_size=(2,2), dropout=0.3)

    return p2, (f1, f2)

In [None]:
def bottleneck(inputs):
    '''
    This function defines the bottleneck convolutions to extract more features before the upsampling layers.
    '''

    bottle_neck = conv2d_block(inputs, n_filters=128, name='bneck')

    return bottle_neck

In [None]:
# Decoder Utilities

def decoder_block(inputs, conv_output, name, n_filters=64, kernel_size=3, strides=3, dropout=0.3):
    '''
    defines the one decoder block of the UNet

    Args:
    inputs (tensor) -- batch of input features
    conv_output (tensor) -- features from an encoder block
    n_filters (int) -- number of filters
    kernel_size (int) -- kernel size
    strides (int) -- strides for the deconvolution/upsampling
    padding (string) -- "same" or "valid", tells if shape will be preserved by zero padding

    Returns:
    c (tensor) -- output features of the decoder block
    '''
    u = Conv2DTranspose(
      n_filters,
      kernel_size, 
      strides=strides,
      padding='same', name=f'{name}_trans')(inputs)
    c = Concatenate(name=f'{name}_conc')([u, conv_output])
    c = Dropout(dropout, name=f'{name}_drop')(c)
    c = conv2d_block(c, n_filters, name=f'{name}_conv', kernel_size=3)

    return c


def decoder(inputs, convs):
    '''
    Defines the decoder of the UNet chaining together 4 decoder blocks. 

    Args:
    inputs (tensor) -- batch of input features
    convs (tuple) -- features from the encoder blocks

    Returns:
    outputs (tensor) -- the pixel wise label map of the image
    '''

    f1, f2 = convs

    c8 = decoder_block(inputs, f2, name='dec2', n_filters=64, kernel_size=(3,3), strides=(2,2), dropout=0.3)
    c9 = decoder_block(c8, f1, name='dec3', n_filters=32, kernel_size=(3,3), strides=(2,2), dropout=0.3)

    outputs = Conv2D(2, (1, 1), activation='softmax', name='finalb')(c9)

    return outputs

In [None]:
def unet():
    '''
    Defines the UNet by connecting the encoder, bottleneck and decoder.
    '''

    inputs = tf.keras.layers.Input(shape=(IMGSIZE,IMGSIZE,1))
    encoder_output, convs = encoder(inputs)
    bottle_neck = bottleneck(encoder_output)
    outputs = decoder(bottle_neck, convs)
    model = Model(inputs=inputs, outputs=outputs, name='unet_board')

    return model

In [None]:
model = unet()
model.summary()

In [None]:
# configure the optimizer, loss and metrics for training
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

In [None]:
# configure the training parameters and train the model

EPOCHS = 3

model_history = model.fit(trainset,
                          steps_per_epoch=200,
                          epochs=EPOCHS,
                          validation_data=valset)

In [None]:
for g in valset.take(1):
    fig, axx = plt.subplots(nrows=1, ncols=3, figsize=(15,5), sharey=True)
    axx[0].imshow(np.squeeze(g[0].numpy()), cmap='gray')
    axx[0].axis('off')
    axx[1].imshow(g[1].numpy()[0,:,:,0], cmap='gray')
    axx[1].axis('off')
    pred = np.squeeze(model.predict(g[0]))
    b = np.argmax(pred[:,:,0:2], axis=-1)
    axx[2].imshow(np.squeeze(g[0].numpy()), cmap='gray', alpha=0.5)
    axx[2].imshow(b, cmap='gray', alpha=0.5)
    axx[2].axis('off')
    fig.tight_layout()
    plt.show()

In [None]:
model.save('models/unet_board_v4.h5')