### Composição temporal 

A partir das imagens de satélite Sentinel-2, é feita a composição temporal. 
- Usa o canal SCL.
- Os valores de observação limpa são: 4, 5 e 6
- É feita a quebra em subtiles


In [None]:
# imports

import os
import sys
sys.path.append(os.path.abspath('..'))
from rasterio.coords import BoundingBox

import src.data.subtile_composition as subtile_composition
import src.data.preprocess_data as preprocess_data


     

### Algumas definições
- Número de subtiles: 6x6.
- Gerando para o tile 032027
- Raiz do caminho com dados brutos: data/raw
- Raiz do caminho dos resultados: data/processed
- É criada uma parta chamada S2-D16_V2_{tile}, e uma subpasta chamada {num_subtiles}x{num_subtiles}_subtiles.
- São salvos valores int16, de dimensões (12, width, height)
- Width e height são 10560/num_subtiles
- Não há sobreposição/overlap


In [None]:
working_dir = os.path.abspath('..')
raw_data_path = os.path.join(working_dir,'data/raw')
processed_data_path = os.path.join(working_dir,'data/processed')
num_subtiles = 6
tile = '016009'
subtile_composition.create_composition(in_folder=raw_data_path, 
                                        out_folder=processed_data_path,
                                        tile = tile,
                                        num_subtiles=num_subtiles,
                                        rewrite = True)



### Detalhes extras:

- Composição utiliza o métod da média das observações válidas
- Quando nenhuma observação é válida, é preenchido com NaN
- Foi feita a interpolação dos valores de NaN, e para distinguir, foram atribuídos valores negativos.
- Sendo assim, pra obter a imagem com interpolação basta usar np.abs
- Para obter os valores não interpolados, basta ler os valores positivos. 


### Exemplo de arquivo gerado:


In [None]:
import rasterio

composition_data_file = os.path.join(working_dir,'data/processed/S2-16D_V2_016009/6x6_subtiles/S2-16D_V2_016009_x=0_y=0.tif')
with rasterio.open(composition_data_file) as src:
    # Read the raster data
    data = src.read()

data.shape

In [None]:
import numpy as np
save_to = os.path.join(working_dir, 'figs', 'composition.png')
subtile_composition.display_images(np.abs(data), limit=-1, save_to=save_to)

In [None]:
working_dir = os.path.abspath('..')
raw_data_path = os.path.join(working_dir,'data/raw')
processed_data_path = os.path.join(working_dir,'data/processed')
num_subtiles = 12
tile = '025037'
subtile_composition.create_composition(in_folder=raw_data_path, 
                                        out_folder=processed_data_path,
                                        tile = tile,
                                        num_subtiles=num_subtiles)

In [None]:
import os
import sys
sys.path.append(os.path.abspath('..'))
import rasterio
import numpy as np

import src.data.subtile_composition as subtile_composition

tiles = {#'MG':'032027',
         'RS': '025037',
         #'AM':'016009',
         #'BA':'038019',
         #'DF': '028022',
         #'RJ': '033029'
         }

images = []
working_dir = os.path.abspath('..')
files = []

for tile in tiles.values():
    try:
        folder = os.path.join(working_dir,f'data/processed/S2-16D_V2_{tile}/6x6_subtiles/')
        files_in_folder = os.listdir(folder)
        print(files_in_folder)
        files += [os.path.join(folder, f) for f in files_in_folder]
        
    except:
        print('skipping', tile)
    break
print(files)
files = [f for f in files if f.endswith('.tif')]

for composition_data_file in files:
    with rasterio.open(composition_data_file) as src:
        # Read the raster data
        data = src.read()
    
    images.append(data)
    break

images = subtile_composition.quantize_to_uint8(images)


In [None]:
#subtile_composition.test_incremental_pca(images[:5], max_components=None)


In [None]:
subtile_composition.test_ipca_with_error(images, max_components=12, batch_size=1000)


In [None]:
# Example list of 12-channel images (channel-first format)
np.random.seed(42)
image1 = np.random.randint(3000, 10000, size=(12, 256, 256)).astype(float)
image2 = np.random.randint(3000, 10000, size=(12, 256, 256)).astype(float)
image_list = [image1, image2]

# Initialize the IPCAHandler
ipca_handler = subtile_composition.IPCAHandler(n_components=3, batch_size=1000)

# Fit the Incremental PCA model

ipca_handler.fit_incremental_pca(image_list)

# Save the model to a file
ipca_handler.save_model(file_prefix="my_ipca_model")



