In [1]:
import numpy as np
import matplotlib.pyplot as plt
from termcolor import colored

import tensorflow as tf
import tensorflow_datasets as tfds

from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Flatten, Dense, Activation

from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow.keras.optimizers import SGD, Adam

from tensorflow.keras.metrics import Mean, SparseCategoricalAccuracy


In [2]:
import numpy as np
import matplotlib.pyplot as plt
from termcolor import colored

import tensorflow as tf
import tensorflow_datasets as tfds

from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Flatten, Dense, Activation

from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow.keras.optimizers import SGD, Adam

from tensorflow.keras.metrics import Mean, SparseCategoricalAccuracy



# %%
class ResidualUnit(tf.keras.Model):
    def __init__(self, filter_in, filter_out, kernel_size):
        super(ResidualUnit, self).__init__()
        self.bn1 = tf.keras.layers.BatchNormalization()
        self.conv1 = tf.keras.layers.Conv2D(filter_out, kernel_size, padding ='same')
        
        self.bn2 = tf.keras.layers.BatchNormalization()
        self.conv2 = tf.keras.layers.Conv2D(filter_out, kernel_size, padding ='same')
        
        if filter_in == filter_out:
            self.identity = lambda x: x
        else:
            self.identity = tf.keras.layers.Conv2D(filter_out, (1, 1), padding = 'same')
        
        
    def call(self, x, training=False, mask=None):
        h = self.bn1(x, training=training)
        h = tf.nn.relu(h)
        h = self.conv1(h)
        
        h = self.bn2(h, training=training)
        h = tf.nn.relu(h)
        h = self.conv2(h)
        return self.identity(x) + h

# %%
class ResnetLayer(tf.keras.Model):
    def __init__(self, filter_in, filters, kernel_size):
        super(ResnetLayer, self).__init__()
        self.sequence = list()
        for f_in, f_out in zip([filter_in] + list(filters), filters):
            self.sequence.append(ResidualUnit(f_in, f_out, kernel_size))
        
    def call(self, x, training=False, mask=None):
        for unit in self.sequence:
            x = unit(x, training=training)
        return x
    
# %%
class ResNet(tf.keras.Model):
    def __init__(self):
        super(ResNet, self).__init__()
        self.conv1 = tf.keras.layers.Conv2D(8, (3, 3), padding = 'same', activation='relu') # 28x28x8
        
        self.res1 = ResnetLayer(8, (16, 16), (3, 3)) # 28x28x16
        self.pool1 = tf.keras.layers.MaxPool2D((2, 2)) # 14x14x16
        
        self.res2 = ResnetLayer(16, (32, 32), (3, 3)) # 14x14x32
        self.pool2 = tf.keras.layers.MaxPool2D((2, 2)) # 7x7x32
        
        self.res3 = ResnetLayer(32, (64, 64), (3, 3)) # 7x7x64
        
        self.flatten = tf.keras.layers.Flatten()
        self.dense1 = tf.keras.layers.Dense(128, activation='relu')
        self.dense2 = tf.keras.layers.Dense(10, activation='softmax')
    
    def call(self, x, training=False, mask=None):
        x = self.conv1(x)
        
        x = self.res1(x, training = training)
        x = self.pool1(x)
        x = self.res2(x, training = training)
        x = self.pool2(x)
        x = self.res3(x, training = training)
        
        x = self.flatten(x)
        x = self.dense1(x)
        return self.dense2(x)
    
# %%
# Implement training loop
@tf.function
def train_step(model, images, labels, loss_object, optimizer, train_loss, train_accuracy):
    with tf.GradientTape() as tape:
        predictions = model(images, training=True)
        loss = loss_object(labels, predictions)
    gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    train_loss(loss)
    train_accuracy(labels, predictions)

# Implement algorithm test
@tf.function
def test_step(model, images, labels, loss_object, test_loss, test_accuracy):
    predictions = model(images, training=False)

    t_loss = loss_object(labels, predictions)
    test_loss(t_loss)
    test_accuracy(labels, predictions)
    
# %% 
def get_Pcam_ds():
    (train_validation_ds, test_ds), ds_info = tfds.load(name='patch_camelyon',
                                                            shuffle_files=True,
                                                            as_supervised=True,
                                                            split=['train', 'test'],
                                                            with_info=True)
    
    n_train_validation = ds_info.splits['train'].num_examples
    
    train_ratio = 0.8
    n_train = int(n_train_validation * train_ratio)
    n_validation = n_train_validation - n_train
    
    train_ds = train_validation_ds.take(n_train)
    remaining_ds = train_validation_ds.skip(n_train)
    validation_ds = remaining_ds.take(n_validation)
    
    return train_ds, validation_ds, test_ds

# %%
def standardization(TRAIN_BATCH_SIZE, TEST_BATCH_SIZE):
    global train_ds, validation_ds, test_ds
    
    def stnd(images, labels):
        images = tf.cast(images, tf.float32) / 255.
        return (images, labels)
    
    train_ds = train_ds.map(stnd).shuffle(1000).batch(TRAIN_BATCH_SIZE)
    validation_ds = validation_ds.map(stnd).batch(TEST_BATCH_SIZE)
    test_ds = test_ds.map(stnd).batch(TEST_BATCH_SIZE)
    
