## Segmentação de Imagens de áreas desmatadas obtidas pelo Satélite Landsat8

### Configuração

#### Instalando os requisitos básicos

In [2]:
from google.colab import drive
# 1. Montar o Google Drive
print("Montando o Google Drive...")
drive.mount('/content/drive')


ModuleNotFoundError: No module named 'google.colab'

In [None]:
!pip install git+https://github.com/MIT-AI-Accelerator/multiearth-challenge

Collecting git+https://github.com/MIT-AI-Accelerator/multiearth-challenge
  Cloning https://github.com/MIT-AI-Accelerator/multiearth-challenge to /tmp/pip-req-build-stdp_8ge
  Running command git clone --filter=blob:none --quiet https://github.com/MIT-AI-Accelerator/multiearth-challenge /tmp/pip-req-build-stdp_8ge
  Resolved https://github.com/MIT-AI-Accelerator/multiearth-challenge to commit 21cd2c7f2b5ea7cd8bba5fa49af5fc295e7cf3c9
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting jupyter>=1.0 (from multiearth-challenge==1.0.0)
  Downloading jupyter-1.1.1-py2.py3-none-any.whl.metadata (2.0 kB)
Collecting netcdf4>=1.5 (from multiearth-challenge==1.0.0)
  Downloading netCDF4-1.7.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.8 kB)
Collecting jupyterlab (from jupyter>=1.0->multiearth-challenge==1.0.0)
  Downloading jupyterlab-4.4.4-py3-none-any.whl.metadata (16 kB)
Collecting cftime (from netcdf4>=1.5->multiearth-challenge==1.0.0)
  Downloading cft

In [3]:
#Importando bibliotecas
import pkg_resources
from pathlib import Path
import numpy as np
from matplotlib import pyplot as plt
from skimage import filters, morphology
from skimage.transform import resize
import pandas as pd
from multiearth_challenge.datasets import segmentation_dataset as sd
from tqdm import tqdm


  import pkg_resources


#### Montando o dataset

In [None]:
data_dir = Path("./")
source_files = [data_dir / "landsat8_train.nc"]
segmentation_files = [data_dir / "deforestation_train.nc"]

# 1. Bandas de detecção da imagem de entrada
# As bandas disponíveis são
# ['SR_B1', 'SR_B2', 'SR_B3', 'SR_B4', 'SR_B5', 'SR_B6', 'SR_B7', 'ST_B10', 'QA_PIXEL']
# Selecionaremos as bandas no espectro de luz visível
source_bands = {"Landsat-8": ["SR_B4", "SR_B3", "SR_B2"]} # RGB

# 2. Cobertura de nuvens (em pct) [min, max].
source_cloud_coverage = (0, 0)

# 3. Janela temporal ao redor da data do target
# Como o tempo de revisita do landsat8 é de 16 dias, usaremos esse valor
source_date_window = (-16, 16)

# 4. Se 'True', gera um par (source, target) para cada source compatível, false retorna uma lista das imagens compatíveis
# True gera "duplicação" de targets, então usamos false e lidamos com a lista posteriormente
single_source_image = False

dataset = sd.ImageSegmentationDataset(
    source_files=source_files,
    segmentation_files=segmentation_files,
    source_bands=source_bands,
    merge_source_bands=True,
    source_cloud_coverage=source_cloud_coverage,
    source_date_window=source_date_window,
    single_source_image=single_source_image,
)

print(f"Encontrados {len(dataset)} targets com ao menos uma imagem compatível.")

### Pipeline

#### Funções Principais

In [None]:
def luminance(image):
    l = 0.2126 * image[:, :, 0] + 0.7152 * image[:, :, 1] + 0.0722 * image[:, :, 2]
    return l

# Função auxiliar para normalizar a imagem para visualização (valores entre 0 e 1)
def normalize(img):
    img = img.astype(np.float64)
    min_val = np.min(img)
    max_val = np.max(img)
    if max_val > min_val:
        img = (img - min_val) / (max_val - min_val)
    else:
        img = img * 0  # imagem preta se max==min
    return img

def converter_uint8(img_float):
    #Converte uma imagem float [0, 1] para uint8 (0-255)
    return (img_float * 255).astype(np.uint8)

