In [311]:
from keras.layers import Layer, Input, Dense, Reshape, Flatten, Dropout, Concatenate, Add, Multiply
from keras.layers import Activation
import keras.backend as K
from keras.layers import GlobalMaxPooling2D, GlobalAveragePooling2D
from keras.layers import BatchNormalization as InstanceNormalization
from keras.layers.advanced_activations import LeakyReLU
from keras.layers import Conv2D, Conv2DTranspose
from keras.layers.convolutional import UpSampling2D
from keras.models import Sequential, Model
from tensorflow.keras.optimizers import Adam
import tensorflow as tf
import numpy as np

In [230]:
channel = 3
dim = 128
input_shape = (dim,dim,channel)
n_resnet = 3

# Generateur

In [265]:
def conv2d(layer_input, filters, f_size=4, strides=2, normalization=True):
    """Layer de base pour downsamplé"""
    l = Conv2D(filters, kernel_size=f_size, strides=strides, padding='same')(layer_input)
    if normalization:
        l = InstanceNormalization()(l)
    l = LeakyReLU(alpha=0.2)(l)
    return l

def deconv2d(layer_input, filters, f_size=4):
    """Layers used during upsampling"""
    u = UpSampling2D(size=2)(layer_input)
    u = Conv2D(filters, kernel_size=f_size, strides=1, padding='same', activation='relu')(u)
    u = AdaLIN_simple()(u)
    return u

def resnet(layer_input, filters, f_size=3):
    l = Conv2D(filters, kernel_size=f_size, strides=1, padding='same')(layer_input)
    l = InstanceNormalization()(l)
    l = LeakyReLU(alpha=0.2)(l)
    l = Conv2D(filters, kernel_size=f_size, strides=1, padding='same')(l)
    l = InstanceNormalization()(l)
    return Add()([l, layer_input])

def resnet_adalin(layer_input, gamma, beta, f_size=3):
    filters = layer_input.shape[-1]
    l = Conv2D(filters, kernel_size=f_size, strides=1, padding='same')(layer_input)
    l = AdaLIN()([l, gamma, beta])
    l = LeakyReLU(alpha=0.2)(l)
    l = Conv2D(filters, kernel_size=f_size, strides=1, padding='same')(l)
    l = AdaLIN()([l, gamma, beta])
    return Add()([l, layer_input])

In [464]:
input_layer = Input(shape=input_shape)
input_layer.shape
depth = channel
g = conv2d(input_layer, depth, f_size=7, strides=1)
for _ in range(2):
    depth*=2
    g = conv2d(g, depth, f_size=3, strides=2)

In [297]:
g

<tf.Tensor 'leaky_re_lu_39/LeakyRelu:0' shape=(None, 32, 32, 12) dtype=float32>

In [282]:
class AdaLIN(Layer):
    def __init__(self, smoothing=True, eps = 1e-5):
        super(AdaLIN, self).__init__()
        self.smoothing = smoothing
        self.eps = eps

    def build(self, input_shape):
        if (len(input_shape) != 3):
            raise ValueError("Il faut donner dans l'ordre le layer, gamma et beta")

    def call(self, inputs):
        x, gamma, beta = inputs[0], inputs[1], inputs[2]
        ch = x.shape[-1]
        ins_mean, ins_sigma = tf.nn.moments(x, axes=[1, 2], keepdims=True)
        x_ins = (x - ins_mean) / (tf.sqrt(ins_sigma + self.eps))
        ln_mean, ln_sigma = tf.nn.moments(x, axes=[1, 2, 3], keepdims=True)
        x_ln = (x - ln_mean) / (tf.sqrt(ln_sigma + self.eps))
        rho = tf.Variable(np.ones(ch), dtype = np.float32, name="rho", shape=[ch], constraint=lambda x: tf.clip_by_value(x, clip_value_min=0.0, clip_value_max=1.0))
        if self.smoothing :
            rho = tf.clip_by_value(rho - tf.constant(0.1), 0.0, 1.0)
        x_hat = rho * x_ins + (1 - rho) * x_ln
        x_hat = x_hat * gamma + beta
        return x_hat

In [283]:
class AdaLIN_simple(Layer):
    def __init__(self, smoothing=True, eps = 1e-5):
        super(AdaLIN_simple, self).__init__()
        self.smoothing = smoothing
        self.eps = eps
        
    def build(self, input_shape):
        self.ch = input_shape[-1]
        self.gamma = tf.Variable(np.ones(self.ch), dtype = np.float32, name="gamma", shape=[self.ch])
        self.beta = tf.Variable(np.zeros(self.ch), dtype = np.float32, name="gamma", shape=[self.ch])

    def call(self, inputs):
        x = inputs
        ins_mean, ins_sigma = tf.nn.moments(x, axes=[1, 2], keepdims=True)
        x_ins = (x - ins_mean) / (tf.sqrt(ins_sigma + self.eps))
        ln_mean, ln_sigma = tf.nn.moments(x, axes=[1, 2, 3], keepdims=True)
        x_ln = (x - ln_mean) / (tf.sqrt(ln_sigma + self.eps))
        rho = tf.Variable(np.zeros(self.ch), dtype = np.float32, name="rho", shape=[self.ch], constraint=lambda x: tf.clip_by_value(x, clip_value_min=0.0, clip_value_max=1.0))
        if self.smoothing :
            rho = tf.clip_by_value(rho - tf.constant(0.1), 0.0, 1.0)
        x_hat = rho * x_ins + (1 - rho) * x_ln
        x_hat = x_hat * self.gamma + self.beta
        return x_hat

