In [6]:
%pip install google_auth_oauthlib

Note: you may need to restart the kernel to use updated packages.


In [7]:
import ee
import pandas as pd 
import geopandas as gpd
import os
import io
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaIoBaseDownload
import time 
from datetime import datetime


In [None]:
class manager_img_gee():
    def __init__(self, project):
        ee.Authenticate()
        ee.Initialize(project=project)
        self.drive_service = self._authenticate()
        self.folder_id = None
        self.folder_name = None
        self.target_folder_id = None
        self.target_folder_name = None
    
    def _log_message(self, message):
        dt_new = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"[{dt_new}] - {message}")

    def _log_error(self, error):
        dt_new = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        raise RuntimeError(f"[{dt_new}] - Erro: {error}")

    def _authenticate(self):
        SCOPES = ["https://www.googleapis.com/auth/drive"] 

        creds = None
        if os.path.exists("data/token.json"):
            creds = Credentials.from_authorized_user_file("data/token.json", SCOPES)
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file("data/credentials.json", SCOPES)
                creds = flow.run_local_server(port=0)
            with open("data/token.json", "w") as token:
                token.write(creds.to_json())
        
        return build("drive", "v3", credentials=creds)

    def _get_folder_id_by_name(self, folder_name):
        """
        Obtém o ID de uma pasta no Google Drive pelo nome.

        Args:
            folder_name (str): O nome da pasta a ser pesquisada.

        Returns:
            str | None: O ID da pasta se encontrada, caso contrário None.
        """
        try:
            query = f"name = '{folder_name}' and mimeType = 'application/vnd.google-apps.folder' and 'root' in parents"
            response = self.drive_service.files().list(q=query, pageSize=1, fields="files(id, name)").execute()
            items = response.get('files', [])
            if not items:
                self._log_message(f"Pasta '{folder_name}' não encontrada no Google Drive. Ela será criada se necessário.")
                return None
            return items[0].get('id')
        except HttpError as error:
            self._log_error(f"Ocorreu um erro na API do Drive ao buscar a pasta: {error}")

    def get_active_gee_task_count(self):
        """
        Conta o número de tarefas de exportação ativas ('RUNNING' ou 'READY') no GEE.

        Returns:
            int: O número de tarefas ativas.
        """
        try:
            task_list = ee.data.getTaskList()
            active_states = {'RUNNING', 'READY'}
            active_tasks = [task for task in task_list if task['state'] in active_states]
            return len(active_tasks)
        except Exception as e:
            self._log_error(f"Erro ao obter a lista de tarefas do Google Earth Engine: {e}")

    def _check_folder_capacity(self, folder_name, limit):
        """
        Verifica se a pasta de destino no Google Drive tem capacidade para novos arquivos.
        A capacidade é limitada a 20 itens (arquivos existentes + tarefas GEE ativas).

        Args:
            folder_name (str): O nome da pasta no Google Drive.

        Returns:
            bool: True se a pasta tiver capacidade, False caso contrário.
        """
        # Verifica se a pasta precisa ser atualizada
        if self.target_folder_name != folder_name or self.target_folder_id is None:
            self._log_message(f"Verificando a pasta '{folder_name}' no Google Drive...")
            self.target_folder_name = folder_name
            self.target_folder_id = self._get_folder_id_by_name(folder_name)
            # Se a pasta não existe, ela tem capacidade para a primeira tarefa
            if self.target_folder_id is None:
                return True
            self._log_message(f"Pasta '{folder_name}' verificada com sucesso.")

        num_active_tasks = self.get_active_gee_task_count()

        try:
            # Conta os arquivos na pasta do Drive
            response = self.drive_service.files().list(q=f"'{self.target_folder_id}' in parents", pageSize=21, fields="files(id)").execute()
            items = response.get('files', [])
            total_items = len(items) + num_active_tasks

            if total_items <= limit:
                return True
            else:
                self._log_message(f"A pasta '{folder_name}' contém {len(items)} arquivos e {num_active_tasks} tarefas ativas, totalizando {total_items}. Aguardando ter menos de 20 itens para continuar...")
                return False
        except HttpError as error:
            self._log_error(f"Ocorreu um erro na API do Drive ao verificar a pasta: {error}")

    def _wait_for_folder_capacity(self, folder_name, limit):
        """
        Aguarda até que a pasta de destino tenha capacidade para novos arquivos,
        verificando a cada 10 minutos.

        Args:
            folder_name (str): O nome da pasta no Google Drive.
        """
        wait_count = 0
        while not self._check_folder_capacity(folder_name, limit):
            self._log_message(f"Aguardando capacidade na pasta '{folder_name}'...")
            time.sleep(60 * 10)  # Espera 10 minutos
            wait_count += 1
            if wait_count > 6: # Timeout após ~1 hora
                self._log_error(f"O fluxo de arquivos na pasta '{folder_name}' demorou demais para ser liberado.")

    def _get_all_acquisition_dates(self, area_of_interest, start_date, end_date, satellite, SR=False):
        """
        Obtém todas as datas de aquisição únicas para um determinado satélite e período.

        Args:
            area_of_interest (ee.Geometry): A área de interesse.
            start_date (str): Data de início (YYYY-MM-DD).
            end_date (str): Data de fim (YYYY-MM-DD).
            satellite (str): 'sentinel1' ou 'sentinel2'.
            SR (bool): Usar reflectância de superfície para Sentinel-2.

        Returns:
            list: Uma lista de datas de aquisição (YYYY-MM-DD).
        """
        try:
            collection = None
            if satellite == 'sentinel1':
                collection = ee.ImageCollection('COPERNICUS/S1_GRD') \
                               .filterBounds(area_of_interest) \
                               .filterDate(start_date, end_date) \
                               .filter(ee.Filter.listContains('transmitterReceiverPolarisation', 'VV')) \
                               .filter(ee.Filter.listContains('transmitterReceiverPolarisation', 'VH')) \
                               .filter(ee.Filter.eq('instrumentMode', 'IW'))
            elif satellite == 'sentinel2':
                collection_id = 'COPERNICUS/S2_SR_HARMONIZED' if SR else 'COPERNICUS/S2_HARMONIZED'
                collection = ee.ImageCollection(collection_id) \
                               .filterBounds(area_of_interest) \
                               .filterDate(start_date, end_date) \
                               .filter(ee.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', 20))
            else:
                self._log_error(f"Satélite '{satellite}' não suportado.")
                return []

            timestamps = collection.aggregate_array('system:time_start').getInfo()
            if not timestamps:
                self._log_error(f"Nenhum dado encontrado para o satélite {satellite} no período especificado.")
                return []

            unique_dates = sorted(list(set([datetime.utcfromtimestamp(ts / 1000).strftime('%Y-%m-%d') for ts in timestamps])))
            self._log_message(f"Datas de aquisição encontradas para {satellite}: {unique_dates}")
            return unique_dates
        except Exception as e:
            self._log_error(f"Erro ao obter datas de aquisição para {satellite}: {e}")

    def _start_export_task(self, image, filename, folder_name, area_of_interest, satellite):
        """
        Inicia uma tarefa de exportação de imagem do GEE para o Google Drive.

        Args:
            image (ee.Image): A imagem a ser exportada.
            filename (str): O nome do arquivo de saída.
            folder_name (str): A pasta de destino no Google Drive.
            area_of_interest (ee.Geometry): A área de interesse para o recorte.
            satellite (str): 'sentinel1' ou 'sentinel2' para selecionar as bandas corretas.
        """
        bands = ['VH', 'VV'] if satellite == 'sentinel1' else ['B4', 'B3', 'B2']
        task_config = {
            'image': image.select(bands),
            'description': filename,
            'folder': folder_name,
            'fileNamePrefix': filename,
            'scale': 10,
            'region': area_of_interest,
            'fileFormat': 'GeoTIFF',
            'maxPixels': 1e12,
        }
        task = ee.batch.Export.image.toDrive(**task_config)
        task.start()
        self._log_message(f"Tarefa de exportação iniciada: {filename}")


    def _get_sentinel2_cloud_masked_composite(self, area_of_interest, start_date, end_date, SR=False):
        """Cria um mosaico Sentinel-2 com máscara de nuvens usando o Cloud Score+."""
        collection_id = 'COPERNICUS/S2_SR_HARMONIZED' if SR else 'COPERNICUS/S2_HARMONIZED'
        s2_collection = ee.ImageCollection(collection_id).filter(ee.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', 10))
        cloud_score_collection = ee.ImageCollection('GOOGLE/CLOUD_SCORE_PLUS/V1/S2_HARMONIZED')
        QA_BAND = 'cs'
        CLEAR_THRESHOLD = 0.60

        # Vincula a coleção do Sentinel-2 com a do Cloud Score
        s2_with_cs = s2_collection.filterBounds(area_of_interest) \
                                  .filterDate(start_date, end_date) \
                                  .linkCollection(cloud_score_collection, [QA_BAND])
        
        # Função para aplicar a máscara de nuvem
        def apply_cloud_mask(image):
            return image.updateMask(image.select(QA_BAND).gte(CLEAR_THRESHOLD))

        composite = s2_with_cs.map(apply_cloud_mask).median().clip(area_of_interest)
        return composite

    def _download_file_from_drive(self, file_id, file_name, local_path):
        """Baixa um arquivo do Google Drive para um diretório local."""
        try:
            self._log_message(f"Iniciando o download do arquivo '{file_name}'...")
            request = self.drive_service.files().get_media(fileId=file_id)
            fh = io.BytesIO()
            downloader = MediaIoBaseDownload(fh, request)
            done = False
            while not done:
                status, done = downloader.next_chunk()
            
            with open(os.path.join(local_path, file_name), "wb") as f:
                f.write(fh.getbuffer())
            self._log_message(f"Arquivo '{file_name}' baixado com sucesso.")
        except HttpError as error:
            self._log_error(f"Erro ao baixar o arquivo {file_name}: {error}")

    def _delete_file_from_drive(self, file_id, file_name):
        """Remove um arquivo do Google Drive."""
        try:
            self.drive_service.files().delete(fileId=file_id).execute()
            self._log_message(f"Arquivo '{file_name}' removido do Google Drive.")
        except HttpError as error:
            self._log_error(f"Erro ao remover o arquivo '{file_name}' do Google Drive: {error}")

    def export_images_to_drive(self, area_of_interest, start_date, end_date, satellite, folder_name, base_filename, SR=False, limit=500, sentinel1_dates=None):
        """
        Orquestra a exportação de imagens de satélite para o Google Drive.
        Para Sentinel-1, exporta uma imagem por data de aquisição.
        Para Sentinel-2, exporta um mosaico mediano sem nuvens para todo o período.

        Args:
            area_of_interest (ee.Geometry): A área de interesse do GEE.
            start_date (str): Data de início (YYYY-MM-DD).
            end_date (str): Data de fim (YYYY-MM-DD).
            satellite (str): 'sentinel1' ou 'sentinel2'.
            folder_name (str): A pasta de destino no Google Drive.
            base_filename (str): O prefixo para os nomes dos arquivos.
            SR (bool): Para Sentinel-2, indica se usa reflectância de superfície.
            limit (int): Limite de capacidade da pasta.
            sentinel1_dates (list): Lista opcional de datas específicas (formato YYYY-MM-DD) 
                                    para Sentinel-1. Se None, usa a lógica padrão de 10 datas.
        """
        if satellite == 'sentinel2':
            self._wait_for_folder_capacity(folder_name, limit)
            s2_image = self._get_sentinel2_cloud_masked_composite(area_of_interest, start_date, end_date, SR)
            filename = f"{base_filename}_{satellite}_{start_date}_to_{end_date}"
            self._start_export_task(s2_image, filename, folder_name, area_of_interest, satellite)
        
        elif satellite == 'sentinel1':
            # Se datas específicas foram fornecidas, usa elas; caso contrário, usa a lógica padrão
            if sentinel1_dates is not None:
                self._log_message(f"Usando datas específicas fornecidas: {sentinel1_dates}")
                acquisition_dates = sentinel1_dates
                
            # --- NOVA VERIFICAÇÃO DE DISPONIBILIDADE ---
            self._log_message("Verificando disponibilidade de imagens Sentinel-1 para todas as datas especificadas...")
            missing_dates = []
            for date in acquisition_dates:
                images = self._get_sentinel1_images(area_of_interest, date)
                if images is None:  
                    missing_dates.append(date)

            if missing_dates:
                self._log_message(f"As seguintes datas não possuem imagens Sentinel-1 disponíveis: {missing_dates}.")
                self._log_message("Pulando execução deste município por falta de dados completos.")
                return  
            else:
                self._log_message("Todas as datas possuem imagens disponíveis. Iniciando exportação...")

            for date in acquisition_dates:
                images = self._get_sentinel1_images(area_of_interest, date)
                num_images = images.size().getInfo()

                for i in range(num_images):
                    self._wait_for_folder_capacity(folder_name, limit)
                    image = ee.Image(images.get(i))
                    filename = f"{base_filename}_{satellite}_{date}_img{i+1:02d}"
                    self._start_export_task(image, filename, folder_name, area_of_interest, satellite)


    def download_latest_file_from_drive(self, folder_name, local_path="downloads"):
        """
        Baixa o arquivo mais antigo da pasta especificada no Drive e o remove em seguida.

        Args:
            folder_name (str): O nome da pasta no Google Drive.
            local_path (str): O caminho do diretório local para salvar o arquivo.

        Returns:
            str | None: O nome do arquivo baixado ou None se a pasta estiver vazia.
        """
        try:
            folder_id = self._get_folder_id_by_name(folder_name)
            if not folder_id:
                self._log_message(f"A pasta '{folder_name}' não existe. Nenhum arquivo para baixar.")
                return None

            if not os.path.exists(local_path):
                os.makedirs(local_path)
                self._log_message(f"Diretório local '{local_path}' criado.")
            
            # Busca o arquivo mais antigo (primeiro a ser criado) na pasta
            response = self.drive_service.files().list(
                q=f"'{folder_id}' in parents and trashed=false",
                pageSize=1,
                fields="files(id, name)",
                orderBy="createdTime"
            ).execute()
            
            items = response.get('files', [])
            if not items:
                self._log_message(f"Nenhum arquivo encontrado na pasta '{folder_name}'.")
                return None

            file_to_download = items[0]
            file_id = file_to_download['id']
            file_name = file_to_download['name']
            
            self._download_file_from_drive(file_id, file_name, local_path)
            self._delete_file_from_drive(file_id, file_name)
            
            return file_name
        except Exception as e:
            self._log_error(f"Erro durante o processo de download local: {e}")

    def delete_local_file(self, file_name, local_path="downloads"):
        """
        Remove um arquivo do diretório de download local.

        Args:
            file_name (str): O nome do arquivo a ser removido.
            local_path (str): O caminho para o diretório local.
        """
        file_path = os.path.join(local_path, file_name)
        if os.path.exists(file_path):
            os.remove(file_path)
            self._log_message(f"Arquivo '{file_name}' removido do diretório local '{local_path}'.")
        else:
            self._log_message(f"Arquivo '{file_name}' não encontrado em '{local_path}'. Nenhuma ação necessária.")

In [12]:
# manager = manager_img_gee('ee-raullomonte13-ic')
# path = 'data/CV_shp/campo_verde.shp'
# gdf_cv = gpd.read_file(path)

# geometry = gdf_cv.geometry.values[0]  # Extrai o polígono como objeto shapely

# geo_json = geometry.__geo_interface__
# polygon_ee = ee.Geometry.Polygon(geo_json['coordinates'])

In [13]:
manager = manager_img_gee('ee-raullomonte13-ic')
path = '../msn/PL_data/MT_Municipios_2024/MT_Municipios_2024.shp'
gdf = gpd.read_file(path)
gdf = gdf[gdf['NM_MUN'] == 'Primavera do Leste']
gdf.to_file('../msn/PL_data/PL_shp/PL_shp.shp')
geometry = gdf.geometry.values[0]  # Extrai o polígono como objeto shapely

geo_json = geometry.__geo_interface__
polygon_ee = ee.Geometry.Polygon(geo_json['coordinates'])

In [16]:
manager = manager_img_gee('ee-raullomonte13-ic')

datas = ['2015-11-10','2015-12-16']
manager.export_images_to_drive(area_of_interest=polygon_ee,
                           start_date='2015-10-25',
                           end_date='2016-07-15',
                           satellite='sentinel1',
                           folder_name='PL_Sentinel1_v3',
                           base_filename='PL_sentinel1',
                            sentinel1_dates=datas
                           )

[2025-10-28 09:16:08] - Usando datas específicas fornecidas: ['2015-11-10', '2015-12-16']
[2025-10-28 09:16:08] - Verificando disponibilidade de imagens Sentinel-1 para todas as datas especificadas...
[2025-10-28 09:16:13] - Todas as datas possuem imagens disponíveis. Iniciando exportação...
[2025-10-28 09:16:19] - Verificando a pasta 'PL_Sentinel1_v3' no Google Drive...
[2025-10-28 09:16:19] - Pasta 'PL_Sentinel1_v3' verificada com sucesso.
[2025-10-28 09:16:23] - Tarefa de exportação iniciada: PL_sentinel1_sentinel1_2015-11-10_img01
[2025-10-28 09:16:32] - Tarefa de exportação iniciada: PL_sentinel1_sentinel1_2015-12-16_img01
[2025-10-28 09:16:36] - Tarefa de exportação iniciada: PL_sentinel1_sentinel1_2015-12-16_img02


---

In [31]:
file_name

'test_base_filename_sem_SR_sentinel2_2022-08-02_2023-10-31.tif'

In [23]:
manager.get_delete_local(file_name=file_name, local_path='downloads')

[2025-07-10 17:31:04] - Arquivo 'test_base_filename_sem_SR_sentinel2_2022-08-02_2023-10-31.tif' removido do diretório local 'downloads'.
