In [1]:
import os
import requests
import numpy as np
import pandas as pd
import zipfile
import io
from mask_tiff_file_from_coordinates_list import export_dataframe_with_raster_features
import pandas as pd

In [2]:
def get_partition_name(lower_bound: float, longitude: bool) -> str:
    if longitude:
        if lower_bound.is_integer():
            partition_name = f"{abs(int(lower_bound)):02d}_"
        else:
            partition_name = f"{abs(int(lower_bound)):02d}{str(lower_bound-int(lower_bound))[-1]}"
    else:
        if lower_bound < 0:
            hemisphere = "S"
        else:
            hemisphere = "N"
        partition_name = f"{abs(int(lower_bound))}{hemisphere}"
    return partition_name

def generate_partitions(start:float, end:float, step:float, longitude:bool) -> dict[str, tuple]:
    partitions = {}
    lower_bound = start
    upper_bound = start + step
    while upper_bound < end:
        partition_name = get_partition_name(lower_bound=lower_bound, longitude=longitude)
        partitions[partition_name] = (lower_bound, upper_bound)
        lower_bound = upper_bound
        upper_bound += step
    return partitions

def find_partition(number: float, partitions: dict[str, tuple]) -> str:
    for partition, interval in partitions.items():
        if interval[0] <= number < interval[1]:
            return partition
    return None

# Defina o intervalo
start:float = -63.0
end:float = -39.0
step:float = 1.5

# Gere as partições
partitions_long = generate_partitions(start, end, step, True)
partitions_lat = generate_partitions(start = -33.0, end= 2.0, step = 1, longitude=False)

In [3]:
dataframe = pd.read_parquet("../assets/INPUT/occurence_abscence.parquet")
dataframe['Latitude_str'] = dataframe['Latitude'].apply(lambda x: find_partition(x, partitions_lat))
dataframe['Longitude_str'] = dataframe['Longitude'].apply(lambda x: find_partition(x, partitions_long))
dataframe['download_key'] = dataframe['Latitude_str']+dataframe['Longitude_str']
dataframe[['Latitude','Longitude','download_key','Presence']].to_parquet("../assets/INPUT/occurence_absence_with_download_key.parquet")

dataframe = pd.read_parquet("../assets/INPUT/occurence_absence_with_download_key.parquet")
grid_codes = dataframe['download_key'].unique().tolist()

In [4]:
len(grid_codes)

98

In [7]:
[f'http://www.dsr.inpe.br/topodata/data/geotiff/{code}ZN.zip'for code in grid_codes[2:12]]

['http://www.dsr.inpe.br/topodata/data/geotiff/29S57_ZN.zip',
 'http://www.dsr.inpe.br/topodata/data/geotiff/30S555ZN.zip',
 'http://www.dsr.inpe.br/topodata/data/geotiff/29S555ZN.zip',
 'http://www.dsr.inpe.br/topodata/data/geotiff/31S555ZN.zip',
 'http://www.dsr.inpe.br/topodata/data/geotiff/32S555ZN.zip',
 'http://www.dsr.inpe.br/topodata/data/geotiff/26S555ZN.zip',
 'http://www.dsr.inpe.br/topodata/data/geotiff/31S54_ZN.zip',
 'http://www.dsr.inpe.br/topodata/data/geotiff/24S54_ZN.zip',
 'http://www.dsr.inpe.br/topodata/data/geotiff/32S54_ZN.zip',
 'http://www.dsr.inpe.br/topodata/data/geotiff/25S54_ZN.zip']

In [7]:
from pathlib import Path

In [30]:
def download_zip_file(input_file_path: str, output_file_path: str) -> None:
    """
    Download zip file
    Args:
        input_file_path (str): File path 
        output_file_path (str): Output path
        
    Example:
        input_file_path: str = "http://www.dsr.inpe.br/topodata/data/geotiff/23S435SN.zip"
        output_file_path: str = "/mnt/biofeats/assets/TOPOGRAPHY/"
        download_zip_file(input_file_path = input_file_path, output_file_path = output_file_path)
    """
    with requests.get(input_file_path) as r:
        z = zipfile.ZipFile(io.BytesIO(r.content))
        z.extractall(output_file_path)
    return

