In [15]:
import asyncio
import math
import os
import shutil
import tempfile
import time
from pathlib import Path

import ee
import geemap
import nest_asyncio
from tqdm.notebook import tqdm

from src.utils.logger import setup_logger

# Configure logger
logger = setup_logger(name="eeMaps", log_file="logs/eeLoader.log")

# Apply nest_asyncio to allow nested event loops in Jupyter
nest_asyncio.apply()

# Initial Earth Engine authentication
ee.Authenticate()  # Needs one-time authentication
ee.Initialize()  # Initialize the Earth Engine library

# Earth Engine Data Acquisition

This notebook provides optimized utilities for downloading large datasets from Google Earth Engine while handling download size limits. Features include:

- Adaptive tiling based on image size estimation
- Asynchronous downloads with progress visualization
- Automatic merging of tiles into a single output
- Proper temporary file management with cleanup

The code follows best practices for geospatial data handling in Python.

## Asyncio in Jupyter Notebooks

This notebook uses asynchronous I/O (asyncio) to parallelize downloads. Since Jupyter notebooks already run in an event loop, we need to use `nest_asyncio` to allow nested event loops. This lets us:

1. Run asynchronous functions in notebook cells
2. Execute multiple downloads concurrently
3. Provide progress feedback during long-running operations

The code below includes proper error handling for the Jupyter environment and ensures resources are properly cleaned up.

