# core

> Core functionality for distributing Earth Engine requests among Dask workers.

In [None]:
#| default_exp core

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
import coiled
import dask.distributed
import ee
import google.auth

In [None]:
#| export
class InitEarthEngine(dask.distributed.WorkerPlugin):
    def __init__(self, **kwargs):
        print('InitEarthEngine.init: starting')  # This appears in the notebook output where the cluster is initiated.
        self.kwargs = kwargs
        print(f'InitEarthEngine.init: kwargs = {kwargs}')

    def setup(self, worker):
        # Print statements output to the dask cluster logs (viewable via Coiled dashboard)
        print('InitEarthEngine.setup: starting')
        print('InitEarthEngine.setup: default to using the high volume endpoint')
        self.kwargs.setdefault('opt_url', 'https://earthengine-highvolume.googleapis.com')
        import ee
        print(f'InitEarthEngine.setup: ee.Initialize(**{self.kwargs})')
        ee.Initialize(**self.kwargs)


class ClusterGEE(coiled.Cluster):
    def __init__(self, **kwargs):
        print('ClusterGEE init')
        super().__init__(**kwargs)
        # Wait for the workers to start, then send the ADCs
        self.wait_for_workers(kwargs['n_workers'])
        coiled.credentials.google.send_application_default_credentials(self)

    def get_client(self):
        print('ClusterGEE get_client')
        client = super().get_client()
        client.register_plugin(InitEarthEngine())
        return client


# For local development
class LocalClusterGEE(dask.distributed.LocalCluster):    
    def __init__(self, **kwargs):
        print('LocalClusterGEE init')
        super().__init__(**kwargs)

    def get_client(self):
        print('LocalClusterGEE get_client')
        client = super().get_client()
        client.register_plugin(InitEarthEngine())
        return client

# Try it out

Create a cluster and run a few jobs.

## Authenticate & Initialize Earth Engine

Get credentials and the GCP project ID, authenticating if necessary.

In [None]:
try:
    credentials, project_id = google.auth.default()
except google.auth.exceptions.DefaultCredentialsError:
    print('Unable to get auth credentials. Initiating auth flow.')
    !gcloud auth application-default login
    credentials, project_id = google.auth.default()
try:
    ee.Initialize(credentials=credentials, project=project_id)
except google.auth.exceptions.RefreshError:
    print('Credentials refresh error. Initiating auth flow.')
    !gcloud auth application-default login
    credentials, project_id = google.auth.default()
ee.Initialize(credentials=credentials, project=project_id)

# Start Dask Cluster

Start up a Earth Engine enabled cluster. This may take a few minutes to complete.

In [None]:
cluster = ClusterGEE(
    name='test-class-cluster',
    n_workers=1,
    worker_cpu=4,
    spot_policy="spot_with_fallback",
    region='us-central1',
    idle_timeout="1 hours",
)
# cluster = LocalClusterGEE()

Retrieve a client for the cluster, and display it.

In [None]:
client = cluster.get_client()
client

# Submit Jobs

Test it out by:
- Defining a function that can be distributed,
- Submitting jobs running the function to workers, 
- Gathering the results locally, and
- Displaying the results

In [None]:
# Get a list of countries to analyze.
country_fc = ee.FeatureCollection('USDOS/LSIB_SIMPLE/2017')
country_list = country_fc.aggregate_array('country_na').distinct().sort().getInfo()

import random

# Write a function that can be run by the cluster workers. 
def get_country_stats(country_name):
    country = country_fc.filter(ee.Filter.eq('country_na', country_name))
    elev = ee.ImageCollection("COPERNICUS/DEM/GLO30").select('DEM').mosaic()
    return {
        'country': country_name, 
        'area_km2': country.geometry().area().multiply(1e-6).round().getInfo(), 
        'mean_elev': elev.reduceRegion(reducer=ee.Reducer.mean(),
                                       geometry=country.geometry(),
                                       scale=10000,
                                       ).get('DEM').getInfo(),
    }


# Create and submit jobs among the workers.
print('Submitting jobs...')
futures = [
    client.submit(get_country_stats, country, retries=3)
    for country in ['Abyei Area', 'Zimbabwe']
]
print('...done')

# Gather up the results and display them.
print('Gathering results...')
results = client.gather(futures)
print('...done')
results

# Shut down the cluster.

In [None]:
cluster.shutdown()

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()