# Unsupervised Image-image Translation(UNIT) TensorFlow Implementation

In [1]:
import tensorflow as tf
from collections import OrderedDict

  return f(*args, **kwds)


In [2]:
#Funciton to chose the activation function
def activate(linear, activation='leaky_relu'):
        if activation == 'sigmoid':
            return tf.nn.sigmoid(linear)
        elif activation == 'softmax':
            return tf.nn.softmax(linear)
        elif activation == 'tanh':
            return tf.nn.tanh(linear)
        elif activation == 'relu':
            return tf.nn.relu(linear)
        elif activation == 'leaky_relu':
            return tf.nn.leaky_relu(linear)
        elif activation == 'linear':
            return linear

In [3]:
def gaussian_noise_layer(input_layer, std=1.0):
    noise = tf.random_normal(shape=tf.shape(input_layer), mean=0.0, stddev=std, dtype=tf.float32) 
    return tf.add(input_layer, noise)

In [4]:
def batch_normalization_layer(input_layer):
    
    dimension = input_layer.get_shape().as_list()[-1]
    
    mean, variance = tf.nn.moments(input_layer, axes=[0, 1, 2])
    
    beta = tf.Variable(tf.constant(0.0, shape=[dimension]))
    
    gamma = tf.Variable(tf.constant(1.0, shape=[dimension]))

    bn_layer = tf.nn.batch_normalization(input_layer, mean, variance, beta, gamma, variance_epsilon=1e-10)

    return bn_layer

In [5]:
def create_conv_layer(input_layer,     # The previous layer.
                   num_input_channels, # Num. channels in prev. layer.
                   filter_size,        # Width and height of each filter.
                   num_filters,        # Number of filters.
                   use_pooling=False,  
                   pad = [],
                   strides=[1,1,1,1],
                   deconv=False,
                   out_shape = [],      # Output shape in case of deconv 
                   batch_normalization=False,
                   activation='leaky_relu'): # Use 2x2 max-pooling.

    # Shape of the filter-weights for the convolution. 
    # This format is determined by the TensorFlow API.
    if deconv:
        shape = [filter_size, filter_size, num_filters, num_input_channels]
    else:
        shape = [filter_size, filter_size, num_input_channels, num_filters]

    # Create new weights aka. filters with the given shape.
    weights = tf.Variable(tf.truncated_normal(shape, stddev=0.05))

    # Create new biases, one for each filter.
    biases = tf.Variable(tf.constant(1.0, shape=[num_filters]))

    if len(pad) > 0:
        input_layer = tf.pad(input_layer, [[0, 0], [pad[0], pad[0]], [pad[1], pad[1]], [0, 0]], "CONSTANT")
    
    # Create the TensorFlow operation for de-convolution.
    if deconv:
        #in_shape = input_layer.get_shape().as_list()
        in_shape = tf.shape(input_layer)
    
        out_h = ((in_shape[1] - 1) * strides[1]) + filter_size - 2 * pad[0]
        
        out_w = ((in_shape[2] - 1) * strides[2]) + filter_size - 2 * pad[1]
        
        output_shape = tf.stack([in_shape[0], out_h, out_w, num_filters])
        
        layer = tf.nn.conv2d_transpose(value=input_layer, 
                                       filter=weights, 
                                       output_shape=output_shape, 
                                       strides=strides, 
                                       padding='VALID')
        
    else:
        layer = tf.nn.conv2d(input=input_layer,
                         filter=weights,
                         strides=strides,
                         padding='VALID')

    # Add the biases to the results of the convolution.
    # A bias-value is added to each filter-channel.
    layer = tf.add(layer,biases)

    # Use pooling to down-sample the image resolution?
    if use_pooling:
        # This is 2x2 max-pooling, which means that we
        # consider 2x2 windows and select the largest value
        # in each window. Then we move 2 pixels to the next window.
        layer = tf.nn.max_pool(value=layer,
                               ksize=[1, 2, 2, 1],
                               strides=[1, 2, 2, 1],
                               padding='SAME')

    # Batch normalization
    if batch_normalization:
        layer = batch_normalization_layer(layer)
    
    # Activation of the layers (ReLU).
    layer = activate(layer, activation=activation)

    return layer

