# Deep Complex Network

**Setup**

In [None]:
import tensorflow as tf
import tensorflow_probability as tfp
from tensorflow.keras.regularizers import l2
from tensorflow.keras.layers import Layer, Input, Flatten, Dense, Dropout, Add, BatchNormalization, Conv2D, Conv1D, ReLU, GlobalAveragePooling2D, GlobalAveragePooling1D
from tensorflow.keras.initializers import Initializer, Constant
from tensorflow.keras.optimizers import Adam, SGD
from tensorflow.keras.losses import SparseCategoricalCrossentropy, MeanSquaredError
from tensorflow.keras.metrics import SparseCategoricalAccuracy, MeanAbsoluteError
from tensorflow.keras.callbacks import EarlyStopping, LearningRateScheduler
from tensorflow.keras.regularizers import l2
import math, os, datetime
import pandas as pd
import numpy as np

## Imaginary Part Learning
It learns the imaginary part of the real input through a single residual convolution block

In [None]:
class ImagBlock(tf.keras.Model):
  def __init__(self, dim, channels, filters, kernel_size, trainable=False, kernel_initializer=None, kernel_regularizer=None):
    super(ImagBlock, self).__init__()
    self.kernel_regularizer = kernel_regularizer
    self.kernel_initializer = kernel_initializer
    self.bn1 = BatchNormalization(trainable=trainable)
    self.bn2 = BatchNormalization(trainable=trainable)
    if dim == 2:
      self.conv1 = Conv1D(filters=filters, kernel_size=kernel_size, padding='same', kernel_initializer=self.kernel_initializer, kernel_regularizer=self.kernel_regularizer)
      self.conv2 = Conv1D(filters=channels, kernel_size=kernel_size, padding='same', kernel_initializer=self.kernel_initializer, kernel_regularizer=self.kernel_regularizer)
    elif dim == 3:
      self.conv1 = Conv2D(filters=filters, kernel_size=kernel_size, padding='same', kernel_initializer=self.kernel_initializer, kernel_regularizer=self.kernel_regularizer)
      self.conv2 = Conv2D(filters=channels, kernel_size=kernel_size, padding='same', kernel_initializer=self.kernel_initializer, kernel_regularizer=self.kernel_regularizer)

  def call(self, input):
    x  = self.bn1(input)
    x  = ReLU()(x)
    x = self.conv1(x)
    x  = self.bn2(x)
    x  = ReLU()(x)
    x = self.conv2(x)
    return tf.complex(input, x)

In [None]:
input = tf.random.normal((100, 32, 32, 3))
ImagBlock(dim=3, channels=3, filters=32, kernel_size=3)(input)

## Complex Batch Normalization
Efficient normalization for complex valued inputs

In [None]:
class ComplexBatchNorm(Layer):
  def __init__(self, regularizer=None):
    super(ComplexBatchNorm, self).__init__()
    self.regularizer = regularizer

  def build(self, input_shape):
    self.gamma_rr = self.add_weight('gamma_rr', shape=(1,), initializer=Constant(1/math.sqrt(2)), regularizer=self.regularizer, trainable=True)
    self.gamma_ii = self.add_weight('gamma_ii', shape=(1,), initializer=Constant(1/math.sqrt(2)), regularizer=self.regularizer, trainable=True)
    self.gamma_ri = self.add_weight('gamma_ri', shape=(1,), initializer=Constant(0.0), regularizer=self.regularizer, trainable=True)
    self.beta_real = self.add_weight('beta_real', shape=(1,), initializer=Constant(0.0), regularizer=self.regularizer, trainable=True)
    self.beta_imag = self.add_weight('beta_imag', shape=(1,), initializer=Constant(0.0), regularizer=self.regularizer, trainable=True)

  def call(self, input):
    real, imag = tf.math.real(input), tf.math.imag(input)
    real_centered, imag_centered = real-tf.reduce_mean(real), imag-tf.reduce_mean(imag)
    # covariances
    cov_rr = tf.reduce_mean(real_centered*real_centered)
    cov_ri = tf.reduce_mean(real_centered*imag_centered)
    cov_ir = tf.reduce_mean(imag_centered*real_centered)
    cov_ii = tf.reduce_mean(imag_centered*imag_centered)
    # inverse square root
    delta, tau = (cov_rr*cov_ii) - (cov_ri**2), cov_rr + cov_ii
    s = tf.math.sqrt(delta)
    t = tf.math.sqrt(tau + 2*s)
    inverse_st = 1.0/(s*t)
    inv_rr = (cov_ii + s)*inverse_st
    inv_ii = (cov_rr + s)*inverse_st
    inv_ri = -cov_ri*inverse_st
    # normalization
    real_normed = inv_rr*real_centered + inv_ri*imag_centered
    imag_normed = inv_ri*real_centered + inv_ii*imag_centered
    #batch normalization
    real_bn = self.gamma_rr*real_normed + self.gamma_ri*imag_normed + self.beta_real
    imag_bn = self.gamma_ri*real_normed + self.gamma_ii*imag_normed + self.beta_imag
    return tf.complex(real_bn, imag_bn)