In [None]:
class EarthEngineDownloader:
    """Utility class for downloading and processing Earth Engine data efficiently.

    This class provides methods for downloading large Earth Engine datasets by
    adaptively splitting them into smaller tiles, downloading asynchronously,
    and merging the results.
    """

    def __init__(
        self,
        tmp_dir: Path,
        max_workers: int = 4,
        max_size_mb: float = 45.0,
        cleanup: bool = True,
    ):
        """Initialize the Earth Engine Downloader.

        Args:
            tmp_dir: Directory to store temporary tiles (None for system temp dir)
            max_workers: Maximum number of concurrent downloads
            max_size_mb: Maximum size of a single tile in MB (should be < 50MB EE limit)
            cleanup: Whether to clean up temporary files after download
        """
        self.max_workers = max_workers
        self.max_size_mb = max_size_mb
        self.cleanup = cleanup

        # Create a temporary directory if none provided
        self._tmp_dir = tmp_dir
        self._tmp_created = False
        if self._tmp_dir is None:
            self._tmp_dir = Path(tempfile.mkdtemp(prefix="ee_download_"))
            self._tmp_created = True
            logger.info(f"Created temporary directory: {self._tmp_dir}")

    def __del__(self):
        """Clean up temporary directory when the object is destroyed."""
        self._cleanup_tmp_dir()

    def _cleanup_tmp_dir(self):
        """Remove temporary directory if it was created by this class."""
        if self._tmp_created and self.cleanup and self._tmp_dir.exists():
            try:
                shutil.rmtree(self._tmp_dir)
                logger.info(f"Removed temporary directory: {self._tmp_dir}")
            except Exception as e:
                logger.error(f"Error cleaning up temporary directory: {e}")

    @staticmethod
    def create_polygon_from_extent(
        north: float, west: float, south: float, east: float
    ) -> ee.Geometry.Polygon:
        """Creates an Earth Engine Polygon geometry from bounding box coordinates.

        Args:
            north: Northern latitude of the extent
            west: Western longitude of the extent
            south: Southern latitude of the extent
            east: Eastern longitude of the extent

        Returns:
            Polygon geometry representing the bounding box

        Raises:
            ValueError: If coordinates are not valid
        """
        if not (-90 <= south < north <= 90):
            raise ValueError("Latitude values must be in [-90, 90] and north > south.")
        if not (-180 <= west < east <= 180):
            raise ValueError("Longitude values must be in [-180, 180] and east > west.")

        # Define the polygon corners in (lon, lat) order
        coordinates = [
            [west, north],
            [east, north],
            [east, south],
            [west, south],
            [west, north],  # Close the polygon
        ]
        return ee.Geometry.Polygon([coordinates])

    @staticmethod
    def estimate_file_size_mb(
        width_px: int, height_px: int, bands: int = 1, bytes_per_pixel: int = 4
    ) -> float:
        """Estimate the size of a GeoTIFF file in megabytes based on dimensions.

        Args:
            width_px: Width of the image in pixels
            height_px: Height of the image in pixels
            bands: Number of bands in the image
            bytes_per_pixel: Bytes per pixel (typically 1, 2, or 4)

        Returns:
            Estimated file size in megabytes
        """
        # Calculate raw size in bytes
        size_bytes = width_px * height_px * bands * bytes_per_pixel

        # Add overhead for headers, metadata, etc. (15% to be conservative)
        size_bytes *= 1.15

        # Convert to MB
        size_mb = size_bytes / (1024 * 1024)

        return size_mb

    def split_aoi_into_tiles(
        self,
        aoi: ee.Geometry.Polygon,
        max_tile_size_km: float,
    ) -> list[tuple[ee.Geometry.Polygon, str]]:
        """Split a large AOI into smaller tiles with unique filenames.

        Args:
            aoi: The area of interest to split
            max_tile_size_km: Maximum size of each tile in kilometers

        Returns:
            list of tuples containing (tile geometry, tile filename)
        """
        # Get the bounding box of the AOI
        bbox = aoi.bounds().getInfo()["coordinates"][0]

        # Extract min/max coordinates
        lons = [p[0] for p in bbox]
        lats = [p[1] for p in bbox]

        min_lon, max_lon = min(lons), max(lons)
        min_lat, max_lat = min(lats), max(lats)

        # Calculate approximate degrees per requested km
        deg_per_km_lat = 1.0 / 111.0  # 1 degree latitude is ~111km
        deg_per_km_lon = 1.0 / (111.0 * math.cos(math.radians((min_lat + max_lat) / 2.0)))

        # Calculate tile sizes in degrees
        tile_size_lat = max_tile_size_km * deg_per_km_lat
        tile_size_lon = max_tile_size_km * deg_per_km_lon

        # Calculate number of tiles in each direction
        num_tiles_lat = math.ceil((max_lat - min_lat) / tile_size_lat)
        num_tiles_lon = math.ceil((max_lon - min_lon) / tile_size_lon)

        logger.info(
            f"Splitting AOI into {num_tiles_lat}x{num_tiles_lon} = {num_tiles_lat * num_tiles_lon} tiles"
        )

        # Create tiles with filenames
        tiles = []
        for i in range(num_tiles_lat):
            for j in range(num_tiles_lon):
                # Calculate tile bounds
                tile_min_lat = min_lat + i * tile_size_lat
                tile_max_lat = min(min_lat + (i + 1) * tile_size_lat, max_lat)

                tile_min_lon = min_lon + j * tile_size_lon
                tile_max_lon = min(min_lon + (j + 1) * tile_size_lon, max_lon)

                # Create tile polygon
                tile = self.create_polygon_from_extent(
                    north=tile_max_lat, west=tile_min_lon, south=tile_min_lat, east=tile_max_lon
                )

                # Create unique filename for this tile
                tile_filename = f"tile_{i:03d}_{j:03d}.tif"

                tiles.append((tile, tile_filename))

        return tiles

    async def _download_tile(
        self,
        image: ee.Image,
        tile: ee.Geometry.Polygon,
        output_path: Path,
        scale: float,
        file_per_band: bool = False,
        **kwargs,
    ) -> Path | None:
        """Download a single tile asynchronously.

        Args:
            image: Earth Engine image to download
            tile: Tile geometry to download
            output_path: Path to save the file
            scale: Resolution in meters
            file_per_band: Whether to save each band as a separate file
            **kwargs: Additional arguments to pass to geemap.ee_export_image

        Returns:
            Path to the downloaded file or None if download failed
        """
        # Use ThreadPoolExecutor to run the blocking IO in a separate thread
        loop = asyncio.get_event_loop()

        try:

            def _download():
                # Clip image to tile
                clipped_img = image.clip(tile)

                # Export image
                geemap.ee_export_image(
                    clipped_img,
                    filename=str(output_path),
                    scale=scale,
                    region=tile,
                    file_per_band=file_per_band,
                    **kwargs,
                )
                return output_path

            # Run download in thread pool
            result = await loop.run_in_executor(None, _download)
            return result

        except Exception as e:
            logger.error(f"Error downloading tile {output_path.name}: {e}")
            if output_path.exists():
                try:
                    os.remove(output_path)
                except OSError as remove_err:
                    logger.warning(f"Failed to remove incomplete tile {output_path}: {remove_err}")
            return None

    def calculate_optimal_tile_size_km(
        self,
        image: ee.Image,
        aoi: ee.Geometry.Polygon,
        scale: float,
    ) -> float:
        """Calculate optimal tile size based on estimated image size.

        Args:
            image: Earth Engine image to download
            aoi: Area of interest
            scale: Resolution in meters

        Returns:
            Optimal tile size in kilometers
        """
        # Get the bounding box
        bbox = aoi.bounds().getInfo()["coordinates"][0]

        # Extract min/max coordinates
        lons = [p[0] for p in bbox]
        lats = [p[1] for p in bbox]

        min_lon, max_lon = min(lons), max(lons)
        min_lat, max_lat = min(lats), max(lats)

        # Calculate size in degrees
        width_deg = max_lon - min_lon
        height_deg = max_lat - min_lat

        # Calculate approximate meters per degree at this latitude
        lat_center = (min_lat + max_lat) / 2
        meters_per_deg_lon = 111320 * math.cos(math.radians(lat_center))
        meters_per_deg_lat = 110574  # Approximate meters per degree of latitude

        # Calculate dimensions in pixels
        width_px = int(width_deg * meters_per_deg_lon / scale)
        height_px = int(height_deg * meters_per_deg_lat / scale)

        # Get the number of bands
        bands_info = image.bandNames().getInfo()
        num_bands = len(bands_info) if bands_info else 1

        # Estimate total file size
        estimated_size_mb = self.estimate_file_size_mb(width_px, height_px, num_bands)
        logger.info(f"Estimated full image size: {estimated_size_mb:.2f} MB")

        if estimated_size_mb <= self.max_size_mb:
            # We can potentially download in one go, but to be safe, we'll use a smaller value
            # We'll just use an arbitrary tile size that gives us 2x2 tiles
            return min(width_deg * meters_per_deg_lon, height_deg * meters_per_deg_lat) / 2000

        # Calculate number of tiles needed based on estimated size
        num_tiles_needed = math.ceil(estimated_size_mb / self.max_size_mb)

        # Add 20% safety margin
        num_tiles_needed = int(num_tiles_needed * 1.2)

        # Try to make a square grid of tiles
        tiles_per_side = math.ceil(math.sqrt(num_tiles_needed))

        # Calculate tile size in km
        width_km = width_deg * meters_per_deg_lon / 1000
        height_km = height_deg * meters_per_deg_lat / 1000

        tile_size_km = max(width_km, height_km) / tiles_per_side

        # Add a safety margin by reducing tile size by 10%
        tile_size_km *= 0.9

        logger.info(f"Calculated optimal tile size: {tile_size_km:.2f} km")
        return tile_size_km

    def merge_geotiff_tiles(
        self,
        tile_paths: list[Path],
        output_path: Path,
        nodata: Optional[float] = None,
        compress: bool = True,
    ) -> Path:
        """Merge multiple GeoTIFF tiles into a single file.

        Args:
            tile_paths: list of paths to GeoTIFF tiles
            output_path: Path to save the merged GeoTIFF
            nodata: Value to use for nodata (if any)
            compress: Whether to apply LZW compression to the output

        Returns:
            Path to the merged GeoTIFF file

        Raises:
            ValueError: If no valid tiles are found
        """
        import rasterio
        from rasterio.merge import merge

        if not tile_paths:
            raise ValueError("No tile paths provided")

        logger.info(f"Merging {len(tile_paths)} tiles into {output_path}")

        # Ensure output directory exists
        output_path.parent.mkdir(parents=True, exist_ok=True)

        # Open all tile datasets
        src_files_to_mosaic = []
        for path in tile_paths:
            try:
                if not path.exists():
                    logger.warning(f"Tile does not exist: {path}")
                    continue

                src = rasterio.open(path)
                src_files_to_mosaic.append(src)
            except Exception as e:
                logger.error(f"Error opening tile {path}: {e}")

        if not src_files_to_mosaic:
            raise ValueError("No valid tiles to merge")

        try:
            # Merge tiles
            mosaic, out_transform = merge(src_files_to_mosaic, nodata=nodata)

            # Copy metadata from the first tile
            out_meta = src_files_to_mosaic[0].meta.copy()

            # Update metadata with new dimensions and transform
            out_meta.update(
                {
                    "driver": "GTiff",
                    "height": mosaic.shape[1],
                    "width": mosaic.shape[2],
                    "transform": out_transform,
                }
            )

            # Add compression to reduce file size if requested
            if compress:
                out_meta.update({"compress": "lzw", "predictor": 2})

            # Update nodata value if specified
            if nodata is not None:
                out_meta.update({"nodata": nodata})

            # Write merged file
            with rasterio.open(output_path, "w", **out_meta) as dest:
                dest.write(mosaic)

            logger.info(f"Successfully merged tiles into {output_path}")
            return output_path

        except Exception as e:
            logger.error(f"Error merging tiles: {e}")
            raise
        finally:
            # Close input files
            for src in src_files_to_mosaic:
                src.close()

    async def download_image(
        self,
        image: ee.Image,
        output_path: Path,
        aoi: ee.Geometry.Polygon,
        scale: float,
        tile_size_km: float | None = None,
        file_per_band: bool = False,
        nodata: float | None = None,
        **kwargs,
    ) -> Path:
        """Download an Earth Engine image efficiently using adaptive tiling and async IO.

        Args:
            image: Earth Engine image to download
            output_path: Path to save the final merged output
            aoi: Area of interest to download
            scale: Resolution in meters
            tile_size_km: Tile size in kilometers (None for automatic calculation)
            file_per_band: Whether to save each band as a separate file
            nodata: Nodata value for the output
            **kwargs: Additional arguments to pass to geemap.ee_export_image

        Returns:
            Path to the downloaded/merged file
        """
        start_time = time.time()

        # Calculate optimal tile size if not provided
        if tile_size_km is None:
            tile_size_km = self.calculate_optimal_tile_size_km(image=image, aoi=aoi, scale=scale)

        # Split AOI into tiles
        tiles_with_filenames = self.split_aoi_into_tiles(aoi, tile_size_km)

        # Create async download tasks
        download_tasks = []
        for tile, filename in tiles_with_filenames:
            # Create path for this tile
            tile_path = self._tmp_dir / filename

            # Create download task
            task = self._download_tile(
                image=image,
                tile=tile,
                output_path=tile_path,
                scale=scale,
                file_per_band=file_per_band,
                **kwargs,
            )
            download_tasks.append(task)

        # Run downloads with progress bar
        logger.info(f"Downloading {len(download_tasks)} tiles...")
        with tqdm(total=len(download_tasks), desc="Downloading") as pbar:
            # Create a semaphore to limit concurrent downloads
            semaphore = asyncio.Semaphore(self.max_workers)

            async def download_with_semaphore(task):
                async with semaphore:
                    result = await task
                    pbar.update(1)
                    return result

            # Wrap each task with semaphore
            download_tasks = [download_with_semaphore(task) for task in download_tasks]

            # Wait for all downloads to complete
            download_results = await asyncio.gather(*download_tasks)

        # Filter out failed downloads
        successful_downloads = [path for path in download_results if path is not None and path.exists()]
        success_rate = len(successful_downloads) / len(download_tasks) if download_tasks else 0

        logger.info(
            f"Downloaded {len(successful_downloads)}/{len(download_tasks)} tiles "
            f"({success_rate:.1%}) in {time.time() - start_time:.1f}s"
        )

        if not successful_downloads:
            raise RuntimeError("All tile downloads failed")

        # If we only have one tile, just move/copy it to the output path
        if len(successful_downloads) == 1:
            logger.info("Only one tile downloaded, using it directly")
            # Create output directory if it doesn't exist
            output_path.parent.mkdir(parents=True, exist_ok=True)

            # Copy to final destination
            shutil.copy2(successful_downloads[0], output_path)

            # Clean up the temporary tile if needed
            if self.cleanup:
                try:
                    os.remove(successful_downloads[0])
                except OSError as remove_err:
                    logger.warning(
                        f"Failed to remove temporary tile {successful_downloads[0]}: {remove_err}"
                    )

            return output_path

        # Merge multiple tiles
        logger.info(f"Merging {len(successful_downloads)} tiles...")
        merged_path = self.merge_geotiff_tiles(successful_downloads, output_path, nodata=nodata)

        # Clean up the temporary files if needed
        if self.cleanup:
            for path in successful_downloads:
                try:
                    os.remove(path)
                except OSError as remove_err:
                    logger.warning(f"Failed to remove temporary tile {path}: {remove_err}")

        logger.info(f"Total download and merge time: {time.time() - start_time:.1f}s")
        return merged_path


