<a href="https://colab.research.google.com/github/JohnYCLam/MachineLearning/blob/main/TensorFlow_and_PyTorch_ANN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#MNIST Dataset Demo - Tensorflow

In [None]:
import tensorflow as tf
import matplotlib.pyplot as plt

In [None]:
mnist = tf.keras.datasets.mnist

(x_train, y_train), (x_test, y_test) = mnist.load_data()

In [None]:
x_train = x_train / 255
x_test = x_test / 255

In [None]:
image = x_train[0]
label = y_train[0]

In [None]:
fig, ax = plt.subplots()
plt.imshow(image.squeeze(), cmap = "gray")
ax.set_title(f'Class Label: {label}')
plt.show();

In [None]:
image.shape

In [None]:
import numpy as np

output_shape = len(np.unique(y_train))

In [None]:
from tensorflow.keras.layers import Flatten, Dense, Dropout, Dense, Input

In [None]:
def simpleModel(input_shape, output_shape, act_units, drop_outs = None, act_fcn = 'relu'):

    model_sequence = [Input(shape = input_shape), Flatten()]
    for i in range(len(act_units)):
        model_sequence.append(Dense(act_units[i], activation = act_fcn))
        if type(drop_outs) == list:
            model_sequence.append(Dropout(drop_outs[i]))
    model_sequence.append(Dense(output_shape, activation = 'softmax'))
    model = tf.keras.models.Sequential(model_sequence)
    return model

In [None]:
model = simpleModel(image.shape, output_shape, [128], [0.2])
model.summary()

In [None]:
# difference between sparse categorical crossentropy and categorical crossentropy
# categorical crossentropy: the labels are in one-hot encoding, e.g. [0, 0, 0, 1]
# sparse categorical crossentropy: the labels are single values. e.g. 3
# https://stackoverflow.com/questions/58565394/what-is-the-difference-between-sparse-categorical-crossentropy-and-categorical-c
def model_train(model, x_train, y_train, x_test, y_test, loss, optimizer = 'adam', metrics = ['accuracy'], epochs = 10):

    model.compile(optimizer = optimizer, loss = loss, metrics = metrics)
    result = model.fit(x_train, y_train, validation_data = (x_test, y_test), epochs = epochs)
    return result

In [None]:
result = model_train(model, x_train, y_train, x_test, y_test, 'sparse_categorical_crossentropy')

In [None]:
#if we use categorical crossentropy, we have to convert the labels into one-hot encoding first

y_train = tf.one_hot(y_train, depth = 10)
y_test = tf.one_hot(y_test, depth = 10)

model_2 = simpleModel(image.shape, output_shape, [128], [0.2])
result_2 = model_train(model_2, x_train, y_train, x_test, y_test, 'categorical_crossentropy')

In [None]:
fig, ax = plt.subplots(1,2, figsize = (8, 4), sharey = True)

ax[0].plot(result.history['accuracy'], label = 'acc')
ax[0].plot(result.history['val_accuracy'], label = 'val_acc')
ax[0].set_title('Model 1 Accuracy')
ax[0].legend()
ax[1].plot(result_2.history['accuracy'], label = 'acc')
ax[1].plot(result_2.history['val_accuracy'], label = 'val_acc')
ax[1].set_title('Model 2 Accuracy')
ax[1].legend()
plt.show()

In [None]:
fig, ax = plt.subplots(1,2, figsize = (8, 4), sharey = True)

ax[0].plot(result.history['loss'], label = 'loss')
ax[0].plot(result.history['val_loss'], label = 'val_loss')
ax[0].set_title('Model 1 Loss')
ax[0].legend()
ax[1].plot(result_2.history['loss'], label = 'loss')
ax[1].plot(result_2.history['val_loss'], label = 'val_loss')
ax[1].set_title('Model 2 Loss')
ax[1].legend()
plt.show()

#MNIST Dataset Demo - PyTorch

In [None]:
import torch
from torch import nn

import torchvision
from torchvision.transforms import ToTensor

import matplotlib.pyplot as plt

In [None]:
!pip install torchinfo
!pip install torchmetrics

In [None]:
if torch.cuda.is_available():
    device = "cuda"
else:
    device = "cpu"

In [None]:
train_data = torchvision.datasets.MNIST("data", train = True, download = True, transform = ToTensor())
test_data = torchvision.datasets.MNIST("data", train = False, download = True, transform = ToTensor())

print(f'No. of training data: {len(train_data)} | No. of test data: {len(test_data)}')

In [None]:
image, label = train_data[0]
image.shape, label

In [None]:
class_names = train_data.classes
print(class_names)

In [None]:
fig, ax = plt.subplots()
plt.imshow(image.squeeze(), cmap = "gray")
ax.set_title(f'Class Label: {label}')
plt.show();

