In [None]:
import os
# os.environ["TF_USE_LEGACY_KERAS"] = "1"
import numpy as np
import pandas as pd
import tensorflow as tf
import keras
# from tensorflow import keras
import matplotlib.pyplot as plt
import cv2
import seaborn as sns
from sklearn.metrics import classification_report, confusion_matrix
keras.utils.set_random_seed(2024)

In [None]:
IMAGE_SHAPE= (224, 224, 3)
PREFIX = '/kaggle/input/better-mura/MURA-v1.1/'
TRAIN_PATH = "/kaggle/input/better-mura/MURA-v1.1/MURA-v1.1/train_augmented.csv"
TEST_PATH = "/kaggle/input/better-mura/MURA-v1.1/MURA-v1.1/valid.csv"

In [None]:
df_train = pd.read_csv(TRAIN_PATH)
df_test = pd.read_csv(TEST_PATH)

In [None]:
print(len(df_train), len(df_test))

In [None]:
df_train.head()

In [None]:
df_train['BodyPart'].value_counts()

In [None]:
df_train_wrest = df_train[df_train['BodyPart'] == 'XR_WRIST'].reset_index(drop = True)[['path', 'label']]
df_test_wrest = df_test[df_test['BodyPart'] == 'XR_WRIST'].reset_index(drop = True)[['path', 'label']]

In [None]:
df_train_wrest.head()

In [None]:
df_train_wrest['label'].value_counts()

In [None]:
def create_tf_dataset(df):
    df = df.sample(frac=1).reset_index(drop=True)
    imgs, labels = tf.convert_to_tensor(df['path']), tf.convert_to_tensor(df['label'])
    ds = tf.data.Dataset.from_tensor_slices((imgs, labels))
    return ds

In [None]:
ds_raw = create_tf_dataset(df_train_wrest)
test_raw  = create_tf_dataset(df_test_wrest)

In [None]:
TRAIN_SIZE = int(len(df_train_wrest) * 0.9)

ds_raw = ds_raw.shuffle(10000)
train_raw = ds_raw.take(TRAIN_SIZE)
valid_raw = ds_raw.skip(TRAIN_SIZE)

In [None]:
len(train_raw), len(valid_raw)

In [None]:
train_raw.element_spec

In [None]:
for ex_path, ex_label in train_raw.skip(1).take(1):
    print(ex_path)
    print(ex_label)

In [None]:
def load_image(image_path):
    img = tf.io.read_file(PREFIX + image_path)
    img = tf.io.decode_png(img, channels=3)
    img = tf.image.resize(img, IMAGE_SHAPE[:-1])
    return img

In [None]:
test_img = load_image(ex_path)
print(test_img.shape)
plt.imshow(test_img.numpy().astype('uint8'));

In [None]:
def apply_CLAHE(img):
    t_img = img.numpy().astype('uint8')
    gray = cv2.cvtColor(t_img, cv2.COLOR_BGR2GRAY)
    clahe = cv2.createCLAHE(clipLimit=5.0, tileGridSize=(8, 8))
    clahe_img = clahe.apply(gray)
    clahe_img_colored = cv2.cvtColor(clahe_img, cv2.COLOR_GRAY2RGB)
    return tf.convert_to_tensor(clahe_img_colored, dtype=tf.float32)

In [None]:
clahe_image = apply_CLAHE(test_img)
plt.imshow(clahe_image.numpy().astype('uint8'));

In [None]:
def prepare_dataset(ds, batch_size=32, shuffle_buffer=10000, shuffle = False):
    if shuffle:
        ds = ds.shuffle(shuffle_buffer)
    
    return (ds.
        map(lambda path, label: (load_image(path), tf.expand_dims(label, 0)), tf.data.AUTOTUNE).
        apply(tf.data.experimental.ignore_errors()).
        batch(batch_size).
        prefetch(tf.data.AUTOTUNE)
       )

In [None]:
train_ds = prepare_dataset(train_raw, shuffle = True)
valid_ds = prepare_dataset(valid_raw)
test_ds = prepare_dataset(test_raw)
train_ds.element_spec

In [None]:
for img, label in train_ds.take(1):
    print(img.shape, tf.squeeze(label))

## Modelling

In [None]:
from keras.applications import vgg19, densenet, efficientnet, efficientnet_v2