In [6]:
def create_res_block(X, num_input_channels=3, filter_size=3, num_filters=3, pad=[1,1]):
    
    layer_conv1 = create_conv_layer(input_layer=X,
                                       num_input_channels=num_input_channels,
                                       filter_size=filter_size,
                                       num_filters=num_filters,
                                       pad=pad,
                                       use_pooling=False,
                                       batch_normalization=True,
                                       activation='relu')
    
    layer_conv2 = create_conv_layer(input_layer=layer_conv1,
                                       num_input_channels=num_input_channels,
                                       filter_size=filter_size,
                                       num_filters=num_filters,
                                       pad=pad,
                                       use_pooling=False,
                                       batch_normalization=False,
                                       activation='linear')
    
    layer_conv2 = batch_normalization_layer(layer_conv2)
    
    layer_conv2 += X
    
    layer_res = activate(layer_conv2, activation='relu')
    
    return layer_res

In [7]:
def create_encoders(X, num_input_channels, layer_n, res_block_n):
    
    encoder_A = OrderedDict()
    encoder_B = OrderedDict()
    
    encoder_A['layer_conv1'] = create_conv_layer(input_layer=X,
                                       num_input_channels=num_input_channels,
                                       num_filters=num_input_channels,
                                       filter_size=7,
                                       strides=[1,2,2,1],
                                       pad=[3,3])
    
    encoder_B['layer_conv1'] = create_conv_layer(input_layer=X,
                                       num_input_channels=num_input_channels,
                                       num_filters=num_input_channels,
                                       filter_size=7,
                                       strides=[1,2,2,1],
                                       pad=[3,3])
    
    for i in range(1, layer_n):
        encoder_A['layer_conv'+str(i+1)] = create_conv_layer(encoder_A[next(reversed(encoder_A))],
                                       num_input_channels=num_input_channels,
                                       num_filters=num_input_channels*2,
                                       filter_size=3,
                                       strides=[1,2,2,1],
                                       pad=[1,1])
        
        encoder_B['layer_conv'+str(i+1)] = create_conv_layer(encoder_B[next(reversed(encoder_B))],
                                       num_input_channels=num_input_channels,
                                       num_filters=num_input_channels*2,
                                       filter_size=3,
                                       strides=[1,2,2,1],
                                       pad=[1,1])
        
        num_input_channels = num_input_channels*2
        
    for i in range(1, res_block_n):
        encoder_A['block_en_res'+str(i+1)] = create_res_block(encoder_A[next(reversed(encoder_A))],
                                                           num_input_channels=num_input_channels,
                                                           num_filters=num_input_channels)
        
        encoder_B['block_en_res'+str(i+1)] = create_res_block(encoder_B[next(reversed(encoder_B))],
                                                           num_input_channels=num_input_channels,
                                                           num_filters=num_input_channels)
        
    return encoder_A, encoder_B

In [8]:
def create_shared_layers(X, block_shared_n):
    encoder_shared = OrderedDict()
    decoder_shared = OrderedDict()
    
    encoder_shared_ = create_res_block(X)
    encoder_shared['block_shared_res1'] = gaussian_noise_layer(encoder_shared_)
    
    for i in range(1, block_shared_n+1):
        encoder_shared_ = create_res_block(encoder_shared['block_shared_res'+str(i)])
        encoder_shared['block_shared_res'+str(i+1)] = gaussian_noise_layer(encoder_shared_)
    
    decoder_shared['block_shared_res1'] = create_res_block(encoder_shared['block_shared_res'+str(block_shared_n)])
    
    for i in range(1, block_shared_n+1):
        decoder_shared['block_shared_res'+str(i+1)] = create_res_block(decoder_shared['block_shared_res'+str(i)])
        
    return decoder_shared

In [9]:
def create_generator(X, num_input_channels, layer_n, res_block_n, num_output_channels):

    decoder_A = OrderedDict()
    decoder_B = OrderedDict()
    
    decoder_A['block_res1'] = create_res_block(X,
                                               num_input_channels=num_input_channels,
                                               num_filters=num_input_channels)
        
    decoder_B['block_res1'] = create_res_block(X,
                                               num_input_channels=num_input_channels,
                                               num_filters=num_input_channels)
        
    for i in range(1, res_block_n):
        decoder_A['block_res'+str(i+1)] = create_res_block(decoder_A[next(reversed(decoder_A))],
                                                           num_input_channels=num_input_channels,
                                                           num_filters=num_input_channels)
        
        decoder_B['block_res'+str(i+1)] = create_res_block(decoder_B[next(reversed(decoder_B))],
                                                           num_input_channels=num_input_channels,
                                                           num_filters=num_input_channels)
    for i in range(0, layer_n):
        
        decoder_A['layer_deconv'+str(i+1)] = create_conv_layer(decoder_A[next(reversed(decoder_A))],
                                       num_input_channels=num_input_channels,
                                       num_filters=num_input_channels//2,
                                       filter_size=3,
                                       strides=[1,2,2,1],
                                       pad=[1,1],
                                       deconv=True,
                                       batch_normalization=True,
                                       activation='relu')
        
        decoder_B['layer_deconv'+str(i+1)] = create_conv_layer(decoder_B[next(reversed(decoder_B))],
                                       num_input_channels=num_input_channels,
                                       num_filters=num_input_channels//2,
                                       filter_size=3,
                                       strides=[1,2,2,1],
                                       pad=[1,1],
                                       deconv=True,
                                       batch_normalization=True,
                                       activation='relu')
        
        num_input_channels = num_input_channels//2
    
    decoder_A['layer_deconv_final'] = create_conv_layer(decoder_A[next(reversed(decoder_A))],
                                       num_input_channels=num_input_channels,
                                       num_filters=num_output_channels,
                                       filter_size=1,
                                       strides=[1,1,1,1],
                                       pad=[0,0],
                                       activation='tanh')
    
    decoder_B['layer_deconv_final'] = create_conv_layer(decoder_B[next(reversed(decoder_B))],
                                       num_input_channels=num_input_channels,
                                       num_filters=num_output_channels,
                                       filter_size=1,
                                       strides=[1,1,1,1],
                                       pad=[0,0],
                                       activation='tanh')
    
    return decoder_A, decoder_B

