# Obter dados no drive

In [None]:
from google.colab import drive
import pandas as pd
import random
import os
from PIL import Image, ImageDraw
import matplotlib.pyplot as plt
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, Callback
from glob import glob
from sklearn.model_selection import train_test_split
import numpy as np
import tensorflow as tf
from keras import layers, models, Input, regularizers
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import load_img, img_to_array

In [None]:
drive.mount('/content/drive')

# Divisão de imagens e masks em amostras de treino e teste

In [None]:
images = []
masks = []

for path in glob('/content/drive/Shared drives/Grupo T de Tech/Data/dataset_inteli/cropped_images/*/*'):
  images.append(path + '/image.tif')
  masks.append(path + '/mask.png')

In [None]:
# Função para carregar e pré-processar uma imagem e sua máscara
def load_and_preprocess_image(image_path, mask_path, target_size):

    image = load_img(image_path, target_size=target_size)
    image = img_to_array(image) / 255.0  # Normalização entre 0 e 1

    mask = load_img(mask_path, target_size=target_size, color_mode='grayscale')
    mask = img_to_array(mask) / 255.0  # Normalização entre 0 e 1

    return image, mask

In [None]:
# Lista para armazenar imagens e máscaras pré-processadas
images_processed = []
masks_processed = []

count = 1
# Carregar e pré-processar todas as imagens e máscaras
for img_path, mask_path in zip(images, masks):
    print(count)
    count += 1
    img, mask = load_and_preprocess_image(img_path, mask_path, target_size=(256, 256))
    images_processed.append(img)
    masks_processed.append(mask)

# Converter para arrays numpy
images_processed = np.array(images_processed)
masks_processed = np.array(masks_processed)


In [None]:
images_processed

In [None]:
X_train, X_val, y_train, y_val = train_test_split(images_processed, masks_processed, test_size=0.3, random_state=42)

# Definição do modelo

In [None]:
class CyclicLR(Callback):
    def __init__(self, base_lr=1e-4, max_lr=1e-3, step_size=2000., mode='triangular'):
        super(CyclicLR, self).__init__()

        self.base_lr = base_lr
        self.max_lr = max_lr
        self.step_size = step_size
        self.mode = mode
        self.iterations = 0
        self.history = {}

    def clr(self):
        cycle = np.floor(1 + self.iterations / (2 * self.step_size))
        x = np.abs(self.iterations / self.step_size - 2 * cycle + 1)
        lr = self.base_lr + (self.max_lr - self.base_lr) * max(0, (1 - x))
        if self.mode == 'triangular2':
            lr = lr / float(2 ** (cycle - 1))
        elif self.mode == 'exp_range':
            lr = lr * (0.999 ** self.iterations)
        return lr

    def on_train_begin(self, logs=None):
        logs = logs or {}
        tf.keras.backend.set_value(self.model.optimizer.lr, self.base_lr)

    def on_batch_end(self, batch, logs=None):
        self.iterations += 1
        lr = self.clr()
        tf.keras.backend.set_value(self.model.optimizer.lr, lr)
        self.history.setdefault('lr', []).append(lr)
        for k, v in logs.items():
            self.history.setdefault(k, []).append(v)

# Função de callbacks
def get_callbacks():
    early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
    reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6)
    clr = CyclicLR(base_lr=1e-4, max_lr=1e-3, step_size=2000., mode='triangular2')
    return [early_stopping, reduce_lr, clr]

In [None]:
# Função para calcular a sigmoide e converter para 0 ou 1 o output
class ThresholdLayer(tf.keras.layers.Layer):
    def call(self, inputs):
        return tf.where(inputs < 0.5, 0.0, 1.0)

# Função para calcular o Dice Coefficient
def dice_coefficient(y_train, y_val):
    smooth = 1e-6
    intersection = tf.reduce_sum(y_train * y_val)
    dice_coefficient = (2. * intersection + smooth) / (tf.reduce_sum(y_train) + tf.reduce_sum(y_val) + smooth)
    return dice_coefficient

# Função de perda de Dice
def dice_loss(y_train, y_val):
    return 1 - dice_coefficient(y_train, y_val)

