# **Trabalho Final - Processamento Digital de Imagens**

*EfficientEnsemble: Diagnóstico de câncer de mama em imagens de ultrassom utilizando processamento de imagens e Ensemble de EfficientNets*

## Verificação do Ambiente e Versões

In [None]:
import tensorflow as tf, sys, numpy as np, os, platform
print("TF:", tf.__version__)
print("Python:", sys.version)
print("OS:", platform.platform())

## Extrair pasta .zip

In [None]:
!unzip -q "/content/archive.zip" -d /content/

## Dependências para o filtro guiado

In [None]:
!pip -q install opencv-contrib-python-headless==4.10.0.84

## Imports, Configurações Gerais e Reprodutibilidade


In [None]:
import os, sys, random, math, pathlib, json
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.applications import EfficientNetB0, EfficientNetB1, EfficientNetB2
from tensorflow.keras.applications.efficientnet import preprocess_input
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report, roc_auc_score, roc_curve, f1_score, precision_score, recall_score, accuracy_score
import matplotlib.pyplot as plt

SEED = 18
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)

AUTOTUNE = tf.data.AUTOTUNE
IMG_SIZE_B0 = (224, 224)
IMG_SIZE_B1 = (240, 240)
IMG_SIZE_B2 = (260, 260)
BATCH_SIZE = 32
EPOCHS = 50
LR = 1e-4

BASE_DIR = "/content/Dataset_BUSI_with_GT"

## Coleta das Imagens por Classe e Construção do DataFrame Unificado (excluindo máscaras)

In [None]:
from pathlib import Path
import glob

def list_images_no_masks(class_dir, label):
    paths = []
    for root, _, files in os.walk(class_dir):
        for f in files:
            lf = f.lower()
            if lf.endswith((".png", ".jpg", ".jpeg")) and ("mask" not in lf):
                paths.append(os.path.join(root, f))
    labels = [label] * len(paths)
    return paths, labels

benign_dir    = os.path.join(BASE_DIR, "benign")
malignant_dir = os.path.join(BASE_DIR, "malignant")

assert os.path.isdir(benign_dir) and os.path.isdir(malignant_dir), \
    "Pastas 'benign' e/ou 'malignant' não encontradas."

benign_paths, benign_labels       = list_images_no_masks(benign_dir, 0)
malignant_paths, malignant_labels = list_images_no_masks(malignant_dir, 1)

print("CONTAGEM DE IMAGENS (SEM MÁSCARAS): ")
print(f"Imagens benignas  : {len(benign_paths)}")
print(f"Imagens malignas  : {len(malignant_paths)}")
print(f"Total de imagens  : {len(benign_paths) + len(malignant_paths)}\n")

all_paths  = benign_paths + malignant_paths
all_labels = benign_labels + malignant_labels

df = pd.DataFrame({"path": all_paths, "label": all_labels})
df = df.sample(frac=1.0, random_state=SEED).reset_index(drop=True)

print("Value counts em df (1 linha por imagem):")
print(df["label"].value_counts().rename({0: "benign", 1: "malignant"}))
print("\nPrimeiras imagens:")
print(df.head(5))

## Divisão Estratificada do Dataset em Treino, Validação e Teste (60/20/20)


In [None]:
train_df, temp_df = train_test_split(
    df, test_size=0.4, stratify=df["label"], random_state=SEED
)
val_df, test_df = train_test_split(
    temp_df, test_size=0.5, stratify=temp_df["label"], random_state=SEED
)

def describe_split(name, dframe):
    counts = dframe["label"].value_counts().rename({0:"benign",1:"malignant"})
    print(f"{name}: {len(dframe)} amostras |", dict(counts))

describe_split("Treino", train_df)
describe_split("Val", val_df)
describe_split("Teste", test_df)

## Extração da ROI a partir das máscaras (sem pré-processamento)



In [None]:
import os
import cv2
import glob
import numpy as np
import pandas as pd
from pathlib import Path
from tqdm import tqdm

ROI_DIR = "/content/ROI_BUSI"
os.makedirs(ROI_DIR, exist_ok=True)

def bbox_from_mask(mask: np.ndarray):
    """Retorna bounding box da maior componente conexa da máscara."""
    m = (mask > 0).astype(np.uint8)
    num, _, stats, _ = cv2.connectedComponentsWithStats(m, connectivity=8)

    if num <= 1:
        ys, xs = np.where(m)
        if ys.size == 0:
            return (0, 0, mask.shape[1], mask.shape[0])
        x0, y0, x1, y1 = xs.min(), ys.min(), xs.max(), ys.max()
        return (x0, y0, x1 - x0 + 1, y1 - y0 + 1)

    idx = 1 + np.argmax(stats[1:, cv2.CC_STAT_AREA])
    x, y, w, h, _ = stats[idx]
    return (x, y, w, h)


