# Global Mangrove Watch Utils

Functions used across notebooks

In [None]:
import subprocess
import json
import math

from pydantic import BaseModel, PrivateAttr
from typing import List, Union, Dict, Optional
from enum import Enum
from google.cloud import storage
from pathlib import Path
import ee
import sys

if sys.version_info >= (3, 9):
    from typing import TypedDict, Literal, overload  # pylint: disable=no-name-in-module
else:
    from typing_extensions import TypedDict, Literal

import logging
logging.basicConfig(level=logging.INFO)


In [None]:
pyramiding_policy = Literal["MEAN","MODE","SAMPLE"]
gee_assets_type = Literal["table", "image"]

class TableProperties(BaseModel):
    name: str
    version: str
    creator: str
    description: str
    identifier: str
    keywords: str
    citation: str
    license: str
    url: str
    language: str
    altName: str
    distribution: str
    variableMeasured: str
    units: str
    spatialCoverage: str
    temporalCoverage: str
    dataLineage: str

class ImageCollectionProperties(BaseModel):
    name: str
    version: str
    creator: str
    description: str
    identifier: str
    keywords: str
    citation: str
    license: str
    url: str
    language: str
    altName: str
    distribution: str
    variableMeasured: str
    units: str
    spatialCoverage: str
    temporalCoverage: str
    dataLineage: str

class ImageProperties(BaseModel):
    band_nodata_values: str
    band_pyramiding_policies: pyramiding_policy
    band_names: str
    year: Optional[int]

In [None]:
class Sources(BaseModel):
    uris: List[str]
    affine_transform: Optional[dict]

class Tilesets(BaseModel):
    data_type: Optional[str]
    id: Optional[str]
    crs: Optional[str]
    sources: List[Sources]

class TimeGEE(BaseModel):
    seconds: str


class GEEManifest(BaseModel):
    path: Path
    name: str
    tilesets: List[Tilesets]
    bands: Optional[List[Dict]] = None
    mask_bands: Optional[List[str]] = None
    footprint: Optional[Dict] = None
    missing_data: Optional[Dict] = None
    pyramiding_policy: Optional[pyramiding_policy]
    uri_prefix: str
    start_time: Union[str,TimeGEE]
    end_time: Union[str,TimeGEE]
    properties: Union[TableProperties, ImageProperties]

    class Config:
        underscore_attrs_are_private = True

In [None]:
def execute_command(cmd: str) -> int:
    """Executes a command and returns the exit code.
    
    Args:
        cmd: The command to execute.
    
    Returns:
        The exit code of the command.
    """
    r = subprocess.call(cmd, shell=True)
    if r == 0:
        logging.info("Task created")
    else:
        logging.error(f"Error creating task: {r}")
    return r

In [None]:
# GC storage blob management.
def list_gcs(bucket_name: str, dir_path: str, file_pattern: str = "*"):
    ''' Creates a list of blobs from a gcs bucket.

    Args:
        bucket: The bucket to list
        dir_path: The path to the directory to list
        file_pattern: The file pattern to match

    Returns:
        A list of full paths
    
    example:
        list_gcs("bucket", "dir", "*.csv")

    '''
    p = f"{dir_path}/{file_pattern}"
    logging.info(f"Searching {p}")

    # storage_client = storage.Client.from_service_account_json(<path to service account>)
    storage_client = storage.Client()

        # Note: Client.list_blobs requires at least package version 1.17.0.
    blobs = storage_client.list_blobs(bucket_name, prefix=dir_path, delimiter=file_pattern, include_trailing_delimiter=True)

    return blobs

def download_blob(bucket_name: str, source_blob_name: str, destination_file_name: str) -> str:
    """Downloads a blob from the bucket.
    
    args:
        bucket_name: The bucket to download from. "your-bucket-name"
        source_blob_name: The name of the blob to download. "storage-object-name"
        destination_file_name: The name of the file to download to "local/path/to/file".

    returns:
        The path to the downloaded file.
    """
    # storage_client = storage.Client.from_service_account_json(<path to service account>)
    
    storage_client = storage.Client()

    bucket = storage_client.bucket(bucket_name)

    # Construct a client side representation of a blob.
    # Note `Bucket.blob` differs from `Bucket.get_blob` as it doesn't retrieve
    # any content from Google Cloud Storage. As we don't need additional data,
    # using `Bucket.blob` is preferred here.
    blob = bucket.blob(source_blob_name)
    blob.download_to_filename(destination_file_name)

    logging.info(
        f"Downloaded storage object {source_blob_name} \
        from bucket {bucket_name} to local file {destination_file_name}."
    )
    return destination_file_name

