In [22]:
import tensorflow as tf
import pandas as pd
import numpy as np

from tensorflow.keras.layers import Conv2D

import matplotlib.pyplot as plt

In [4]:
print("Tensorflow version: {}".format(tf.__version__))
print("Is GPU available? {}".format(tf.config.list_physical_devices('GPU')))

Tensorflow version: 2.3.1
Is GPU available? [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


### Encoding Layer Class

This function is to perform the rnn_conv operation that makes up the building block of the encoder and decoder

In [5]:
ENCODER_INPUT_DIM = 32
ENCODER1_DIM = 64
ENCODER2_DIM = 128
ENCODER3_DIM = 128
DECODER_INPUT_DIM = 128
DECODER1_DIM = 128
DECODER2_DIM = 128
DECODER3_DIM = 64
DECODER4_DIM = 32

Encoding Layer Class

In [23]:
def padding(x, stride):

    if x % stride == 0:
        return x // stride
    else:
        return x // stride + 1

def initial_hidden(input_size, filters, kernel_size):
    
    """Initialize hidden and cell states, all zeros"""
    
    shape = [input_size] + kernel_size + [filters]
    
    hidden = tf.zeros(shape)
    cell = tf.zeros(shape)
    
    return hidden, cell

In [24]:
def rnn_conv(inputs, hiddens, filters, kernel_size, strides):
    '''Convolution RNN cell

    See detail in formula (4) in paper
    "Full Resolution Image Compression with Recurrent Neural Networks"
    https://arxiv.org/pdf/1608.05148.pdf

    Args:
        name: name of current Conv RNN layer
        inputs: inputs tensor with shape (batch_size, height, width, channel)
        hiddens: hidden states from the previous iteration
        kernel_size: tuple of kernel size
        strides: strides size

    Output:
        hidden state and cell state of this layer
    '''
    
    gates_filters = 4 * filters
    hidden, cell = hiddens
    
    conv_inputs = Conv2D(inputs=inputs, filters=gates_filters, kernel_size=kernel_size, strides=strides, padding='same')
    conv_hidden = Conv2D(inputs=hidden, filters=gates_filters, kernel_size=kernel_size, padding='same')
        
    in_gate, f_gate, out_gate, c_gate = tf.split(conv_inputs + conv_hidden, 4, axis=-1)
    in_gate = tf.nn.sigmoid(in_gate)
    f_gate = tf.nn.sigmoid(f_gate)
    out_gate = tf.nn.sigmoid(out_gate)
    c_gate = tf.nn.tanh(c_gate)
    new_cell = tf.multiply(f_gate, cell) + tf.multiply(in_gate, c_gate)
    new_hidden = tf.multiply(out_gate, tf.nn.tanh(new_cell))
    
    return new_hidden, new_cell

In [39]:
class Encoder(object):
    
    def __init__(self, batch_size, height=32, width=32):
        
        """
        batch_size: mini-batch size
        height: input height of the image getting compressed by the encoder
        width: input width of the image getting compressed by the decoder
        """
        
        self.batch_size = batch_size
        self.height = height
        self.width = width
        self.init_hidden() # initalize the hidden states in the RNN
        
    def init_hidden(self):
        
        # initalize the hidden and cell states within the encoder
        
        height = padding(padding(self.height, 2), 2)
        width = padding(padding(self.width, 2), 2)
        
        self.hiddens1 = initial_hidden(self.batch_size, ENCODER1_DIM, [height, width])
        height = padding(height, 2)
        width = padding(width, 2)
        
        self.hiddens2 = initial_hidden(self.batch_size, ENCODER2_DIM, [height, width])
        height = padding(height, 2)
        width = padding(width, 2)
        
        self.hiddens3 = initial_hidden(self.batch_size, ENCODER3_DIM, [height, width])
        
    def encode(self, inputs):

        """Compress inputs into a vector of 128 lengths with value {-1, 1}"""
                
        encoder_rnn_input = Conv2D(filters=ENCODER_INPUT_DIM, kernel_size=[3, 3], strides=(2, 2), padding='same')(inputs)
        self.hiddens1 = rnn_conv(encoder_rnn_input, self.hiddens1, ENCODER1_DIM, [3, 3], (2, 2))
        self.hiddens2 = rnn_conv(self.hiddens1[0], self.hiddens2, ENCODER2_DIM, [3, 3], (2, 2))
        self.hiddens3 = rnn_conv(self.hiddens2[0], self.hiddens3, ENCODER3_DIM, [3, 3], (2, 2))
        code = self.binarizer(self.hiddens3[0])

        return code
    
    def binarizer(self, inputs, filters=32, kernel_size=(1, 1)):

        binarizer_input = conv2D(inputs=inputs, filters=filters, kernel_size=kernel_size, padding='same', activation=tf.nn.tanh)

        probs = (1 + binarizer_input) / 2
        dist = tf.distributions.Bernoulli(probs=probs, dtype=tf.float32)
        noise = 2 * dist.sample(name='noise') - 1 - binarizer_input
        output = binarizer_input + tf.stop_gradient(noise)
        output = binarizer_input + noise

        return output

In [41]:
encoder = Encoder(batch_size=64)
encoder.encode(inputs=x_train)

InvalidArgumentError: Value for attr 'T' of uint8 is not in the list of allowed values: half, bfloat16, float, double, int32
	; NodeDef: {{node Conv2D}}; Op<name=Conv2D; signature=input:T, filter:T -> output:T; attr=T:type,allowed=[DT_HALF, DT_BFLOAT16, DT_FLOAT, DT_DOUBLE, DT_INT32]; attr=strides:list(int); attr=use_cudnn_on_gpu:bool,default=true; attr=padding:string,allowed=["SAME", "VALID", "EXPLICIT"]; attr=explicit_paddings:list(int),default=[]; attr=data_format:string,default="NHWC",allowed=["NHWC", "NCHW"]; attr=dilations:list(int),default=[1, 1, 1, 1]> [Op:Conv2D]

In [32]:
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()