In [8]:
# python standard library imports
from typing import Self, Any
from pathlib import Path

In [1]:
# model building imports
from keras import Model, Sequential, Input
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dropout, Dense
from keras.ops import add

In [2]:
# model training imports
from keras.optimizers import SGD
from keras.losses import CategoricalCrossentropy
from keras.metrics import CategoricalAccuracy, AUC, F1Score
from keras.callbacks import ModelCheckpoint, CSVLogger, LearningRateScheduler

In [3]:
# augmentation operations
from keras.layers import RandomBrightness, RandomFlip, RandomRotation
from keras.layers import Pipeline

In [4]:
# custom modules imports
from src.utils import load_cifar10_sample

In [5]:
# CIFAR-10

# specify input_shape and number of classes
input_shape = (32, 32, 3) # RGB
n_classes = 10

# 0. airplane
# 1. car
# 2. bird
# 3. cat
# 4. deer
# 5. dog
# 6. frog
# 7. horse
# 8. ship
# 9. truck

In [6]:
value_range = (0.0, 1.0)

augmentation_layer = Pipeline(
    [
        RandomBrightness(factor=0.1, value_range=value_range),
        RandomFlip(),
        RandomRotation(factor=0.1, fill_mode="reflect")
    ],
    name="augmentation_layer"
)

In [9]:
class MyTinyCNN(Model):
    """
    MyTinyCNN class, inherets from keras' Model class
    """

    def __init__(self: Self, activation: str = "relu") -> None:
        """
        Initialization
        """

        super().__init__(name="my_tiny_oo_cnn")

        self.augmentation_layer = augmentation_layer

        self.conv_layer_1 = Conv2D(
            filters=3 * 8,
            kernel_size=(3, 3),
            activation=activation,
            name="conv_layer_1"
        )
        self.max_pool_layer_1 = MaxPooling2D(
            pool_size=(2, 2),
            name="max_pool_layer_1"
        )

        # exemplify non-sequential nature of computation possible with
        # the functional and object-oriented methods
        self.conv_layer_2l = Conv2D(
            filters=3 * 16,
            kernel_size=(3, 3),
            activation=activation,
            name="conv_layer_2l",
            padding="same"
        )
        self.conv_layer_2r = Conv2D(
            filters=3 * 16,
            kernel_size=(2, 2),
            activation=activation,
            name="conv_layer_2r",
            padding="same"
        )
        self.max_pool_layer_2 = MaxPooling2D(
            pool_size=(2, 2),
            name="max_pool_layer_2"
        )

        self.flatten_layer = Flatten(name="flatten_layer")
        self.dropout = Dropout(rate=0.3)
        self.dense_layer = Dense(
            n_classes,
            activation="softmax",
            name="classification_head"
        )

    def call(self: Self, inputs: Any) -> Any:
        """
        Forward call
        """

        x = self.augmentation_layer(inputs)


        x = self.conv_layer_1(x)
        x = self.max_pool_layer_1(x)

        # exemplify non-sequential nature of computation possible with
        # the functional and object-oriented methods
        x_l = self.conv_layer_2l(x)
        x_r = self.conv_layer_2r(x)
        x = add(x_l, x_r)
        x = self.max_pool_layer_2(x)

        x = self.flatten_layer(x)
        x = self.dropout(x)

        return  self.dense_layer(x)

In [10]:
# Train our regularized MyTinyCNN:

In [11]:
X_train, y_train, X_test, y_test = load_cifar10_sample(1024, 128)

In [12]:
epochs = 32
batch_size = 16

In [13]:
# add L2 weight decay to the optimizer directly, don't add a new loss term
model = MyTinyCNN()
optimizer = SGD(learning_rate=0.01, name="optimizer", weight_decay=0.01)
loss = CategoricalCrossentropy(name="loss")

<div class="alert alert-info">

## **Note:**

O **`weight_decay=0.01`** é o $\alpha$ que multiplica a norma L2 dos pesos na função de perda.