def select_largest_mask_path(img_path: str):
    """
    Dada uma imagem, retorna o caminho da máscara com maior área (>0).
    Se não encontrar máscara válida, retorna None.
    """
    p = Path(img_path)
    mask_pattern = p.with_name(f"{p.stem}_mask*.png")
    mask_paths = sorted(glob.glob(str(mask_pattern)))

    best_path = None
    best_area = 0

    for mpath in mask_paths:
        mask = cv2.imread(mpath, cv2.IMREAD_GRAYSCALE)
        if mask is None:
            print(f"[WARNING] não foi possível ler a máscara: {mpath}")
            continue

        area = np.count_nonzero(mask)
        if area > best_area:
            best_area = area
            best_path = mpath

    return best_path


def extract_roi_largest_mask(img_path: str, out_root: str):
    """
    Extrai uma ROI por imagem, usando apenas a máscara de maior área.
    Se não houver máscara legível, usa a imagem inteira como ROI.
    """
    img = cv2.imread(img_path, cv2.IMREAD_COLOR)
    if img is None:
        print(f"[WARNING] imagem não encontrada: {img_path}")
        return None

    best_mask_path = select_largest_mask_path(img_path)

    if best_mask_path is not None:
        mask = cv2.imread(best_mask_path, cv2.IMREAD_GRAYSCALE)
        if mask is not None:
            x, y, w, h = bbox_from_mask(mask)
        else:
            print(f"[WARNING] falha ao ler máscara {best_mask_path}, usando imagem inteira.")
            h_img, w_img = img.shape[:2]
            x, y, w, h = 0, 0, w_img, h_img
    else:
        h_img, w_img = img.shape[:2]
        x, y, w, h = 0, 0, w_img, h_img

    crop = img[y:y+h, x:x+w]

    p = Path(img_path)
    cls = p.parent.name
    out_dir = Path(out_root) / cls
    out_dir.mkdir(parents=True, exist_ok=True)

    out_name = f"{p.stem}_roi.png"
    out_path = out_dir / out_name
    cv2.imwrite(str(out_path), crop)

    return str(out_path)

roi_paths = []

for idx, row in tqdm(df.iterrows(), total=len(df), desc="Extraindo ROIs (1 por imagem)"):
    roi_path = extract_roi_largest_mask(row["path"], ROI_DIR)
    if roi_path is None:
        print(f"[WARNING] Não foi possível gerar ROI para {row['path']}")
        roi_path = row["path"]
    roi_paths.append(roi_path)

roi_df = pd.DataFrame({
    "path": roi_paths,
    "label": df["label"].values
})

print("ROIs salvas:", len(roi_df))
print(roi_df["label"].value_counts().rename({0: "benign", 1: "malignant"}))
roi_df.head()

## Pré-processamento da ROI (Filtro Guiado e Mediana)

In [None]:
import cv2
from pathlib import Path

ROI_PP_DIR = "/content/ROI_PP_BUSI"
os.makedirs(ROI_PP_DIR, exist_ok=True)

def guided_then_median(bgr_img: np.ndarray, radius=8, eps=1e-2):
    try:
        gf = cv2.ximgproc.guidedFilter
        guided = gf(guide=bgr_img, src=bgr_img, radius=radius, eps=eps)
    except Exception:
        guided = cv2.bilateralFilter(bgr_img, d=9, sigmaColor=75, sigmaSpace=75)
    median = cv2.medianBlur(guided, ksize=3)
    return median

def apply_pp_to_roi(roi_path: str, out_root: str):
    img = cv2.imread(roi_path, cv2.IMREAD_COLOR)
    assert img is not None, f"Não foi possível ler a ROI: {roi_path}"
    img_pp = guided_then_median(img)

    p = Path(roi_path); cls = p.parent.name
    out_dir = Path(out_root) / cls
    out_dir.mkdir(parents=True, exist_ok=True)
    out_path = out_dir / p.name
    if not out_path.exists():
        cv2.imwrite(str(out_path), img_pp)
    return str(out_path)