# Create a helper function for easier notebook use
async def download_ee_image(
    image: ee.Image,
    output_path: str | Path,
    aoi: ee.Geometry.Polygon,
    scale: float,
    max_workers: int = 4,
    max_size_mb: float = 45.0,
    tile_size_km: float | None = None,
    file_per_band: bool = False,
    nodata: float | None = None,
    cleanup: bool = True,
    **kwargs,
) -> Path:
    """Helper function to download Earth Engine images with optimal settings.

    Args:
        image: Earth Engine image to download
        output_path: Path to save the final merged output
        aoi: Area of interest to download
        scale: Resolution in meters
        max_workers: Maximum number of concurrent downloads
        max_size_mb: Maximum size of a single tile in MB
        tile_size_km: Tile size in kilometers (None for automatic calculation)
        file_per_band: Whether to save each band as a separate file
        nodata: Nodata value for the output
        cleanup: Whether to clean up temporary files
        **kwargs: Additional arguments to pass to geemap.ee_export_image

    Returns:
        Path to the downloaded/merged file
    """
    # Convert string path to Path
    if isinstance(output_path, str):
        output_path = Path(output_path)

    # Create output directory
    output_path.parent.mkdir(parents=True, exist_ok=True)

    # Create temporary directory for tiles
    tmp_dir = Path(tempfile.mkdtemp(prefix="ee_download_"))

    try:
        # Create downloader
        downloader = EarthEngineDownloader(
            tmp_dir=tmp_dir, max_workers=max_workers, max_size_mb=max_size_mb, cleanup=cleanup
        )

        # Download image
        result = await downloader.download_image(
            image=image,
            output_path=output_path,
            aoi=aoi,
            scale=scale,
            tile_size_km=tile_size_km,
            file_per_band=file_per_band,
            nodata=nodata,
            **kwargs,
        )

        return result
    finally:
        # Clean up temporary directory if cleanup is enabled
        if cleanup:
            try:
                shutil.rmtree(tmp_dir)
            except OSError as remove_err:
                logger.warning(f"Failed to remove temporary directory {tmp_dir}: {remove_err}")

