## Conectando ao Google Drive

In [None]:
# Mounting GDrive.
from google.colab import drive
drive.mount('/content/drive')

## Import das Bibliotecas

In [None]:
!pip install rasterio

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras.preprocessing.image import ImageDataGenerator, load_img, img_to_array
import os
import time
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import rasterio
from rasterio.enums import Resampling
import shutil
from PIL import Image
import skimage.io as io
import skimage.transform as trans
import numpy as np
from keras.models import *
from keras.layers import *
from keras.optimizers import *
from keras.callbacks import ModelCheckpoint, LearningRateScheduler
from keras import backend as keras
from sklearn.model_selection import train_test_split
import glob

##Seleção de parâmetros



In [None]:
#@title Configuração do Processamento de Imagem { run: "auto", display-mode: "form" }

#@markdown ### Seleção de Tipos de Imagem:
marked_rgb = True #@param {type:"boolean"}
rgb = True #@param {type:"boolean"}
png = True #@param {type:"boolean"}
ndvi = True #@param {type:"boolean"}
gndvi = True #@param {type:"boolean"}
original_tif = True #@param {type:"boolean"}

#@markdown ### Filtros de Pré-processamento:
calibration = True #@param {type:"boolean"}
atmospheric_correction = True #@param {type:"boolean"}
remove_clouds = True #@param {type:"boolean"}

#@markdown ### Dimensões da Imagem (Altura e Largura são iguais):
image_dimension = 300 # @param {type:"slider", min:300, max:1200, step:10}

#@markdown ### Técnicas de Data Augmentation:
flip_horizontal = True #@param {type:"boolean"}
rotation = True #@param {type:"boolean"}
scaling = True #@param {type:"boolean"}
shear = True #@param {type:"boolean"}
brightness_adjustment = True #@param {type:"boolean"}
none_aug = False #@param {type:"boolean"}

#@markdown ### Técnica de Normalização:
normalization_technique = "Max Abs Scaling" #@param ["Min-Max Scaling", "Z-Score", "Max Abs Scaling", "Robust Scaling", "None"] {type:"string"}

#@markdown ### Divisão de Dados de Treino/Teste (% para Treino):
training_percentage = 80 #@param {type:"slider", min:10, max:90, step:5}

# Compila a lista de tipos de imagem selecionados
image_types = []
if marked_rgb: image_types.append("Marked RGB")
if rgb: image_types.append("RGB")
if png: image_types.append("PNG")
if ndvi: image_types.append("NDVI")
if gndvi: image_types.append("GNDVI")
if original_tif: image_types.append("TIFF")
image_types.append("Binary Mask")

# Compila a lista de técnicas de data augmentation selecionadas
data_augmentation = []
if flip_horizontal: data_augmentation.append("Flip Horizontal")
if rotation: data_augmentation.append("Rotation")
if scaling: data_augmentation.append("Scaling")
if shear: data_augmentation.append("Shear")
if brightness_adjustment: data_augmentation.append("Brightness Adjustment")
if none_aug: data_augmentation = ["None"]  # Prioriza 'None' se selecionado

# Calcula a porcentagem de teste com base na porcentagem de treino
test_percentage = 100 - training_percentage

print(f"Tipos de Imagem Selecionados: {image_types}")
print(f"Técnicas de Data Augmentation: {data_augmentation}")
print(f"Calibração Radiométrica: {'Ativada' if calibration else 'Desativada'}")
print(f"Correção Atmosférica: {'Ativada' if atmospheric_correction else 'Desativada'}")
print(f"Dimensões da Imagem: {image_dimension} x {image_dimension}")
print(f"Técnica de Normalização: {normalization_technique}")
print(f"Porcentagem de Treino: {training_percentage}%")
print(f"Porcentagem de Teste: {test_percentage}%")


## Preparação dos Dados de Treino

Separação dos dados para treino do modelo. A ideia foi separar em diretórios distintos, cada variável vai armazenar o diretório com um tipo específico de imagem.

In [None]:
addressDSmain = '/content/drive/MyDrive/modulo10/data/dataset_inteli'