In [291]:
class MUL(Layer):
    def __init__(self):
        super(MUL, self).__init__()
        
    def build(self, input_shape):
        if (len(input_shape) !=3):
            raise ValueError("Il faut trois layer : le layer a mutlpplié, un layer dense de taille (batch,1) ainsi que son biais")

    def call(self, inputs):
        x, w, b = inputs[0], inputs[1], inputs[2]
        w = tf.gather(tf.transpose(tf.nn.bias_add(w, b)), 0)
        return tf.multiply(x,w)

In [300]:
def mul(X):
    x,w,b = X[0], X[1], X[2]
    w = tf.gather(tf.transpose(tf.nn.bias_add(w, b)), 0)
    return tf.multiply(x,w)

In [485]:
class Linear(Layer):
    def __init__(self):
        super(Linear, self).__init__()

    def build(self, input_shape):
        if (len(input_shape) != 2):
            raise ValueError("Input shape : {}".format(input_shape))
        self.w = self.add_weight(
            shape=(input_shape[0][-1], 1),
            initializer="random_normal",
            trainable=True,
        )
        self.b = self.add_weight(
            shape=(1,), initializer="random_normal", trainable=True
        )
        super(Linear, self).build(input_shape)

    def call(self, inputs):
        r1 = tf.matmul(inputs[0], self.w) + self.b
        w = tf.gather(tf.transpose(tf.nn.bias_add(self.w, self.b)), 0)
        r2 = tf.multiply(inputs[1],w)
        return [r1,r2]

In [487]:
test_entree = Input(shape=(32,32,12))
test_pool = GlobalAveragePooling2D()(test_entree)
test_fin_pool, test_fin_normal = Linear()([test_pool, test_entree])
Model(test_entree, test_fin_pool)

<keras.engine.training.Model at 0x25cbb229e08>

In [489]:
test_fin_normal

<tf.Tensor 'linear_60/Mul:0' shape=(None, 32, 32, 12) dtype=float32>

<tf.Tensor 'linear_42/add:0' shape=(None, 1) dtype=float32>

<keras.engine.training.Model at 0x25cba49e748>

In [301]:
# Class activation map
gmv, gav = GlobalMaxPooling2D()(g), GlobalAveragePooling2D()(g)
dense_m, dense_a = Dense(1), Dense(1)
cam_m, cam_a = dense_m(gmv), dense_a(gav)

g_m, g_a = Lambda(mul)([g, dense_m.weights[0], dense_m.weights[1]]), Lambda(mul)([g, dense_a.weights[0], dense_a.weights[1]])
cam, g = Concatenate()([cam_m, cam_a]), Concatenate()([g_m, g_a])
g = conv2d(g, depth, f_size=1, strides=1)

In [303]:
g

<tf.Tensor 'leaky_re_lu_40/LeakyRelu:0' shape=(None, 32, 32, 12) dtype=float32>

In [314]:
w

<tf.Variable 'dense_43/kernel:0' shape=(12, 1) dtype=float32, numpy=
array([[ 0.35803616],
       [-0.13402516],
       [ 0.47078097],
       [-0.5012767 ],
       [ 0.41451585],
       [ 0.18452203],
       [-0.5768551 ],
       [-0.6032223 ],
       [-0.38289896],
       [-0.6784733 ],
       [-0.34806266],
       [-0.2619669 ]], dtype=float32)>

In [239]:
# Constantes pour la ARLIN function
def arlin_param(x, filters):
    l = Flatten()(x)
    for _ in range(2):
        l = Dense(filters)(l)
        l = LeakyReLU(alpha=0.2)(l)
    return Dense(filters)(l), Dense(filters)(l)
gamma, beta = arlin_param(g, depth)

In [262]:
for _ in range(3):
    g = resnet_adalin(g, gamma, beta)

In [277]:
g = deconv2d(g, depth, f_size=4)

tracking <tf.Variable 'ada_lin_simple_8/gamma:0' shape=(12,) dtype=float32> gamma
tracking <tf.Variable 'ada_lin_simple_8/gamma:0' shape=(12,) dtype=float32> beta


In [278]:
g

<tf.Tensor 'ada_lin_simple_8/add_3:0' shape=(None, 128, 128, 12) dtype=float32>

# Discriminateur

In [253]:
l = Conv2D(filters, kernel_size=3, strides=1, padding='same')(l)
l = AdaLIN()([l, gamma, beta])
l

<tf.Tensor 'ada_lin_9/add_3:0' shape=(None, 32, 32, 12) dtype=float32>

In [255]:
Add()([l, g])

<tf.Tensor 'add_3/add:0' shape=(None, 32, 32, 12) dtype=float32>

In [123]:
tf.multiply(x,a)[...,1]

InvalidArgumentError: Incompatible shapes: [3,3,3,5] vs. [5,1] [Op:Mul]