# Initialize

## Check if we are using CPU or GPU

In [None]:
import tensorflow as tf
tf.test.gpu_device_name()

'/device:GPU:0'

## Import the requirments utils or libs

In [None]:
!pip install tensorflow==2.18.0

Collecting tensorflow==2.18.0
  Downloading tensorflow-2.18.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.1 kB)
Collecting tensorboard<2.19,>=2.18 (from tensorflow==2.18.0)
  Downloading tensorboard-2.18.0-py3-none-any.whl.metadata (1.6 kB)
Collecting ml-dtypes<0.5.0,>=0.4.0 (from tensorflow==2.18.0)
  Downloading ml_dtypes-0.4.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (20 kB)
Downloading tensorflow-2.18.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (615.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m615.4/615.4 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ml_dtypes-0.4.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.2/2.2 MB[0m [31m54.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading tensorboard-2.18.0-py3-none-any.whl (5.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5

In [None]:
import time
import numpy as np
import tensorflow as tf
from tensorflow import keras

# Beginner Version Model (Easy, Simple)

## Preparing the dataset
* We use MNIST as the dataset

In [None]:
from tensorflow.keras.datasets import mnist

# Load dataset, note that x_train, y_train, x_test, y_test are numpy arrays
(x_train, y_train), (x_test, y_test) = mnist.load_data()

# Convert numpy arrays to tensors
x_train = tf.convert_to_tensor(x_train, dtype=tf.float32) # [60000, 28, 28]
y_train = tf.convert_to_tensor(y_train, dtype=tf.int32) # [60000]
x_test = tf.convert_to_tensor(x_test, dtype=tf.float32) # [10000, 28, 28]
y_test = tf.convert_to_tensor(y_test, dtype=tf.int32) # [10000]

# Scale the dataset and add a channel dimension
x_train = x_train / 255.0
x_train = tf.expand_dims(x_train, axis=-1) # [60000, 28, 28, 1]
x_test = x_test / 255.0
x_test = tf.expand_dims(x_test, axis=-1) # [60000, 28, 28, 1]

print(x_train.shape)
print(y_train.shape)
print(x_test.shape)
print(y_test.shape)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
[1m11490434/11490434[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 0us/step
(60000, 28, 28, 1)
(60000,)
(10000, 28, 28, 1)
(10000,)


## Build the beginner version model

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten, Conv2D, MaxPooling2D

beginner_model = Sequential([
    Conv2D(filters=6, kernel_size=8, activation='relu'),
    MaxPooling2D(pool_size=(2, 2)),
    Conv2D(filters=15, kernel_size=4, activation='relu'),
    MaxPooling2D(pool_size=(2, 2)),
    Flatten(),
    Dense(128, activation='relu'),
    Dense(10, activation='softmax') # Outputs a probability distribution
])

## Training & Evaluating the beginner version model

In [None]:
beginner_model.compile(
  optimizer='adam',
  loss='sparse_categorical_crossentropy',
  metrics=['accuracy'] # Percentage of good predictions
)

beginner_model.fit(x_train, y_train, epochs=3, batch_size=1024)

beginner_model.evaluate(x_test, y_test, verbose=2)

Epoch 1/3
[1m59/59[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 24ms/step - accuracy: 0.5078 - loss: 1.7675
Epoch 2/3
[1m59/59[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step - accuracy: 0.8857 - loss: 0.3849
Epoch 3/3
[1m59/59[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 5ms/step - accuracy: 0.9240 - loss: 0.2591
313/313 - 2s - 5ms/step - accuracy: 0.9384 - loss: 0.2019


[0.2019142061471939, 0.9383999705314636]

# Expert Version Model (Hard, Complex)

## Preparing the dataset

In [None]:
from tensorflow.keras.datasets import mnist

# Load dataset, note that x_train, y_train, x_test, y_test are numpy arrays
(x_train, y_train), (x_test, y_test) = mnist.load_data()

# Convert numpy arrays to tensors
x_train = tf.convert_to_tensor(x_train, dtype=tf.float32) # [60000, 28, 28]
y_train = tf.convert_to_tensor(y_train, dtype=tf.int32) # [60000]
x_test = tf.convert_to_tensor(x_test, dtype=tf.float32) # [10000, 28, 28]
y_test = tf.convert_to_tensor(y_test, dtype=tf.int32) # [10000]

# Scale the dataset and add a channel dimension
x_train = x_train / 255.0
x_train = tf.expand_dims(x_train, axis=-1) # [60000, 28, 28, 1]
x_test = x_test / 255.0
x_test = tf.expand_dims(x_test, axis=-1) # [60000, 28, 28, 1]

# Make sure the shapes are correct
print(x_train.shape)
print(y_train.shape)
print(x_test.shape)
print(y_test.shape)

# Split dataset with batch size of 1024
BATCH_SIZE = 1024
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(BATCH_SIZE)
test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(BATCH_SIZE)
print(train_dataset)
print(test_dataset)

(60000, 28, 28, 1)
(60000,)
(10000, 28, 28, 1)
(10000,)
<_BatchDataset element_spec=(TensorSpec(shape=(None, 28, 28, 1), dtype=tf.float32, name=None), TensorSpec(shape=(None,), dtype=tf.int32, name=None))>
<_BatchDataset element_spec=(TensorSpec(shape=(None, 28, 28, 1), dtype=tf.float32, name=None), TensorSpec(shape=(None,), dtype=tf.int32, name=None))>


## Build custom layers for export version model
* Structure:
  - Inputs Layer
  - Hidden Layer 1: ConvPool2D
  - Hidden Layer 2: ConvPool2D
  - Hidden Layer 3: Flatten
  - Hidden Layer 4: DoubleDense
  - Outputs Layer
* The above structure is a basic CNN model for classify graphs or pictures
* **Note**: `__init__` is called when creating the layer, `build` is called when the first time the layer is used, and `call` is what the layer actually does.

### Initialize algorithms for layers
* 如同上述結構，我們會需要以下算法來幫助我們建立各層

In [None]:
from tensorflow.keras.layers import Dense, Flatten, Conv2D, MaxPooling2D

### Linear
* 全連接層, Fully Connected Layer
* 這層通常是 神經網路中最基本的層
* 主要是在做線性變換，透過此公式：$y = Wx + b$
  - $W$: weighted matrix
  - $b$: bias
* 用於 將輸入轉換成不同的特徵空間，例如從 128 維度轉換成 64 維度。
* 因為這邊是要用於做圖像分類，所以 $W$ 為權重矩陣。
* 應用：最後的分類層（softmax 通常跟 Linear 一起用），將特徵數壓縮或展開

In [None]:
from tensorflow.keras.layers import Layer

class Linear(Layer):
  """y = Wx + b"""

  def __init__(self, units=32):
    super(Linear, self).__init__()
    self.units = units

  def build(self, input_shape):
    self.W = self.add_weight(
      shape=(input_shape[-1], self.units),
      initializer='random_normal',
      trainable=True
    )
    self.b = self.add_weight(
      shape=(self.units, ),
      initializer='random_normal',
      trainable=True
    )

  def call(self, inputs):
    # inputs -> W*inputs + b -> return
    return tf.matmul(inputs, self.W) + self.b

### Double Dense
* 將兩個 Dense（全連接層）疊加起來：linear_1, linear2
* 搭配兩個 activation function（激活函數）：ReLU, Softmax
  - ReLU（Rectified Linear Unit）:
    - 這是一個非線性激活函數：$f(x) = max(0, x)$
    - 用於讓網路可以學習**非線性特徵**（避免只有線性轉換）
    - 相比於 sigmoid（另一個 activation function），ReLU不會有梯度消失的問題（ vanishing gradient ），因此訓練速度快、效果更好

  - Softmax:
    - 這是一個 歸一化函數：$\sigma(z)i = \frac{e^{z_i}}{\sum{j=1}^{n} e^{z_j}}$
    - 通常用在**最後一層輸出，讓輸出變成「機率分佈」**，適合用來做分類問題
    - 可以把原始數值轉換成機率
    - 適合多分類問題（multi-class classficication）

In [None]:
class DoubleDense(Layer):
  """Linear-relu + Linear-softmax"""

  def __init__(self, nb_classes):
    super(DoubleDense, self).__init__()
    self.nb_classes = nb_classes

  def build(self, input_shape):
    self.linear_1 = Linear(units=128)
    self.linear_2 = Linear(units=self.nb_classes)

  def call(self, inputs):
    # inputs -> Linear(128) + ReLU -> Linear(10) + Softmax -> return
    x = tf.nn.relu(self.linear_1(inputs))
    x = tf.nn.softmax(self.linear_2(x))
    return x

### ConvPool2D
* 此隱藏層其實包含了兩個部分：Conv（卷積層）+ ReLU（激活函數），MaxPooling2D（池化層）
* Conv2D（卷積層）：
  - 用 kernel（卷積核）來掃描圖片，提取特徵（邊緣、形狀、輪廓、紋理等）。
  - 產生的結果將會是另一張新的圖片，但像素值會是經過處理的

* ReLU（激活函數）：
  - 類似之前在 Linear 中的 ReLU，此處的是通常在 Conv2D 層中都會加入的，用於把任何小於0的數字變成0
  - 如此一來可以**去除雜音**，讓神經網路學到更有意義的特徵

* MaxPooling2D（最大池化層）:
  - **用於降低維度（ downsampling ），以減少計算量**
  - 概念：**讓最重要的特徵保留下來**，去除不重要的細節
  - 用一個**小窗口（通常是2x2）**，在圖片上滑動（ ex. stride=2 ），**每個區塊只保留最大值**

In [None]:
class ConvPool2D(Layer):
  """Conv2D-relu + MaxPooling2D"""

  def __init__(self, nb_kernels, kernel_size):
    super(ConvPool2D, self).__init__()
    self.nb_kernels = nb_kernels
    self.kernel_size = kernel_size

  def build(self, input_shape):
    self.conv_2D = Conv2D(
      filters=self.nb_kernels,
      kernel_size=self.kernel_size,
      activation='relu'
    )
    self.pool_2D = MaxPooling2D(pool_size=(2, 2))

  def call(self, inputs):
    # inputs -> Convolution -> Max Pooling -> return
    x = self.conv_2D(inputs)
    x = self.pool_2D(x)
    return x

### Define the model using the above layers

In [None]:
from tensorflow.keras import Model

class ExportModel(Model):
  def __init__(self, nb_classes):
    super(ExportModel, self).__init__()
    self.nb_classes = nb_classes

    # Make sure to initialize these variables,
    # or we will get wrong while we invoke train_steps() in the training loop
    self.x_0 = None
    self.x_1 = None
    self.x_2 = None
    self.predictions = None

  def build(self, input_shape):
    self.conv_pool_1 = ConvPool2D(nb_kernels=6, kernel_size=8)
    self.conv_pool_2 = ConvPool2D(nb_kernels=15, kernel_size=4)
    self.flatten = Flatten()
    self.double_dense = DoubleDense(nb_classes=self.nb_classes)

  def call(self, inputs):
    self.x_0 = self.conv_pool_1(inputs)
    self.x_1 = self.conv_pool_2(self.x_0)
    self.x_2 = self.flatten(self.x_1)
    self.predictions = self.double_dense(self.x_2)
    return self.predictions

### Creating the model using pre-made functions

In [None]:
loss_function = tf.keras.losses.SparseCategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam()

export_model = ExportModel(nb_classes=10)
export_model.compile(optimizer=optimizer, loss=loss_function)

### Training the model

In [None]:
export_model.fit(train_dataset, epochs=3)

Epoch 1/3
[1m59/59[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 25ms/step - loss: 1.9921
Epoch 2/3
[1m59/59[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 0.4520
Epoch 3/3
[1m59/59[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 5ms/step - loss: 0.2987


<keras.src.callbacks.history.History at 0x7c2c2011cf90>

### The Summary of the model

In [None]:
export_model.summary()

## Build a custom training loop

### Initialize the loss function and optimizer we will use in the training loop
* `loss_function` 是用來在之後的 training loop 的每次執行時會計算 batch 的 loss，例如：`loss = loss_function(y_true, y_pred)`，這樣會得到單一批次（ batch ）的 loss
* `optimizer` 也是在 training loop 的每次執行時去依照計算的 loss 來調整模型
* `train_loss` 和 `train_accuracy` 則是用來累計並追蹤 loss 和 accuracy
  - `ts.keras.metrics.Mean(name='train_loss')` 和 `tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')` 這兩者是 Tensorflow 的 metric 物件，專門用來累計並計算整個 epoch 的 loss 和 accuracy，而不是只計算單次 batch
* `test_loss` 和 `test_accuracy` 則是用來在測試模型（我們會用不同的資料來對模型做訓練和測試）時，累計並追蹤 loss 和 accuracy

In [None]:
loss_function = tf.keras.losses.SparseCategoricalCrossentropy()

optimizer = tf.keras.optimizers.Adam()

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

test_loss = tf.keras.metrics.Mean(name='test_loss')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')

# !Important: We have to reset the above variables every time before we do the training loop

### Building the training step

In [None]:
def train_step(images, labels, model, loss_function, optimizer):

  # Open a GradientTape
  with tf.GradientTape() as tape:
    # Forward pass
    predictions = model(images)

    # Calculate the loss for this batch
    loss = loss_function(labels, predictions)

  # Get gradient of loss (weights)
  gradients = tape.gradient(loss, model.trainable_variables)

  # Update weights
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  # Calculate the loss and accuracy
  train_loss(loss)
  train_accuracy(labels, predictions)

### Building a testing step

In [None]:
def test_step(images, labels, model, loss_function, optimizer):
  # Forward pass
  predictions = model(images)

  # Calculate the loss for this batch
  loss = loss_function(labels, predictions)

  # Note that we don't do optimize here,
  # since we are only testing the model

  # Save loss and accuracy
  test_loss(loss)
  test_accuracy(labels, predictions)

### Build the custom training loop

In [None]:
# Since the train_loss, train_accuracy are traced by Tensorflow keras,
# we have to reset it here
loss_function = tf.keras.losses.SparseCategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam()

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

test_loss = tf.keras.metrics.Mean(name='test_loss')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')

export_model = ExportModel(nb_classes=10)
epochs = 3
start = time.time()

# Iterate over epochs
for epoch in range(epochs):

  # Train over every batch in the training dataset
  for images, labels in train_dataset:
    train_step(images, labels, export_model, loss_function, optimizer)

  # Test over every batch in the testing dataset
  for test_images, test_labels in test_dataset:
    test_step(test_images, test_labels, export_model, loss_function, optimizer)

  # Print the results for this epoch
  template = 'Epoch {:.0f}, Loss: {:.3f}, Accuracy: {:.3f}%, Test Loss: {:.3f}, Test Accuracy: {:.3f}%'
  print(template.format(epoch + 1,
                        train_loss.result(),
                        train_accuracy.result() * 100,
                        test_loss.result(),
                        test_accuracy.result() * 100))

  # Reset the metrics for the next epoch
  train_loss.reset_state()
  train_accuracy.reset_state()
  test_loss.reset_state()
  test_accuracy.reset_state()

# Display elapsed time
end = time.time()
print("Time: ", end - start)

Epoch 1, Loss: 1.422, Accuracy: 65.697, Test Loss: 0.466, Test Accuracy: 86.180
Epoch 2, Loss: 0.408, Accuracy: 88.222, Test Loss: 0.306, Test Accuracy: 91.020
Epoch 3, Loss: 0.293, Accuracy: 91.355, Test Loss: 0.234, Test Accuracy: 93.040
Time:  11.845203638076782


### Building a **graph** to speed up training
* By adding @tf.function decorator before the training and testing functions

In [None]:
@tf.function
def graph_train_step(images, labels, model, loss_function, optimizer):

  # Open a GradientTape
  with tf.GradientTape() as tape:
    # Forward pass
    predictions = model(images)

    # Calculate the loss for this batch
    loss = loss_function(labels, predictions)

  # Get gradient of loss (weights)
  gradients = tape.gradient(loss, model.trainable_variables)

  # Update weights
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  # Save loss and accuracy
  train_loss(loss)
  train_accuracy(labels, predictions)

In [None]:
@tf.function
def graph_test_step(images, labels, model, loss_function, optimizer):
  # Forward pass
  predictions = model(images)

  # Calculate the loss for this batch
  loss = loss_function(labels, predictions)

  # Note that we don't do optimize here,
  # since we are only testing the model

  # Save loss and accuracy
  test_loss(loss)
  test_accuracy(labels, predictions)

**Notice the time different between the original training loop**

In [None]:
# Since the train_loss, train_accuracy are traced by Tensorflow keras,
# we have to reset it here
loss_function = tf.keras.losses.SparseCategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam()

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

test_loss = tf.keras.metrics.Mean(name='test_loss')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')

export_model = ExportModel(nb_classes=10)
epochs = 3
start = time.time()

# Iterate over epochs
for epoch in range(epochs):

  # Train over every batch in the training dataset
  for images, labels in train_dataset:
    graph_train_step(images, labels, export_model, loss_function, optimizer)

  # Test over every batch in the testing dataset
  for test_images, test_labels in test_dataset:
    graph_test_step(test_images, test_labels, export_model, loss_function, optimizer)

  # Print the results for this epoch
  template = 'Epoch {:.0f}, Loss: {:.3f}, Accuracy: {:.3f}%, Test Loss: {:.3f}, Test Accuracy: {:.3f}%'
  print(template.format(epoch + 1,
                        train_loss.result(),
                        train_accuracy.result() * 100,
                        test_loss.result(),
                        test_accuracy.result() * 100))

  # Reset the metrics for the next epoch
  train_loss.reset_state()
  train_accuracy.reset_state()
  test_loss.reset_state()
  test_accuracy.reset_state()

# Display elapsed time
end = time.time()
print("Time: ", end - start)

Epoch 1, Loss: 1.363, Accuracy: 64.433, Test Loss: 0.561, Test Accuracy: 82.970
Epoch 2, Loss: 0.480, Accuracy: 85.347, Test Loss: 0.364, Test Accuracy: 89.110
Epoch 3, Loss: 0.346, Accuracy: 89.595, Test Loss: 0.278, Test Accuracy: 91.440
Time:  3.5787980556488037


## Additions

### Adding Regularization to the loss function

In [None]:
class ConvPool2DWithRegularization(Layer):
  """Conv2D-relu + MaxPooling2D with Regularization"""

  def __init__(self, nb_kernels, kernel_size):
    super(ConvPool2D, self).__init__()
    self.nb_kernels = nb_kernels
    self.kernel_size = kernel_size

  def build(self, input_shape):
    self.conv_2D = Conv2D(
      filters=self.nb_kernels,
      kernel_size=self.kernel_size,
      activation='relu',
      # using keras's parameters
      # kernel_regularizer=tf.keras.regularizers.l2(1.)
    )
    self.pool_2D = MaxPooling2D(pool_size=(2, 2))

  def call(self, inputs):
    x = self.conv_2D(inputs)

    # using custom layer's loss property
    # self.l1_reg = tf.reduce_sum(tf.abs(self.W) + tf.reduce_sum(tf.abs(self.b)))
    # self.add_loss(self.l1_reg)

    x = self.pool_2D(x)
    return x

In [None]:
def train_step(images, labels, model, loss_function, optimizer):

  with tf.GradientTape() as tape:
    predictions = model(images)

    loss = loss_function(labels, predictions)

    # Add extra losses created during this forward pass
    loss += 1e-3 * sum(model.losses)

  gradients = tape.gradient(loss, model.trainable_variables)

  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_loss(loss)
  train_accuracy(labels, predictions)

### Custom loss function

In [None]:
from tensorflow.keras.losses import Loss

class CustomLoss(Loss):
  """Custom Sparse Cross Entropy loss with L1 Regularization"""

  def __init__(self, tuning_param, model) -> None:
    super(CustomLoss, self).__init__()
    self.tuning_param = tuning_param
    self.SCE = tf.kerase.losses.SparseCategoricalCrossentropy()
    self.model = model

  def call(self, y_true, y_pred):
    return self.SCE(y_true, y_pred) + self.tuning_param * sum(self.model.losses)

# And just to replace the loss function in the training loop with `custom_loss_function` which is instantiated from `CustomLoss`