# Dicionário para mapear nomes de diretório aos seus respectivos sufixos
directory_suffixes = {
    "Marked RGB": 'marked_rgbs',
    "RGB": 'rgbs',
    "TIFF": 'tci_tifs',
    "PNG": 'tci_pngs',
    "NDVI": 'ndvi',
    "GNDVI": 'gndvi',
    "Binary Mask": 'masks'
}

# Dicionário com diretório e caminhos
directory_map = {
    key: os.path.join(addressDSmain, suffix)
    for key, suffix in directory_suffixes.items()
    if key in image_types
}

In [None]:
def process_files_in_directory(directory_path, process_function):
    """Processa arquivos dentro de um diretório e seus subdiretórios com uma função especificada."""
    for root, dirs, files in os.walk(directory_path):
        for file in files:
            if file.endswith('.tif'):
                file_path = os.path.join(root, file)
                process_function(file_path, file_path)  # Substitui o arquivo original



def calibrar_radiometricamente(image_path, output_path):
    with Image.open(image_path) as img:
        # Converte a imagem para um array numpy para processamento
        img_array = np.array(img)
        # Simulando a calibração radiométrica multiplicando cada pixel por 1.05
        calibrated_array = img_array * 1.05
        calibrated_array = np.clip(calibrated_array, 0, 255)  # Assegura que os valores estão entre 0 e 255
        # Converte de volta para uma imagem
        calibrated_img = Image.fromarray(calibrated_array.astype('uint8'))
        calibrated_img.save(output_path)

def corrigir_atmosfericamente(image_path, output_path):
    with Image.open(image_path) as img:
        # Converte a imagem para um array numpy para processamento
        img_array = np.array(img)
        # Simulando a correção atmosférica ajustando o contraste da imagem
        # Aumentando o contraste em 10%
        contrast_factor = 1.1
        mean = np.mean(img_array)
        corrected_array = (img_array - mean) * contrast_factor + mean
        corrected_array = np.clip(corrected_array, 0, 255)  # Assegura que os valores estão entre 0 e 255
        # Converte de volta para uma imagem
        corrected_img = Image.fromarray(corrected_array.astype('uint8'))
        corrected_img.save(output_path)

In [None]:
if "NDVI" or "GNDVI" in directory_map:
  # Caminho para as imagens TIF dentro de subdiretórios em "images"
  tif_path = os.path.join(addressDSmain, "images")

  # Processa todas as imagens TIF para calibração radiométrica e correção atmosférica
  if calibration:
    process_files_in_directory(tif_path, calibrar_radiometricamente)

  if atmospheric_correction:
    process_files_in_directory(tif_path, corrigir_atmosfericamente)

if "TIFF" in directory_map:
  # Caminho para as imagens TIF dentro de subdiretórios em "images"
  tif_path = directory_map["TIFF"]

  # Processa todas as imagens TIF para calibração radiométrica e correção atmosférica
  if calibration:
    process_files_in_directory(tif_path, calibrar_radiometricamente)

  if atmospheric_correction:
    process_files_in_directory(tif_path, corrigir_atmosfericamente)

In [None]:
treinamento_images = {}

def band_number(filename):
    parts = filename.split('.')[0].split('b')
    if len(parts) > 1:
        try:
            number_part = parts[1]
            if number_part.endswith('a'):
                return int(number_part[:-1]) + 0.1
            return int(number_part)
        except ValueError:
            return float('inf')
    return float('inf')


image_directories = sorted(os.listdir(addressDSmain+"/images"))

for directory in image_directories:
    directory_path = os.path.join(addressDSmain+"/images", directory)


    if os.path.isdir(directory_path):

        tif_files = sorted([f for f in os.listdir(directory_path) if f.endswith('.tif')], key=band_number)

        treinamento_images[directory] = [os.path.join(directory_path, f) for f in tif_files]