In [None]:
##FUNCÕES DE REALCE
#Histograma
#Computa a distribuição de intensidades de pixel em uma img
def histograma(img, tam):
    acumulador = np.zeros((tam,))
    h, w = img.shape
    for i in range(h):
        for j in range(w):
            acumulador[img[i, j]] += 1
    return acumulador

#Histograma Cumulativo
#Computa a proporcao de intensidades
def histo_cumulativo(h, L):
    acumulador = np.zeros((L,))
    acumulador[0] = h[0]
    for i in range(1, L):
        acumulador[i] = acumulador[i - 1] + h[i]
    return acumulador

#Funcao que aplica uma equalizacao no histograma
#T(z) = ((L-1)/ total_pixels) * Hc(z)
def transf(hc, img, total=None):

    h, w = img.shape
    L = 256  # níveis de cinza
    if total is None:
        total = float(h * w)
    new_img = np.zeros_like(img, dtype=np.uint8)
    aux = (L-1)/total
    for i in range(h):
        for j in range(w):
            z = img[i, j]
            new_val = aux * hc[z]
            new_img[i, j] = new_val

    return new_img.astype(np.uint8)

#Realiza a equalizacao de forma individual
def equaliza_indiv(imagens):
    img_eq =  []
    for img in imagens:
        h = histograma(img, 256)
        hc = histo_cumulativo(h, 256)
        img_eq.append(transf(hc, img))

    return img_eq

#realiza a equalizacao de forma conjunta, i.e, para n imagens
def equaliza_conj(imagens):
    h_conjunto = np.zeros(256)
    for img in imagens:
        h_conjunto += histograma(img, 256)

    hc_conjunto = histo_cumulativo(h_conjunto, 256)
    total_pixels = sum(img.size for img in imagens)  # total de pixels do conjunto
    imagens_eq = [transf(hc_conjunto, img, total=total_pixels) for img in imagens]

    return imagens_eq, hc_conjunto

def plot_histograms(images, titles, main_title):
    plt.figure(figsize=(15, 10))

    for i, (img, title) in enumerate(zip(images, titles)):
        plt.subplot(2, len(images), i+1)
        plt.imshow(img, cmap='gray')
        plt.title(title)
        plt.axis('off')

        plt.subplot(2, len(images), i+1+len(images))
        hist = histograma(img, 256)
        plt.bar(range(256), hist)
        plt.title(f'Histograma {title}')

    plt.suptitle(main_title)
    plt.tight_layout()
    plt.show()

