# export

> Export API for large-scale batch extraction to Google Drive or Cloud Storage.

In [None]:
#| default_exp export

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Optional, Literal, List, Dict
import time

import ee

## ExportDestination

Configuration for where to export results.

In [None]:
#| export
@dataclass
class ExportDestination:
    """Configuration for export destination.

    Attributes:
        type: 'drive' for Google Drive, 'cloud_storage' for GCS
        folder: Drive folder name or GCS bucket/prefix
        file_prefix: Prefix for output filenames (default 'extraction')
        file_format: Output format - 'CSV' or 'GeoJSON' (default 'CSV')
    """
    type: Literal['drive', 'cloud_storage']
    folder: str
    file_prefix: str = 'extraction'
    file_format: Literal['CSV', 'GeoJSON'] = 'CSV'

## ExportConfig

Configuration for export behavior.

In [None]:
#| export
@dataclass
class ExportConfig:
    """Configuration for export behavior.

    Attributes:
        chunk_size: Number of sites per export task (default 50)
        max_concurrent: Maximum concurrent tasks (default 10)
        description_prefix: Prefix for GEE task descriptions
    """
    chunk_size: int = 50
    max_concurrent: int = 10
    description_prefix: str = 'gee_polygons_export'

## ExportTask

Handle to running or completed export tasks. Provides methods to monitor progress, wait for completion, and retrieve results.

In [None]:
#| export
@dataclass
class ExportTask:
    """Handle to running or completed export tasks.

    Provides methods to monitor progress, wait for completion,
    and retrieve results information.

    Attributes:
        task_ids: List of GEE task IDs
        destination: Export destination configuration
        config: Export configuration used
        chunk_mapping: Mapping of task_id to (start_idx, end_idx) tuples
    """
    task_ids: List[str]
    destination: ExportDestination
    config: ExportConfig
    chunk_mapping: Dict[str, tuple] = field(default_factory=dict)

    def status(self) -> Dict[str, str]:
        """Get status of all tasks.

        Returns:
            Dict mapping task_id to status string:
            'READY', 'RUNNING', 'COMPLETED', 'FAILED', 'CANCELLED', 'UNKNOWN'
        """
        statuses = {}
        for task_id in self.task_ids:
            try:
                task_status = ee.data.getTaskStatus(task_id)
                if task_status:
                    statuses[task_id] = task_status[0].get('state', 'UNKNOWN')
                else:
                    statuses[task_id] = 'UNKNOWN'
            except Exception:
                statuses[task_id] = 'UNKNOWN'
        return statuses

    def summary(self) -> Dict[str, int]:
        """Get summary counts by status.

        Returns:
            Dict with counts per status, e.g. {'COMPLETED': 10, 'RUNNING': 5}
        """
        statuses = self.status()
        summary = {}
        for status in statuses.values():
            summary[status] = summary.get(status, 0) + 1
        return summary

    def wait(
        self,
        timeout_minutes: int = 60,
        poll_interval_sec: int = 30,
        progress: bool = True
    ) -> bool:
        """Wait for all tasks to complete.

        Args:
            timeout_minutes: Maximum wait time (default 60)
            poll_interval_sec: Time between status checks (default 30)
            progress: Show progress bar if tqdm available

        Returns:
            True if all tasks completed successfully, False otherwise
        """
        start = time.time()
        timeout_sec = timeout_minutes * 60

        completed = set()
        pbar = None

        if progress:
            try:
                from tqdm.auto import tqdm
                pbar = tqdm(total=len(self.task_ids), desc="Export tasks")
            except ImportError:
                pass

        try:
            while time.time() - start < timeout_sec:
                statuses = self.status()

                for tid, status in statuses.items():
                    if tid not in completed and status in ('COMPLETED', 'FAILED', 'CANCELLED'):
                        completed.add(tid)
                        if pbar:
                            pbar.update(1)

                if len(completed) == len(self.task_ids):
                    return all(s == 'COMPLETED' for s in statuses.values())

                time.sleep(poll_interval_sec)

            # Timeout reached
            return False

        finally:
            if pbar:
                pbar.close()

    def cancel(self) -> Dict[str, bool]:
        """Cancel all running tasks.

        Returns:
            Dict mapping task_id to success boolean
        """
        results = {}
        for task_id in self.task_ids:
            try:
                ee.data.cancelTask(task_id)
                results[task_id] = True
            except Exception:
                results[task_id] = False
        return results

    def failed_tasks(self) -> List[str]:
        """Get list of failed task IDs."""
        return [tid for tid, status in self.status().items() if status == 'FAILED']

    def completed_tasks(self) -> List[str]:
        """Get list of completed task IDs."""
        return [tid for tid, status in self.status().items() if status == 'COMPLETED']

    def running_tasks(self) -> List[str]:
        """Get list of currently running task IDs."""
        return [tid for tid, status in self.status().items() if status == 'RUNNING']

    def results_info(self) -> Dict[str, dict]:
        """Get info about completed results.

        Returns:
            Dict mapping task_id to result info including destination URIs
        """
        results = {}
        for task_id in self.task_ids:
            try:
                task_status = ee.data.getTaskStatus(task_id)
                if task_status and task_status[0].get('state') == 'COMPLETED':
                    results[task_id] = {
                        'destination_uris': task_status[0].get('destination_uris', []),
                        'chunk': self.chunk_mapping.get(task_id),
                        'output_url': task_status[0].get('output_url', [])
                    }
            except Exception:
                pass
        return results

    def __repr__(self) -> str:
        summary = self.summary()
        parts = [f"{status}={count}" for status, count in sorted(summary.items())]
        return f"ExportTask(n={len(self.task_ids)}, {', '.join(parts)})"

## Internal Utilities

In [None]:
#| export
def _wait_for_task_slot(active_tasks: List[str], max_concurrent: int, poll_sec: int = 5) -> List[str]:
    """Wait until there's room for another task.

    Args:
        active_tasks: List of currently active task IDs
        max_concurrent: Maximum allowed concurrent tasks
        poll_sec: Seconds between checks

    Returns:
        Updated list of still-active task IDs
    """
    while len(active_tasks) >= max_concurrent:
        still_active = []
        for task_id in active_tasks:
            try:
                status = ee.data.getTaskStatus(task_id)
                if status and status[0].get('state') in ('READY', 'RUNNING'):
                    still_active.append(task_id)
            except Exception:
                pass  # Assume completed/failed
        active_tasks = still_active

        if len(active_tasks) >= max_concurrent:
            time.sleep(poll_sec)

    return active_tasks

## Example Usage

In [None]:
#| eval: false
# Example: Export a large collection to Google Drive
from gee_polygons import SiteCollection
from gee_polygons.datasets.mapbiomas import MAPBIOMAS_LULC

# Load 40,000 sites in lazy mode
sites = SiteCollection.from_geojson('all_sites.geojson', lazy=True)

# Configure export
destination = ExportDestination(
    type='drive',
    folder='restoration_extractions',
    file_prefix='lulc_2024'
)

config = ExportConfig(
    chunk_size=50,      # 50 sites per task
    max_concurrent=15   # Run 15 tasks at once
)

# Submit export (creates ~800 tasks)
task = sites.export_categorical(
    layer=MAPBIOMAS_LULC,
    years=range(2010, 2024),
    destination=destination,
    config=config
)

print(f"Submitted {len(task.task_ids)} export tasks")

# Monitor progress
task.wait(timeout_minutes=180)

# Check results
print(task.summary())
print(task.results_info())

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()