def get_band_paths(directory_files, band_nir='8', band_red='4', band_green='3'):
    """
    Retorna os caminhos das bandas NIR, Red e Green para cálculos de índices.

    Args:
    directory_files (list): Lista de todos os arquivos em um diretório.
    band_nir (str): Sufixo da banda NIR no nome do arquivo (padrão '8' para b8).
    band_red (str): Sufijo da banda Red no nome do arquivo (padrão '4' para b4).
    band_green (str): Sufixo da banda Green no nome do arquivo (padrão '3' para b3).

    Returns:
    tuple: Contém os caminhos dos arquivos para as bandas NIR, Red e Green.
          Cada elemento pode ser None se a banda correspondente não for encontrada.
    """
    nir_path = [file for file in directory_files if f"b{band_nir}.tif" in file]
    red_path = [file for file in directory_files if f"b{band_red}.tif" in file]
    green_path = [file for file in directory_files if f"b{band_green}.tif" in file]


    return nir_path[0] if nir_path else None, \
          red_path[0] if red_path else None, \
          green_path[0] if green_path else None


def calculate_and_save_index(nir_band_path, other_band_path, index_type, output_path):
    """Calcula e salva um índice específico (NDVI/GNDVI)."""
    with rasterio.open(nir_band_path) as nir_src, rasterio.open(other_band_path) as other_src:
        nir = nir_src.read(1, resampling=Resampling.bilinear)
        other_band = other_src.read(1, resampling=Resampling.bilinear)
        index = (nir.astype(float) - other_band.astype(float)) / (nir + other_band + 1e-10)


        profile = nir_src.profile
        profile.update(dtype=rasterio.float32, count=1, compress='lzw')

        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        with rasterio.open(output_path, 'w', **profile) as dst:
            dst.write(index.astype(rasterio.float32), 1)

In [None]:

def ensure_directory_clean(directory_path):
    """Garante que o diretório seja apagado se existir e criado limpo."""
    if os.path.exists(directory_path):
        shutil.rmtree(directory_path)
    os.makedirs(directory_path)

if "NDVI" in directory_map:
    ndvi_directory = directory_map["NDVI"]
    ensure_directory_clean(ndvi_directory)

    def process_ndvi(treinamento_images, base_path):
        """Processa e salva NDVI para cada diretório de imagem."""
        for directory, files in treinamento_images.items():
            nir_path, red_path, _ = get_band_paths(files)
            if nir_path and red_path:
                output_path = os.path.join(ndvi_directory, f'{directory}_NDVI.tif')
                calculate_and_save_index(nir_path, red_path, 'NDVI', output_path)

    process_ndvi(treinamento_images, addressDSmain)

if "GNDVI" in directory_map:
    gndvi_directory = directory_map["GNDVI"]
    ensure_directory_clean(gndvi_directory)

    def process_gndvi(treinamento_images, base_path):
        """Processa e salva GNDVI para cada diretório de imagem."""
        for directory, files in treinamento_images.items():
            nir_path, _, green_path = get_band_paths(files)
            if nir_path and green_path:
                output_path = os.path.join(gndvi_directory, f'{directory}_GNDVI.tif')
                calculate_and_save_index(nir_path, green_path, 'GNDVI', output_path)

    process_gndvi(treinamento_images, addressDSmain)

In [None]:
directory_destiny = os.path.join(addressDSmain, 'imagens_unificadas')

ensure_directory_clean(directory_destiny)

def copiar_e_renomear(diretorio_origem, prefixo):
    arquivos = sorted(os.listdir(diretorio_origem))
    for idx, arquivo in enumerate(arquivos):
        arquivo_origem = os.path.join(diretorio_origem, arquivo)
        novo_nome = f"{prefixo}_{idx}{os.path.splitext(arquivo)[1]}"
        arquivo_destino = os.path.join(directory_destiny, novo_nome)
        shutil.copy(arquivo_origem, arquivo_destino)

for prefixo, diretorio in directory_map.items():
    copiar_e_renomear(diretorio, prefixo)

print("Arquivos copiados e renomeados com sucesso.")

In [None]:
# def ensure_directory(directory):
#     if not os.path.exists(directory):
#         os.makedirs(directory)
#     else:
#         print(f"Directory '{directory}' already exists.")

# def data_augmentation(unified_directory, augmented_directory, image_dimension, rotation=True, scaling=True, shear=True, flip_horizontal=True):
#     ensure_directory(augmented_directory)

#     datagen = ImageDataGenerator(
#         rotation_range=90 if rotation else 0,
#         shear_range=0.2 if shear else 0.0,
#         zoom_range=0.2 if scaling else 0.0,
#         horizontal_flip=flip_horizontal,
#         fill_mode='nearest'
#     )