def download_file(base_url:str, grid_code:str, download_dir:str):
    tile_url = f"{base_url}/{grid_code}ZN.zip"
    print('Baixando:', tile_url)
    
    if not Path(os.path.join(download_dir, grid_code,"ZN.tif")).exists():
        # Enviar uma solicitação GET para a URL do tile

        
        # Verificar se a solicitação foi bem-sucedida (código de status 200)
        try:
            # Salvar o conteúdo da resposta em um arquivo no diretório de download
            download_zip_file(tile_url,download_dir)
            return True
        except:
            print('Falha ao baixar:', grid_code)
            return False
    else:
        return True  

In [21]:
import rasterio
import rasterio.warp

# Carregar o arquivo .tif
# Substitua 'path_to_tiff' pelo caminho do seu arquivo .tif
path_to_tiff = f'../assets/TILE/{grid_codes[0]}.tif'

def get_values_list(contained_df: pd.DataFrame, path_to_tiff: str):
    vals = []
    src = rasterio.open(path_to_tiff)
    # Iterar sobre os pontos no DataFrame
    for index, row in contained_df.iterrows():
        # Extrair a latitude e longitude do ponto
        latitude = row['Latitude']
        longitude = row['Longitude']
        
        # Reprojetar as coordenadas do ponto para o sistema de referência do arquivo .tif, se necessário
        if src.crs != 'EPSG:4326':  # Se o CRS do arquivo não for WGS84
            longitude, latitude = rasterio.warp.transform('EPSG:4326', 'EPSG:4326', [longitude], [latitude])
            longitude, latitude = longitude[0], latitude[0]

        # Converter as coordenadas do ponto para índices de pixel
        col, row = src.index(longitude, latitude)
        
        # Extrair o valor do pixel correspondente
        value = src.read(1, window=((row, row+1), (col, col+1)))
        try:
            value = value[0][0]  # Valor do pixel
        except:
            value = -100
            print("Error")
        vals.append(value)
    return vals
    #print(f'Latitude: {latitude}, Longitude: {longitude}, Valor do pixel: {value}')

In [None]:
grid_code = grid_codes[0]
path_to_tiff = path_to_tiff = f'../assets/TILE/{grid_codes[0]}.tif'
contained_df = dataframe[dataframe['download_key'] == grid_code].copy()
vals = get_values_list(contained_df=contained_df, path_to_tiff=path_to_tiff)

In [31]:
mult_df = []
for grid_code in grid_codes:
    base_url = 'http://www.dsr.inpe.br/topodata/data/geotiff'
    download_dir = '../assets/TILE'

    if download_file(base_url=base_url, grid_code=grid_code, download_dir=download_dir):
        path_to_tiff = f'../assets/TILE/{grid_code}ZN.tif'
        contained_df = dataframe[dataframe['download_key'] == grid_code].copy()
        vals = get_values_list(contained_df=contained_df, path_to_tiff=path_to_tiff)
        contained_df['top']= vals
        try:
            contained_df.to_parquet(f'../assets/TILE/dataframe_parquet_tile/{grid_code}.parquet')
            mult_df.append(contained_df)
        except:
            print("Unnable to export to parquet")
            mult_df.append(contained_df)
        try:
            if filepath:=Path(f'../assets/TILE/{grid_codes[0]}ZN.tif').exists():
                os.remove(filepath)
        except:
            print("Erro ao remover")

Baixando: http://www.dsr.inpe.br/topodata/data/geotiff/30S57_ZN.zip
Error
Erro ao remover
Baixando: http://www.dsr.inpe.br/topodata/data/geotiff/31S57_ZN.zip


In [20]:
vals

[134.276]