In [None]:
input = tf.complex(tf.random.normal((100, 32, 32, 3)), tf.random.normal((100, 32, 32, 3)))
ComplexBatchNorm()(input)

## Complex Convolutional Layer
Implements convolution between complex inputs `W * h = (A * x - B * y) + i(B * x + A * y)`

**Complex Kernel Initialization**

In [None]:
class ComplexConvInitialization(Initializer):
  def __init__(self, unit, input_size):
    super(ComplexConvInitialization, self).__init__()
    self.unit = unit
    self.input_size = float(input_size)

  def __call__(self, shape, dtype=None):
    scale = 1/(tf.sqrt(self.input_size))
    magnitude = tfp.random.rayleigh(shape=shape, scale=scale)
    phase = tf.random.uniform(shape=shape, minval=-math.pi, maxval=math.pi)
    comp_weights = tf.complex(magnitude, 0.0)*tf.math.exp(tf.complex(0.0, 1.0)*tf.complex(phase, 0.0))
    return tf.math.real(comp_weights) if self.unit == 'real' else tf.math.imag(comp_weights)

**Convolutional Layer**

In [None]:
class ComplexConv(Layer):
  def __init__(self, filters, kernel_size, strides=None, kernel_regularizer=None):
    super(ComplexConv, self).__init__()
    self.filters = filters
    self.kernel_size = kernel_size
    self.strides = strides
    self.kernel_regularizer = kernel_regularizer

  def build(self, input_shape):
    if len(input_shape) == 4:
      kernel_shape = (self.kernel_size, self.kernel_size, input_shape[-1], self.filters)
    elif len(input_shape) == 3:
      kernel_shape = (self.kernel_size, input_shape[-1], self.filters)

    self.real_kernel = self.add_weight('real_kernel', shape=kernel_shape, initializer=ComplexConvInitialization('real', input_shape[1]), regularizer=self.kernel_regularizer, trainable=True)
    self.imag_kernel = self.add_weight('imag_kernel', shape=kernel_shape, initializer=ComplexConvInitialization('imag', input_shape[1]), regularizer=self.kernel_regularizer, trainable=True)

  def call(self, input):
    real = tf.math.real(input)
    imag = tf.math.imag(input)
    if len(input.shape) == 4:
      real_conv = tf.nn.conv2d(real, filters=self.real_kernel, padding='SAME', strides=self.strides) - tf.nn.conv2d(imag, filters=self.imag_kernel, padding='SAME', strides=self.strides)
      imag_conv = tf.nn.conv2d(real, filters=self.imag_kernel, padding='SAME', strides=self.strides) + tf.nn.conv2d(imag, filters=self.real_kernel, padding='SAME', strides=self.strides)
    elif len(input.shape) == 3:
      real_conv = tf.nn.conv1d(real, filters=self.real_kernel, padding='SAME', stride=self.strides) - tf.nn.conv1d(imag, filters=self.imag_kernel, padding='SAME', stride=self.strides)
      imag_conv = tf.nn.conv1d(real, filters=self.imag_kernel, padding='SAME', stride=self.strides) + tf.nn.conv1d(imag, filters=self.real_kernel, padding='SAME', stride=self.strides)
    return tf.complex(real_conv, imag_conv)