In [None]:
# Displaying history loss/accuracy
colors = plt.rcParams['axes.prop_cycle'].by_key()['color']
def plot_his(history):
    plt.figure(figsize=(15,12))
    metrics = ['accuracy', 'loss']
    for i, metric in enumerate(metrics):
        plt.subplot(220+1+i)
        plt.plot(history.epoch, history.history[metric], color=colors[0], label='Train')
        plt.plot(history.epoch, history.history['val_'+metric],
                 color=colors[1], linestyle="--", label='Val')
        plt.xlabel('Epoch')
        plt.ylabel(metric)
        plt.legend()
    plt.show()
    
def print_results(model):
    train_res, valid_res, test_res = model.evaluate(train_ds.take(100), verbose = 0), model.evaluate(valid_ds.take(100), verbose = 0), model.evaluate(test_ds, verbose = 0)
    print("Train Loss     : {0:.5f}".format(train_res[2]))
    print("Validation Loss: {0:.5f}".format(valid_res[2]))
    print("test Loss: {0:.5f}".format(test_res[2]))
    print("---")
    print("Train Accuracy     : {0:.5f}".format(train_res[1]))
    print("Validation Accuracy: {0:.5f}".format(valid_res[1]))
    print("Test Accuracy: {0:.5f}".format(test_res[1]))
    
def plot_conf_matrix(model):
    y_test = np.concatenate([y for x, y in test_ds], axis=0)
    
    y_pred = np.round(model.predict(test_ds))

    print(classification_report(y_test, y_pred))
    
    cm = confusion_matrix(y_test, y_pred)
    sns.heatmap(cm, annot=True, vmin=0, fmt='g', cmap='Blues', cbar=False)
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.title("Confusion Matrix")
    plt.show()

