In [None]:
from dask.distributed import (
    Client, 
    progress as dask_progress,
    wait as dask_wait,
)
import os
import dask

# configure dashboard link to go over proxy
dask.config.set({"distributed.dashboard.link":
                 os.environ.get('JUPYTERHUB_SERVICE_PREFIX', '/')+"proxy/{port}/status"});

In [None]:
from datacube.utils.rio import configure_s3_access

client = Client('tcp://dask-scheduler:8786')
display(client)

if False:
    client.restart()

if not client.get_metadata('odc_s3', False):
    # Configure GDAL for s3 access
    print('Configuring S3 read')
    configure_s3_access(aws_unsigned=True,  # works only when reading public resources
                        client=client)
    client.set_metadata('odc_s3', True)


In [None]:
import pickle
from pathlib import Path
import gzip
import s2gm
from datacube.utils.dask import compute_tasks
from tqdm.notebook import tqdm

cfg = s2gm.load_config('gmcfg.toml')
tasks_pkl = Path(f'tasks_{cfg.year}_{cfg.season}.pkl.gz')

In [None]:
%%time
tasks = pickle.load(gzip.open(tasks_pkl, 'rb'))

In [None]:
from datacube.utils.aws import s3_client, s3_fetch, auto_find_region
import time
from distributed import wait as dask_wait
import toolz
import distributed

region = auto_find_region()
s3 = s3_client(creds=cfg.s3.creds, region_name=region)

def s3_file_exists(url, s3):
    try:
        bb = s3_fetch(url, range=(0,4), s3=s3)
        return len(bb) > 0
    except:
        return False

def task_should_skip(task):
    y_url = cfg.s3.prefix + task.dataset_prefix + task.file_prefix + '.yaml'
    return s3_file_exists(y_url, s3=s3)
    

def is_persist_done(x, timeout=0.01):
    try:
        dask_wait(x, timeout=timeout)
        return True
    except distributed.TimeoutError as e:
        return False

    
def mk_futures(tasks, n_active=3, delay=0):
    tasks = (task for task in tasks if not task_should_skip(task))
    
    def stage1(tasks):
        for task in tasks:
            if delay:
                time.sleep(delay)
                
            xx, gm, yaml = s2gm.process_task(task, cfg, client)
            yaml = client.compute(yaml)
            del xx
            yield (gm, yaml)

    inprogress = []

    for (gm, yaml) in stage1(tasks):
        inprogress.append((gm, yaml))
        
        while True:
            todo = []
            for (gm, yaml) in inprogress:
                if is_persist_done(gm, 0.1):
                    print('.', end='')
                    del gm
                    yield yaml
                else:
                    todo.append((gm, yaml))
            inprogress = todo
            if len(inprogress) <= n_active:
                break
                

In [None]:
yy_futures = []

for f in mk_futures(tqdm(tasks), 6, delay=5):
    yy_futures.append(f)
    
    y_done = []
    y_todo = []
    for y in yy_futures:
        if y.done():
            y_done.append(y)
        else:
            y_todo.append(y)
    yy_futures = y_todo
    
    for y in y_done:
        try:
            url, ok = y.result()
            print(f'{url} {ok}')
        except Exception as e:
            print(str(e))