In [None]:
input = tf.complex(tf.random.normal((10, 32, 32, 3)), tf.random.normal((10, 32, 32, 3)))
ComplexConv(filters=64, kernel_size=3)(input)

## Complex Relu
Complex valued activation functions
**cRelu, zRelu, modRelu**

In [None]:
class CRelu(Layer):
  def __init__(self):
    super(CRelu, self).__init__()

  def call(self, input):
    return tf.complex(tf.nn.relu(tf.math.real(input)), tf.nn.relu(tf.math.imag(input)))


class ZRelu(Layer):
  def __init__(self):
    super(ZRelu, self).__init__()
  
  def call(self, input):
    inf = tf.zeros_like(input, dtype='float32')
    sup = tf.ones_like(input, dtype='float32')*math.pi/2 
    return tf.where(tf.math.logical_and(tf.math.angle(input) >= inf, tf.math.angle(input) <= sup), input, tf.zeros_like(input))


class ModRelu(Layer):
  def __init__(self, regularizer=None):
    super(ModRelu, self).__init__()
    self.regularizer = regularizer

  def build(self, input_shape):
    self.b = self.add_weight('b', shape=input_shape[1:], initializer=Constant(0.5), regularizer=self.regularizer, trainable=True)
  
  def call(self, input):
    biased_abs = tf.abs(input) - self.b
    relu = tf.nn.relu(biased_abs)
    ang = tf.math.angle(input)
    return tf.complex(biased_abs*tf.math.cos(ang), biased_abs*tf.math.sin(ang))

In [None]:
input = tf.constant([[0-0.5j, 0.5+0j], [1+2j, 2+1j]],dtype='complex64')
CRelu()(input)
#ZRelu()(input)
#ModRelu()(input)

**Complex Dropout**

Dropout real ad imaginary part of complex inputs

In [None]:
class ComplexDropout(Layer):
  def __init__(self, rate):
    super(ComplexDropout, self).__init__()
    self.rate = rate

  def call(self, input):
    real = tf.math.real(input)
    imag = tf.math.imag(input)
    real_drop = Dropout(self.rate)(real)
    imag_drop = Dropout(self.rate)(imag)
    return tf.complex(real_drop, imag_drop)

In [None]:
input = tf.complex(tf.random.normal((10, 32, 32, 3)), tf.random.normal((10, 32, 32, 3)))
ComplexDropout(0.4)(input)

## Model

### Image Recognition

**Real Valued Residual Network**

