# Better CNN model

Here I explore very similar architectures to the baseline (CNN - Flatten - Dense layers)

Hyperparameters experimented with:

- number of CNN and dense layers
- Dropout
- Activation function
- Other max pooling strategies
- learning rate scheduling


After training, model is serialized and uploaded to W&B project.

In [None]:
# installs Weights & Biases
! pip install wandb

In [None]:
import wandb
import tensorflow as tf
import numpy as np
import pathlib
import shutil
from typing import List

def load_data(run) -> List[tf.data.Dataset]:
    """
    Downloads datasets from a wandb artifact and loads them into a list of tf.data.Datasets.
    """

    artifact_name = f"letters_splits_tfds"
    artifact = run.use_artifact(f"master-thesis/{artifact_name}:latest")
    artifact_dir = pathlib.Path(
        f"./artifacts/{artifact.name.replace(':', '-')}"
    ).resolve()
    if not artifact_dir.exists():
        artifact_dir = artifact.download()
        artifact_dir = pathlib.Path(artifact_dir).resolve()

    # if tf.__version__ minor is less than 10, use
    # tf.data.experimental.load instead of tf.data.Dataset.load

    if int(tf.__version__.split(".")[1]) < 10:
        load_function = tf.data.experimental.load
    else:
        load_function = tf.data.Dataset.load
    
    output_list = []
    for split in ["train", "test", "val"]:
        ds = load_function(str(artifact_dir / split), compression="GZIP")
        output_list.append(ds)
    
    return output_list

def get_number_of_classes(ds: tf.data.Dataset) -> int:
    """
    Returns the number of classes in a dataset.
    """
    labels_iterator= ds.map(lambda x, y: y).as_numpy_iterator()
    labels = np.concatenate(list(labels_iterator))
    return len(np.unique(labels))

def preprocess_dataset(ds: tf.data.Dataset, batch_size: int, cache: bool = True) -> tf.data.Dataset:
    ds = ds.map(lambda x, y: (tf.cast(x, tf.float32) / 255.0, y))  # normalize
    ds = ds.unbatch().batch(batch_size)
    if cache:
        ds = ds.cache().prefetch(buffer_size=tf.data.AUTOTUNE)
    return ds

def calculate_model_size_on_disk(path: str) -> int:
    return pathlib.Path(path).stat().st_size    

def calculate_model_num_parameters(model: tf.keras.Model) -> int:
    return model.count_params()

def calculate_model_flops(model: tf.keras.Model) -> str:
    pass

In [None]:
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))
print("Available devices: ", tf.config.list_physical_devices())


In [None]:
baseline_additional_dense_layer = tf.keras.Sequential(
        [
            tf.keras.layers.InputLayer(input_shape=(32, 32, 1)),
            tf.keras.layers.Conv2D(32,kernel_size=(3, 3), activation="relu"),
            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
            tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(5376, activation="relu"),
            tf.keras.layers.Dense(256, activation="relu"),
            tf.keras.layers.Dense(128, activation="relu"),
            tf.keras.layers.Dense(num_classes, activation="softmax"),
        ]
    )

def get_model(input_shape=(32, 32, 1), num_classes=89, conv_filters=[32, 64, 128], dense_units=[5376, 256], activation_fn=tf.keras.layers.ReLU(), dropout_rate=0.0):
    """
    Returns a tf.keras.Sequential model with the specified parameters. By default returns baseline model.
    """
    print("Input shape: ", input_shape)
    print("Number of classes: ", num_classes)
    print(f"Building model with {conv_filters} conv filters, {dense_units} dense units, {activation_fn} activation function and {dropout_rate} dropout rate.")
    model = tf.keras.Sequential()
    model.add(tf.keras.layers.InputLayer(input_shape=input_shape))

    for num_conv_filters in conv_filters:
        model.add(tf.keras.layers.Conv2D(num_conv_filters, kernel_size=(3, 3)))
        model.add(activation_fn)
        model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2)))
    
    model.add(tf.keras.layers.Flatten())

    for num_dense_units in dense_units:
        model.add(tf.keras.layers.Dense(num_dense_units))
        model.add(activation_fn)
        if dropout_rate > 0.0:
            model.add(tf.keras.layers.Dropout(dropout_rate))
    
    model.add(tf.keras.layers.Dense(num_classes, activation="softmax"))

    return model
    
def train(model_name, config_defaults, num_classes=89):
    print("Initializing wandb")
    with wandb.init(
        project="master-thesis",
        job_type="training",
        name=model_name,
        config=config_defaults,
    ) as run:
        ds_train, ds_test, ds_val = load_data(run)

        num_classes = get_number_of_classes(ds_val)

        ds_train = preprocess_dataset(ds_train, batch_size=wandb.config.batch_size)
        ds_val = preprocess_dataset(ds_val, batch_size=wandb.config.batch_size)
        ds_test = preprocess_dataset(ds_test, batch_size=wandb.config.batch_size, cache=False)
        
        
        print(f"Model name: {model_name}")
        model = tf.keras.Sequential([
            tf.keras.layers.InputLayer(input_shape=(32, 32, 1)),
            tf.keras.layers.Conv2D(32,kernel_size=(3, 3), activation="relu"),
            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
            tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(256*2, activation="relu"),
            tf.keras.layers.Dense(256, activation="relu"),
            tf.keras.layers.Dense(num_classes, activation="softmax"),
            ]
        )

        model.compile(
            optimizer='adam',
            loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
            metrics=["accuracy"],
        )

        # save the best model
        checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
            filepath=f"./artifacts/{model_name}.h5",
            save_weights_only=False,
            monitor="val_accuracy",
            mode="max",
            save_best_only=True,
        )

        wandb_callback = wandb.keras.WandbCallback(
            save_model=False,
            compute_flops=True,
        )

        history = model.fit(
            ds_train,
            epochs=wandb.config.epochs,
            validation_data=ds_val,
            callbacks=[wandb_callback, checkpoint_callback],
        )
        print("Training finished")
        # calculate model size on disk, flops and number of parameters

        disk_size = calculate_model_size_on_disk(f"./artifacts/{model_name}.h5")
        num_parameters = calculate_model_num_parameters(model)

        # evaluate model on ds_test and log to wandb
        test_loss, test_acc = model.evaluate(ds_test)

        wandb.log(
            {
                "test loss": test_loss,
                "test accuracy": test_acc,
                "number of parameters": num_parameters,
                "disk size": disk_size,
            }
        )
        # save artifact to wandb
        artifact = wandb.Artifact(name=model_name, type="model")

        # save best model to artifact
        artifact.add_file(f"./artifacts/{model_name}.h5")
        run.log_artifact(artifact)
        run.finish()
        print("Evaluation finished")


In [None]:
defaults = dict(
    batch_size=128,
    epochs=100,   
)

train("better_cnn-smaller-dense-layer", defaults)