In [2]:
import complexnn
import tensorflow as tf
import numpy as np
from keras.models import Sequential
from keras.layers import Layer
import keras
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)

    except RuntimeError as e:
        print(e)
np.random.default_rng(7414)

# Inception Layer

In [4]:
class InceptLayer(Layer):
    def __init__(self,
                 filter1,
                 filter2,
                 filter3,
                 filter4,
                 kernel1,
                 kernel2,
                 kernel3,
                 kernel4,
                 num_units=4,
                 name='InceptionLayer',**kwargs):
        super().__init__(**kwargs)
        self.filter1 = filter1
        self.filter2 = filter2
        self.filter3 = filter3
        self.filter4 = filter4
        self.kernel1 = kernel1
        self.kernel2 = kernel2
        self.kernel3 = kernel3
        self.kernel4 = kernel4
        self.num_units = num_units
    def build(self, input_shape):
        self.conv1 = complexnn.conv.ComplexConv2D(self.filter1,self.kernel1,padding='same')
        self.conv2 = complexnn.conv.ComplexConv2D(self.filter2,self.kernel2,padding='same')
        self.conv3 = complexnn.conv.ComplexConv2D(self.filter3,self.kernel3,padding='same')
        self.conv4 = complexnn.conv.ComplexConv2D(self.filter4,self.kernel4,padding='same')
        self.act = AmplitudeMaxout_inclued(self.num_units)
    def call(self, x):
        conv_output1 = self.conv1(x)
        real1, imag1 = self.seperate(self.act(conv_output1))
        conv_output2 = self.conv2(x)
        real2, imag2 = self.seperate(self.act(conv_output2))
        conv_output3 = self.conv3(x)
        real3, imag3 = self.seperate(self.act(conv_output3))
        conv_output4 = self.conv4(x)
        real4, imag4 = self.seperate(self.act(conv_output4))
        return tf.concat([real1,real2,real3,real4,imag1,imag2,imag3,imag4], axis=-1)
    def compute_output_shape(self, input_shape):
        last_dim1 = self.compute_shape(self.filter1,self.num_units,self.filter1%self.num_units)
        last_dim2 = self.compute_shape(self.filter2,self.num_units,self.filter2%self.num_units)
        last_dim3 = self.compute_shape(self.filter3,self.num_units,self.filter3%self.num_units)
        last_dim4 = self.compute_shape(self.filter4,self.num_units,self.filter4%self.num_units)
        shape = list(input_shape)
        shape[-1] = last_dim1 + last_dim2 + last_dim3 + last_dim4
        return tuple(shape)
    
    def get_config(self):
        pass
    
    def seperate(self, x):
        shape = x.get_shape().as_list()
        dim = shape[-1]//2
        realpart = x[:,:,:,:dim]
        imagpart = x[:,:,:,dim:]
        return realpart, imagpart
    def compute_shape(self,filters,units,flag):
        if flag:
            return 2*(filters//units + 1)
        else:
            return 2*(filters//units)

# Activation function:AMU

In [None]:
class AmplitudeMaxout(Layer):
    def __init__(self, num_pieces, name='AMU',**kwargs):
        super().__init__(**kwargs)
        self.__name__ = name
        self.num_pieces = num_pieces

    @tf.function
    def call(self, x, axis=None):
        shape = x.get_shape().as_list()
        if axis is None:
            axis = -1
            shape[0] = -1
        if shape[axis]%2:
            raise ValueError(f'nb of real/imaginary channel are inequivalent')
        num_channels = shape[-1]//2
        self.num_units = num_channels//self.num_pieces
        if num_channels%self.num_pieces:
            self.num_units += 1
            num_padding = self.num_pieces - num_channels%self.num_pieces
            padding_size = tf.concat([tf.shape(x)[:-1],tf.constant([num_padding])],axis=-1)
            zero_padding = tf.zeros(padding_size)           
        shape[axis] = self.num_units
        exp_shape = shape + [self.num_pieces]
        real_part = x[:,:,:,:num_channels]
        imag_part = x[:,:,:,num_channels:]        
        if num_channels%self.num_pieces:
            real_part = tf.concat([real_part,zero_padding], axis=-1)
            imag_part = tf.concat([imag_part,zero_padding], axis=-1)
        real_part = tf.reshape(real_part, exp_shape)
        imag_part = tf.reshape(real_part, exp_shape)
        real_part, imag_part = self.return_AMU(real_part, imag_part, exp_shape)
        return tf.concat([real_part,imag_part],axis=-1)         

    def compute_output_shape(self, input_shape):
        shape = list(input_shape)
        shape[-1] = 2*self.num_units
        return tuple(shape)
    
    def get_config(self):
        base_config = super().get_config()
        config = {'units': self.num_units,
                  'pieces': self.num_pieces}
        return dict(list(base_config.items()) + list(config.items()))
    
    def return_AMU(self, real_part, imag_part ,expand_shape):
        modulus = real_part**2 + imag_part**2
        expand_modulus = tf.reshape(modulus, expand_shape)    
        cond = tf.equal(expand_modulus,tf.reduce_max(expand_modulus,axis=-1,keepdims=True))
        real_part = tf.reduce_max(real_part*tf.cast(cond,dtype=tf.float32),axis=-1)
        imag_part = tf.reduce_max(imag_part*tf.cast(cond,dtype=tf.float32),axis=-1)
        return real_part, imag_part

# Loss function: MSE

In [None]:
@tf.function
def ComplexRMS(y_true, y_pred):
    shape = y_pred.get_shape().as_list()
    if shape[-1]%2:
        raise ValueError(f"nb of imaginary part isn't equal to that of real part")
    num_channels = shape[-1]//2
    n_points = 1
    for n in shape[1:]:
        n_points = n_points*n
    n_points = tf.cast(n_points,dtype=tf.float32)/2
    real_pdt = y_pred[:,:,:,num_channels:]
    imag_pdt = y_pred[:,:,:,:num_channels]
    real_true = y_true[:,:,:,num_channels:]
    imag_true = y_true[:,:,:,:num_channels]
    return tf.sqrt(tf.reduce_sum((real_pdt-real_true)**2+(imag_pdt-imag_true)**2)/n_points)
@tf.function
def ComplexMSE(y_true, y_pred):
    shape = y_pred.get_shape().as_list()
    if shape[-1]%2:
        raise ValueError(f"nb of imaginary part isn't equal to that of real part")
    num_channels = shape[-1]//2
    n_points = 1
    for n in shape[1:]:
        n_points = n_points*n
    n_points = tf.cast(n_points,dtype=tf.float32)/2
    real_pdt = y_pred[:,:,:,num_channels:]
    imag_pdt = y_pred[:,:,:,:num_channels]
    real_true = y_true[:,:,:,num_channels:]
    imag_true = y_true[:,:,:,:num_channels]
    return tf.reduce_sum(((real_pdt-real_true)**2+(imag_pdt-imag_true)**2)) / n_points
@tf.function
def ComplexMAE(y_true, y_pred):
    shape = y_pred.get_shape().as_list()
    if shape[-1]%2:
        raise ValueError(f"nb of imaginary part isn't equal to that of real part")
    num_channels = shape[-1]//2
    n_points = 1
    for n in shape[1:]:
        n_points = n_points*n
    n_points = tf.cast(n_points,dtype=tf.float32)/2
    real_pdt = y_pred[:,:,:,num_channels:]
    imag_pdt = y_pred[:,:,:,:num_channels]
    real_true = y_true[:,:,:,num_channels:]
    imag_true = y_true[:,:,:,:num_channels]
    return tf.reduce_sum(tf.sqrt((real_pdt-real_true)**2+(imag_pdt-imag_true)**2)) / n_points


In [5]:
tf.keras.backend.clear_session()
x_train = np.random.rand(61,64,64,32)
y_train = np.random.rand(61,64,64,2)
input_size = x_train.shape[1:]

[[[[4.17022005e-01 7.20324493e-01 1.14374817e-04 3.02332573e-01]
   [1.46755891e-01 9.23385948e-02 1.86260211e-01 3.45560727e-01]
   [3.96767474e-01 5.38816734e-01 4.19194514e-01 6.85219500e-01]
   [2.04452250e-01 8.78117436e-01 2.73875932e-02 6.70467510e-01]
   [4.17304802e-01 5.58689828e-01 1.40386939e-01 1.98101489e-01]
   [8.00744569e-01 9.68261576e-01 3.13424178e-01 6.92322616e-01]
   [8.76389152e-01 8.94606664e-01 8.50442114e-02 3.90547832e-02]]

  [[1.69830420e-01 8.78142503e-01 9.83468338e-02 4.21107625e-01]
   [9.57889530e-01 5.33165285e-01 6.91877114e-01 3.15515631e-01]
   [6.86500928e-01 8.34625672e-01 1.82882773e-02 7.50144315e-01]
   [9.88861089e-01 7.48165654e-01 2.80443992e-01 7.89279328e-01]
   [1.03226007e-01 4.47893526e-01 9.08595503e-01 2.93614148e-01]
   [2.87775339e-01 1.30028572e-01 1.93669579e-02 6.78835533e-01]
   [2.11628116e-01 2.65546659e-01 4.91573159e-01 5.33625451e-02]]

  [[5.74117605e-01 1.46728575e-01 5.89305537e-01 6.99758360e-01]
   [1.02334429e-01 4.

使用上keras會檢查層與層之間的input_shape與output_shape，若我們將activation放在ComplexConv2D中輸出的channel數會減少，e.g. 設定16 filters輸出32 channel但經過AMU(4)後會剩下8 channels，若下層再增加Conv layer則會跳出輸出channel數8不是32倍數的錯誤資訊，因此受限於AMU只能設定輸出和Conv同輸出# channel，要解決這個問題需要將activation獨立出成一個layer

In [8]:
InputTensor = keras.Input(shape=input_shapes)
conv1 = complexnn.conv.ComplexConv2D(64,(3,3),padding='same')(InputTensor)
amp1 = AmplitudeMaxout_inclued(4)(conv1)
conv2 = complexnn.conv.ComplexConv2D(16,(3,3),padding='same')(amp1)
amp2 = AmplitudeMaxout_inclued(4)(conv2)
incep = InceptLayer(filter1=4,
                    filter2=4,
                    filter3=4,
                    filter4=4,
                    kernel1=(3,5),
                    kernel2=(3,3),
                    kernel3=(5,3),
                    kernel4=(3,5),
                    num_units=4)(amp2)
Output = complexnn.conv.ComplexConv2D(1,(1,1),padding='same')(incep)

model = keras.Model(inputs=InputTensor,outputs=Output)
model.built
model.summary()
model.compile(optimizer=keras.optimizers.RMSprop(),loss=ComplexMSE)
model.fit(x_train,y_train,epochs=20)

Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
complex_conv2d_1 (ComplexCon (None, 7, 7, 32)          608       
_________________________________________________________________
amplitude_maxout_1 (Amplitud (None, 7, 7, 8)           0         
_________________________________________________________________
incept_layer_1 (InceptLayer) (None, 7, 7, 8)           1760      
_________________________________________________________________
complex_conv2d_6 (ComplexCon (None, 7, 7, 2)           10        
Total params: 2,378
Trainable params: 2,378
Non-trainable params: 0
_________________________________________________________________
[[[[ 0.00412539  0.0016796 ]
   [ 0.00480253 -0.0018454 ]
   [ 0.02798994  0.02553133]
   [ 0.0409994   0.02085593]
   [ 0.03826549 -0.00754606]
   [ 0.01735482  0.015537  ]
   [-0.00400526 -0.00087712]]

  [[ 0.00114611 -0.01676142]
   [-0.0027918  

In [None]:
def test_conv_tf(x,w,b):
    x_shape = list(x.shape)
    x_dim = x_shape[-1]//2
    xre = x[:,:,:,:x_dim]
    xim = x[:,:,:,x_dim:]
    
    w_shape = list(w.shape)
    w_dim = w_shape[-1]//2
    new_shape = [w_shape[0],w_shape[1],x_dim,w_dim]
    wre = w[:,:,:,:w_dim].reshape(new_shape)
    wim = w[:,:,:,w_dim:].reshape(new_shape)
    crr = tf.nn.conv2d(xre, wre, strides=1, padding='SAME')
    cii = tf.nn.conv2d(xim, wim, strides=1, padding='SAME')
    cri = tf.nn.conv2d(xre, wim, strides=1, padding='SAME')
    cir = tf.nn.conv2d(xim, wre, strides=1, padding='SAME')
    return tf.nn.bias_add(np.concatenate([crr-cii,cri+cir],axis=-1),b)


In [None]:
def test_conv_keras(x,w,b):
    x_shape = list(x.shape)
    x_dim = x_shape[-1]//2
    xre = x[:,:,:,:x_dim]
    xim = x[:,:,:,x_dim:]
    
    w_shape = list(w.shape)
    w_dim = w_shape[-1]//2
    new_shape = [w_shape[0],w_shape[1],x_dim,w_dim]
    wre = w[:,:,:,:w_dim].reshape(new_shape)
    wim = w[:,:,:,w_dim:].reshape(new_shape)
    
    xre = tf.convert_to_tensor(xre,dtype=tf.float32)
    xim = tf.convert_to_tensor(xim,dtype=tf.float32)
    crr = tf.keras.backend.conv2d(xre, wre, strides=1, padding='same')

    cii = tf.keras.backend.conv2d(xim, wim, strides=1, padding='same')

    cri = tf.keras.backend.conv2d(xre, wim, strides=1, padding='same')

    cir = tf.keras.backend.conv2d(xim, wre, strides=1, padding='same')
    return tf.nn.bias_add(np.concatenate([crr-cii,cri+cir],axis=-1),b)

In [None]:
w_conv1,b_conv1,w_incept1,b_incept1,w_incept2,b_incept2,w_incept3,b_incept3,w_incept4,b_incept4,w_conv2,b_conv2 = model.weights
print('conv1 result: /n')
print(testconv(x_train,w_conv1,b_conv1)