In [None]:
@keras.saving.register_keras_serializable()
class CLAHE(keras.layers.Layer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
    def call(self, inputs, active = True):
        # THIS WRAPPER IS MANDATORY TO SET THE SHAPE OF THE OUTPUT TENSOR 
        # https://stackoverflow.com/questions/42590431/output-from-tensorflow-py-func-has-unknown-rank-shape
        def tf_wrapper(img):
            @tf.py_function(Tout=tf.float32)
            def apply_CLAHE(img):
                t_img = img.numpy().astype('uint8')
                gray = cv2.cvtColor(t_img, cv2.COLOR_BGR2GRAY)
                clahe = cv2.createCLAHE(clipLimit=5.0, tileGridSize=(8, 8))
                clahe_img = clahe.apply(gray)
                clahe_img_colored = cv2.cvtColor(clahe_img, cv2.COLOR_GRAY2RGB)
                return tf.convert_to_tensor(clahe_img_colored, dtype=tf.float32)
            new_img = apply_CLAHE(img)
            new_img.set_shape(img.shape)
            return new_img
        
        _ndims = inputs.get_shape().ndims
        if active == True:
            if _ndims == 3:
                return tf_wrapper(inputs)
            elif _ndims == 4:
                return tf.map_fn(tf_wrapper, inputs)
        else:
            return inputs

@keras.saving.register_keras_serializable()
class WristMURA(keras.Model):
    def __init__(self,
                 preprocess_fn,
                 base_model,
                 apply_CLAHE,
                 apply_aug,
                 top_layers_trainable,
                 top_layers_trainable_num,
                 trainable=True,
                 name=None,
                 dtype=None
                ):
        super().__init__(trainable=trainable, name=name, dtype=dtype)
        
        self.preprocess_fn = preprocess_fn
        
        # Model layers
        self.clahe_layer = CLAHE()
        self.data_augmentation = keras.Sequential([
          keras.layers.RandomFlip("horizontal"),
          keras.layers.RandomRotation(0.2),
        ])
        
        self.base_model = base_model
        self.dense1_layer = keras.layers.Dense(512, kernel_regularizer=keras.regularizers.L2(1e-4), activation = 'relu', name="Dense_layer_1")
        self.dropout1_layer = keras.layers.Dropout(.4, name="Dropout_layer_1")
        self.dense2_layer = keras.layers.Dense(256, kernel_regularizer=keras.regularizers.L2(1e-4), activation = 'relu', name="Dense_layer_2")
        self.dropout2_layer = keras.layers.Dropout(.4, name="Dropout_layer_2")
        self.output_layer = keras.layers.Dense(1, activation='sigmoid', name="Output_layer")
   
        # Model configs
        self.base_model.trainable = False
        
        self.apply_CLAHE = apply_CLAHE
        self.apply_aug = apply_aug
        
        self.top_layers_trainable_num = top_layers_trainable_num
        self.top_layers_trainable = top_layers_trainable
        
        
    def call(self, inputs, training=None):
        
        inputs = self.clahe_layer(inputs, active=self.apply_CLAHE)
        inputs = self.data_augmentation(inputs, training=(self.apply_aug and training))

        inputs = self.preprocess_fn(inputs)
        
        x = self.base_model(inputs, training=training)
        x = self.dense1_layer(x)
        x = self.dropout1_layer(x, training=training)
        x = self.dense2_layer(x)
        x = self.dropout2_layer(x, training=training)
        
        output = self.output_layer(x)
        
        return output

        
    @property
    def top_layers_trainable(self):
        return self._top_layers_trainable
    
    @top_layers_trainable.setter      
    def top_layers_trainable(self, value):
        self._top_layers_trainable = value
        if value == True:
            self.base_model.trainable = True
            for layer in self.base_model.layers[:-self.top_layers_trainable_num]:
                  layer.trainable=False
        else:
            self.base_model.trainable = False
            
#     def build(self, input_shape):
#         x = keras.layers.Input(input_shape)
        
#         layers = []
#         if self.apply_CLAHE == True:
#             layers += [self.clahe_layer]
#         if self.apply_aug:
#             layers += [self.data_augmentation]
            
#         layers += [self.base_model, self.dense1_layer, self.dropout1_layer,
#                   self.dense2_layer,self.dropout2_layer, self.output_layer]
#         for layer in layers:
#             x = layer(x)

    def get_config(self):
        base_config = super().get_config()
        config = {
            "preprocess_fn": keras.saving.serialize_keras_object(self.preprocess_fn),
            "base_model": keras.saving.serialize_keras_object(self.base_model),
            "apply_CLAHE": keras.saving.serialize_keras_object(self.apply_CLAHE),
            "apply_aug": keras.saving.serialize_keras_object(self.apply_aug),
            "top_layers_trainable": keras.saving.serialize_keras_object(self.top_layers_trainable),
            "top_layers_trainable_num": keras.saving.serialize_keras_object(self.top_layers_trainable_num),
        }
        return {**base_config, **config}

    @classmethod
    def from_config(cls, config, custom_objects=None):
        config['preprocess_fn'] = keras.saving.deserialize_keras_object(config['preprocess_fn'])
        config['base_model'] = keras.saving.deserialize_keras_object(config['base_model'])
        config['apply_CLAHE'] = keras.saving.deserialize_keras_object(config['apply_CLAHE'])
        config['apply_aug'] = keras.saving.deserialize_keras_object(config['apply_aug'])
        config['top_layers_trainable'] = keras.saving.deserialize_keras_object(config['top_layers_trainable'])
        config['top_layers_trainable_num'] = keras.saving.deserialize_keras_object(config['top_layers_trainable_num'])
        return cls(**config)

----
# Training

In [None]:
@keras.saving.register_keras_serializable()        
def preprocess_fn_wrapper(x):
    return efficientnet_v2.preprocess_input(x)

In [None]:
model = WristMURA(
    preprocess_fn=preprocess_fn_wrapper,
    base_model=efficientnet_v2.EfficientNetV2B3(include_top=False, pooling='avg'),
    top_layers_trainable=True,
    apply_CLAHE=True,
    apply_aug=False,
    top_layers_trainable_num=200
)

In [None]:
STEPS_PER_EPOCH = 150

lr_schedule = keras.optimizers.schedules.InverseTimeDecay(
  0.001,
  decay_steps=STEPS_PER_EPOCH*2,
  decay_rate=1,
  staircase=False)

early_stopping = keras.callbacks.EarlyStopping(
    monitor='val_binary_crossentropy',
    verbose=1,
    patience=3,
    min_delta = 1e-3,
#     restore_best_weights=True,
)

checkpoint_callback = keras.callbacks.ModelCheckpoint(
    filepath="effnet-ep{epoch:02d}-loss{val_binary_crossentropy:.3f}.keras",
    monitor='val_binary_crossentropy',
)

In [None]:
model.compile(
    optimizer=keras.optimizers.Adam(lr_schedule),
    loss=keras.losses.BinaryCrossentropy(from_logits=False),
    metrics=[keras.metrics.BinaryCrossentropy(from_logits=False, name='binary_crossentropy'), 'accuracy'],
)

In [None]:
EPOCHS = 15
history = model.fit(
    train_ds.repeat(),
    steps_per_epoch=STEPS_PER_EPOCH,
    validation_data=valid_ds,
    validation_steps=STEPS_PER_EPOCH//4,
    epochs=EPOCHS,
    callbacks=[early_stopping, checkpoint_callback]
)

In [None]:
plot_his(history)

In [None]:
best_model = keras.models.load_model("/kaggle/working/effnet-ep08-loss0.248.keras")

In [None]:
best_model.evaluate(test_ds)

In [None]:
print_results(best_model)

In [None]:
plot_conf_matrix(best_model)

In [None]:
import shutil
def Copy_Model(source_path, name):
    destination_path = f"/kaggle/working/{name}.keras"
    shutil.copyfile(source_path, destination_path)
    return destination_path

In [None]:
new_path = Copy_Model("/kaggle/input/wrist-mura/keras/effnet/1/effnet-ep08-loss0.248.keras","wrist_effnet")

In [None]:
new_model = keras.models.load_model(new_path)

In [None]:
new_model.evaluate(test_ds)

### Evaluating on loaded model

In [None]:
prefix = '/kaggle/input/better-mura/MURA-v1.1/'
valid_data = pd.read_csv('/kaggle/input/better-mura/MURA-v1.1/MURA-v1.1/valid.csv')
# valid_data = valid_data[valid_data["BodyPart"].str.contains(r"XR_SHOULDER")]
# valid_data = valid_data[valid_data["BodyPart"].str.contains(r"XR")]
valid_data = valid_data[valid_data["BodyPart"] == "XR_WRIST"]
paths = prefix + valid_data['path'].astype(str)
labels = valid_data['label']
labels = pd.Series(labels, name='Label')
labels = labels.astype(str)
v_images = pd.concat([paths, labels], axis=1)

In [None]:
def load_image(image_path):
    img = tf.io.read_file(image_path)
    img = tf.io.decode_png(img, channels=3)
    img = tf.image.resize(img, (224, 224))
    return img

In [None]:
def Wrist_Predict(img):
    img = np.expand_dims(img, 0).astype(np.float32)
    prediction = new_model.predict(img, verbose = 0)
    binary_prediction = "Fractured" if prediction > 0.5 else "Non-Fractured"
    return binary_prediction

In [None]:
fracturedcounter = 0
normalcounter = 0
prefracturedcounter = 0
prenormalcounter = 0
true_positive = 0
false_positive = 0
false_negative = 0
true_negative = 0

for index, row in v_images.iterrows():
    image_path = row["path"]
    if row["Label"] == '1':
        label = "Fractured"
        fracturedcounter += 1
    else:
        label = "Non-Fractured"
        normalcounter += 1
    img = load_image(image_path)
    
    ## PUT YOUR FUNCTION HERE ##
    prediction = Wrist_Predict(img)
    
    if prediction == "Fractured":
        prefracturedcounter += 1
    else:
        prenormalcounter += 1
#     print("Real Value: " + label)
#     print("Status: " + prediction)
    if label == prediction:
        if label == "Fractured":
            true_positive += 1
        else:
            true_negative += 1
    else:
        if label == "Fractured":
            false_negative += 1
        else:
            false_positive += 1

accuracy = (true_positive + true_negative) / (true_positive + true_negative + false_positive + false_negative)
precision = true_positive / (true_positive + false_positive)
recall = true_positive / (true_positive + false_negative)
f1_score = 2 * (precision * recall) / (precision + recall)

print("Fractured Count: " + str(fracturedcounter) + " Normal Count: " + str(normalcounter))
print("preFractured Count: " + str(prefracturedcounter) + " preNormal Count: " + str(prenormalcounter))
print("Accuracy: ", accuracy*100)
print("Precision: ", precision*100)
print("Recall: ", recall*100)
print("F1 Score: ", f1_score*100)

## Testing different Thresholds

In [None]:
def plot_conf_matrix_th(model, th):
    y_test = np.concatenate([y for x, y in test_ds], axis=0)
    
    y_pred = (model.predict(test_ds) > th)

    print(classification_report(y_test, y_pred))
    
    cm = confusion_matrix(y_test, y_pred)
    sns.heatmap(cm, annot=True, vmin=0, fmt='g', cmap='Blues', cbar=False)
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.title("Confusion Matrix")
    plt.show()

In [None]:
plot_conf_matrix_th(new_model, th=.50)

In [None]:
plot_conf_matrix_th(new_model, th=.40)

In [None]:
plot_conf_matrix_th(new_model, th=.30)

In [None]:
plot_conf_matrix_th(new_model, th=.25)