In [None]:
import tensorflow as tf
from cv2 import (
    CLAHE,
    COLOR_BGR2GRAY,
    COLOR_GRAY2RGB,
    createCLAHE,
    cvtColor,
    resize,
)
from cv2.typing import MatLike
from divisor_de_arquivos import (
    change_dataframe_test_paths,
    change_dataframe_train_paths,
)
from keras import Model
from keras.applications import EfficientNetB3
from keras.callbacks import (
    Callback,
    EarlyStopping,
    LearningRateScheduler,
    ModelCheckpoint,
    ReduceLROnPlateau,
)
from keras.layers import (
    BatchNormalization,
    Conv2D,
    Dense,
    Dropout,
    GlobalAveragePooling2D,
    Input,
    MaxPooling2D,
    SeparableConv2D,
)
from keras.mixed_precision import Policy, set_global_policy
from keras.models import load_model, Sequential
from keras.optimizers import AdamW
from keras.regularizers import l1_l2
from matplotlib.pyplot import ioff, ion, show, subplots
from numpy import float32, ndarray, uint8, unique
from numpy.random import seed
from pandas import DataFrame, read_csv
from pandas._typing import ArrayLike
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import LabelEncoder
from sklearn.utils.class_weight import compute_class_weight
from tensorflow import __version__, Tensor
from tensorflow.config import run_functions_eagerly
from tensorflow.config.experimental import list_physical_devices, set_memory_growth
from tensorflow.random import set_seed

# Política de precisão mista para melhorar o desempenho em GPUs compatíveis
set_global_policy(Policy("mixed_float16"))

# habilita alocamento de memória dinâmica quando necessário para GPUs
for gpu in list_physical_devices("GPU"):
    set_memory_growth(gpu, True)

# desabilita execução ansiosa para TensorFlow 1.x, (versão 2.x já tem isso desabilitado por padrão)
if int(__version__[0]) < 2:
    run_functions_eagerly(False)

seed(42)
set_seed(42)

ACTIVATION_FUNCTION: str = "relu"
BATCH_SIZE: int = 32
DROPOUT: float = 0.3
FIELD_SIZE: int = 3
FILTERS: int = 32
IMAGE_DIMENSION: int = 224
KERNEL_REGULARIZER: l1_l2 = l1_l2(l1=1e-5, l2=1e-4)
OPTIMIZER_WEIGHT_DECAY: float = 1e-4
PADDING: str = "same"
POOL_SIZE: int = 2
UNITS: int = FILTERS * 2
VERBOSE: int = 1


class GraficoAcompanhamento(Callback):
    def on_train_begin(self, logs=None):
        self.epoch: list[int] = []
        self.train_acc: list[float] = []
        self.val_acc: list[float] = []
        self.train_loss: list[float] = []
        self.val_loss: list[float] = []

    def on_epoch_end(self, epoch, logs=None):
        self.epoch.append(epoch + 1)
        self.train_acc.append(logs.get("accuracy"))
        self.val_acc.append(logs.get("val_accuracy"))
        self.train_loss.append(logs.get("loss"))
        self.val_loss.append(logs.get("val_loss"))

    def on_train_end(self, logs=None):
        self.fig, self.ax = subplots(ncols=2, figsize=(12, 4))
        ion()

        self.ax[0].clear()
        self.ax[1].clear()

        self.ax[0].plot(self.epoch, self.train_acc, label="Treino")
        self.ax[0].plot(self.epoch, self.val_acc, label="Validação")
        self.ax[0].set_title("Precisão")
        self.ax[0].set_xlabel("Época")
        self.ax[0].set_ylabel("Precisão")
        self.ax[0].legend()
        self.ax[0].grid(True)

        self.ax[1].plot(self.epoch, self.train_loss, label="Treino")
        self.ax[1].plot(self.epoch, self.val_loss, label="Validação")
        self.ax[1].set_title("Perda")
        self.ax[1].set_xlabel("Época")
        self.ax[1].set_ylabel("Perda")
        self.ax[1].legend()
        self.ax[1].grid(True)

        ioff()
        show()


