In [1]:
import tensorflow as tf

(x,y),(x_test,y_test)=tf.keras.datasets.fashion_mnist.load_data()
x_train=x[:55000,:]
y_train=y[:55000]
x_val=x[55000:,:]
y_val=y[55000:]
x_train,x_test,x_val=x_train/255.0,x_test/255.0,x_val/255.0

In [None]:
class Double_Boundary_HuberLoss(tf.keras.losses.Loss):
    def __init__(self,delta1=1,delta2=1.5,**kwargs):
        self.delta1=delta1
        self.delta2=delta2
        super().__init__(**kwargs)



# This commented one is for regression and the uncommented is for classfication.

#    def call(self,y_true,y_pred):
#        error=tf.abs(y_pred-y_true)
#        small_error=(error**2)/2
#        medium_error=(error*(error**(1/2)))/2
#        large_error=(error*self.delta2)-(self.delta1**2)/2
#        return tf.where(error<self.delta1,small_error,tf.where(error<self.delta2,medium_error,large_error))
    
    

    def call(self,y_true,y_pred):
        y_true=tf.one_hot(tf.cast(y_true,tf.int32),depth=tf.shape(y_pred)[-1])
        error=tf.abs(y_pred-y_true)
        small_error=(error**2)/2
        error=tf.clip_by_value(error,0,5)
        medium_error=(error*(error**(1/2)))/2
        large_error=(error*self.delta2)-(self.delta1**2)/2
        return tf.reduce_mean(tf.where(error<self.delta1,small_error,tf.where(error<self.delta2,medium_error,large_error)))
    def get_config(self):
        base_config=super().get_config()
        return {**base_config,"delta1":self.delta1,"delta2":self.delta2}

In [3]:
class he_normal(tf.keras.initializers.Initializer):
    def __call__(self,shape,dtype=None):
        n_inputs=shape[-2]
        stddev=tf.sqrt(2/tf.cast(n_inputs,tf.float32))
        return tf.random.normal(shape,stddev=stddev,dtype=dtype)
    def get_config(self):
        return {}
    
class l2_reg(tf.keras.regularizers.Regularizer):
    def __init__(self,penalty=0.01):
        self.penalty=penalty
    def __call__(self,data):
        return (self.penalty)*(tf.reduce_sum((data**2)))
    def get_config(self):
        return {"l2":self.penalty}
def nonneg(x):
        return tf.keras.activations.relu(x)
def mish(x):
        return x*tf.math.tanh(tf.math.softplus(x))


In [4]:
class HuberMetric(tf.keras.metrics.Metric):
    def __init__(self,delta1=1,delta2=2,name="HuberMetric",**kwargs):
        super().__init__(**kwargs)
        self.delta1=delta1
        self.delta2=delta2
        self.total=self.add_weight(name="total",initializer="zeros")
        self.count=self.add_weight(name="count",initializer="zeros")
    def update_state(self,y_true,y_pred):
        y_true=tf.cast(y_true,tf.int32)
        y_true=tf.one_hot(y_true,depth=tf.shape(y_pred)[-1])
        y_true=tf.cast(y_true,tf.float32)
        error=tf.abs(y_true-y_pred)

        is_small=tf.less(error,self.delta1)
        small_error=(error**2)/2

        is_mid=tf.logical_and(tf.greater_equal(error,self.delta1),tf.less(error,self.delta2))
        error=tf.clip_by_value(error,1e-7,1e3)
        mid_error=((error**2)*tf.sqrt(error))/2

        large_error=(error*self.delta1)-((1/2)*(self.delta2)**2)

        huber_val=tf.where(is_small,small_error,tf.where(is_mid,mid_error,large_error))
        self.total.assign_add(tf.reduce_sum(huber_val))
        self.count.assign_add(tf.cast(tf.size(huber_val),tf.float32))
    def result(self):
        return self.total/self.count
    def reset(self):
        self.total.assign(0)
        self.count.assign(0)