# Função para calcular a penalidade adicional
def penalty_loss(y_train, y_val, penalty_weight):
    # Calcular a penalidade considerando a diferença entre y_train e y_val
    penalty = tf.reduce_sum(tf.abs(y_train - y_val))
    # Multiplicar a penalidade pelo peso da penalidade
    weighted_penalty = penalty_weight * penalty
    return weighted_penalty

# Função de perda combinada
def combined_loss(y_train, y_val, penalty_weight):
    # Perda padrão (por exemplo, perda de entropia cruzada binária)
    standard_loss = tf.keras.losses.binary_crossentropy(y_train, y_val)
    # Dice Loss
    dice = dice_loss(y_train, y_val)
    # Penalidade adicional
    penalty = penalty_loss(y_train, y_val, penalty_weight)
    # Perda total = perda padrão + penalidade + Dice Loss
    total_loss = standard_loss + penalty + dice
    return total_loss

# Métrica de acurácia customizada
def custom_accuracy(y_train, y_val):
    # Calcular a acurácia considerando uma tolerância de 0.5 na predição
    y_val_binary = tf.round(y_val)
    accuracy = tf.reduce_mean(tf.cast(tf.equal(y_train, y_val_binary), tf.float32))
    return accuracy

In [None]:
def TransUnet(input_shape):
    inputs = tf.keras.Input(shape=input_shape)

    # Encoder (contraction path)
    conv1 = layers.Conv2D(16, 3, activation='relu', padding='same')(inputs)
    conv1 = layers.Conv2D(16, 3, activation='relu', padding='same')(conv1)
    pool1 = layers.MaxPooling2D(pool_size=(2, 2))(conv1)
    drop1 = layers.Dropout(0.3)(pool1)

    conv2 = layers.Conv2D(32, 3, activation='relu', padding='same')(drop1)
    conv2 = layers.Conv2D(32, 3, activation='relu', padding='same')(conv2)
    pool2 = layers.MaxPooling2D(pool_size=(2, 2))(conv2)
    drop2 = layers.Dropout(0.3)(pool2)

    conv3 = layers.Conv2D(64, 3, activation='relu', padding='same')(drop2)
    conv3 = layers.Conv2D(64, 3, activation='relu', padding='same')(conv3)
    pool3 = layers.MaxPooling2D(pool_size=(2, 2))(conv3)
    drop3 = layers.Dropout(0.3)(pool3)

    # Bottleneck
    conv4 = layers.Conv2D(128, 3, activation='relu', padding='same')(drop3)
    conv4 = layers.Conv2D(128, 3, activation='relu', padding='same')(conv4)
    drop4 = layers.Dropout(0.3)(conv4)

    # Decoder (expansion path)
    up5 = layers.Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(drop4)
    merge5 = layers.concatenate([conv3, up5], axis=3)
    conv5 = layers.Conv2D(64, 3, activation='relu', padding='same')(merge5)
    conv5 = layers.Conv2D(64, 3, activation='relu', padding='same')(conv5)

    up6 = layers.Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(conv5)
    merge6 = layers.concatenate([conv2, up6], axis=3)
    conv6 = layers.Conv2D(32, 3, activation='relu', padding='same')(merge6)
    conv6 = layers.Conv2D(32, 3, activation='relu', padding='same')(conv6)

    up7 = layers.Conv2DTranspose(16, (2, 2), strides=(2, 2), padding='same')(conv6)
    merge7 = layers.concatenate([conv1, up7], axis=3)
    conv7 = layers.Conv2D(16, 3, activation='relu', padding='same')(merge7)
    conv7 = layers.Conv2D(16, 3, activation='relu', padding='same')(conv7)

    outputs = layers.Conv2D(1, 1, activation='sigmoid')(conv7)  # Saída com um canal (máscara binária)

    threshold_output = ThresholdLayer()(outputs)

    model = tf.keras.Model(inputs=inputs, outputs=outputs)
    return model

In [None]:
# Criar modelo TransU-Net
model = TransUnet(input_shape=(256, 256, 3))
#model = UNet(output_channels=1)
model.summary()

# Treino do modelo

In [None]:
callbacks = get_callbacks()

# Compilar o modelo
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4), loss=lambda y_train, y_val: combined_loss(y_train, y_val, 0.001), metrics=[custom_accuracy])