In [None]:
class simplePyTorchModel(nn.Module):
    def __init__(self, input_shape, output_shape, act_units, drop_outs = None, act_fcn = 'relu'):
        super().__init__()
        model_sequence = [nn.Flatten()]

        act_units.insert(0, input_shape)

        for i in range(len(act_units) - 1):

            model_sequence.append(nn.Linear(act_units[i], act_units[i + 1]))

            if act_fcn == 'relu':
                model_sequence.append(nn.ReLU())
            elif act_fcn == 'tanh':
                model_sequence.append(nn.Tanh())
            elif act_fcn == 'sigmoid':
                model_sequence.append(nn.Sigmoid())
            else:
                raise Exception("Unsupported Activation Function")

            if type(drop_outs) == list:
                model_sequence.append(nn.Dropout(drop_outs[i]))

        model_sequence.append(nn.Linear(act_units[-1], output_shape))

        self.block = nn.Sequential(*model_sequence)

    def forward(self, x):
        return self.block(x)

In [None]:
model = simplePyTorchModel(image.shape[1] * image.shape[2], len(class_names), [128], [0.2])
model.to(device)

In [None]:
from torchinfo import summary

summary(model, input_size = (50000, 1, 28, 28))

In [None]:
from torch.utils.data import DataLoader

def dataloader(train_data, test_data, batch_size):

    train_dataloader = DataLoader(train_data, batch_size = batch_size, shuffle = True)
    test_dataloader = DataLoader(test_data, batch_size = batch_size, shuffle = False)

    return train_dataloader, test_dataloader

In [None]:
train_dataloader, test_dataloader = dataloader(train_data, test_data, 32)

print(f'Number of batches in train data: {len(train_dataloader)}')
print(f'Number of batches in test data: {len(test_dataloader)}')

In [None]:
import torchmetrics

acc_fcn = torchmetrics.Accuracy(task = 'multiclass', num_classes = len(class_names)).to(device)
loss_fcn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params = model.parameters(), lr = 0.001)

In [None]:
def train_step(model, device, loss_fcn, acc_fcn, optimizer, train_dataloader):

    train_loss = 0
    train_acc = 0

    model.train()

    for batch, (X_train, y_train) in enumerate(train_dataloader):
        X_train = X_train.to(device)
        y_train = y_train.to(device)

        y_train_pred = model(X_train)

        batch_loss = loss_fcn(y_train_pred, y_train)
        train_loss += batch_loss
        train_acc += acc_fcn(y_train_pred, y_train)

        optimizer.zero_grad()

        batch_loss.backward()

        optimizer.step()

        if batch % 400 == 0:
            print(f'Read through {batch * len(X_train)}/{len(train_dataloader.dataset)} samples')

    train_loss /= len(train_dataloader)
    train_acc /= len(train_dataloader)

    return train_loss, train_acc


In [None]:
def test_step(model, device, loss_fcn, acc_fcn, optimizer, test_dataloader):

    test_loss = 0
    test_acc = 0

    model.eval()

    with torch.inference_mode():
        for X_test, y_test in test_dataloader:

            X_test = X_test.to(device)
            y_test = y_test.to(device)

            y_test_pred = model(X_test)

            test_loss += loss_fcn(y_test_pred, y_test)
            test_acc += acc_fcn(y_test_pred, y_test)

        test_loss /= len(test_dataloader)
        test_acc /= len(test_dataloader)

    return test_loss, test_acc

In [None]:
from tqdm.auto import tqdm
from timeit import default_timer as timer

def model_training(model, device, loss_fcn, acc_fcn, optimizer, train_dataloader, test_dataloader, epochs = 10):

    results = {'train_acc': [], 'train_loss': [], 'test_acc': [], 'test_loss': []}

    total_start_time = timer()
    for epoch in tqdm(range(epochs)):
        epoch_start_time = timer()
        print(f'Epoch: {epoch + 1}')

        train_loss, train_acc = train_step(model, device, loss_fcn, acc_fcn, optimizer, train_dataloader)

        test_loss, test_acc = test_step(model, device, loss_fcn, acc_fcn, optimizer, test_dataloader)

        results['train_loss'].append(train_loss)
        results['train_acc'].append(train_acc)
        results['test_loss'].append(test_loss)
        results['test_acc'].append(test_acc)

        epoch_end_time = timer()

        print(f"\nEpoch Train Time: {epoch_end_time - epoch_start_time:.4f}")
        print(f"Train Accuracy: {train_acc:.4f} | Train Loss: {train_loss:.4f} | Test Accuracy: {test_acc:.4f} | Test Loss: {test_loss:.4f} \n")

    total_end_time = timer()
    print(f"Total Time: {total_end_time - total_start_time:.4f}")

    return results

In [None]:
results = model_training(model, device, loss_fcn, acc_fcn, optimizer, train_dataloader, test_dataloader)

In [None]:
fig, ax = plt.subplots()

ax.plot(results['train_acc'], label = 'train_acc')
ax.plot(results['test_acc'], label = 'test_acc')
ax.set_title('Model Accuracy')
ax.legend()
plt.show()

In [None]:
fig, ax = plt.subplots()
results['train_loss'] = [loss.detach().numpy() for loss in results['train_loss']]
ax.plot(results['train_loss'], label = 'train_loss')
ax.plot(results['test_loss'], label = 'test_loss')
ax.set_title('Model Loss')
ax.legend()
plt.show()