In [25]:
# Example: Create an area of interest (AOI)
def create_sample_aoi(
    north: float = 50.0, west: float = 40.0, south: float = 40.0, east: float = 50.0
) -> ee.Geometry.Polygon:
    """Create a sample area of interest for demonstration.

    Args:
        north: Northern latitude boundary
        west: Western longitude boundary
        south: Southern latitude boundary
        east: Eastern longitude boundary

    Returns:
        Earth Engine polygon geometry representing the area of interest
    """
    # Create and return polygon
    return EarthEngineDownloader.create_polygon_from_extent(north, west, south, east)


# Create an AOI for Eastern Europe/Western Asia region
aoi = create_sample_aoi(north=70.0, west=10.0, south=40.0, east=45.0)

desired_datasets = [
    "COPERNICUS/Landcover/100m/Proba-V-C3/Global",
    "MODIS/061/MCD12Q1",
    "JAXA/GCOM-C/L3/LAND/LAI/V3",
]
# Get land cover data from Earth Engine - MODIS Land Cover Type product
image_collection = (
    ee.ImageCollection("MODIS/061/MCD12Q1")
    .filterDate("2023-01-01", "2023-12-31")  # Most recent complete year
    .first()  # Get the first image in the collection
    .select("LC_Type1")  # Select IGBP classification
    .clip(aoi)  # Clip to our area of interest
)