pp_paths, pp_labels = [], []
for idx, row in roi_df.iterrows():
    pp_path = apply_pp_to_roi(row["path"], ROI_PP_DIR)
    pp_paths.append(pp_path); pp_labels.append(row["label"])

pp_df = pd.DataFrame({"path": pp_paths, "label": pp_labels})
train_pp_df = pp_df.loc[train_df.index].reset_index(drop=True)
val_pp_df   = pp_df.loc[val_df.index].reset_index(drop=True)
test_pp_df  = pp_df.loc[test_df.index].reset_index(drop=True)

print("ROI+PP salvas:", len(pp_df))

## Aumento apenas na minoria (malignant) até igualar

In [None]:
import cv2
import numpy as np
from pathlib import Path
import os

AUG_DIR = "/content/ROI_PP_AUG_BUSI"
os.makedirs(AUG_DIR, exist_ok=True)
(AUG_DIR_PATH := Path(AUG_DIR)).mkdir(parents=True, exist_ok=True)

train_benign = train_pp_df[train_pp_df["label"] == 0].reset_index(drop=True)
train_malig  = train_pp_df[train_pp_df["label"] == 1].reset_index(drop=True)

target_malig = len(train_benign)
need = target_malig - len(train_malig)
print(f"Benignas: {len(train_benign)} | Malignas: {len(train_malig)} | Precisamos criar: {need}")

rng = np.random.default_rng(SEED)

P_FLIP_H = 0.5
P_FLIP_V = 0.5
P_ROTATE = 0.5
P_ZOOM   = 0.5


MAX_DEG   = 5
MAX_ZOOM  = 0.10


def random_small_rotate(img, max_deg=MAX_DEG):
    """Roda a imagem por um ângulo aleatório entre -max_deg e +max_deg.
       Usa borderMode reflect para evitar bordas pretas."""
    deg = rng.uniform(-max_deg, max_deg)
    h, w = img.shape[:2]
    M = cv2.getRotationMatrix2D((w/2, h/2), deg, 1.0)
    return cv2.warpAffine(img, M, (w, h), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT_101)

def random_small_zoom(img, max_ratio=MAX_ZOOM):
    """Zoom in/out até ±max_ratio e recorta/letterbox para manter tamanho original.
       Se z >= 1.0 -> recorta o centro (zoom in).
       Se z <  1.0 -> faz letterbox com reflexo (evita bordas pretas)."""
    h, w = img.shape[:2]
    z = 1.0 + rng.uniform(-max_ratio, max_ratio)
    nh, nw = int(round(h*z)), int(round(w*z))
    resized = cv2.resize(img, (nw, nh), interpolation=cv2.INTER_LINEAR)
    if z >= 1.0:
        y0 = (nh - h)//2; x0 = (nw - w)//2
        return resized[y0:y0+h, x0:x0+w]
    else:
        top = (h-nh)//2
        bottom = h-nh-top
        left = (w-nw)//2
        right = w-nw-left
        out = cv2.copyMakeBorder(resized, top, bottom, left, right, borderType=cv2.BORDER_REFLECT_101)
        return out

def apply_random_aug(img):
    """
    Aplica cada transformação condicionalmente/independentemente.
    """
    ops = ["flip_h", "flip_v", "rotate", "zoom"]
    rng.shuffle(ops)

    for op in ops:
        r = rng.random()
        if op == "flip_h" and r < P_FLIP_H:
            img = cv2.flip(img, 1)
        elif op == "flip_v" and r < P_FLIP_V:
            img = cv2.flip(img, 0)
        elif op == "rotate" and r < P_ROTATE:
            img = random_small_rotate(img)
        elif op == "zoom" and r < P_ZOOM:
            img = random_small_zoom(img)


    return img

aug_rows = []
if need > 0:
    src_paths = train_malig["path"].tolist()
    i = 0
    max_attempts = need * 10
    attempts = 0
    while len(aug_rows) < need and attempts < max_attempts:
        p = Path(src_paths[i % len(src_paths)])
        img = cv2.imread(str(p), cv2.IMREAD_COLOR)
        assert img is not None, f"Falha ao ler {p}"

        img_aug = apply_random_aug(img)

        out_dir = AUG_DIR_PATH / "malignant"
        out_dir.mkdir(parents=True, exist_ok=True)
        out_name = f"{p.stem}_aug_{len(aug_rows):05d}{p.suffix}"
        out_path = out_dir / out_name
        cv2.imwrite(str(out_path), img_aug)

        aug_rows.append({"path": str(out_path), "label": 1})
        i += 1
        attempts += 1

    if len(aug_rows) != need:
        raise RuntimeError(f"Foi gerado {len(aug_rows)} augmentações, mas eram esperadas {need}.")

