In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc
import tensorflow as tf
import tensorflow_addons as tfa
import sklearn
import matplotlib


TensorFlow Addons (TFA) has ended development and introduction of new features.
TFA has entered a minimal maintenance and release mode until a planned end of life in May 2024.
Please modify downstream libraries to take dependencies from other repositories in our TensorFlow community (e.g. Keras, Keras-CV, and Keras-NLP). 

For more information see: https://github.com/tensorflow/addons/issues/2807 



In [2]:
# Data preparation
def read_and_decode(filename):
    img = tf.io.read_file(filename)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.convert_image_dtype(img, tf.float32)
    return img

def decode_csv(csv_row):
    record_defaults = ["filepaths", "labels"]
    filename, label_string = tf.io.decode_csv(csv_row, record_defaults)
    img = read_and_decode(filename)
    return img, label_string

dataset = (tf.data.TextLineDataset(
    "new_train.csv").
    map(decode_csv))

In [3]:
for img, label in dataset.take(5):
    avg = tf.math.reduce_mean(img, axis=[0, 1]) # average pixel in the image
    print(label, avg)

tf.Tensor(b'0', shape=(), dtype=string) tf.Tensor([0.26415765 0.26415765 0.26415765], shape=(3,), dtype=float32)
tf.Tensor(b'1', shape=(), dtype=string) tf.Tensor([0.09402619 0.09402619 0.09402619], shape=(3,), dtype=float32)
tf.Tensor(b'1', shape=(), dtype=string) tf.Tensor([0.12388248 0.12388248 0.12388248], shape=(3,), dtype=float32)
tf.Tensor(b'1', shape=(), dtype=string) tf.Tensor([0.0780125 0.0780125 0.0780125], shape=(3,), dtype=float32)
tf.Tensor(b'1', shape=(), dtype=string) tf.Tensor([0.09578612 0.09578612 0.09578612], shape=(3,), dtype=float32)


In [4]:
def _decode_csv(csv_row):
    record_defaults = ["path", "class"]
    try:
        filename, label_string = tf.io.decode_csv(csv_row, record_defaults)
        img = read_and_decode(filename)
        label = tf.argmax(tf.math.equal(["0","1"], label_string))
    except:
        print('File corrupted')
    return img, label


train_dataset = (tf.data.TextLineDataset('new_train.csv').map(_decode_csv)).batch(32)
test_dataset = (tf.data.TextLineDataset('new_test.csv').map(_decode_csv)).batch(32)

In [5]:
train_dataset, test_dataset

(<_BatchDataset element_spec=(TensorSpec(shape=(None, None, None, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None,), dtype=tf.int64, name=None))>,
 <_BatchDataset element_spec=(TensorSpec(shape=(None, None, None, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None,), dtype=tf.int64, name=None))>)

In [6]:
# custom metrics
class pFBeta(tf.keras.metrics.Metric):
    def __init__(self, beta=1, name='pF1', **kwargs):
        super().__init__(name=name, **kwargs)
        self.beta = beta
        self.epsilon = 1e-10
        self.pos = self.add_weight(name='pos', initializer='zeros')
        self.ctp = self.add_weight(name='ctp', initializer='zeros')
        self.cfp = self.add_weight(name='cfp', initializer='zeros')

    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true = tf.cast(y_true, tf.float32)
        y_pred = tf.clip_by_value(y_pred, 0, 1)
        pos = tf.cast(tf.reduce_sum(y_true), tf.float32)
        ctp = tf.cast(tf.reduce_sum(y_pred[y_true == 1]), tf.float32)
        cfp = tf.cast(tf.reduce_sum(y_pred[y_true == 0]), tf.float32)
        self.pos.assign_add(pos)
        self.ctp.assign_add(ctp)
        self.cfp.assign_add(cfp)

    def result(self):
        beta2 = self.beta * self.beta
        prec = self.ctp / (self.ctp + self.cfp + self.epsilon)
        reca = self.ctp / (self.pos + self.epsilon)
        return (1 + beta2) * prec * reca / (beta2 * prec + reca)

    def reset_state(self):
        self.pos.assign(0.)
        self.ctp.assign(0.)
        self.cfp.assign(0.)

In [8]:
# build model
base_model = tf.keras.applications.convnext.ConvNeXtBase(include_top= False,
                                                         weights= "imagenet",
                                                        input_shape= (256, 256, 3),
                                                        pooling= 'max')

base_model.trainable = False

model = tf.keras.models.Sequential([
        base_model,
        tf.keras.layers.Dense(64, activation='sigmoid'),
        tf.keras.layers.Dropout(0.5),
        tf.keras.layers.Dense(1, activation='softmax')])

model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 convnext_base (Functional)  (None, 1024)              87566464  
                                                                 
 dense_1 (Dense)             (None, 64)                65600     
                                                                 
 dropout_1 (Dropout)         (None, 64)                0         
                                                                 
 dense_2 (Dense)             (None, 1)                 65        
                                                                 
Total params: 87,632,129
Trainable params: 65,665
Non-trainable params: 87,566,464
_________________________________________________________________


In [17]:
metrics = [pFBeta(beta=1, name='pF1'),
                   tfa.metrics.F1Score(num_classes=1, threshold=0.50, name='F1'),
                   tf.metrics.Precision(name='Prec'),
                   tf.metrics.Recall(name='Reca'),
                   tf.metrics.AUC(name='AUC'),
                   tf.metrics.BinaryAccuracy(name='BinAcc')]

In [18]:
model.compile(optimizer=tf.keras.optimizers.experimental.SGD(momentum=0.9),
                loss= tf.keras.losses.BinaryCrossentropy(from_logits=False, label_smoothing=0.1), metrics= metrics)

In [None]:
callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3)

In [None]:
history = model.fit(train_dataset, 
    epochs=30,
    batch_size=32,
    validation_data=test_dataset,
    callbacks=[callback])