def learning_rate_schedule(epoch: int) -> float:
    base_learning_rate: float = 0.001
    return (
        base_learning_rate
        if epoch < 10
        else (
            base_learning_rate * 0.5
            if epoch < 20
            else base_learning_rate * 0.1 if epoch < 30 else base_learning_rate * 0.01
        )
    )


def get_callbacks(model_name: str) -> list[Callback]:
    return [
        EarlyStopping(patience=15, restore_best_weights=True, verbose=VERBOSE),
        GraficoAcompanhamento(),
        ModelCheckpoint(
            f"model_fold_{model_name}.keras",
            monitor="val_accuracy",
            verbose=VERBOSE,
            save_best_only=True,
        ),
        LearningRateScheduler(learning_rate_schedule, verbose=VERBOSE),
        ReduceLROnPlateau(factor=0.3, min_lr=1e-7, patience=7, verbose=VERBOSE),
    ]


def preprocess_image(image: ndarray) -> float32:
    image: ndarray = image.astype(uint8)
    gray_scale_image: MatLike = cvtColor(image, COLOR_BGR2GRAY)
    clahe: CLAHE = createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    enhanced_image: MatLike = clahe.apply(gray_scale_image)
    rgb_image: MatLike = cvtColor(enhanced_image, COLOR_GRAY2RGB)
    resized_image: MatLike = resize(rgb_image, IMAGE_SIZE)
    normalized_image: float = resized_image / 255.0
    return (
        (normalized_image - normalized_image.mean()) / (normalized_image.std() + 1e-8)
    ).astype(float32)


def create_efficientnet_model(
    ACTIVATION_FUNCTION=ACTIVATION_FUNCTION,
    DRPOUT=DROPOUT,
    KERNEL_REGULARIZER=KERNEL_REGULARIZER,
    UNITS=UNITS,
) -> Model:
    base_model: Model = EfficientNetB3(
        include_top=False,
        input_shape=(IMAGE_DIMENSION, IMAGE_DIMENSION, 3),
        pooling="avg",
    )
    for layer in base_model.layers[:-20]:
        layer.trainable = False

    inputs: list = base_model.input
    output: list = base_model.output

    output: Tensor = Dense(
        UNITS, activation=ACTIVATION_FUNCTION, kernel_regularizer=KERNEL_REGULARIZER
    )(output)
    output: Tensor = Dropout(DRPOUT)(output)
    output: Tensor = BatchNormalization()(output)
    output: Tensor = Dense(
        UNITS * 2, activation=ACTIVATION_FUNCTION, kernel_regularizer=KERNEL_REGULARIZER
    )(output)
    output: Tensor = Dropout(DRPOUT)(output)
    output: Tensor = BatchNormalization()(output)
    output: Tensor = Dense(
        UNITS * 2, activation=ACTIVATION_FUNCTION, kernel_regularizer=KERNEL_REGULARIZER
    )(output)
    output: Tensor = Dropout(DRPOUT)(output)
    output: Tensor = BatchNormalization()(output)
    output: Tensor = Dense(
        UNITS, activation=ACTIVATION_FUNCTION, kernel_regularizer=KERNEL_REGULARIZER
    )(output)
    output: Tensor = Dropout(0.5)(output)
    output: Tensor = BatchNormalization()(output)
    UNITS = 1
    ACTIVATION_FUNCTION = "sigmoid"
    outputs = Dense(units=UNITS, activation=ACTIVATION_FUNCTION, dtype="float32")(
        output
    )

    model: Model = Model(inputs, outputs)
    model.compile(
        optimizer=AdamW(0.005, OPTIMIZER_WEIGHT_DECAY),
        loss="binary_crossentropy",
        metrics=["accuracy", "precision", "recall", "auc"],
    )

    return model