aug_df = pd.DataFrame(aug_rows)
train_pp_equal_df = pd.concat([train_benign, train_malig, aug_df], ignore_index=True)
train_pp_equal_df = train_pp_equal_df.sample(frac=1.0, random_state=SEED).reset_index(drop=True)

print("Novo treino balanceado por AD (em disco):", train_pp_equal_df["label"].value_counts().to_dict())

## Decodificação/Resize e construção dos datasets (com treino balanceado)

In [None]:
def decode_img(path, label, resize_to):
    img = tf.io.read_file(path)
    img = tf.io.decode_image(img, channels=3, expand_animations=False)
    img = tf.image.resize(img, resize_to, method="bilinear")
    img = tf.cast(img, tf.float32)
    return img, tf.cast(label, tf.int32)

def make_dataset(df_, resize_to=(224,224), training=False, shuffle=True):
    paths = df_["path"].values
    labels = df_["label"].values
    ds = tf.data.Dataset.from_tensor_slices((paths, labels))
    ds = ds.map(lambda p, y: decode_img(p, y, resize_to), num_parallel_calls=AUTOTUNE)
    if training and shuffle:
        ds = ds.shuffle(4096, seed=SEED, reshuffle_each_iteration=True)
    ds = ds.batch(BATCH_SIZE).prefetch(AUTOTUNE)
    return ds

val_ds_b0   = make_dataset(val_pp_df,   resize_to=IMG_SIZE_B0, training=False, shuffle=False)
test_ds_b0  = make_dataset(test_pp_df,  resize_to=IMG_SIZE_B0, training=False, shuffle=False)
val_ds_b1   = make_dataset(val_pp_df,   resize_to=IMG_SIZE_B1, training=False, shuffle=False)
test_ds_b1  = make_dataset(test_pp_df,  resize_to=IMG_SIZE_B1, training=False, shuffle=False)
val_ds_b2   = make_dataset(val_pp_df,   resize_to=IMG_SIZE_B2, training=False, shuffle=False)
test_ds_b2  = make_dataset(test_pp_df,  resize_to=IMG_SIZE_B2, training=False, shuffle=False)

train_ds_b0 = make_dataset(train_pp_equal_df, resize_to=IMG_SIZE_B0, training=True, shuffle=True)
train_ds_b1 = make_dataset(train_pp_equal_df, resize_to=IMG_SIZE_B1, training=True, shuffle=True)
train_ds_b2 = make_dataset(train_pp_equal_df, resize_to=IMG_SIZE_B2, training=True, shuffle=True)

## Modelos EfficientNet (B0/B1/B2) com camadas de Aumento de Dados

In [None]:
def build_effnet(model_name: str, img_size):
    keras.backend.clear_session()

    inputs = keras.Input(shape=img_size + (3,))
    x = layers.Lambda(keras.applications.efficientnet.preprocess_input, name="effnet_preprocess")(inputs)

    if model_name == "B0":
        base = EfficientNetB0(include_top=False, input_shape=img_size + (3,), weights="imagenet")
    elif model_name == "B1":
        base = EfficientNetB1(include_top=False, input_shape=img_size + (3,), weights="imagenet")
    elif model_name == "B2":
        base = EfficientNetB2(include_top=False, input_shape=img_size + (3,), weights="imagenet")
    else:
        raise ValueError("model_name deve ser B0, B1 ou B2.")

    base.trainable = True
    x = base(x)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dropout(0.2)(x)
    outputs = layers.Dense(1, activation="sigmoid")(x)
    model = keras.Model(inputs, outputs, name=f"EfficientNet{model_name}")

    opt = keras.optimizers.Adam(learning_rate=LR)
    model.compile(
        optimizer=opt,
        loss="binary_crossentropy",
        metrics=[
            "accuracy",
            keras.metrics.Precision(name="precision"),
            keras.metrics.Recall(name="recall"),
            keras.metrics.AUC(name="auc"),
        ],
    )
    return model

ckpts = {
    "B0": "/content/effb0_roi_pp_ad.weights.h5",
    "B1": "/content/effb1_roi_pp_ad.weights.h5",
    "B2": "/content/effb2_roi_pp_ad.weights.h5",
}

## Treinamento

In [None]:
histories = {}