In [None]:
# Load the saved model
ipca_handler = subtile_composition.IPCAHandler(n_components=3, batch_size=1000)
ipca_handler.load_model(file_prefix="my_ipca_model")

# Transform new data
transformed_images = ipca_handler.transform_data(image_list)
print(f"Transformed images shape: {transformed_images[0].shape}")

In [None]:
import os
import sys
sys.path.append(os.path.abspath('..'))
import rasterio
import numpy as np

import src.data.subtile_composition as subtile_composition

tiles = {'MG':'032027',
         'RS': '025037',
         'AM':'016009',
         #'BA':'038019',
         #'DF': '028022',
         #'RJ': '033029'
         }

working_dir = os.path.abspath('..')

ipca_handler = subtile_composition.IPCAHandler(n_components=8, batch_size=1000)

for tile in tiles.values():
    #try:
    print('Tile: ', tile)
    images = []
    folder = os.path.join(working_dir,f'data/processed/S2-16D_V2_{tile}/6x6_subtiles/')
    files = os.listdir(folder)
    files = [os.path.join(folder, f) for f in files if f.endswith('.tif')]
    print(f'Has {len(files)} images')
    for composition_data_file in files:
        with rasterio.open(composition_data_file) as src:
            # Read the raster data
            data = src.read()
        qdata = subtile_composition.quantize_to_uint8(data)
        images.append(qdata) #12 channel, quantized.
    if len(images)>0:

        # Fit the Incremental PCA model
        ipca_handler.fit_incremental_pca(images)
        var_ratio = ipca_handler.ipca.explained_variance_ratio_
        print('Variance ratio:', var_ratio)
        print('Cummulative variance ratio:', np.cumulative_sum(var_ratio))

        # Save the model to a file
        ipca_handler.save_model(file_prefix="my_ipca_model")
        


    #except:
    #    print('skipping', tile)


In [None]:
scl_data, meta = subtile_composition.read_channel_by_dates(in_folder, tile, dates, 'SCL', prefix, window)

channels = ['B01', 'B02', 'B03', 'B04', 'B05', 'B06', 'B07', 'B08', 'B09', 'B11', 'B12', 'B8A']#, 'EVI', 'NBR', 'SCL', 'NDVI', 'CLEAROB', 'TOTALOB', 'thumbnail', 'PROVENANCE'])
  
for ch in channels:
    
    channel_i, meta = subtile_composition.read_channel_by_dates(in_folder, tile, dates, ch, prefix, window)
    SCL_composed_image = subtile_composition.composition_SCL(scl_data, channel_i, option = option, scl_min=4, scl_max=6)
    SCL_composed_image =  preprocess_data.fill_nans_nearest(SCL_composed_image, negate_filled = True)
    channel_images.append(SCL_composed_image)


    


## Aplicando a composição temporal

- Aqui, foi escolhido um tile (016009), o que tem mais pixles inválidos.
- Escolhemos uma janela referente a um subtile
- E um canal apenas (B02)

Primeiro fazemos a leitura dos dados.


Mostramos todas as datas obtidas para este canal.

In [None]:
    