In [None]:
#Filtro de ruído
def gaussian_filter(k=3, sigma=1.0):
    arx = np.arange((-k//2) + 1.0, (k//2) + 1.0)
    x,y = np.meshgrid(arx, arx)
    filt = np.exp(-(1/2) * (np.square(x) + np.square(y)) / np.square(sigma))

    return np.array(filt/np.sum(filt))

#Filtro Laplaciano
def operador_laplaciano():
    return np.array([[0,-1,  0],
                     [-1, 4, -1],
                     [0, -1, 0]])

def pad_to_shape(arr, shape):
    """Faz padding de arr para o shape fornecido (centrado)"""
    pad_y = shape[0] - arr.shape[0]
    pad_x = shape[1] - arr.shape[1]
    pad_top = pad_y // 2
    pad_bottom = pad_y - pad_top
    pad_left = pad_x // 2
    pad_right = pad_x - pad_left
    return np.pad(arr, ((pad_top, pad_bottom), (pad_left, pad_right)), mode='constant')

#Filtro de Minimos Quadrados Restritos
def filter_min_quad_rest(sigma, tam_f, img_deg, gama):
    #criando os filtros de degradação(gaussiano) e limitante(laplace)
    laplace = operador_laplaciano()
    filt_deg = gaussian_filter(k = tam_f, sigma=sigma)

    #realizando o padding nos filtros
    filt_h_pad = pad_to_shape(filt_deg, img_deg.shape)
    laplace_pad = pad_to_shape(laplace, img_deg.shape)


    #aplicando a tf na imagem e filtros
    img_deg_ft = np.fft.fft2(img_deg)
    filt_h = np.fft.fft2(filt_h_pad)
    filt_h_conj = np.conjugate(filt_h)
    op_laplace = np.fft.fft2(laplace_pad)

    #realizando a filtragem do dominio da frequencia ( produto na frequencia)
    result = (filt_h_conj / (np.abs(filt_h) ** 2 + gama *( np.abs(op_laplace) ** 2))) * img_deg_ft

    #aplicando a tf inversa e voltando pro dominio real
    img_result = np.fft.ifft2(result)
    img_result = np.fft.ifftshift(img_result)
    img_result = np.real(img_result)

    return((normalize(img_result)*255).astype(np.uint8))


#Define a equação do realce gama
def realce_gama(img, gama):
    img_gama = img**(1/gama)
    img_gama = normalize(img_gama)
    img_norm = (img_gama * 255).astype(np.uint8)

    return img_norm
#Aplica a correção gama na imagem
def correc_gamma(imagem, gama):
    imagens_gama = realce_gama(imagem, gama)
    return imagens_gama
def metodo_otsu(imagem):
    #calculando o histograma da imagem
    histo, bin_edges = np.histogram(imagem, bins=np.arange(257))

    #calculando o tamanho da imagem
    total_pixels = imagem.size
    current_max, limiar = 0,0

    #calculando a media global da imagem
    media_global = np.sum(histo * np.arange(256))/total_pixels

    #variaveis auxiliares
    peso_fundo, media_fundo = 0, 0
    soma_fundo = 0

    for i in range(256):
        #peso de fundo
        peso_fundo += histo[i]

        if peso_fundo == 0:
            continue

        #veirificando o primeiro plano
        peso_plan1 =  total_pixels - peso_fundo

        if peso_plan1 == 0:
            break

        #media do fundo
        soma_fundo += i * histo[i]
        media_fundo = soma_fundo / peso_fundo

        #media primeiro plano
        media_plan1 =(media_global * total_pixels - soma_fundo)/peso_plan1

        #calculando a variancia intra_classe
        var_interclas = peso_fundo * peso_plan1 * (media_fundo - media_plan1) ** 2

        #maxima var e limiarização
        if var_interclas > current_max:
            current_max = var_interclas
            limiar = i

    return limiar

def open_close(img, selem):

    #realizando o processo de abertura ( erosão + dilatação)
    img_opened = morphology.binary_opening(img,selem)

    #realizando o processo de fechamentro (dilatação + erosão)
    img_result = morphology.closing(img_opened, selem)

    #retorna uma nova img
    return img_result

def aplicar_Morph(list_img, selem):
    # Recebe uma lista de imagens e retorna uma outra lista com imagens
    imgs_morph = []
    # Usando tqdm para visualizar o progresso
    for img in tqdm(list_img, desc="Aplicando Morfologia Matemática (Abertura e  Fechamento)"):
        morph = open_close(img, selem)
        imgs_morph.append(morph)
    return imgs_morph

In [None]:
from dataclasses import dataclass, field
import pandas as pd
import numpy as np
from typing import List
from sklearn.metrics import jaccard_score, precision_score, recall_score, f1_score
import copy

def etapa_cinza(source_data,target_data):
  new_dims = (85,85)
  rgb_image = source_data[0]["image"]  # (3, H, W)
  img_transp = rgb_image.transpose((1,2,0)) # converte para (H, W, 3)
  mask = target_data["image"]
  original_mask = mask.squeeze()

  # Opções para manter a característica binária da mascara
  downscaled_mask = resize(
      original_mask,
      new_dims,
      order=0,
      preserve_range=True,
      anti_aliasing=False
  )

  downscaled_mask = downscaled_mask.astype(original_mask.dtype)
  mask = downscaled_mask


  rgb   = converter_uint8(normalize(img_transp))
  cinza = converter_uint8(normalize(luminance(img_transp)))
  # mask.transpose((1,2,0))
  return rgb,cinza,mask

def etapa_filtros(img_cinza,gamma=0.5,sigma=0.01,tam_f=5):
  img_eq = equaliza_indiv([img_cinza])[0]
  img_mq = filter_min_quad_rest(sigma, tam_f, img_cinza, gamma)
  return img_eq,img_mq,img_cinza ## equalizada, minquad, original

def etapa_correcao(img_cinza,gammas:list[float]):
  return [correc_gamma(img_cinza,gamma) for gamma in gammas]

def etapa_otsu(img_cinza):
  limiar = metodo_otsu(img_cinza)
  bin_img = (img_cinza > limiar)#.astype(np.uint8) * 255 # Binariza a imagem com base no limiar
  return bin_img

def etapa_morfologia(img_bin,morf_obj):
  return open_close(img_bin,morf_obj)

def dice_score(y_true, y_pred, zero_division=1):
    y_true_f = y_true.flatten()
    y_pred_f = y_pred.flatten()
    intersection = np.sum(y_true_f * y_pred_f)
    if intersection == 0:
        return zero_division
    return 2 * intersection / (np.sum(y_true_f) + np.sum(y_pred_f))

def etapa_score(true_mask, img_bin):
    true_flat = true_mask.flatten()
    pred_flat = img_bin.flatten()

    jaccard = jaccard_score(true_flat, pred_flat, zero_division=1)
    dice = dice_score(true_mask, img_bin)
    precision = precision_score(true_flat, pred_flat, zero_division=1)
    recall = recall_score(true_flat, pred_flat, zero_division=1)
    f1 = f1_score(true_flat, pred_flat, zero_division=1)

    return jaccard, dice, precision, recall, f1

type2filter = {0:'original',1:'equalizada',2:'min quad'}
filter2type = {value:key for key,value in type2filter.items()}

@dataclass
class IMG:
  _img: List[tuple[np.ndarray,str]]  # armazena a lista real
  filter_type:int
  gamma:float=None
  morf_applied:bool=False
  append_allowed:bool=False ## define como substitui o img
  score_jaccard:float=None
  score_dice:float=None
  score_precision:float=None
  score_recall:float=None
  score_f1:float=None

  @property
  def img(self):
    return self._img[-1][0] if self._img else None

  @img.setter
  def img(self, value):
      if not (isinstance(value,list) or isinstance(value,tuple)):
        value = (value,self._img[-1][1])
      if self.append_allowed and self._img:
          self._img.append(value)
      else:
          self._img = [value]
  def get_imgs(self):
    return self._img

  def to_dict(self):
    return {'gamma':self.gamma,
      'filter':type2filter[self.filter_type],
      'morf_applied':self.morf_applied,
      'score_jaccard':self.score_jaccard,
      'score_dice':self.score_dice,
      'score_precision':self.score_precision,
      'score_recall':self.score_recall,
      'score_f1':self.score_f1
      }
  def __str__(self):
    return f"""
    Descricoes: {[img[1] for img in self._img]}
    Gamma: {self.gamma}
    Filter: {type2filter[self.filter_type]}
    IsMorf: {self.morf_applied}
    """


def pipeline(source_data,target_data,morf_obj,gamma_values=[0.75,2.0,1.0],
             gamma_filtro=0.5,sigma_filtro=0.01,filtro_tamf=5,verbose=False,append_allowed=False, only_scores=False):
  img_rgb, img_cinza, true_mask = etapa_cinza(source_data,target_data)
  if verbose: print("Etapa correção gamma")

  # Aplica correção gama diretamente na imagem cinza original
  imgs_corrigidas = [etapa_correcao(img_cinza, [gamma]) for gamma in gamma_values]  # retorna lista de listas
  imgs_corrigidas = [imgs[0] for imgs in imgs_corrigidas]  # extrai imagem corrigida de cada lista

  resultados: list[IMG] = []

  for img_corr, gamma in zip(imgs_corrigidas, gamma_values):
      if verbose: print(f"Aplicando filtros na imagem com gamma={gamma}")

      # Agora aplica equalização e filtros NA IMAGEM CORRIGIDA
      img_eq, img_mq, img_corr_gama = etapa_filtros(
          img_corr,
          gamma=gamma_filtro,  # não aplicar gamma de novo nos filtros
          sigma=sigma_filtro,
          tam_f=filtro_tamf
      )

      # Adiciona resultado para cada filtro
      for filter_type, img_base in [(0, img_corr), (1, img_eq), (2, img_mq)]:
        resultados.append(IMG([
            (img_rgb, 'original'),
            (img_cinza, 'cinza'),
            (img_corr, 'gamma'),
            (img_base, 'filtro')
        ],
        filter_type=filter_type,gamma=gamma,append_allowed=append_allowed))

  ## passa o otsu em todas as imagens
  if verbose: print(f"Etapa OTSU - {len(resultados)} imagens")
  for img_object in resultados:
    img_object.img = (etapa_otsu(img_object.img),'otsu')

  ## faz um outro conjunto com morfologia
  if verbose: print(f"Etapa Morfologia - {len(resultados)} imagens")

  resultados_morf = []
  for img_object in resultados:
    img_morf = etapa_morfologia(img_object.img,morf_obj)

    historico_imgs = copy.deepcopy(img_object.get_imgs())
    historico_imgs.append((img_morf,'morf'))

    resultados_morf.append(IMG(
                          historico_imgs,
                          filter_type=img_object.filter_type,
                          gamma=img_object.gamma,
                          morf_applied=True,
                          append_allowed=True
                          ))
  resultados.extend(resultados_morf)

  ## etapa de calcular score
  if verbose: print(f"Etapa Score - {len(resultados)} imagens")
  for img_object in resultados:
    img_object.img = normalize(img_object.img) ## normaliza para 0,1 pra poder ficar no formato do sklearn
    img_object.score_jaccard, img_object.score_dice, img_object.score_precision, img_object.score_recall, img_object.score_f1 = etapa_score(true_mask,img_object.img)

  if only_scores:
    for img_object in resultados:
      img_object._img = []

  return resultados

def IMGs2Dataframe(resultados:list[IMG]):
  df = pd.DataFrame([img_object.to_dict() for img_object in resultados])
  return df

#### Resultados

In [None]:
## mostrando o pipeline funcionando
selem = morphology.square(4)
for source_data, target_data in tqdm(dataset, desc="Vetorizando Imagens"):
  resultados_primeira_imagem = pipeline(source_data,target_data,selem,verbose=True,append_allowed=True)
  break
df_img1 = IMGs2Dataframe(resultados_primeira_imagem)
df_img1 = df_img1.sort_values('score_jaccard',ascending=False) ## ordena pelo com maior score
df_img1.head(18) ## só mostra o top 5


  selem = morphology.square(4)
Vetorizando Imagens:   0%|          | 0/10224 [00:00<?, ?it/s]

Etapa correção gamma
Aplicando filtros na imagem com gamma=0.75
Aplicando filtros na imagem com gamma=2.0
Aplicando filtros na imagem com gamma=1.0
Etapa OTSU - 9 imagens
Etapa Morfologia - 9 imagens
Etapa Score - 18 imagens


Vetorizando Imagens:   0%|          | 0/10224 [00:00<?, ?it/s]


Unnamed: 0,gamma,filter,morf_applied,score_jaccard,score_dice,score_precision,score_recall,score_f1
14,2.0,min quad,True,0.775064,0.87328,0.937323,0.817428,0.87328
5,2.0,min quad,False,0.74561,0.854269,0.917674,0.799059,0.854269
3,2.0,original,False,0.736091,0.847986,0.921419,0.785394,0.847986
17,1.0,min quad,True,0.734136,0.846688,0.948583,0.764561,0.846688
12,2.0,original,True,0.732306,0.845469,0.959602,0.7556,0.845469
1,0.75,equalizada,False,0.71374,0.832962,0.930348,0.754032,0.832962
7,1.0,equalizada,False,0.71374,0.832962,0.930348,0.754032,0.832962
8,1.0,min quad,False,0.71016,0.830519,0.923267,0.754704,0.830519
4,2.0,equalizada,False,0.705595,0.827388,0.933315,0.743056,0.827388
6,1.0,original,False,0.705595,0.827388,0.933315,0.743056,0.827388


In [None]:
import matplotlib.pyplot as plt
import numpy as np

def show_images(img_list, titles=None, cmap='gray'):
    n = len(img_list)-1
    print(n)
    cols = min(3, n)  # no máximo 5 colunas
    rows = (n + cols - 1) // cols

    fig, axes = plt.subplots(rows, cols, figsize=(3 * cols, 3 * rows))

    # Garantir que 'axes' seja sempre 2D array-like
    axes = np.array(axes).reshape(-1)

    for i in range(n):
        ax = axes[i]
        img,desc = img_list[i]
        title = titles[i] if titles else desc
        ax.set_title(title)
        ax.imshow(img, cmap=cmap)
        if titles:
            ax.set_title(titles[i])
        ax.axis('off')

    # Esconder os eixos extras, se houver
    for j in range(n, len(axes)):
        axes[j].axis('off')

    plt.tight_layout()
    plt.show()


r1 = resultados_primeira_imagem[14]
show_images(r1.get_imgs())
print(r1)

NameError: name 'resultados_primeira_imagem' is not defined

In [None]:
## Fazendo com todo o dataset
resultados = []
selem = morphology.square(4)
for i,(source_data, target_data) in enumerate(tqdm(dataset, desc="Vetorizando Imagens")):
  img_results = pipeline(source_data,target_data,selem, only_scores=True)
  resultados.extend(img_results)
df = IMGs2Dataframe(resultados)

  selem = morphology.square(4)
Vetorizando Imagens: 100%|██████████| 10224/10224 [33:16<00:00,  5.12it/s]


In [None]:
grouped = df.groupby(['gamma', 'filter', 'morf_applied']).agg(['mean', 'std'])

grouped

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,score_jaccard,score_jaccard,score_dice,score_dice,score_precision,score_precision,score_recall,score_recall,score_f1,score_f1
Unnamed: 0_level_1,Unnamed: 1_level_1,Unnamed: 2_level_1,mean,std,mean,std,mean,std,mean,std,mean,std
gamma,filter,morf_applied,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2,Unnamed: 10_level_2,Unnamed: 11_level_2,Unnamed: 12_level_2
0.75,equalizada,False,0.282143,0.311455,0.767109,0.289066,0.332824,0.382901,0.923192,0.115532,0.352203,0.364784
0.75,equalizada,True,0.33535,0.336706,0.824458,0.233334,0.406385,0.412892,0.89107,0.163708,0.405052,0.388216
0.75,min quad,False,0.29334,0.291381,0.792164,0.240572,0.478853,0.451128,0.760991,0.244368,0.373737,0.35679
0.75,min quad,True,0.291696,0.294754,0.789844,0.244643,0.495115,0.461052,0.745765,0.263976,0.370928,0.356661
0.75,original,False,0.290849,0.294943,0.78649,0.251716,0.495096,0.469757,0.754388,0.252766,0.369334,0.358288
0.75,original,True,0.263437,0.277326,0.758199,0.265164,0.526139,0.474473,0.689532,0.307573,0.344271,0.335755
1.0,equalizada,False,0.283936,0.312823,0.768737,0.288207,0.334327,0.383368,0.923207,0.115039,0.353831,0.365954
1.0,equalizada,True,0.337755,0.338413,0.826485,0.231962,0.408827,0.413916,0.890794,0.163407,0.40708,0.389582
1.0,min quad,False,0.316507,0.311065,0.81067,0.232294,0.472067,0.448409,0.795789,0.214368,0.39322,0.372938
1.0,min quad,True,0.315809,0.314039,0.810769,0.233464,0.485118,0.457692,0.784168,0.232337,0.391559,0.373272


In [None]:
# Qual método obteve o melhor resultado
max_row_idx = grouped[('score_jaccard', 'mean')].idxmax()

grouped.loc[max_row_idx].to_frame().T

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,score_jaccard,score_jaccard,score_dice,score_dice,score_precision,score_precision,score_recall,score_recall,score_f1,score_f1
Unnamed: 0_level_1,Unnamed: 1_level_1,Unnamed: 2_level_1,mean,std,mean,std,mean,std,mean,std,mean,std
2.0,original,False,0.34831,0.349433,0.828597,0.244899,0.466061,0.459333,0.850865,0.168834,0.413398,0.400576