In [18]:
def create_discrimiator(X_A, X_B, num_input_channels, num_filters, layer_n):
    discrim_A = OrderedDict()
    discrim_B = OrderedDict()
    
    discrim_A['layer_discrim_conv1'] = create_conv_layer(input_layer=X_A,
                                       num_input_channels=num_input_channels,
                                       num_filters=num_filters,
                                       filter_size=3,
                                       strides=[1,2,2,1],
                                       pad=[1,1])
    discrim_B['layer_discrim_conv1'] = create_conv_layer(input_layer=X_B,
                                       num_input_channels=num_input_channels,
                                       num_filters=num_filters,
                                       filter_size=3,
                                       strides=[1,2,2,1],
                                       pad=[1,1])
    
    num_input_channels = num_filters
    
    for i in range(1, layer_n):
        
        discrim_A['layer_discrim_conv'+str(i+1)] = create_conv_layer(discrim_A[next(reversed(discrim_A))],
                                       num_input_channels=num_input_channels,
                                       num_filters=num_filters*2,
                                       filter_size=3,
                                       strides=[1,2,2,1],
                                       pad=[1,1])
        discrim_B['layer_discrim_conv'+str(i+1)] = create_conv_layer(discrim_B[next(reversed(discrim_B))],
                                           num_input_channels=num_input_channels,
                                           num_filters=num_filters*2,
                                           filter_size=3,
                                           strides=[1,2,2,1],
                                           pad=[1,1])
        
        num_input_channels = num_input_channels * 2
    
        decoder_A['layer_deconv_final'] = create_conv_layer(decoder_A[next(reversed(decoder_A))],
                                       num_input_channels=num_input_channels,
                                       num_filters=num_output_channels,
                                       filter_size=1,
                                       strides=[1,1,1,1],
                                       pad=[0,0],
                                       activation='tanh')
    
    decoder_B['layer_deconv_final'] = create_conv_layer(decoder_B[next(reversed(decoder_B))],
                                       num_input_channels=num_input_channels,
                                       num_filters=num_output_channels,
                                       filter_size=1,
                                       strides=[1,1,1,1],
                                       pad=[0,0],
                                       activation='tanh')
    
    return discrim_A, discrim_B

In [19]:
tf.reset_default_graph()
create_discrimiator(tf.placeholder(tf.float32, shape=[None, 256, 256, 3], name='x'),tf.placeholder(tf.float32, shape=[None, 256, 256, 3], name='x'),3, 3, 3)

(OrderedDict([('layer_discrim_conv1',
               <tf.Tensor 'LeakyRelu/Maximum:0' shape=(?, 128, 128, 3) dtype=float32>),
              ('layer_discrim_conv2',
               <tf.Tensor 'LeakyRelu_2/Maximum:0' shape=(?, 64, 64, 6) dtype=float32>),
              ('layer_discrim_conv3',
               <tf.Tensor 'LeakyRelu_4/Maximum:0' shape=(?, 32, 32, 6) dtype=float32>)]),
 OrderedDict([('layer_discrim_conv1',
               <tf.Tensor 'LeakyRelu_1/Maximum:0' shape=(?, 128, 128, 3) dtype=float32>),
              ('layer_discrim_conv2',
               <tf.Tensor 'LeakyRelu_3/Maximum:0' shape=(?, 64, 64, 6) dtype=float32>),
              ('layer_discrim_conv3',
               <tf.Tensor 'LeakyRelu_5/Maximum:0' shape=(?, 32, 32, 6) dtype=float32>)]))