# Treinar o modelo
H = model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=200, batch_size=16, callbacks=callbacks)

# Avaliação do modelo

In [None]:
# Avaliando o modelo
loss, accuracy = model.evaluate(X_val, y_val)
print(f'Acurácia do modelo: {accuracy}')

# Mostrando resultados
plt.style.use("ggplot")
plt.figure()
plt.plot(H.epoch, H.history["loss"], label="train_loss")
plt.plot(H.epoch, H.history["val_loss"], label="val_loss")
plt.plot(H.epoch, H.history["custom_accuracy"], label="train_acc")
plt.plot(H.epoch, H.history["val_custom_accuracy"], label="val_acc")
plt.title("Training Loss and Accuracy")
plt.xlabel("Epoch #")
plt.ylabel("Loss/Accuracy")
plt.legend()
plt.show()

In [None]:
# Avaliar o modelo nos dados de teste
results = model.evaluate(X_val, y_val)
print("Test Loss:", results[0])
print("Test Accuracy:", results[1])

# Prever máscaras usando o modelo
predicted_masks = model.predict(X_val)

In [None]:
# Obter métricas de precisão e perda do histórico de treinamento
acc = H.history['custom_accuracy']
val_acc = H.history['val_custom_accuracy']
loss = H.history['loss']
val_loss = H.history['val_loss']

# Número de épocas
epochs = range(1, len(acc) + 1)

# Plotar precisão do conjunto de treino e validação
plt.plot(epochs, acc, 'r', label='Precisão do Conjunto de Treino')
plt.plot(epochs, val_acc, 'b', label='Precisão do Conjunto de Validação')
plt.title('Precisão do Conjunto de Treino e Validação')
plt.xlabel('Épocas')
plt.ylabel('Precisão')
plt.legend()
plt.show()

# Plotar perda do conjunto de treino e validação
plt.plot(epochs, loss, 'r', label='Perda do Conjunto de Treino')
plt.plot(epochs, val_loss, 'b', label='Perda do Conjunto de Validação')
plt.title('Perda do Conjunto de Treino e Validação')
plt.xlabel('Épocas')
plt.ylabel('Perda')
plt.legend()
plt.show()

In [None]:
# Gerar as saídas do modelo para um conjunto de entradas de teste
saidas_modelo = model.predict(X_val)

# Iterar sobre cada saída do modelo
for i in range(len(X_val)):
    # Obter a entrada correspondente e a saída real
    img_entrada = X_val[i]
    img_saida_real = y_val[i]

    # Obter a saída gerada pelo modelo
    img_saida_modelo = saidas_modelo[i]

    # Mostrar as imagens
    plt.figure(figsize=(12, 4))

    plt.subplot(1, 3, 1)
    plt.imshow(img_entrada.squeeze(), cmap='gray')
    plt.title('Entrada')
    plt.axis('off')

    plt.subplot(1, 3, 2)
    plt.imshow(img_saida_real.squeeze(), cmap='gray')
    plt.title('Saída Esperada')
    plt.axis('off')

    plt.subplot(1, 3, 3)
    plt.imshow(img_saida_modelo.squeeze(), cmap='gray')
    plt.title('Saída do Modelo')
    plt.axis('off')

    plt.show()

In [None]:
# Implementando as outras métricas

# Lista para armazenar os scores de IoU
iou_scores = []
# Calcular IoUs e determinar predições corretas
correct_predictions = 0
iou_threshold = 0.5
for mask, result in zip(y_val, img_saida_modelo):
    intersection = np.logical_and(mask, result)
    union = np.logical_or(mask, result)
    iou_score = np.sum(intersection) / np.sum(union) if np.sum(union) != 0 else 0
    iou_scores.append(iou_score)
    # Verificar se a predição é considerada correta (IoU >= threshold)
    if iou_score >= iou_threshold:
        correct_predictions += 1
    print('IoU é: ' + str(iou_score))
# Calcular a média dos IoUs
iou_mean = np.mean(iou_scores)
print('Média dos IoU:', iou_mean)
# Calcular Coverage Ratio (CovR)
total_predictions = len(iou_scores)
covr = correct_predictions / total_predictions if total_predictions > 0 else 0
print('Coverage Ratio (CovR):', covr)