def train_one(model_name, img_size, train_ds, val_ds):
    model = build_effnet(model_name, img_size)
    callbacks = [
        keras.callbacks.ModelCheckpoint(
            ckpts[model_name], monitor="val_loss",
            save_best_only=True, save_weights_only=True, verbose=1
        ),

    ]
    history = model.fit(
        train_ds,
        validation_data=val_ds,
        epochs=EPOCHS,
        verbose=1,
        callbacks=callbacks
    )
    histories[model_name] = history.history
    return model

model_b0 = train_one("B0", IMG_SIZE_B0, train_ds_b0, val_ds_b0)
model_b1 = train_one("B1", IMG_SIZE_B1, train_ds_b1, val_ds_b1)
model_b2 = train_one("B2", IMG_SIZE_B2, train_ds_b2, val_ds_b2)

## Resultados

In [None]:
def evaluate_model(model, test_ds, name=""):
    y_prob = model.predict(test_ds, verbose=0).ravel()
    y_pred = (y_prob >= 0.5).astype(int)
    y_true = np.concatenate([y.numpy() for _, y in test_ds.unbatch().batch(1024)], axis=0)

    acc  = accuracy_score(y_true, y_pred)
    prec = precision_score(y_true, y_pred, zero_division=0)
    rec  = recall_score(y_true, y_pred)
    f1   = f1_score(y_true, y_pred)
    auc  = roc_auc_score(y_true, y_prob)
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    specificity = tn / (tn + fp) if (tn + fp) > 0 else 0.0

    print(f"\nResultados — TESTE ({name})")
    print(f"Acurácia:       {acc*100:6.2f}%")
    print(f"Especificidade: {specificity*100:6.2f}%")
    print(f"Sensibilidade:  {rec*100:6.2f}%")
    print(f"Precisão:       {prec*100:6.2f}%")
    print(f"F1-score:       {f1*100:6.2f}%")
    print(f"AUC-ROC:        {auc:8.5f}")
    return y_true, y_prob, y_pred


model_b0.load_weights(ckpts["B0"])
model_b1.load_weights(ckpts["B1"])
model_b2.load_weights(ckpts["B2"])

# Avaliação individual
y_true_b0, y_prob_b0, y_pred_b0 = evaluate_model(model_b0, test_ds_b0, name="ROI+PP+AD + ENB0")
y_true_b1, y_prob_b1, y_pred_b1 = evaluate_model(model_b1, test_ds_b1, name="ROI+PP+AD + ENB1")
y_true_b2, y_prob_b2, y_pred_b2 = evaluate_model(model_b2, test_ds_b2, name="ROI+PP+AD + ENB2")

assert np.array_equal(y_true_b0, y_true_b1) and np.array_equal(y_true_b0, y_true_b2), "Mis-match nos rótulos do teste"
y_true = y_true_b0

# Ensemble por votação (maioria)
votes = np.stack([y_pred_b0, y_pred_b1, y_pred_b2], axis=1)
y_pred_ens = (np.sum(votes, axis=1) >= 2).astype(int)

y_prob_ens = (y_prob_b0 + y_prob_b1 + y_prob_b2) / 3.0

acc  = accuracy_score(y_true, y_pred_ens)
prec = precision_score(y_true, y_pred_ens, zero_division=0)
rec  = recall_score(y_true, y_pred_ens)
f1   = f1_score(y_true, y_pred_ens)
auc  = roc_auc_score(y_true, y_prob_ens)
tn, fp, fn, tp = confusion_matrix(y_true, y_pred_ens).ravel()
specificity = tn / (tn + fp) if (tn + fp) > 0 else 0.0

print("\nResultados — TESTE (ROI+PP+AD + Ensemble B0/B1/B2)")
print(f"Acurácia:       {acc*100:6.2f}%")
print(f"Especificidade: {specificity*100:6.2f}%")
print(f"Sensibilidade:  {rec*100:6.2f}%")
print(f"Precisão:       {prec*100:6.2f}%")
print(f"F1-score:       {f1*100:6.2f}%")
print(f"AUC-ROC:        {auc:8.5f}")

fpr, tpr, thr = roc_curve(y_true, y_prob_ens)
plt.figure()
plt.plot(fpr, tpr, label=f"AUC = {auc:0.4f}")
plt.plot([0,1], [0,1], linestyle="--")
plt.xlabel("FPR"); plt.ylabel("TPR"); plt.title("Curva ROC — Teste (Ensemble)")
plt.legend(); plt.show()