def create_model(
    ACTIVATION_FUNCTION=ACTIVATION_FUNCTION,
    DROPOUT=DROPOUT,
    FIELD_SIZE=FIELD_SIZE,
    FILTERS=FILTERS,
    IMAGE_DIMENSION=IMAGE_DIMENSION,
    KERNEL_REGULARIZER=KERNEL_REGULARIZER,
    PADDING=PADDING,
    POOL_SIZE=POOL_SIZE,
    UNITS=UNITS,
) -> Sequential:
    model: Sequential = Sequential()
    model.add(Input(shape=(IMAGE_DIMENSION, IMAGE_DIMENSION, 3)))
    model.add(
        Conv2D(FILTERS, FIELD_SIZE, activation=ACTIVATION_FUNCTION, padding=PADDING)
    )
    model.add(Dropout(DROPOUT))
    model.add(MaxPooling2D(pool_size=POOL_SIZE))
    model.add(BatchNormalization())
    model.add(
        Conv2D(FILTERS, FIELD_SIZE, activation=ACTIVATION_FUNCTION, padding=PADDING)
    )
    model.add(Dropout(DROPOUT))
    model.add(MaxPooling2D(pool_size=POOL_SIZE))
    model.add(BatchNormalization())

    model.add(
        SeparableConv2D(
            FILTERS, FIELD_SIZE, activation=ACTIVATION_FUNCTION, padding=PADDING
        )
    )
    model.add(Dropout(DROPOUT))
    model.add(MaxPooling2D(pool_size=POOL_SIZE))
    model.add(BatchNormalization())
    model.add(
        SeparableConv2D(
            FILTERS, FIELD_SIZE, activation=ACTIVATION_FUNCTION, padding=PADDING
        )
    )
    model.add(Dropout(DROPOUT))
    model.add(MaxPooling2D(pool_size=POOL_SIZE))
    model.add(BatchNormalization())

    model.add(GlobalAveragePooling2D())

    model.add(
        Dense(
            units=UNITS,
            activation=ACTIVATION_FUNCTION,
            kernel_regularizer=KERNEL_REGULARIZER,
        )
    )
    model.add(Dropout(DROPOUT))
    model.add(BatchNormalization())
    model.add(
        Dense(
            units=UNITS * 2,
            activation=ACTIVATION_FUNCTION,
            kernel_regularizer=KERNEL_REGULARIZER,
        )
    )
    model.add(Dropout(DROPOUT))
    model.add(BatchNormalization())
    model.add(
        Dense(
            units=UNITS * 2,
            activation=ACTIVATION_FUNCTION,
            kernel_regularizer=KERNEL_REGULARIZER,
        )
    )
    model.add(Dropout(DROPOUT))
    model.add(BatchNormalization())
    model.add(
        Dense(
            units=UNITS,
            activation=ACTIVATION_FUNCTION,
            kernel_regularizer=KERNEL_REGULARIZER,
        )
    )
    model.add(Dropout(0.5))
    model.add(BatchNormalization())

    UNITS = 1
    ACTIVATION_FUNCTION = "sigmoid"
    model.add(Dense(units=UNITS, activation=ACTIVATION_FUNCTION, dtype="float32"))

    model.compile(
        optimizer=AdamW(0.001, OPTIMIZER_WEIGHT_DECAY),
        loss="binary_crossentropy",
        metrics=["accuracy", "precision", "recall", "auc"],
    )
    return model


IMAGE_SIZE: tuple[int, int] = (IMAGE_DIMENSION, IMAGE_DIMENSION)

train_dataframe: DataFrame = read_csv("csv/mass_case_description_train_set.csv")
test_dataframe: DataFrame = read_csv("csv/mass_case_description_test_set.csv")

# Junta os registros de "BENIGN_WITHOUT_CALLBACK" com "BENIGN"
train_dataframe["pathology"] = train_dataframe["pathology"].replace(
    "BENIGN_WITHOUT_CALLBACK", "BENIGN"
)
train_dataframe["cropped image file path"] = train_dataframe[
    "cropped image file path"
].apply(change_dataframe_train_paths)