# Define paths for data storage
tmp_dir = Path("./data/tmp")  # Temporary directory for tiles
output_dir = Path("./data/landcover")  # Directory for final outputs
output_path = output_dir / "mcd12q1_2023_igbp_merged.tif"

# Create directories
tmp_dir.mkdir(parents=True, exist_ok=True)
output_dir.mkdir(parents=True, exist_ok=True)


# Define asynchronous download function
async def run_download():
    """Run the async download process with progress reporting."""
    start_time = time.time()

    try:
        # Download the image with all the optimizations
        result = await download_ee_image(
            image=image_collection,
            output_path=output_path,
            aoi=aoi,
            scale=500,  # meters
            max_workers=4,  # Adjust based on your system capabilities
            max_size_mb=45.0,  # Conservative limit below 50MB Earth Engine limit
            nodata=0,
            file_per_band=False,
            cleanup=True,  # Auto-cleanup temporary files
        )

        elapsed = time.time() - start_time
        logger.info(f"Download completed in {elapsed:.1f}s, result saved to: {result}")
        return result
    except Exception as e:
        logger.error(f"Download error: {e}")
        raise


# Execute the download using the current event loop in Jupyter
# Instead of asyncio.run() which would fail in a notebook environment
try:
    # Get the current event loop
    loop = asyncio.get_event_loop()

    # Run the download task
    result_path = loop.run_until_complete(run_download())

    logger.info(f"Download completed successfully: {result_path}")

    # Cleanup message
    if tmp_dir.exists():
        logger.info(f"Temporary directory {tmp_dir} will be removed on notebook shutdown")

