In [1]:
import os
import sys
import glob
import json
import shutil
import subprocess
import datetime as dt
from datetime import datetime
from functools import lru_cache
from typing import List, Union, Tuple
from contextlib import contextmanager

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.graph_objects as go
import plotly.express as px
import plotly.io as pio

import torch
from numba import jit

from osgeo import gdal
import osgeo_utils.gdal_merge
import rasterio
from rasterio.windows import Window
import geopandas as gpd
import rioxarray as rxr

from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (accuracy_score, recall_score, f1_score, precision_score, 
                             confusion_matrix, matthews_corrcoef, cohen_kappa_score, 
                             hamming_loss, classification_report)

import matplotlib.image as mpimg
from matplotlib.colors import ListedColormap



@contextmanager
def cwd(path: str) -> None:

    """
    Context manager para mudar o diretório de trabalho.
    Mantém o diretório original após a execução do bloc
    
    o de código.
    """

    oldpwd = os.getcwd()
    os.chdir(path)
    try:
        yield
    finally:
        os.chdir(oldpwd)

#np.set_printoptions(threshold=sys.maxsize, edgeitems=sys.maxsize, linewidth=sys.maxsize, precision=sys.maxsize)

In [None]:
def listdir_fullpath(d: str) -> List[str]:
    
    """
    Retorna uma lista de caminhos completos para os arquivos em um diretório.
    """
    
    return [os.path.join(d, f) for f in sorted(os.listdir(d))]

class AmbienteIncorretoError(Exception):
    def __init__(self):
        self.message = "Necessário estar no ambiente 'km_predict' para rodar o modelo"
        super().__init__(self.message)


class Preprocessing:
    def __init__(self, caminho: str) -> None:
        self.caminho: str = caminho  # Pasta onde as imagens .SAFE estão
        self.abs_caminho: str = os.path.abspath(self.caminho)
        self.imgs_diretorio: List[str] = os.listdir(self.caminho)  # Nome de cada imagem .SAFE no diretório
        self.diretorios_tif: List[str] = sorted([diretorio.replace('.SAFE', '.TIF') for diretorio in self.imgs_diretorio if diretorio.endswith('.SAFE')])
        self.caminho_completo_lista: List[str] = [item for item in listdir_fullpath(self.caminho) if item.endswith('.SAFE')]

    def merge_tif_files(self, nome_TIF: str, output_name:str) -> None:
        """
        Mescla arquivos TIFF em um único arquivo usando gdal_merge.
        """
        with cwd(nome_TIF):
            if os.path.exists(output_name):
                os.remove(output_name)
            arquivos_tif: List[str] = glob.glob('*B*.tif')
            arquivos_tif: List[str] = self._sort_files(arquivos_tif,sufix='.tif')
            comando: List[str] = ['gdal_merge.py', '-o', output_name, '-of', 'Gtiff', '-separate', '-ot', 'FLOAT32', '-co', 'BIGTIFF=YES'] + arquivos_tif
            parameters = ['', '-o', output_name] + arquivos_tif + ['-separate', '-co', 'COMPRESS=LZW','-co','BIGTIFF=YES','-co', 'COMPRESS=LZW']
            osgeo_utils.gdal_merge.main(parameters)
            list(map(os.remove, arquivos_tif))

    def _create_tif_folder(self) -> None:
        """
        Cria pastas que serão utilizadas para o resultado dos modelos.
        """
        for tif_dir in self.diretorios_tif:
            if os.path.exists(tif_dir):
                shutil.rmtree(f'./{tif_dir}')
            os.makedirs(tif_dir)
        self._modify_img_diretorio()

    def _modify_img_diretorio(self) -> None:
        self.imgs_diretorio: List[str] = [diretorio for diretorio in self.imgs_diretorio if diretorio.endswith('.SAFE')]

    def jp2_to_tif(self, tif_dim: List[str] = ['10980','10980'], output_name: str = 'merge.tif', create_folder:bool = True) -> None:
        """
        Converte arquivos JP2 para TIFF usando gdal_translate.
        """
        with cwd(self.caminho):
            if create_folder:
                self._create_tif_folder()
            for diretorio in self.imgs_diretorio:
                files: List[str] = glob.glob(os.path.join(f'{diretorio}', 'GRANULE', '*', 'IMG_DATA', '*B*.jp2'))
                files: List[str] = self._sort_files(files,sufix='.jp2')
                nome_TIF: str = diretorio.replace('.SAFE', '.TIF')
                commands: List[List[str]] = []
                
                for f in files:
                    input_path: str = f
                    output_path: str = nome_TIF + '/' + os.path.splitext(os.path.basename(f))[0] + '.tif'
                    if os.path.exists(output_path):
                        os.remove(output_path)
                    cmd: List[str] = ['gdal_translate', input_path, '-ot', 'Float32', '-of', 'Gtiff', '-outsize', tif_dim[0], tif_dim[1], output_path, '-co', 'BIGTIFF=YES']
                    commands.append(cmd)
                    
                for cmd in commands:
                    subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                    
                self.merge_tif_files(nome_TIF,output_name=output_name)
            
                    
                    
    def _sort_files(self, lista: list, sufix: str):
        lista = sorted(lista)

        b08_name: int = [item for item in lista if item.endswith(f"B08{sufix}")][0]
        b08_index: int = lista.index(b08_name)

        b8A_name: int = [item for item in lista if item.endswith(f"B8A{sufix}")][0]
        b8A_index: int = lista.index(b8A_name)

        lista.insert(b08_index + 1, lista.pop(b8A_index))
        
        return lista

    def _criar_symlinks_kappaMask(self, kappa_mask_folder: str) -> None:
        """
        Cria symlinks para pasta "data" do kappamask.
        """
        with cwd(kappa_mask_folder):
            if not os.path.exists('./data'):
                os.makedirs('data')
                
        pasta_origem: str = os.path.abspath(kappa_mask_folder)
        for diretorio in self.caminho_completo_lista:
            caminho_pasta: str = self.caminho + '/' + diretorio
            pasta_destino: str = os.path.abspath(caminho_pasta)
            symlink_path: str = os.path.join(pasta_origem, "data", os.path.basename(pasta_destino))
            try:
                os.symlink(pasta_destino, symlink_path)
                print(f"Symlink criado com sucesso: {symlink_path} -> {pasta_destino}")
            except FileExistsError:
                return
            except Exception as e:
                print(f"Erro ao criar symlink: {e}")

    def _config_json_kappa(self, kappa_mask_folder, product_name, architecture, json_config_name) -> None:
        diretorio_atual = self.abs_caminho
        print(diretorio_atual)
        json_config: dict = {
            "cm_vsm_executable": "cm_vsm",
            "folder_name": f"{diretorio_atual}",
            "product_name": f"{product_name.replace('.SAFE','')}",
            "level_product": "L1C",
            "overlapping": 0.0625,
            "tile_size": 512,
            "resampling_method": "sinc",
            "architecture": f"{architecture}",
            "batch_size": 1,
            "aoi_geometry": ""
        }
        caminho_json_config: str = f'./config/{json_config_name}'
        with cwd(kappa_mask_folder):
            with open(caminho_json_config, 'w') as arquivo:
                json.dump(json_config, arquivo, indent=2)