#     for filename in os.listdir(unified_directory):
#         if filename.endswith(".tif") or filename.endswith(".png"):
#             image_path = os.path.join(unified_directory, filename)
#             print(f"Processing file: {filename}")
#             try:
#                 image = load_img(image_path, target_size=(image_dimension, image_dimension))
#                 x = img_to_array(image)
#                 x = x.reshape((1,) + x.shape)

#                 name, ext = os.path.splitext(filename)

#                 i = 0
#                 for batch in datagen.flow(x, batch_size=1):
#                     i += 1
#                     new_name = f"augmented_{name}_{i}{ext}"
#                     save_path = os.path.join(augmented_directory, new_name)
#                     img_to_save = Image.fromarray(batch[0].astype('uint8'))
#                     img_to_save.save(save_path)
#                     if i >= 5:
#                         break

#             except UnidentifiedImageError:
#                 print(f"Error: Cannot identify image file {image_path}")
#             except Exception as e:
#                 print(f"Unexpected error occurred with file {image_path}: {e}")

# def count_images(directory):
#     count = len([file for file in os.listdir(directory) if file.endswith(".tif") or file.endswith(".png")])
#     return count

# addressDSmain = '/content/drive/MyDrive/modulo10/data/dataset_inteli'

# unified_directory = os.path.join(addressDSmain, "imagens_unificadas")
# augmented_directory = os.path.join(addressDSmain, "augmented_images")
# image_dimension = 224

# data_augmentation(unified_directory, augmented_directory, image_dimension)

# print("Data augmentation completed")

# num_augmented_images = count_images(augmented_directory)
# print(f"Number of augmented images: {num_augmented_images}")

## Data Augmentation

O objetivo deste data augmentation foi gerar novas imagens de máscaras (masks) para as imagens de satélite do dataset do cliente. Essencialmente, a mask foi definida como o target. Assim, para cada imagem aumentada, foi gerada uma máscara correspondente, permitindo que o modelo aprenda a segmentar corretamente.

A ideia é se inicia em renomear as imagens que vão ser utilizadas para treinamento do modelo, para o mesmo nome das suas correspondentes a masks, para depois realizar o mesmo método de augmentation para as imagens de treino.






In [None]:
import os

def rename_images(directory):
    for filename in os.listdir(directory):
        if filename.endswith(".tif") or filename.endswith(".png"):
            # Extrair o número antes do primeiro sublinhado
            new_name = filename.split('_')[0] + os.path.splitext(filename)[1]
            old_path = os.path.join(directory, filename)
            new_path = os.path.join(directory, new_name)
            os.rename(old_path, new_path)
            print(f"Renamed '{filename}' to '{new_name}'")

# Diretório onde estão as imagens marked_rgb
marked_rgb_directory = '/content/drive/MyDrive/modulo10/data/dataset_inteli/rgbs'

# Renomear as imagens
rename_images(marked_rgb_directory)

print("Renaming completed")


In [None]:
import os
from tensorflow.keras.preprocessing.image import ImageDataGenerator, load_img, img_to_array
from PIL import Image, UnidentifiedImageError