$$ \text{loss} = L_{reg} + \alpha \times L_{W} = \frac{1}{2} \times (x - \hat{x})^2 + \alpha \times \left( \sum_{i=1}^{n} \left| w_i \right|^2 \right) $$

</div>


In [14]:
# metrics
categorical_accuracy = CategoricalAccuracy(name="accuracy")
auc = AUC(name="auc")
f1_score = F1Score(average="macro", name="f1_score")
metrics = [categorical_accuracy, auc, f1_score]

In [15]:
# traces the computation
model.compile(loss=loss, optimizer=optimizer, metrics=metrics)

In [None]:
# What are callbacks?
root_dir_path = Path(".")
checkpoint_file_path = root_dir_path / "checkpoint.keras"
metrics_file_path = root_dir_path = root_dir_path / "metrics.csv"

checkpoint_callback = ModelCheckpoint(
    checkpoint_file_path,                                             # Salva para cada epoch guarda o modelo no nosso PC
    monitor="val_loss",
    verbose=0
)
metrics_callback = CSVLogger(metrics_file_path)                       # Isto guarda as métricas no nosso PC

In [None]:
# What is a learning rate scheduler ?
def exp_decay_lr_scheduler(
    epoch: int,                                              # O 1º e o 2º argumentos têm de ser o epoch e o lr nesta ordem
    current_lr: float,
    factor: float = 0.95
) -> float:
    """
    Exponential decay learning rate scheduler
    """

    current_lr *= factor

    return current_lr

In [None]:
lr_scheduler_callback = LearningRateScheduler(exp_decay_lr_scheduler) # Ele fez um gráfico em que o lr é o y e o epoch é o x, e o lr vai diminuindo muito ao longo dos epochs

In [None]:
callbacks = [
    checkpoint_callback,
    metrics_callback,
    lr_scheduler_callback
]              

In [20]:
# train the model
_ = model.fit(
    X_train,
    y_train,
    batch_size=batch_size,
    epochs=epochs,
    validation_split=0.2,
    callbacks=callbacks,
    verbose=2
)

Epoch 1/32
52/52 - 4s - 72ms/step - accuracy: 0.1038 - auc: 0.4920 - f1_score: 0.0893 - loss: 2.3414 - val_accuracy: 0.1171 - val_auc: 0.5356 - val_f1_score: 0.0826 - val_loss: 2.2989 - learning_rate: 0.0095
Epoch 2/32
52/52 - 1s - 15ms/step - accuracy: 0.1197 - auc: 0.5233 - f1_score: 0.1049 - loss: 2.3048 - val_accuracy: 0.1415 - val_auc: 0.5622 - val_f1_score: 0.1110 - val_loss: 2.2796 - learning_rate: 0.0090
Epoch 3/32
52/52 - 1s - 15ms/step - accuracy: 0.1392 - auc: 0.5534 - f1_score: 0.1076 - loss: 2.2879 - val_accuracy: 0.1268 - val_auc: 0.5829 - val_f1_score: 0.0934 - val_loss: 2.2685 - learning_rate: 0.0086
Epoch 4/32
52/52 - 1s - 15ms/step - accuracy: 0.1429 - auc: 0.5733 - f1_score: 0.1221 - loss: 2.2774 - val_accuracy: 0.1317 - val_auc: 0.5511 - val_f1_score: 0.0897 - val_loss: 2.2851 - learning_rate: 0.0081
Epoch 5/32
52/52 - 1s - 14ms/step - accuracy: 0.1600 - auc: 0.5820 - f1_score: 0.1194 - loss: 2.2695 - val_accuracy: 0.2098 - val_auc: 0.6114 - val_f1_score: 0.1799 - v

In [21]:
# evaluate on the test set
model.evaluate(
    X_test,
    y_test,
    batch_size=batch_size,
    return_dict=True,
    verbose=0
)

{'accuracy': 0.25,
 'auc': 0.7233818769454956,
 'f1_score': 0.2239234894514084,
 'loss': 2.0405728816986084}

In [22]:
# What is label smoothing?

In [23]:
# Next class:
# Real data, real models, real world