In [None]:
class Resnet():
  def __init__(self, blocks, filters, output_units):
    super(Resnet, self).__init__()
    self.filters = filters
    self.blocks = blocks
    self.output_units = output_units
    self.model = self.build()

  def build(self):
    inputs = Input(shape=(32,32,3))
    x = Conv2D(filters=self.filters[0], kernel_size=3, padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(inputs)
    x  = BatchNormalization()(x)
    b = ReLU()(x)

    for i in range(len(self.filters)):
      for _ in range(self.blocks):  
        x  = BatchNormalization()(b)
        x = ReLU()(x)
        x = Conv2D(filters=self.filters[i], kernel_size=3, padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(x)
        x  = BatchNormalization()(x)
        x = ReLU()(x)
        x = Conv2D(filters=self.filters[i], kernel_size=3, padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(x)
        x = Dropout(0.4)(x)
        b = Add()([x, b])

      if i != len(self.filters)-1:
        b = Conv2D(filters=self.filters[i+1], kernel_size=1, strides=2, kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(b)

    x = GlobalAveragePooling2D()(b)
    x = Flatten()(x)
    outputs = Dense(self.output_units, kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(x)
    return tf.keras.Model(inputs=[inputs], outputs=[outputs])

**Complex Valued Residual Network**

In [None]:
class ComplexResNet():
  def __init__(self, blocks, filters, output_units):
    super(ComplexResNet, self).__init__()
    self.blocks = blocks
    self.filters = filters
    self.output_units = output_units
    self.model = self.build()

  def build(self):
    inputs = Input(shape=(32,32,3))
    x = ImagBlock(dim=3, channels=3, filters=10, kernel_size=3, trainable=True, kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(inputs)
    x = ComplexConv(filters=self.filters[0], kernel_size=7, kernel_regularizer=l2(1e-4))(x)
    x = ComplexBatchNorm(regularizer=l2(1e-4))(x)
    b = CRelu()(x)

    for i in range(len(self.filters)):
      for _ in range(self.blocks):
        x = ComplexBatchNorm(regularizer=l2(1e-4))(b)
        x = CRelu()(x)
        x = ComplexConv(filters=self.filters[i], kernel_size=3, kernel_regularizer=l2(1e-4))(x)
        x = ComplexBatchNorm(regularizer=l2(1e-4))(x)
        x = CRelu()(x)
        x = ComplexConv(filters=self.filters[i], kernel_size=3, kernel_regularizer=l2(1e-4))(x)
        x = ComplexDropout(0.4)(x)
        b = Add()([x, b])
      
      if i != len(self.filters)-1:
        b = ComplexConv(filters=self.filters[i+1], kernel_size=1, strides=2, kernel_regularizer=l2(1e-4))(b)
    
    x = GlobalAveragePooling2D()(b)
    x = Flatten()(x)  
    outputs = Dense(self.output_units, kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(x)
    return tf.keras.Model(inputs=[inputs], outputs=[outputs])

### Time Series Forecasting

**Real Valued Residual Network**

In [None]:
class Resnet():
  def __init__(self, blocks, filters, output_units):
    super(Resnet, self).__init__()
    self.blocks = blocks
    self.filters = filters
    self.output_units = output_units
    self.model = self.build()

  def build(self):
    inputs = Input(shape=(24,19))
    x = Conv1D(filters=self.filters[0], kernel_size=3, padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(inputs)
    x  = BatchNormalization()(x)
    b = ReLU()(x)

    for i in range(len(self.filters)):
      for _ in range(self.blocks):  
        x  = BatchNormalization()(b)
        x = ReLU()(x)
        x = Conv1D(filters=self.filters[i], kernel_size=3, padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(x)
        x  = BatchNormalization()(x)
        x = ReLU()(x)
        x = Conv1D(filters=self.filters[i], kernel_size=3, padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(x)
        x = Dropout(0.4)(x)
        b = Add()([x, b])

      if i != len(self.filters)-1:
        b = Conv1D(filters=self.filters[i+1], kernel_size=1, strides=2, kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(b)

    x = GlobalAveragePooling1D()(b)
    x = Dense(2048, activation='relu', kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(x)
    outputs = Dense(self.output_units, kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(x)
    return tf.keras.Model(inputs=[inputs], outputs=[outputs])

**Complex Valued Residual Network**

In [None]:
class ComplexResNet():
  def __init__(self, blocks, filters, output_units):
    super(ComplexResNet, self).__init__()
    self.blocks = blocks
    self.filters = filters
    self.output_units = output_units
    self.model = self.build()

  def build(self):
    inputs = Input(shape=(24,19))
    x = ImagBlock(dim=2, channels=19, filters=10, kernel_size=3, trainable=True, kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(inputs)
    x = ComplexConv(filters=self.filters[0], kernel_size=7, kernel_regularizer=l2(1e-4))(x)
    x = ComplexBatchNorm(regularizer=l2(1e-4))(x)
    b = ModRelu(regularizer=l2(1e-4))(x)

    for i in range(len(self.filters)):
      for _ in range(self.blocks):
        x = ComplexBatchNorm(regularizer=l2(1e-4))(b)
        x = ModRelu(regularizer=l2(1e-4))(x)
        x = ComplexConv(filters=self.filters[i], kernel_size=3, kernel_regularizer=l2(1e-4))(x)
        x = ComplexBatchNorm(regularizer=l2(1e-4))(x)
        x = ModRelu(regularizer=l2(1e-4))(x)
        x = ComplexConv(filters=self.filters[i], kernel_size=3, kernel_regularizer=l2(1e-4))(x)
        x = ComplexDropout(0.4)(x)
        b = Add()([x, b])
      
      if i != len(self.filters)-1:
        b = ComplexConv(filters=self.filters[i+1], kernel_size=1, strides=2, kernel_regularizer=l2(1e-4))(b)
    
    x = GlobalAveragePooling1D()(b)
    x = Dense(2048, activation=ModRelu(regularizer=l2(1e-4)), kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(x)
    outputs = Dense(self.output_units, kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(x)
    return tf.keras.Model(inputs=[inputs], outputs=[outputs])

## Data

### CIFAR

In [None]:
(train_images, y_train), (test_images, y_test) = tf.keras.datasets.cifar10.load_data()
#(train_images, y_train), (test_images, y_test) = tf.keras.datasets.cifar100.load_data()
x_train, x_test = train_images/255.0, test_images/255.0

Downloading data from https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz


### WEATHER

[Weather time series dataset](https://www.bgc-jena.mpg.de/wetter/) recorded by the Max Planck Institute for Biogeochemistry

In [None]:
zip_path = tf.keras.utils.get_file(
    origin='https://storage.googleapis.com/tensorflow/tf-keras-datasets/jena_climate_2009_2016.csv.zip',
    fname='jena_climate_2009_2016.csv.zip',
    extract=True)
csv_path, _ = os.path.splitext(zip_path)
df = pd.read_csv(csv_path)

**Feature Engineering**

In [None]:
#take 10 minutes intervals to 1h intervals
df = df[5::6]

# change erroneous measures
wv = df['wv (m/s)']
bad_wv = wv == -9999.0
wv[bad_wv] = 0.0

max_wv = df['max. wv (m/s)']
bad_max_wv = max_wv == -9999.0
max_wv[bad_max_wv] = 0.0

# change wind velocity to vector
wv = df.pop('wv (m/s)')
max_wv = df.pop('max. wv (m/s)')

# Convert to radians.
wd_rad = df.pop('wd (deg)')*np.pi / 180

# Calculate the wind x and y components.
df['Wx'] = wv*np.cos(wd_rad)
df['Wy'] = wv*np.sin(wd_rad)

# Calculate the max wind x and y components.
df['max Wx'] = max_wv*np.cos(wd_rad)
df['max Wy'] = max_wv*np.sin(wd_rad)

# Calculate time signals 
date_time = pd.to_datetime(df.pop('Date Time'), format='%d.%m.%Y %H:%M:%S')
timestamp_s = date_time.map(datetime.datetime.timestamp)

day = 24*60*60
year = (365.2425)*day

df['Day sin'] = np.sin(timestamp_s * (2 * np.pi / day))
df['Day cos'] = np.cos(timestamp_s * (2 * np.pi / day))
df['Year sin'] = np.sin(timestamp_s * (2 * np.pi / year))
df['Year cos'] = np.cos(timestamp_s * (2 * np.pi / year))

**Split Data**

In [None]:
n = len(df)
train_df = df[0:int(n*0.7)]
val_df = df[int(n*0.7):int(n*0.9)]
test_df = df[int(n*0.9):]

**Normalize Data**

In [None]:
train_mean = train_df.mean()
train_std = train_df.std()

train_df = (train_df - train_mean) / train_std
val_df = (val_df - train_mean) / train_std
test_df = (test_df - train_mean) / train_std

**Data Windowing**

In [None]:
class WindowGenerator():
  def __init__(self, train_df, val_df, test_df, input_width, label_width, shift, batch_size=32, label_columns=None):
    
    self.train_df = train_df
    self.val_df = val_df
    self.test_df = test_df
    self.input_width = input_width
    self.label_width = label_width
    self.shift = shift
    self.batch_size = batch_size
    self.label_columns = label_columns
    
    self.column_indices = {name: i for i, name in enumerate(train_df.columns)}
    self.total_window_size = input_width + shift
    self.input_slice = slice(0, input_width)
    self.input_indices = np.arange(self.total_window_size)[self.input_slice]
    self.label_start = self.total_window_size - self.label_width
    self.labels_slice = slice(self.label_start, None)
    self.label_indices = np.arange(self.total_window_size)[self.labels_slice]

    if label_columns is not None:
      self.label_columns_indices = {name: i for i, name in enumerate(label_columns)}

  def split_window(self, features):
    inputs = features[:, self.input_slice, :]
    labels = features[:, self.labels_slice, :]
    if self.label_columns is not None:
      labels = tf.stack([labels[:, :, self.column_indices[name]] for name in self.label_columns], axis=-1)

    inputs.set_shape([None, self.input_width, None])
    labels.set_shape([None, self.label_width, None])

    return inputs, labels

  def make_dataset(self, data):
    data = np.array(data, dtype=np.float32)
    ds = tf.keras.preprocessing.timeseries_dataset_from_array(
        data=data,
        targets=None,
        sequence_length=self.total_window_size,
        sequence_stride=1,
        shuffle=True,
        batch_size=self.batch_size)

    ds = ds.map(self.split_window)

    return ds

  @property
  def train(self):
    return self.make_dataset(self.train_df)

  @property
  def val(self):
    return self.make_dataset(self.val_df)

  @property
  def test(self):
    return self.make_dataset(self.test_df)

  def __repr__(self):
    return '\n'.join([
        f'Total window size: {self.total_window_size}',
        f'Input indices: {self.input_indices}',
        f'Label indices: {self.label_indices}',
        f'Label column name(s): {self.label_columns}'])

## Training

In [None]:
def scheduler(epoch, lr):
  if epoch == 10: return lr*10
  elif epoch == 100: return lr/10
  elif epoch == 120: return lr/10
  elif epoch == 150: return lr/10
  else: return lr

### Image Recognition

In [None]:
resnet = Resnet(blocks=16, filters=[16, 32, 64], output_units=10).model
#resnet = ComplexResNet(blocks=14, filters=[12, 24, 48], output_units=10).model
resnet.summary()

In [None]:
resnet.compile( 
    optimizer=SGD(learning_rate=0.01, momentum=0.9, nesterov=True, clipnorm=1.0),
    loss=SparseCategoricalCrossentropy(from_logits=True),
    metrics=[SparseCategoricalAccuracy()]
)
print('MODEL TRAINING')
history = resnet.fit(x_train, y_train, epochs=60, batch_size=128, validation_split=0.3, shuffle=True, callbacks=LearningRateScheduler(scheduler))
print('\nMODEL EVALUATION')
_, accuracy = resnet.evaluate(x_test, y_test)

### Time Series Forecasting

**Single-step (1h) and Multi-step (24h) Windows**

In [None]:
single_step_window = WindowGenerator(
    train_df, val_df, test_df, 
    input_width=24, label_width=1, shift=1, 
    batch_size=128, label_columns=['T (degC)'])

multi_step_window = WindowGenerator(
    train_df, val_df, test_df, 
    input_width=24, label_width=24, shift=1,
    batch_size=128, label_columns=['T (degC)'])

train = single_step_window.train
valid = single_step_window.val
test = single_step_window.test
#train = multi_step_window.train
#valid = multi_step_window.val
#test = multi_step_window.test

In [None]:
resnet = Resnet(blocks=16, filters=[16, 32, 64], output_units=1).model
#resnet = ComplexResNet(blocks=14, filters=[12, 24, 48], output_units=1).model
resnet.summary()

In [None]:
resnet.compile( 
    optimizer=Adam(clipnorm=1.0),
    loss=MeanSquaredError(),
    metrics=[MeanAbsoluteError()]
)
print('MODEL TRAINING')
history = resnet.fit(train, validation_data=valid, epochs=10)
print('\nMODEL EVALUATION')
_, accuracy = resnet.evaluate(test)