# %%

class Pcam_Classifier(Model):
    def __init__(self):
        super(Pcam_Classifier, self).__init__()
        
        self.flatten = Flatten()
        self.d1 = Dense(64, activation='relu')
        self.d2 = Dense(10, activation='softmax')

    def call(self, x):
        x = self.flatten(x)
        x = self.d1(x)
        x = self.d2(x)
        return x
# %% 
def load_metrics():
    global train_loss, train_acc
    global validation_loss, validation_acc
    global test_loss, test_acc
    
    train_loss = Mean()
    validation_loss = Mean()
    test_loss = Mean()
    
    train_acc = SparseCategoricalAccuracy()
    validation_acc = SparseCategoricalAccuracy()
    test_acc = SparseCategoricalAccuracy()

# %%
def trainer():
    global train_ds, model, loss_object, optimizer
    global train_loss, train_acc

    for images, labels in train_ds:
        with tf.GradientTape() as tape:
            predictions = model(images)
            loss = loss_object(labels, predictions)
            
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        
        train_loss(loss)
        train_acc(labels, predictions)
        
# %%
def validation():
    global validation_ds, model, loss_object
    global validation_loss, validation_acc

    for images, labels in validation_ds:
        
        predictions = model(images)
        loss = loss_object(labels, predictions)
        
        validation_loss(loss)
        validation_acc(labels, predictions)

# %%
def tester():
    global test_ds, model, loss_object
    global test_loss, test_acc

    for images, labels in test_ds:
        
        predictions = model(images)
        loss = loss_object(labels, predictions)
        
        test_loss(loss)
        test_acc(labels, predictions)

    template = 'Test Loss: {:.4f}\t Test Accuracy: {:.2f}%\n'
    print(template.format(test_loss.result(),
                          test_acc.result()*100))
 # %%   
def train_reporter():
    global epoch
    global train_loss, train_acc
    global validation_loss, validation_acc

    print(colored('Epoch', 'red', 'on_white'), epoch + 1)
    template = 'Train Loss: {:.4f}\t Train Accuracy: {:.2f}%\n' + 'Validation Loss: {:.4f}\t Validation Accuracy: {:.2f}%\n'
        
    print(template.format(train_loss.result(),
                          train_acc.result()*100,
                          validation_loss.result(),
                          validation_acc.result()*100))
    
    train_acc.reset_states()
    train_loss.reset_states()
    validation_loss.reset_states()
    validation_acc.reset_states() 
# %%

# Create model
model = ResNet()

# Define loss and optimizer
loss_object = tf.keras.losses.SparseCategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam()

# Define performance metrics
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

test_loss = tf.keras.metrics.Mean(name='test_loss')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')





In [None]:
# %%
EPOCHS = 10
LR = 0.001
TRAIN_BATCH_SIZE = 16
TEST_BATCH_SIZE = 32

train_ds, validation_ds, test_ds = get_Pcam_ds()
standardization(TRAIN_BATCH_SIZE, TEST_BATCH_SIZE)

model = Pcam_Classifier()
loss_object = SparseCategoricalCrossentropy()
optimizer = SGD(learning_rate=LR)

load_metrics()

for epoch in range(EPOCHS):
    trainer()
    validation()
    train_reporter()

tester()

[47m[31mEpoch[0m 1
Train Loss: 0.6208	 Train Accuracy: 64.38%
Validation Loss: 0.9001	 Validation Accuracy: 50.39%

[47m[31mEpoch[0m 2
Train Loss: 0.5879	 Train Accuracy: 67.09%
Validation Loss: 0.6550	 Validation Accuracy: 60.58%

[47m[31mEpoch[0m 3
Train Loss: 0.5770	 Train Accuracy: 68.32%
Validation Loss: 0.5572	 Validation Accuracy: 69.42%

[47m[31mEpoch[0m 4
Train Loss: 0.5682	 Train Accuracy: 69.46%
Validation Loss: 0.6098	 Validation Accuracy: 65.60%

[47m[31mEpoch[0m 5
Train Loss: 0.5608	 Train Accuracy: 70.54%
Validation Loss: 0.5816	 Validation Accuracy: 68.00%

[47m[31mEpoch[0m 6
Train Loss: 0.5526	 Train Accuracy: 71.60%
Validation Loss: 0.5429	 Validation Accuracy: 73.88%

[47m[31mEpoch[0m 7
Train Loss: 0.5453	 Train Accuracy: 72.42%
Validation Loss: 0.5564	 Validation Accuracy: 70.77%

[47m[31mEpoch[0m 8
Train Loss: 0.5384	 Train Accuracy: 73.17%
Validation Loss: 0.5914	 Validation Accuracy: 67.46%

[47m[31mEpoch[0m 9
Train Loss: 0.5331	 Train 