def upload_blob(bucket_name: str, source_file_name: str, destination_blob_name: str) -> str:
    """Uploads a file to the bucket.
    
    Args:
        bucket_name: The bucket to upload to. "your-bucket-name"
        source_file_name: The path to the file to upload. "local/path/to/file"
        destination_blob_name: The name of the blob to upload to. "storage-object-name"
    
    Returns:
        The public url of the uploaded file.
    
    """
    # storage_client = storage.Client.from_service_account_json(<path to service account>)
    storage_client = storage.Client() 
    
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    blob.upload_from_filename(source_file_name)

    logging.info(
        f"File {source_file_name} uploaded to {destination_blob_name}"
    )
    return destination_blob_name

In [None]:
def copy_folder_gcs(source: str, dest: str, opts: str="")-> None:
    """
    Use gsutil to copy each corresponding item in source_list
    to dest_list.

    Args:
        source: The source folder to copy from.
        dest: The destination folder to copy to.
        opts: The options to pass to gsutil.

    Example:
    copy_gcs("gs://my-bucket/folder", ".")
    """
    cmd = f"gsutil -m cp -r {opts} '{source}' '{dest}'"
    execute_command(cmd)

### Update GEE Asset properties

In [None]:
def createEEManifest(manifest: GEEManifest, file_path: Path)-> Path:
    """
    Use gsutil to copy each corresponding item in source_list
    to dest_list.

    Args:
        source: The source folder to copy from.
        dest: The destination folder to copy to.
        opts: The options to pass to gsutil.

    Example:
    copy_gcs("gs://my-bucket/folder", ".")
    """
    with open(file_path, 'w') as f:
        f.write(manifest.json(exclude_none=True, indent=2, exclude={'path'}))
    
    return file_path

# we should be able to use the manifest to create the task with ee.data.startIngestion
def uploadAssetEE(asset_type: gee_assets_type, manifest: GEEManifest)-> Path:
    """
    Use gsutil to copy each corresponding item in source_list
    to dest_list.

    Args:
        asset_type: The type of asset to upload.
        manifest: The manifest to upload.

    Example:
    uploadAssetEE("table", manifest)
    """
    try:
        manifest_path = manifest.path
        cmd = f"earthengine upload {asset_type} --manifest {manifest_path}"

        createEEManifest(manifest, manifest_path)
        execute_command(cmd)
        return manifest_path

    except Exception as e:
        logging.error(f"Error uploading asset {manifest.name}")
        logging.error(e)
        raise e

### Crerate GEE Image collection

In [None]:
# @deprecate: use `get_table_properties` instead
# TODO: use the programatic way of doing this
def createImageCollection(ee_asset_path: Path, 
                        properties: ImageCollectionProperties, 
                        image_list: List[GEEManifest]):
    """
    Create an image collection from a list of images.
    """
    try:
        # Creates the image collection
        ee.data.createAsset({"type":"ImageCollection"}, ee_asset_path, properties.dict(exclude_none=True))
        logging.info(f"Created image collection {ee_asset_path}")

        # Creates the uploads for each image in the collection
        for image in image_list:
            uploadAssetEE("image", image)
            logging.info(f"Upload image {image.name}")

    except Exception as e:
        logging.error(f"Error creating collection {ee_asset_path}")
        logging.error(e)
        raise e

In [None]:
def addImagesToCollection(image_collection_path: Path, image_list: List[GEEManifest]):
    """
    Add images to an image collection.
    """
    try:
        for image in image_list:
            uploadAssetEE("image", image)
            logging.info(f"Upload image {image.name}")
            
    except Exception as e:
        
        raise e

In [None]:
def exportMapTasks(asset: ee.ImageCollection, asset_name: str, gcbucket: str, years: list,
                region: ee.Geometry, style: str, min_zoom: int, max_zoom: int, 
                env: str = 'staging', key: str = 'year'):
    """
    Export the data to GEE.

    Parameters
    ----------
    asset : ee.Image
        The image to export.
    asset_name : str
        The name of the asset.
    gcbucket : str
        The name of the GCS bucket.
    geometry_collection : ee.GeometryCollection
        The geometry collection to export the data.
    
    Returns
    -------
    List of tasks
    """
    taskList = []
    
    # TODO: improve this loop
    for year in years:
        task = ee.batch.Export.map.toCloudStorage(
            image = ee.Image(asset.filterMetadata(key, 'equals', year).first()
                    ).sldStyle(style),
            description = f'{asset_name}_{year}',
            path = f'{env}/tilesets/{asset_name}/{year}',
            bucket = gcbucket,
            minZoom = min_zoom,
            maxZoom = max_zoom,
            writePublicTiles = False,
            skipEmptyTiles = True,
            fileFormat = 'png',
            region = region
            )
        taskList.append(task)
    
    return taskList

In [None]:
def batchExecute(taskList: list, batch_size: int = 20):
    """
    Execute the tasks in the list in baches of 20 as is the max allowed by GEE. 
    each task takes about 10 min to execute.
    """
    n_tasks = len(taskList)
    n_batches = math.ceil(n_tasks / batch_size)
    
    for i in range(n_batches):
        for task in taskList[i*batch_size:(i+1)*batch_size]:
            task.start()

    return taskList