In [1]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, Dense, Flatten, Conv2D, MaxPooling2D, Dropout, GlobalAveragePooling2D
from sklearn.metrics import confusion_matrix
import os

In [2]:
def create_dataset(image_dir, csv_path, image_dimensions = [256, 256]):
    df = pd.read_csv(csv_path)
    labels = df[['MEL', 'NV', 'BCC', 'AKIEC', 'BKL', 'DF', 'VASC']].values
    image_names = df['image']
    image_paths = [os.path.join(image_dir, f"{name}.jpg") for name in image_names]

    def load_image(file_path, label):
        img = tf.io.read_file(file_path)
        img = tf.image.resize(tf.image.decode_jpeg(img, channels = 3), image_dimensions)
        img = tf.cast(img, tf.float32) / 255.0
        img = tf.add(img, 0.05)
        img = tf.clip_by_value(img, 0, 1)
        return img, label
    
    dataset = tf.data.Dataset.from_tensor_slices((image_paths, labels))
    dataset = dataset.map(load_image, num_parallel_calls = tf.data.experimental.AUTOTUNE)
    return dataset, len(image_paths)

In [3]:
def RobustAsymmetricLoss(y_true, y_pred, gamma_pos, gamma_neg, lambda_, alpha, beta, tau, M, N):
    y_pred = tf.clip_by_value(y_pred, 1e-7, 1 - 1e-7)
    num_classes = tf.shape(y_true)[-1]
    batch_size = tf.shape(y_true)[0]
    pos_loss = tf.zeros((batch_size, num_classes), dtype = tf.float32)
    neg_loss = tf.zeros((batch_size, num_classes), dtype = tf.float32)

    for c in range(num_classes):
        temp_1 = tf.zeros((batch_size, ), dtype = tf.float32)
        temp_2 = tf.zeros((batch_size, ), dtype = tf.float32)
        for m in range(1, M + 1):
            temp_1 += alpha[m - 1] * tf.pow(1.0 - y_pred[:, c], m + gamma_pos)
        for n in range(1, N + 1):
            temp_2 += beta[n - 1] * tf.pow(tf.maximum(y_pred[:, c] - tau, 0), n + gamma_neg)
        indices = tf.stack([tf.range(batch_size), tf.fill([batch_size], c)], axis=1)
        pos_loss = tf.tensor_scatter_nd_update(pos_loss, indices, tf.cast(y_true[:, c], dtype=tf.float32) * temp_1)
        neg_loss = tf.tensor_scatter_nd_update(neg_loss, indices, (lambda_[c] - y_pred[:, c]) * (1.0 - tf.cast(y_true[:, c], dtype=tf.float32)) * temp_2)

    loss = tf.reduce_sum(pos_loss, axis = 1) + tf.reduce_sum(neg_loss, axis = 1)
    return tf.reduce_mean(loss)

In [4]:
class NormalizedMulticlassAccuracy(tf.keras.metrics.Metric):
    def __init__(self, name='normalized_multiclass_accuracy', **kwargs):
        super().__init__(name=name, **kwargs)
        self.num_classes = 7
        self.total = self.add_weight(name='total', initializer='zeros')
        self.count = self.add_weight(name='count', initializer='zeros')

    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true = tf.argmax(y_true, axis=-1)
        y_pred = tf.argmax(y_pred, axis=-1)
        values = tf.cast(tf.equal(y_true, y_pred), tf.float32)
        if sample_weight is not None:
            sample_weight = tf.cast(sample_weight, self.dtype)
            values = tf.multiply(values, sample_weight)
        self.total.assign_add(tf.reduce_sum(values))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))

    def result(self):
        return (self.total / self.count) * self.num_classes

    def reset_state(self):
        self.total.assign(0.)
        self.count.assign(0.)

In [5]:
train_dataset, train_size = create_dataset(
    image_dir = 'train_images',
    csv_path = 'Training_GroundTruth.csv',
    image_dimensions = [256, 256],
)
def augment_image(img, label):
    img = tf.image.random_flip_left_right(img)
    img = tf.image.random_flip_up_down(img)
    img = tf.image.random_brightness(img, max_delta=0.2)
    img = tf.image.random_contrast(img, lower=0.8, upper=1.2)
    img = tf.image.random_saturation(img, lower=0.8, upper=1.2)
    img = tf.image.random_hue(img, max_delta=0.2)
    img = tf.image.random_crop(img, size=[224, 224, 3])
    img = tf.image.resize(img, [256, 256])
    return img, label
train_dataset = train_dataset.map(augment_image, num_parallel_calls=tf.data.experimental.AUTOTUNE)
train_dataset = train_dataset.cache().shuffle(buffer_size = train_size).batch(32).prefetch(buffer_size = tf.data.AUTOTUNE)

In [6]:
num_classes = 7
strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
    model = Sequential([
        Input(shape = (256, 256, 3)),
        Conv2D(32, (3, 3), activation = 'relu'),
        MaxPooling2D(2, 2),
        Conv2D(64, (3, 3), activation = 'relu'),
        MaxPooling2D(2, 2),
        Conv2D(128, (3, 3), activation = 'relu'),
        MaxPooling2D(2, 2),
        Conv2D(256, (3, 3), activation = 'relu'),
        Dropout(0.5),
        MaxPooling2D(2, 2),
        Conv2D(512, (3, 3), activation = 'relu'),
        GlobalAveragePooling2D(),
        Flatten(),
        Dense(512, activation='relu'),
        Dense(num_classes, activation='softmax'),
    ])
    M, N = 3, 3
    alpha = tf.constant([1.0 for _ in range(M)], dtype=tf.float32)
    beta = tf.constant([1.0 for _ in range(N)], dtype=tf.float32)
    gamma_pos = tf.constant(2.0, dtype=tf.float32)
    gamma_neg = tf.constant(1.0, dtype=tf.float32)
    lambda_ = tf.constant([5.32, 1.00, 9.77, 21.14, 4.19, 20.66, 25.97], dtype=tf.float32)
    tau = tf.constant(0.5, dtype=tf.float32)
    normalized_accuracy = NormalizedMulticlassAccuracy()
    model.compile(
        loss = lambda y_true, y_pred: RobustAsymmetricLoss(y_true, y_pred, gamma_pos, gamma_neg, lambda_, alpha, beta, tau, M, N),
        optimizer = tf.keras.optimizers.RMSprop(learning_rate = 0.003),
        metrics = [NormalizedMulticlassAccuracy()],
    )


INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/

In [7]:
model.fit(
    train_dataset,
    epochs = 10,
)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x1f1581cde40>

In [8]:
test_dataset, test_size = create_dataset('test_images', 'Test_GroundTruth.csv', [256, 256])
test_dataset = test_dataset.cache().batch(32).prefetch(buffer_size = tf.data.AUTOTUNE)
y_pred = model.predict(test_dataset)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true = np.concatenate([y for x, y in test_dataset], axis=0)
y_true = np.argmax(y_true, axis=1) if y_true.ndim > 1 else y_true

def normalized_multi_class_accuracy(y_true, y_pred):
    cm = confusion_matrix(y_true, y_pred, labels=np.arange(num_classes))
    cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    return np.mean(np.diag(cm_normalized))

normalized_accuracy = normalized_multi_class_accuracy(y_true, y_pred_classes)

print(f"Normalized Multi-class Accuracy: {normalized_accuracy}")

Normalized Multi-class Accuracy: 0.14285714285714285