In [5]:
class MyDenseLayer(tf.keras.layers.Layer):
    def __init__(self,units,activation=None,**kwargs):
        super().__init__(**kwargs)
        self.units=units
        self.activation=tf.keras.activations.get(activation)
    def build(self,shape):
        n_inputs=shape[-1]
        self.weight=self.add_weight(
            shape=(n_inputs,self.units),
            initializer="he_normal",
            trainable=True
        )
        self.bias=self.add_weight(
            shape=(self.units,),
            initializer="zeros",
            trainable=True
        )
    def call(self,inputs):
        z=tf.matmul(inputs,self.weight)+self.bias
        return self.activation(z) if self.activation else z

In [6]:
class ResidualBlock(tf.keras.layers.Layer):
    def __init__(self,units,activation="relu",**kwargs):
        super().__init__(**kwargs)
        self.hidden1=tf.keras.layers.Dense(units,activation=activation)
        self.hidden2=tf.keras.layers.Dense(units)

    def call(self,inputs):
        z=self.hidden1(inputs)
        z=self.hidden2(z)
        return z

    def get_config(self):
        base_config=super().get_config()
        return {**base_config,"activation":self.activation,"units":self.units}

In [7]:
class MyModel(tf.keras.Model):
    def __init__(self,num_classes=10,**kwargs):
        super().__init__(**kwargs)
        self.flatten=tf.keras.layers.Flatten()
        
        self.hidden1=MyDenseLayer(256,activation=mish)

        self.resblock1=ResidualBlock(256)
        self.resblock2=ResidualBlock(256)

        self.hidden2=MyDenseLayer(64,activation=nonneg)
        self.hidden3=MyDenseLayer(32,activation=mish)
        
        self.classifier=MyDenseLayer(num_classes,activation="softmax")
        
    def call(self,inputs):
        x=self.flatten(inputs)
        x=self.hidden1(x)
        x=self.resblock1(x)+x
        x=self.resblock2(x)+x
        x=self.hidden2(x)
        x=self.hidden3(x)
        return self.classifier(x)
    def get_config(self):
        base=super().get_config()
        return {"num_classes":10,**base}

In [11]:
model=MyModel()
train_metric=HuberMetric()
val_metric=HuberMetric()
optimizer=tf.keras.optimizers.Adam()
loss_func=tf.keras.losses.SparseCategoricalCrossentropy()
initializer=he_normal()
train_acc=tf.keras.metrics.SparseCategoricalAccuracy()
val_acc=tf.keras.metrics.SparseCategoricalAccuracy()


ds_train=tf.data.Dataset.from_tensor_slices((x_train,y_train)).batch(32)
ds_val=tf.data.Dataset.from_tensor_slices((x_val,y_val)).batch(32)

n_epochs=5
for epoch in range(n_epochs):
    print(f"Epoch {epoch+1}:")
    for x_batch,y_batch in ds_train:
        with tf.GradientTape() as tape:
            logits=model(x_batch,training=True)
            loss=loss_func(y_batch,logits)
        grads=tape.gradient(loss,model.trainable_variables)
        optimizer.apply_gradients(zip(grads,model.trainable_variables))
        train_acc.update_state(y_batch,logits)
    print("Train Accuracy: ",train_acc.result().numpy())
    train_acc.reset_state()
    for x_batch,y_batch in ds_val:
        val_logits=model(x_batch,training=False)
        val_acc.update_state(y_batch,val_logits)
    print("Val_Score: ",val_acc.result().numpy())
    val_acc.reset_state()

Epoch 1:
Train Accuracy:  0.82036364
Val_Score:  0.8594
Epoch 2:
Train Accuracy:  0.8609273
Val_Score:  0.8652
Epoch 3:
Train Accuracy:  0.87665457
Val_Score:  0.8688
Epoch 4:
Train Accuracy:  0.88485456
Val_Score:  0.8712
Epoch 5:
Train Accuracy:  0.8924
Val_Score:  0.877