In [7]:
def create_parquet_block_partition(
    base_url: str, grid_code: str, download_dir: str
):
    download_file(base_url=base_url, grid_code=grid_code, download_dir=download_dir)
    try:
        export_dataframe_with_raster_features(
            geojson="../assets/INPUT/occurence_absence.json",
            explanatory_rasters=[f'{download_dir}/{grid_code}ZN.tif'],
            columns=['top','Presence'],
            occurence_absence_dataframe_path="../assets/INPUT/occurence_abscence.parquet",
            output_file_path='../assets/INPUT/temp_occurence_absence.parquet'
        )
        df = pd.read_parquet("../assets/INPUT/temp_occurence_absence.parquet")
        df[df['top'] != 0].to_parquet(f"../assets/TILE/dataframe_parquet_tile/{grid_code}.parquet")
    except Exception as e:
        print(e)
    try:
        os.remove(os.path.join(download_dir,f"{grid_code}ZN.tif"))
    except Exception as e:
        print(e)
    return

In [1]:
import rasterio
from rasterio.mask import mask
import pandas as pd
import geopandas as gpd

df = pd.read_parquet("../assets/INPUT/occurence_abscence.parquet")

In [2]:
geoshape_filepath: str = "../assets/FEATURES/MALHAS/BR_UF_2022.shp"
states_filter = ["RS","SC","PR","SP","MG","ES","RJ"]
geoframe = gpd.read_file(geoshape_filepath)
geoframe = geoframe[geoframe["SIGLA_UF"].isin(states_filter)].cx[:-42,:].reset_index()

In [3]:
with rasterio.open("../assets/TILE/33S54_ZN.tif") as src:
    # Máscara do arquivo .tiff usando o shapefile
    out_image, out_transform = mask(src, geoframe.geometry, crop=True)

In [5]:
src = rasterio.open("../assets/TILE/33S54_ZN.tif")

In [9]:
from mask_tiff_file_from_coordinates_list import export_dataframe_with_raster_features

export_dataframe_with_raster_features(
    geojson="../assets/INPUT/occurence_absence.json",
    explanatory_rasters=['../assets/TILE/33S54_ZN.tif'],
    columns=['top','Presence'],
    occurence_absence_dataframe_path="../assets/INPUT/occurence_abscence.parquet",
    output_file_path='../assets/INPUT/temp_occurence_absence.json'
)



In [None]:
import os
import requests
import numpy as np

# Função para gerar os códigos de quadrícula para uma área delimitadora
def generate_grid_codes(lat_range, lon_range):
    grid_codes = []
    for lat in range(lat_range[0], lat_range[1]):
        for lon in np.arange(lon_range[0], lon_range[1], 1.5):
            lat_str = f'{abs(lat)}' + ('S' if lat < 0 else 'N')
            lon_str = f'{abs(int(np.floor(lon))):02d}' + ('_' if lon % 1 == 0 else '5')

            grid_code = f'{lat_str}{lon_str}'
            grid_codes.append(grid_code)
    return grid_codes

if __name__ == '__main__':
    # Define a área delimitadora
    lat_range = (-33,-17)
    lon_range = (-59, -40)


    # Gerar os códigos de quadrícula para a área delimitadora
    grid_codes = generate_grid_codes(lat_range, lon_range)

    # URL base para download dos tiles
    base_url = 'http://www.dsr.inpe.br/topodata/data/geotiff'

    # Diretório para salvar os tiles baixados
    download_dir = '../assets/TILE'
    os.makedirs(download_dir, exist_ok=True)

    # Baixar os tiles correspondentes aos códigos de quadrícula gerados
    for grid_code in grid_codes[:100]:
        tile_url = f"{base_url}/{grid_code}ZN.zip"
        print('Baixando:', tile_url)
        
        # Enviar uma solicitação GET para a URL do tile
        response = requests.get(tile_url)
        
        # Verificar se a solicitação foi bem-sucedida (código de status 200)
        if response.status_code == 200:
            # Salvar o conteúdo da resposta em um arquivo no diretório de download
            with open(os.path.join(download_dir, f"{grid_code}.tif"), 'wb') as f:
                f.write(response.content)
            print('Download bem-sucedido:', grid_code)
        else:
            print('Falha ao baixar:', grid_code)