# Aplicando o mesmo data augmentation para as máscaras e imagens de satélite
def data_augmentation_for_pairs(marked_rgb_directory, masks_directory, image_dimension, rotation=True, scaling=True, shear=True, flip_horizontal=True):
    datagen = ImageDataGenerator(
        rotation_range=90 if rotation else 0,
        shear_range=0.2 if shear else 0.0,
        zoom_range=0.2 if scaling else 0.0,
        horizontal_flip=flip_horizontal,
        fill_mode='nearest'
    )

    for filename in os.listdir(marked_rgb_directory):
        if filename.endswith(".tif") or filename.endswith(".png"):
            image_path_rgb = os.path.join(marked_rgb_directory, filename)
            image_path_mask = os.path.join(masks_directory, filename)
            print(f"Processing file: {filename}")

            try:
                image_rgb = load_img(image_path_rgb, target_size=(image_dimension, image_dimension))
                image_mask = load_img(image_path_mask, target_size=(image_dimension, image_dimension))

                x_rgb = img_to_array(image_rgb)
                x_mask = img_to_array(image_mask)

                x_rgb = x_rgb.reshape((1,) + x_rgb.shape)
                x_mask = x_mask.reshape((1,) + x_mask.shape)

                name, ext = os.path.splitext(filename)

                i = 0
                rgb_gen = datagen.flow(x_rgb, batch_size=1, seed=42)
                mask_gen = datagen.flow(x_mask, batch_size=1, seed=42)

                for (batch_rgb, batch_mask) in zip(rgb_gen, mask_gen):
                    i += 1
                    new_name = f"{name}_{i}{ext}"
                    save_path_rgb = os.path.join(marked_rgb_directory, new_name)
                    save_path_mask = os.path.join(masks_directory, new_name)

                    img_to_save_rgb = Image.fromarray(batch_rgb[0].astype('uint8'))
                    img_to_save_mask = Image.fromarray(batch_mask[0].astype('uint8'))

                    img_to_save_rgb.save(save_path_rgb)
                    img_to_save_mask.save(save_path_mask)

                    if i >= 50:
                        break

            except UnidentifiedImageError:
                print(f"Error: Cannot identify image file {image_path_rgb} or {image_path_mask}")
            except Exception as e:
                print(f"Unexpected error occurred with file {filename}: {e}")

# Diretórios de entrada
addressDSmain = '/content/drive/MyDrive/modulo10/data/dataset_inteli'
marked_rgb_directory = os.path.join(addressDSmain, "rgbs")
masks_directory = os.path.join(addressDSmain, "masks")
image_dimension = 1200

# Aplicar data augmentation
data_augmentation_for_pairs(marked_rgb_directory, masks_directory, image_dimension)

print("Data augmentation completed")


In [None]:
import os
import numpy as np
import tensorflow as tf
from keras.preprocessing.image import load_img, img_to_array

def ensure_directory_exists(directory):
    if not os.path.exists(directory):
        os.makedirs(directory)

def normalize_images(image_directory, normalized_directory, image_dimension, normalization_technique):
    # Verify if the diretory exists
    ensure_directory_exists(normalized_directory)

    for filename in os.listdir(image_directory):
        if filename.endswith(".tif") or filename.endswith(".png"):
            image_path = os.path.join(image_directory, filename)
            image = load_img(image_path, target_size=(image_dimension, image_dimension))
            x = img_to_array(image)

            if normalization_technique == "Min-Max Scaling":
                min_val, max_val = np.min(x), np.max(x)
                if max_val - min_val != 0:
                    x = (x - min_val) / (max_val - min_val)
                else:
                    x = np.zeros_like(x)

            elif normalization_technique == "Z-Score":
                mean, std = np.mean(x), np.std(x)
                if std != 0:
                    x = (x - mean) / std
                else:
                    x = np.zeros_like(x)

            elif normalization_technique == "Max Abs Scaling":
                max_abs_val = np.max(np.abs(x))
                if max_abs_val != 0:
                    x = x / max_abs_val
                else:
                    x = np.zeros_like(x)

            elif normalization_technique == "Robust Scaling":
                median, iqr = np.median(x), np.percentile(x, 75) - np.percentile(x, 25)
                if iqr != 0:
                    x = (x - median) / iqr
                else:
                    x = np.zeros_like(x)

            x = (x * 255).astype(np.uint8)
            img = tf.keras.preprocessing.image.array_to_img(x)

            # Alterar o nome do arquivo para incluir 'normalized_' no início
            name, ext = os.path.splitext(filename)
            new_filename = f"normalized_{name}{ext}"

            img.save(os.path.join(normalized_directory, new_filename))

# Main directory
addressDSmain = '/content/drive/MyDrive/modulo10/data/dataset_inteli'

unified_directory = os.path.join(addressDSmain, "imagens_unificadas")
normalized_directory = os.path.join(addressDSmain, "imagens_normalizadas")
image_dimension = 224

normalize_images(unified_directory, normalized_directory, image_dimension, normalization_technique)

print("Normalização concluída")


In [None]:
image_path = '/content/drive/MyDrive/modulo10/data/dataset_inteli/rgbs/575_1.png'

with rasterio.open(image_path) as src:
    image_data = src.read(1)

plt.imshow(image_data, cmap='gray')
plt.axis('off')
plt.colorbar()
plt.show()