class Modelos(Preprocessing):
    def __init__(self, caminho: str) -> None:
        self.caminho = caminho
        super().__init__(caminho)

    def start(self,create_folder) -> None:
        """Começa a etapa de pré-processamento dos arquivos .jp2"""
        self.jp2_to_tif(create_folder=create_folder)
        df = Modelos.get_gt_predict(self.caminho.split('/')[-1].upper())
        with cwd(self.caminho):
            list(map(shutil.rmtree, self.diretorios_tif))
        return df
        
    @staticmethod
    def read_tif_values(tif_path, x, y):
        with rasterio.open(tif_path) as src:
            # Converte as coordenadas x, y em índices de linha e coluna no raster
            row, col = src.index(x, y)
            values = []
            for i in range(1, src.count + 1):  # Para cada banda
                window = Window(col, row, 1, 1)
                band_values = src.read(i, window=window)
                values.append(band_values[0][0])
            return values

    @staticmethod
    def tifs(df_rotulos, index, name,regiao):
        ponto_de_montagem = f'/media/jean/90D8B801D8B7E41E/Ubuntu/{regiao.title()}'
        destino = os.path.abspath(ponto_de_montagem)
        with cwd(destino):
            with cwd(name):
                geotiff_path = 'merge.tif'
                # Lista para armazenar os valores das bandas
                band_values_list = []

                # Itera sobre cada linha do dataframe
                for _, row in df_rotulos.iterrows():
                    x, y = row['x'], row['y']
                    values = Modelos.read_tif_values(geotiff_path, x, y)
                    band_values_list.append(values)

                # Adiciona os valores das bandas ao dataframe original
                band_values_array = np.array(band_values_list)
                for i in range(band_values_array.shape[1]):
                    df_rotulos[f'band_{i + 1}'] = band_values_array[:, i]

                return df_rotulos
            
    @staticmethod
    def get_gt_predict(regiao):
        def get_value_from_data_df(row):
            return data_df[row['x']][float(row['y'])]
        resultado = {}
        with cwd('./Pontos_Validados/'):
            with cwd(f'./{regiao.upper()}'):
                dates = [x for x in os.listdir() if not x in (".ipynb_checkpoints")]
                i=0
                for data in dates:
                    date = f'{data}/'
                    print(date)
                    with cwd(date):
                        models = [x for x in sorted(os.listdir()) if ( x != ".ipynb_checkpoints") and (not(x.endswith('.xml'))) and (not(x.endswith('.jp2')))]
                        for model in models:
                            with cwd(os.path.join(model)):
                                xml = [x for x in os.listdir(f'../') if x.endswith("xml")][0].split('_')[4:11]
                                name = '_'.join(xml).replace('.SAFE','.TIF')
                                files = sorted(os.listdir())  
                                shp = [shp for shp in files if shp.endswith('.shp')][0]
                                shapefile = gpd.read_file(shp)
                                new_df = shapefile['geometry'].get_coordinates()
                                df = pd.concat([new_df,shapefile['GrndTruth']],axis=1)
                                df['datetime'] = pd.to_datetime(data.replace('(1)','').replace('_','/'),dayfirst=True)
                                df = Modelos.tifs(df,i,name,regiao)
                                resultado[f'{regiao}_{model}_{data}'] = df
        return resultado

pasta_principal = '/media/jean/90D8B801D8B7E41E/Ubuntu/'
for i in os.listdir(pasta_principal)[1:]:
    pasta_regiao = os.path.abspath(os.path.join(pasta_principal,i))
    obj: Modelos = Modelos(pasta_regiao)
    teste_df_regiao = obj.start(create_folder=True)  # Executar uma ÚNICA vez na pasta no qual as imagens estão.
    pd.concat(teste_df_regiao.values(),axis=0).to_csv(f'{i.title()}_pixels.csv')

...100 - done.
0...10...20...30...40...50...60...70...80...90...100 - done.
0...10...20...30...40...50...60...70...80...90...100 - done.
0...10...20...30...40...50...60...70...80...90...100 - done.
0...10...20...30...40...50...60...70...80...90...100 - done.
0...10...20...30...40...50...60