test_dataframe["pathology"] = test_dataframe["pathology"].replace(
    "BENIGN_WITHOUT_CALLBACK", "BENIGN"
)
test_dataframe["cropped image file path"] = test_dataframe[
    "cropped image file path"
].apply(change_dataframe_test_paths)

train_datagen = tf.keras.preprocessing.image.ImageDataGenerator(
    brightness_range=[0.8, 1.2],
    height_shift_range=0.1,
    horizontal_flip=True,
    preprocessing_function=preprocess_image,
    rescale=1.0 / 255,
    rotation_range=15,
    shear_range=0.1,
    width_shift_range=0.1,
    zoom_range=0.1,
)

val_datagen = tf.keras.preprocessing.image.ImageDataGenerator(
    rescale=1.0 / 255,
    preprocessing_function=preprocess_image,
)

In [None]:
models_accuracies: list[tuple[float, float]] = []
sk_fold: StratifiedKFold = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)
fold: int = 1
for train, val in sk_fold.split(
    train_dataframe["cropped image file path"], train_dataframe["pathology"]
):
    train_fold = train_dataframe.iloc[train].reset_index(drop=True)
    val_fold = train_dataframe.iloc[val].reset_index(drop=True)

    train_dataset = train_datagen.flow_from_dataframe(
        train_fold,
        x_col="cropped image file path",
        y_col="pathology",
        target_size=IMAGE_SIZE,
        class_mode="binary",
        seed=42,
    )

    val_dataset = val_datagen.flow_from_dataframe(
        val_fold,
        x_col="cropped image file path",
        y_col="pathology",
        target_size=IMAGE_SIZE,
        class_mode="binary",
        shuffle=False,
        seed=42,
    )

    pathology_encoded: ArrayLike = LabelEncoder().fit_transform(
        train_dataframe["pathology"]
    )
    class_weights: ndarray = compute_class_weight(
        "balanced", classes=unique(pathology_encoded), y=pathology_encoded
    )
    class_weights: dict = dict(enumerate(class_weights))

    model: Sequential = create_model()

    trained_model = model.fit(
        train_dataset,
        callbacks=get_callbacks(str(fold)),
        class_weight=class_weights,
        epochs=308,
        validation_data=val_dataset,
        verbose=VERBOSE,
    )
    val_accuracies: tuple[float] = trained_model.history["val_accuracy"]
    max_val_accuracy: float = max(val_accuracies)
    max_val_index: int = val_accuracies.index(max_val_accuracy)
    accuracy: tuple[float, float] = (
        max_val_accuracy,
        trained_model.history["accuracy"][max_val_index],
    )
    models_accuracies.append(accuracy)
    fold += 1

best_accuracy: float = max(models_accuracies)
best_model: int = models_accuracies.index(best_accuracy) + 1
breast_cancer_classifier = load_model(f"model_fold_{best_model}.keras")

In [None]:
import os
import numpy as np
from keras.utils import load_img, img_to_array
from IPython.display import Image

%store breast_cancer_classifier
%store best_accuracy


diretorio_path = 'imagensCancerMama/teste_dataset'
imagens = os.listdir(diretorio_path)
index = np.random.randint(0, len(imagens))
imagem = imagens[index]
imagem_path = os.path.join(diretorio_path, imagem)

print(best_accuracy)

test_image = load_img(imagem_path, target_size=IMAGE_SIZE)
test_image = img_to_array(test_image)
test_image = np.expand_dims(test_image, axis=0)
resultado = breast_cancer_classifier.predict(test_image)
print(f"Resultado da predição: {resultado[0][0]}")
print(imagem)
print("É um câncer maligno" if resultado[0][0] >= 0.5 else "É um câncer benigno")
Image(filename=imagem_path)