import rasterio
import matplotlib.pyplot as plt
import os
import numpy as np
#np.nanmedian(filtered_data, axis=0, dtype=np.float32)
tile = '016009'
window = rasterio.windows.Window(0, 0, 10560//6, 10560//6)
working_dir = os.path.abspath('..')


### Le todas as datas
all_filtered = []
dates = os.listdir(f'{working_dir}/data/raw/S2-16D_V2_{tile}')
for date in dates:

    ### Abre o canal SCL
    scl = f'{working_dir}/data/raw/S2-16D_V2_{tile}/{date}/S2-16D_V2_{tile}_{date}_SCL.tif'
    with rasterio.open(scl) as scr:
        scl_channel = np.squeeze(scr.read(window = window)) 
    mask = (scl_channel >= 4) & (scl_channel <= 6)

    ### Abre o canal B02
    data_path = f'{working_dir}/data/raw/S2-16D_V2_{tile}/{date}/S2-16D_V2_{tile}_{date}_B02.tif'
    with rasterio.open(data_path) as scr:
        channel_data = np.squeeze(scr.read(window = window))

    ### Filtra, só onde o mask SCL é entre 4 e 6. 
    filtered_data = np.where(mask, channel_data, np.nan)
    all_filtered.append(filtered_data)

### Todas as datas em um só numpy array
all_filtered = np.stack(all_filtered, axis=0)
print(all_filtered.shape)
plt.figure(figsize=(20, 10))

for i in range(all_filtered.shape[0]):
    plt.subplot(1, all_filtered.shape[0], i + 1)
    plt.imshow(all_filtered[i])
    plt.xlabel(f'Data: {dates[i]}')
    plt.xticks([])  # Remove x-axis ticks
    plt.yticks([])  # Remove y-axis ticks

#plt.suptitle(f'Todas as datas. Tile: {tile}', fontsize=16)  # Set title for whole figure
print(f'Todas as datas. Tile: {tile}')
plt.subplots_adjust(top=0.85)  # Adjust layout to fit title properly
plt.show()


Os pontos brancos são os pixels inváildos. Note que são muitos, inclusive uma região inteira da última imagem.

### Composição temporal

In [None]:

### Composição temporal, por mediana
composite = np.nanmedian(all_filtered, axis=0)
plt.imshow(composite)
plt.xlabel(f'Composição temporal')
plt.xticks([])  # Remove x-axis ticks
plt.yticks([])  # Remove y-axis ticks
plt.show()


Após composição temporal, muitos pixels inválidos foram removidos

### Interpolação

Vamos preencher os inválidos com o método vizinhos mais próximos

In [None]:

import sys
sys.path.append(os.path.abspath('..'))
import src.data.preprocess_data as preprocess_data
interpolated = preprocess_data.fill_nans_nearest(composite).astype(np.int16)

plt.imshow(interpolated)
plt.show()


### Normalização e quantização

In [None]:

lower = np.percentile(interpolated, 2)  # 2nd percentile
upper = np.percentile(interpolated, 98)  # 98th percentile

# Check if the range is zero (constant channel)
if upper == lower:
    # Assign a default value (e.g., 0) for this channel
    quantized = np.zeros_like(interpolated, dtype=np.uint8)
else:
    # Normalize and clip to [0, 255]
    quantized = np.clip((interpolated - lower) / (upper - lower) * 255, 0, 255).astype(np.uint8)            

plt.imshow(quantized)

Muito melhor!



## Aumentnando a escala

Tudo isso foi para um só canal. Isso deve ser repetido em todos, para todos os subtiles e tiles.

Para isso, fizemos o modulo prepare_data.py que trata dsito em nível maior.

Aqui um exemplo para todos os subtiles de um tile


In [None]:
import src.data.prepare_data as prepare
channels = ['B01', 'B02', 'B03', 'B04', 'B05', 'B06', 'B07', 'B08', 'B09', 'B11', 'B12', 'B8A']

tile_size = 10560
num_subtiles = 6
subtile_size = tile_size//num_subtiles

for channel in channels:
    i = 1
    plt.figure(figsize=(20, 20))
    for y in range(0, tile_size, tile_size//num_subtiles):
        for x in range(0, tile_size, tile_size//num_subtiles):
            window = rasterio.windows.Window(x, y, subtile_size, subtile_size)
            image = prepare.prepare_image(tile=tile, channel=channel, window=window)
            plt.subplot(num_subtiles, num_subtiles, i)
            plt.imshow(image)
            #plt.xlabel(f'Subtile {i}')
            plt.xticks([])  # Remove x-axis ticks
            plt.yticks([])  # Remove y-axis ticks
            i+=1
    plt.tight_layout()
    plt.show()
    break

## PCA incremental

Vamos ler todos os canais agora. Colocar em um único numpy array.



In [None]:
import os
import sys
sys.path.append(os.path.abspath('..'))
import rasterio
import numpy as np
import matplotlib.pyplot as plt
import src.data.prepare_data as prepare
working_dir = os.path.abspath('..')



In [None]:
channels = ['B01', 'B02', 'B03', 'B04', 'B05', 'B06', 'B07', 'B08', 'B09', 'B11', 'B12', 'B8A']
#channels = ['B02', 'B03', 'B04', 'B05', 'B06', 'B07', 'B08', 'B11', 'B12', 'B8A']
#channels = ['B02', 'B03', 'B04', 'B05', 'B06', 'B07', 'B08', 'B11', 'B12', 'B8A']
tile_size = 10560
num_subtiles = 6
subtile_size = tile_size//num_subtiles

allbands = []
allbandsQ = []

tiles = os.listdir(os.path.join(working_dir,'data', 'raw'))
tiles = [tile[10:] for tile in tiles]

print(tiles)
from collections import Counter

counts = {}
countsQ = {}


for i, channel in enumerate(channels):
    counts[channel] = Counter()
    countsQ[channel] = Counter()
    for tile in tiles:
        for x in range(0, 10560, subtile_size):
            for y in range(0, 10560, subtile_size):
                print(channel, tile, x, y)
                window = rasterio.windows.Window(0, 0, subtile_size, subtile_size)
                image = prepare.prepare_image(tile=tile, channel=channel, window=window)
                qimage = prepare.quantize(image, lower_fixed=0, higher_fixed=5000) #low_percentile=2, high_percentile=99.9)
                counts[channel].update(image.flatten())
                countsQ[channel].update(qimage.flatten())

print(counts)
print(countsQ)

In [None]:
plt.figure(figsize=(16, 6))

for i, (ch, count) in enumerate(zip(counts.keys(), counts.values())):
    values = list(count.keys())
    counts_list = list(count.values())
    numeric_x = []
    for val in values:
        try:
            numeric_x.append(float(val))
        except (ValueError, TypeError):
            continue  # Ignore non-numeric values
    numeric_y = []
    for val in counts_list:
        try:
            numeric_y.append(float(val))
        except (ValueError, TypeError):
            continue  # Ignore non-numeric values
        
    sorted_data = sorted(zip(numeric_x, numeric_y))
    sorted_values, sorted_counts = zip(*sorted_data)

    try:
        plt.subplot(4, 3, i+1)

        # Preencher a área abaixo da linha
        plt.fill_between(sorted_values, sorted_counts, color='blue', alpha=0.99)

        # Adicionar rótulos e título
        plt.xlabel("")
        plt.ylabel("")
        plt.title(ch)
        

        # Mostrar o gráfico
        
    except:
        continue
plt.tight_layout()
save_to = os.path.join(working_dir, 'figs', 'histogram_full.png')
plt.savefig(save_to, dpi=300)
plt.show()


In [None]:
plt.figure(figsize=(16, 6))

for i, (ch, count) in enumerate(zip(countsQ.keys(), countsQ.values())):
    values = list(count.keys())
    counts_list = list(count.values())
    numeric_x = []
    for val in values:
        try:
            numeric_x.append(float(val))
        except (ValueError, TypeError):
            continue  # Ignore non-numeric values
    numeric_y = []
    for val in counts_list:
        try:
            numeric_y.append(float(val))
        except (ValueError, TypeError):
            continue  # Ignore non-numeric values
        
    sorted_data = sorted(zip(numeric_x, numeric_y))
    sorted_values, sorted_counts = zip(*sorted_data)

    try:
        plt.subplot(4, 3, i+1)

        # Preencher a área abaixo da linha
        plt.fill_between(sorted_values, sorted_counts, color='blue', alpha=0.99)

        # Adicionar rótulos e título
        plt.xlabel("")
        plt.ylabel("")
        plt.title(ch)
        

        # Mostrar o gráfico
        
    except:
        continue
plt.tight_layout()
save_to = os.path.join(working_dir, 'figs', 'histogram_q.png')
plt.savefig(save_to, dpi=300)
plt.show()



In [None]:


channels = ['B01', 'B02', 'B03', 'B04', 'B05', 'B06', 'B07', 'B08', 'B09', 'B11', 'B12', 'B8A']
#channels = ['B02', 'B03', 'B04', 'B05', 'B06', 'B07', 'B08', 'B11', 'B12', 'B8A']
#channels = ['B02', 'B03', 'B04', 'B05', 'B06', 'B07', 'B08', 'B11', 'B12', 'B8A']
tile_size = 10560
num_subtiles = 6
subtile_size = tile_size//num_subtiles

allbands = []
allbandsQ = []


plt.figure(figsize=(12, 16))

for i, channel in enumerate(channels):
    
    window = rasterio.windows.Window(0, 0, subtile_size, subtile_size)
    image = prepare.prepare_image(tile='016009', channel=channel, window=window)
    qimage = prepare.quantize(image, lower_fixed=0, higher_fixed=5000) #low_percentile=2, high_percentile=99.9)
    plt.subplot(4, 3, i+1)
    plt.imshow(qimage)
    plt.xlabel(f'{channel}')
    plt.xticks([])  # Remove x-axis ticks
    plt.yticks([])  # Remove y-axis ticks
    allbands.append(image)
    allbandsQ.append(qimage)
plt.tight_layout()
save_to = os.path.join(working_dir, 'figs', 'bandas.png')
plt.savefig(save_to, dpi=100)
plt.show()

allbands = np.stack(allbands, axis=0)
allbandsQ = np.stack(allbandsQ, axis=0)


## Salvando

In [None]:
import os
import sys
sys.path.append(os.path.abspath('..'))
import rasterio
import numpy as np
import matplotlib.pyplot as plt
import src.data.prepare_data as prepare
import src.data.mask_processing as mask_processing
working_dir = os.path.abspath('..')


In [None]:
urban_shp_path = os.path.join(working_dir, "data/masks/AreasUrbanizadas2019_Brasil/AU_2022_AreasUrbanizadas2019_Brasil.shp")
bdc_grid_path = os.path.join(working_dir, "data/grids/BDC_SM_V2/BDC_SM_V2Polygon.shp")
custom_crs_wkt = 'PROJCS["unknown",GEOGCS["unknown",DATUM["Unknown based on GRS80 ellipsoid",SPHEROID["GRS 1980",6378137,298.257222101,AUTHORITY["EPSG","7019"]]],PRIMEM["Greenwich",0,AUTHORITY["EPSG","8901"]],UNIT["degree",0.0174532925199433,AUTHORITY["EPSG","9122"]]],PROJECTION["Albers_Conic_Equal_Area"],PARAMETER["latitude_of_center",-12],PARAMETER["longitude_of_center",-54],PARAMETER["standard_parallel_1",-2],PARAMETER["standard_parallel_2",-22],PARAMETER["false_easting",5000000],PARAMETER["false_northing",10000000],UNIT["metre",1,AUTHORITY["EPSG","9001"]],AXIS["Easting",EAST],AXIS["Northing",NORTH]]'

rasterizer = mask_processing.RasterizeMasks(urban_shp_path, bdc_grid_path, custom_crs_wkt)

tiles = os.listdir(os.path.join(working_dir,'data', 'raw'))
tiles = [tile[10:] for tile in tiles]
for tile in tiles:
    mask = rasterizer.raster_tile(tile, save_path=os.path.join(working_dir,"data/masks"))


In [None]:
channels_dict = {}
channels_dict[12] = ['B01', 'B02', 'B03', 'B04', 'B05', 'B06', 'B07', 'B08', 'B09', 'B11', 'B12', 'B8A']
#channels_dict[10] = ['B02', 'B03', 'B04', 'B05', 'B06', 'B07', 'B08', 'B11', 'B12', 'B8A']
#channels_dict[8] = ['B02', 'B03', 'B04', 'B05', 'B06', 'B08', 'B11', 'B12']
#channels_dict[6] = ['B02', 'B03', 'B04', 'B06', 'B08', 'B11']
#channels_dict[4] = ['B02', 'B03', 'B04','B08']

tile_size = 10560
num_subtiles = 6
subtile_size = tile_size//num_subtiles


tiles_escolhidos = {
              'Boa Vista': '015002',  
              'Manaus': '016009',
              'Campo Grande': '021027',
              'Macapá': '025005',
              'Porto Alegre': '025037',
              'Curitiba': '027032',
              'Brasília': '028022',                      
              'Belo Horizonte': '032027',
              'Rio de Janeiro': '033029',
              'Teresina': '034011',
              'Petrolina': '036016',
              'Salvador': '038019',      
              }
tiles = list(tiles_escolhidos.values())
#for tile in tiles:
for tile in ['016009']:
    print(tile)
    for x in range(0, 10560, subtile_size):
        for y in range(0, 10560, subtile_size):
            print(x, y)
            subtile = []
            subtileQ = []
            for i, ch in enumerate(channels_dict[12]):
                print(ch, end = ', ')
                window = rasterio.windows.Window(x, y, subtile_size, subtile_size)
                image = prepare.prepare_image(tile=tile, channel=ch, window=window)
                subtile.append(image)
                subtileQ.append(prepare.quantize(image, lower_fixed=0, higher_fixed=5000)) #low_percentile=2, high_percentile=99.9)
            subtile = np.stack(subtile, axis=0)
            subtileQ = np.stack(subtileQ, axis=0)
            for k, chd in channels_dict.items(): 
                #print(chd)
                #print(k)
                #print(channels_dict[12])
                indices = [i for i, value in enumerate(chd) if value in channels_dict[12]]
                #print(indices)
                prepare.save(subtileQ[indices], x, y, tile, num_subtiles)
            #prepare.save(subtile, x, y, tile, num_subtiles)
                
                

Muito deste dado é redundante. Geralmente ao guardar.

Vamos rodar pra alguns subtiles



In [None]:
image_list = [] 

tiles = {'MG':'032027',
         'RS': '025037',
         'AM':'016009',
         #'BA':'038019',
         #'DF': '028022',
         #'RJ': '033029'
         }

tile_size = 10560
num_subtiles = 6
subtile_size = tile_size//num_subtiles

for tile in tiles.values():
    output = [] 
    for i, channel in enumerate(channels):
        window = rasterio.windows.Window(0, 0, subtile_size, subtile_size)
        image = prepare.prepare_image(tile=tile, channel=channel, window=window)

        output.append(image)
    output = np.stack(output, axis=0)

    image_list.append(output)

## Fit de um PCA

Observando as imagens anteriores, existe muita redundância entre canais.

Vamos usar uma análise de componentes principais para reduzir redundância.

### Variancia explicada

A seguir, tem um script que  calcula quanto cada componente contribui pra variância do dado.


In [None]:

from sklearn.decomposition import IncrementalPCA
from sklearn.decomposition import PCA
from sklearn.metrics import mean_squared_error

n_components = 12
n_channels, height, width = image_list[0].shape

pca = PCA(n_components=n_components)

all_pixels = []
for image in image_list:
    print(image.shape)
    pixels = image.reshape(n_channels, -1).T  # Reshape to (n_pixels, n_channels)
    all_pixels.append(pixels)
all_pixels = np.vstack(all_pixels)  # Shape: (total_n_pixels, n_channels)

print(all_pixels.shape)
pixels_transformed = pca.fit_transform(all_pixels)
explained_variance_ratio = pca.explained_variance_ratio_
cumulative_variance_ratio = np.cumsum(explained_variance_ratio)

plt.figure(figsize=(8, 5))
plt.plot(range(1, len(cumulative_variance_ratio) + 1), cumulative_variance_ratio, marker='o', linestyle='-')
plt.axhline(y=0.95, color='gray', linestyle='--', label='95% Variance')  # Optional: Highlight 95% threshold

plt.xlabel("Number of Principal Components")
plt.ylabel("Cumulative Explained Variance Ratio")
plt.title("PCA: Cumulative Explained Variance")
plt.grid(True)
plt.show()


In [None]:
channels = ['B01', 'B02', 'B03', 'B04', 'B05', 'B06', 'B07', 'B08', 'B09', 'B11', 'B12', 'B8A']

channel_choices = {12:[0,1,2,3,4,5,6,7,8,9,10,11],
                   10:[1,2,3,4,5,6,7,9,10,11],
                   8: [1,2,3,4,5,7,9,10],
                   6: [1,2,3,5,7,9],
                   4: [1,2,3,7],
                   }
for n_components in channel_choices.keys():
    print(n_components)
    print(channel_choices[n_components])
    print([channels[i] for i in channel_choices[n_components]])
    n_channels, height, width = image_list[0].shape

    pca = PCA(n_components=n_components)

    all_pixels = []
    for image in image_list:
        image_channels = image[channel_choices[n_components]]
        print(image_channels.shape)
        pixels = image.reshape(n_components, -1).T  # Reshape to (n_pixels, n_channels)
        all_pixels.append(pixels)
    all_pixels = np.vstack(all_pixels)  # Shape: (total_n_pixels, n_channels)

    print(all_pixels.shape)
    pixels_transformed = pca.fit_transform(all_pixels)
    explained_variance_ratio = pca.explained_variance_ratio_
    cumulative_variance_ratio = np.cumsum(explained_variance_ratio)

    plt.figure(figsize=(8, 5))
    plt.plot(range(1, len(cumulative_variance_ratio) + 1), cumulative_variance_ratio, marker='o', linestyle='-')
    plt.axhline(y=0.95, color='gray', linestyle='--', label='95% Variance')  # Optional: Highlight 95% threshold

    plt.xlabel("Number of Principal Components")
    plt.ylabel("Cumulative Explained Variance Ratio")
    plt.title("PCA: Cumulative Explained Variance")
    plt.grid(True)
    plt.show()


Note que 3 components explicam 95% da variância.
Geralmente esta é a 'rule of thumb' recomendada.

Aqui é mostrado apenas para 1 subtile de de 3 tiles, mas observando mais dados, o comportamento é semelhante


Aqui, pegamos as imagens processadas pelo PCA, normalizamos as 3 principais componentes e visualizamos como imagens RGB

In [None]:
start = 0
reconstructed_images = []
for image in image_list:
    _, w, h = image.shape
    n_pixels = w * h  # Number of pixels in the image
    
    # Extract corresponding pixels
    img_pixels = pixels_transformed[start:start + n_pixels].T  # Transpose back to (channels, pixels)
    
    # Reshape back to image format (n_channels, w, h)
    img_reconstructed = img_pixels.reshape(n_channels, w, h)
    
    # Append to list
    reconstructed_images.append(img_reconstructed)
    
    # Move to next image in the flattened array
    start += n_pixels

In [None]:
plt.figure(figsize=(20, 6))
for i, img in enumerate(reconstructed_images):  
    rgb_image = np.transpose(img[:3], (1, 2, 0))  
    lower = np.percentile(rgb_image, 2)  # 2nd percentile
    upper = np.percentile(rgb_image, 98)  # 98th percentile
    quantized = np.clip((rgb_image - lower) / (upper - lower) * 255, 0, 255).astype(np.uint8)            

    plt.subplot(1, 3, i+1)
    plt.imshow(quantized)
    
    plt.xlabel(f'Exemplo {i}, transformado em 3 canais')
    plt.xticks([])  # Remove x-axis ticks
    plt.yticks([])  # Remove y-axis ticks
plt.imshow

Um limite interessante é utiilzar os 3 canais que carregam as componentes principais no processo de treinamento. Estes 3 canais explicam pelo menos 95% da variância.


Porém, se não for suficiente, o usuário pode usar mais canais, a custo de maior custo de armazenamento e processamento posteriores.

### Extrapolando

Um problema desta abordagem é que o PCA é fixo, e poucos dados podem não representar as estatísticas do dado completo. 

Teríamos que carregar os dados por completo para obter um PCA e aplicar a todo o dado. Isto é inviável para todos os tiles.

Então, vamos utilizar o PCA incremental.

Primeiro, um generator que le as imagens e prepara.

In [None]:
import os
import sys
sys.path.append(os.path.abspath('..'))
import rasterio
import numpy as np

import src.data.prepare_data as prepare


tiles = {'MG':'032027',
         'RS': '025037',
         'AM':'016009',
         'BA':'038019',
         'DF': '028022',
         'RJ': '033029'
         }
tile_size = 10560
num_subtiles = 6
subtile_size = tile_size//num_subtiles    
working_dir = os.path.abspath('..')
channels = ['B01', 'B02', 'B03', 'B04', 'B05', 'B06', 'B07', 'B08', 'B09', 'B11', 'B12', 'B8A']



def subtile_generator(tiles, num_subtiles, tile_size, channels, return_info = False):
    for tile in tiles.values():
        for x in range(0, tile_size//num_subtiles, tile_size):
            for y in range(0, tile_size//num_subtiles, tile_size):
                image = []
                window = rasterio.windows.Window(x, y, subtile_size, subtile_size)
                for channel in channels:
                    #imagem composta temporalmente
                    channel_data = prepare.prepare_image(tile=tile, channel=channel, window=window) 

                    image.append(channel_data)
                image = np.stack(image, axis=0)
                #print(image.shape, type(image))
                if not return_info:
                    yield image #imagem de 12 canais
                else:
                    yield image, x, y, tile, num_subtiles
batch_size = 100
total_images = num_subtiles*num_subtiles*len(tiles)
gen = subtile_generator(tiles, num_subtiles, tile_size, channels)
generator = prepare.batch_generator(subtile_generator(tiles, num_subtiles, tile_size, channels), 
                                    batch_size, 
                                    total_images, 
                                    shuffle=True)

In [None]:
ipca = prepare.IPCAHandler(n_components=12, batch_size=batch_size)
ipca.fit(gen)
ipca.save_model(f'{working_dir}/config/ipca_model_{len(tiles)}_tiles')


In [None]:

frozen_ipca = prepare.IPCAHandler(n_components=12, batch_size=batch_size)
frozen_ipca.load_model(f'{working_dir}/config/ipca_model_6_tiles')

def subtile_generator_transform(tiles, num_subtiles, tile_size, channels):
    for tile in tiles.values():
        for x in range(0, tile_size//num_subtiles, tile_size):
            for y in range(0, tile_size//num_subtiles, tile_size):
                image = []
                window = rasterio.windows.Window(x, y, subtile_size, subtile_size)
                for channel in channels:
                    #imagem composta temporalmente
                    channel_data = prepare.prepare_image(tile=tile, channel=channel, window=window) 

                    image.append(channel_data)
                image = np.stack(image, axis=0)
                #print(image.shape, type(image))
                yield image, x, y, tile, num_subtiles#imagem de 12 canais


for img, x, y, tile, num_subtiles in subtile_generator_transform(tiles, num_subtiles, tile_size, channels):
    print(x, y, tile, num_subtiles)
    ipca_image = frozen_ipca.transform_and_quantize(img) #img list

    for c in [3, 7, 12]:
        prepare.save(img, c, x, y, tile, num_subtiles)

In [None]:

def blend_colors(color_transparent, alpha, color_base):
    """
    Calcula a cor resultante quando uma cor transparente é sobreposta em uma cor base opaca.

    Parâmetros:
        color_transparent (str): Código hexadecimal da cor transparente (ex.: "#f70004").
        alpha (float): Nível de transparência da cor transparente (0 <= alpha <= 1).
        color_base (str): Código hexadecimal da cor base opaca (ex.: "#CCCCCC").

    Retorna:
        str: Código hexadecimal da cor resultante.
    """
    # Função auxiliar para converter hexadecimal para RGB
    def hex_to_rgb(hex_color):
        hex_color = hex_color.lstrip("#")  # Remove o "#" do início
        return tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4))  # Converte cada par de dígitos para decimal

    # Função auxiliar para converter RGB para hexadecimal
    def rgb_to_hex(rgb_color):
        return "#{:02x}{:02x}{:02x}".format(*rgb_color)  # Formata como hexadecimal com dois dígitos por componente

    # Converte as cores para RGB
    rgb_transparent = hex_to_rgb(color_transparent)
    rgb_base = hex_to_rgb(color_base)

    # Calcula a cor resultante usando a fórmula de combinação linear
    r_result = int(alpha * rgb_transparent[0] + (1 - alpha) * rgb_base[0])
    g_result = int(alpha * rgb_transparent[1] + (1 - alpha) * rgb_base[1])
    b_result = int(alpha * rgb_transparent[2] + (1 - alpha) * rgb_base[2])

    # Converte o resultado de volta para hexadecimal
    return rgb_to_hex((r_result, g_result, b_result))


def generate_latex_table(column_colors, gray_levels):
    """
    Gera o código LaTeX para uma tabela com células coloridas.

    Parâmetros:
        column_colors (list of str): Lista de códigos hexadecimais das cores das colunas (ex.: ["#f70004", "#00ff00"]).
        gray_levels (list of int): Lista de níveis de cinza das linhas em porcentagem (ex.: [80, 60, 40]).

    Retorna:
        str: Código LaTeX da tabela.
    """
    # Função auxiliar para converter nível de cinza em hexadecimal
    def gray_level_to_hex(level):
        value = int(level / 100 * 255)  # Converte porcentagem para valor RGB
        return "#{:02x}{:02x}{:02x}".format(value, value, value)

    # Função auxiliar para calcular a cor resultante (usando blend_colors)
    def blend_colors(color_transparent, alpha, color_base):
        def hex_to_rgb(hex_color):
            hex_color = hex_color.lstrip("#")
            return tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4))

        def rgb_to_hex(rgb_color):
            return "#{:02x}{:02x}{:02x}".format(*rgb_color)

        rgb_transparent = hex_to_rgb(color_transparent)
        rgb_base = hex_to_rgb(color_base)

        r_result = int(alpha * rgb_transparent[0] + (1 - alpha) * rgb_base[0])
        g_result = int(alpha * rgb_transparent[1] + (1 - alpha) * rgb_base[1])
        b_result = int(alpha * rgb_transparent[2] + (1 - alpha) * rgb_base[2])

        return rgb_to_hex((r_result, g_result, b_result))

    # Início do código LaTeX
    latex_code = "\\documentclass{article}\n"
    latex_code += "\\usepackage[table]{xcolor}\n"
    latex_code += "\\begin{document}\n\n"
    latex_code += "\\begin{tabular}{|" + "|".join(["c"] * len(column_colors)) + "|}\n"
    latex_code += "\\hline\n"

    # Loop para gerar as linhas da tabela
    for gray_level in gray_levels:
        gray_hex = gray_level_to_hex(gray_level)  # Converte nível de cinza para hexadecimal
        row = []
        for column_color in column_colors:
            blended_color = blend_colors(column_color, 0.5, gray_hex)  # Calcula a cor resultante
            row.append(f"\\cellcolor[HTML]{{{blended_color.lstrip('#')}}}")  # Adiciona \cellcolor
        latex_code += " & ".join(row) + " \\\\\n"  # Une as células com "&" e adiciona "\\\\"
        latex_code += "\\hline\n"

    # Fim do código LaTeX
    latex_code += "\\end{tabular}\n"
    latex_code += "\\end{document}"

    return latex_code


# Exemplo de uso
if __name__ == "__main__":
    # Cores das colunas (transparentes com 50% de opacidade)
    column_colors = ["#f30004", "#23D7C5", "#F700FF", "#c1f122"]

    # Níveis de cinza das linhas (em porcentagem)
    gray_levels = [100, 67, 33, 0]

    # Gera o código LaTeX
    latex_table = generate_latex_table(column_colors, gray_levels)
    print(latex_table)