In [1]:
import numpy as np 
import tensorflow as tf
import pandas as pd
from tensorflow import keras

In [6]:
class HuberLoss(keras.losses.Loss):
    def __init__(self, threshold=0.1, **kwargs):
        self.threshold= threshold
        super().__init__(**kwargs)
    def call(self, y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < self.threshold
        squared_error = tf.square(error) / 2
        linear_error = self.threshold * tf.abs(error) - self.threshold **2 / 2
        return tf.where(is_small_error, squared_error, linear_error)
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, 'threshold': self.threshold}


In [7]:
class MyL1Regulaizer(keras.regularizers.Regularizer):
    def __init__(self, factor):
        self.factor = factor
    def call(self, weights):
        return tf.reduce_sum((tf.abs(self.factor * weights)))
    def get_config(self):
        return{'factor' : self.factor}

In [8]:
def create_huber(threshold=1.0):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss  = threshold * tf.abs(error) - threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn

class HuberMetric(keras.metrics.Metric):
    def __init__(self, threshold, **kwargs):
        super().__init__(**kwargs)
        self.threshold = threshold
        self.function = create_huber()
        self.total = self.add_weight('total', initializer='zeros')
        self.count = self.add_weight('count', initializer='zeros')
    def update_state(self, y_true, y_pred, sample_weight=None):
        metric = self.function(y_true, y_pred)
        self.total.assign_add(tf.reduce_add(metric))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
    def result(self):
        return self.total / self.count
    def get_config(self):
        base_config = super().get_config()
        return{**base_config, 'threshold' : self.threshold}

In [10]:
exponential_layer = keras.layers.Lambda(lambda x : tf.exp(x))

In [11]:
class MyDense(keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.activation = keras.activations.get(activation)
    def build(self, batch_input_shape):
        self.kernel = self.add_weight(name='kernel', shape=[batch_input_shape[-1], self.units], initializer='glorot_normal')
        self.bias = self.add_weight(name='bias', shape=[self.units], initializer='zeros')
        super().build(batch_input_shape)
    def call(self, x):
        return self.activation(x @ self.kernel + self.bias)
    def compute_output_shape(self, batch_input_shape):
        return tf.TensorShape(batch_input_shape.aslist()[-1] + self.units)
    def get_config(self):
        base_config = super().get_config()
        return{**base_config, 'units':self.units, 'activation':keras.activation.serialize(self.activation)}
    

In [12]:
class AddGaussianNoise(keras.layers.Layer):
    def __init__(self, std, **kwargs):
        super().__init__(**kwargs)
        self.std = std
    def call(self, x, training=None):
        if training:
            noise = tf.random.normal(tf.shape(x), stddev= self.std)
            return x +noise
        if not training:
            return x
    def compute_output_shape(self, batch_input_shape):
        return batch_input_shape        

In [13]:
class ResidualBlock(keras.layers.Layer):
    def __init__(self, layers, units, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [keras.layers.Dense(units, activation='selu', kernel_initializer='lecun_normal') for _ in range(layers)]
    def call(self, input):
        z = input
        for layer in self.hidden:
            z = layer(z)
        return z +input

In [14]:
class ResidualRegressor(keras.models.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden1 = keras.layers.Dense(30, activation="elu",
                                          kernel_initializer="he_normal")
        self.block1 = ResidualBlock(2, 30)
        self.block2 = ResidualBlock(2, 30)
        self.out = keras.layers.Dense(output_dim)

    def call(self, inputs):
        Z = self.hidden1(inputs)
        for _ in range(1 + 3):
            Z = self.block1(Z)
        Z = self.block2(Z)
        return self.out(Z)

In [15]:
class LayerNormalization(keras.layers.Layer):
    def __init__(self, eps=0.001, **kwargs):
        super().__init__(**kwargs)
        self.eps = eps

    def build(self, batch_input_shape):
        self.alpha = self.add_weight(
            name="alpha", shape=batch_input_shape[-1:],
            initializer="ones")
        self.beta = self.add_weight(
            name="beta", shape=batch_input_shape[-1:],
            initializer="zeros")
        super().build(batch_input_shape) # must be at the end

    def call(self, X):
        mean, variance = tf.nn.moments(X, axes=-1, keepdims=True)
        return self.alpha * (X - mean) / (tf.sqrt(variance + self.eps)) + self.beta

    def compute_output_shape(self, batch_input_shape):
        return batch_input_shape

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "eps": self.eps}

In [None]:
class DepthWiseMaxPooling(keras.layers.Layer):
    def __init__(self, pool_size, strides=None, padding='VALID', **kwargs):
        if strides is None:
            strides = pool_size
        self.pool_size = pool_size
        self.padding = padding
        self.strides = strides
        super().__init__(**kwargs)
    def calls(self, inputs):
        return tf.nn.max_pool(inputs, ksize=(1,1,1,self.pool_size), strides=(1,1,1,self.pool_size), padding= self.padding )

In [5]:
class DepthWiseAvgPooling(keras.layers.Layer):
    def __init__(self, pool_size, strides=None, padding='VALID', **kwargs):
        if strides is None:
            strides = pool_size
        self.pool_size = pool_size
        self.padding = padding
        self.strides = strides
        super().__init__(**kwargs)
    def calls(self, inputs):
        return tf.nn.avg_pool(inputs, ksize=(1,1,1,self.pool_size), strides=(1,1,1,self.pool_size), padding= self.padding )

In [None]:
from functools import partial 
default_layer = partial(keras.layers.Conv2D, kernel_size=3, padding='SAME', strides=1, use_bias=False)

class ResidualUnit(keras.layers.Layer):
    def __init__(self, filters, strides=1, activation='relu', **kwargs):
        super().__init__(**kwargs)
        self.activation = keras.activations.get(activation)
        self.main_layers= [
            default_layer(filters, strides=strides),
            keras.layers.BatchNormalization(),
            self.activation,
            default_layer(filters),
            keras.layers.BatchNormalization()]
        self.skip_layers = []
        if strides > 1:
            self.skip_layers = [
                default_layer(filters, strides=strides),
                keras.layers.BatchNormalization()
            ]
    def call(self, inputs):
        z = inputs
        for layer in self.main_layers:
            z = layer(z)
        skip_z = inputs
        for skip_layer in self.skip_layers:
            skip_z = skip_layer(skip_z)
        return self.activation(z + skip_z)

In [2]:
class LNSimpleRNN(keras.layers.Layer):
    def __init__(self, units, activation='elu', **kwargs):
        super().__init__(**kwargs)
        self.output_size = units
        self.state_size = units
        self.activation = keras.activations.get(activation)
        self.layer_normalization = keras.layers.LayerNormalization
        self.simple_rnn_cell = keras.layers.SimpleRNNCell(units,
                                                          activation=None)
    def get_initial_state(self, inputs=None, batch_size=None, dtype=None):
        if inputs is not None:
            batch_size = tf.shape(inputs)[0]
            dtype = inputs.dtype
        return [tf.zeros([batch_size, self.state_size], dtype=dtype)]
    def call(self, inputs, states):
        outputs, new_states = self.simple_rnn_cell(inputs, states)
        norm_outputs = self.activation(self.layer_norm(outputs))
        return norm_outputs, [norm_outputs]