except Exception as e:
    logger.error(f"Download failed: {e}", exc_info=True)  # Include full traceback

2025-06-25 00:08:34 | eeMaps | INFO     | calculate_optimal_tile_size_km:271 | Estimated full image size: 132.33 MB
2025-06-25 00:08:34 | eeMaps | INFO     | calculate_optimal_tile_size_km:296 | Calculated optimal tile size: 1535.38 km
2025-06-25 00:08:34 | eeMaps | INFO     | calculate_optimal_tile_size_km:296 | Calculated optimal tile size: 1535.38 km
2025-06-25 00:08:35 | eeMaps | INFO     | split_aoi_into_tiles:144 | Splitting AOI into 3x2 = 6 tiles
2025-06-25 00:08:35 | eeMaps | INFO     | download_image:440 | Downloading 6 tiles...
2025-06-25 00:08:35 | eeMaps | INFO     | split_aoi_into_tiles:144 | Splitting AOI into 3x2 = 6 tiles
2025-06-25 00:08:35 | eeMaps | INFO     | download_image:440 | Downloading 6 tiles...


Downloading:   0%|          | 0/6 [00:00<?, ?it/s]

2025-06-25 00:08:48 | eeMaps | INFO     | download_image:461 | Downloaded 6/6 tiles (100.0%) in 14.8s
2025-06-25 00:08:48 | eeMaps | INFO     | download_image:488 | Merging 6 tiles...
2025-06-25 00:08:48 | eeMaps | INFO     | merge_geotiff_tiles:326 | Merging 6 tiles into data/landcover/mcd12q1_2023_igbp_merged.tif
2025-06-25 00:08:48 | eeMaps | INFO     | download_image:488 | Merging 6 tiles...
2025-06-25 00:08:48 | eeMaps | INFO     | merge_geotiff_tiles:326 | Merging 6 tiles into data/landcover/mcd12q1_2023_igbp_merged.tif
2025-06-25 00:08:49 | eeMaps | INFO     | merge_geotiff_tiles:376 | Successfully merged tiles into data/landcover/mcd12q1_2023_igbp_merged.tif
2025-06-25 00:08:49 | eeMaps | INFO     | download_image:499 | Total download and merge time: 15.2s
2025-06-25 00:08:49 | eeMaps | INFO     | run_download:67 | Download completed in 15.2s, result saved to: data/landcover/mcd12q1_2023_igbp_merged.tif
2025-06-25 00:08:49 | eeMaps | INFO     | <module>:83 | Download completed 