In [2]:
import coiled
from typing import List, Dict, Any
import json

import dask
from dask.distributed import Client
from mercantile import tiles, bounds
import numpy as np
import fiona

from label_maker.label import make_labels
from label_maker.filter import create_filter
from label_maker.utils import get_image_function

In [2]:
class LabelMakerJob():
    def __init__(self, zoom: int=None, bounds: List[float]=None, classes: List[Dict[Any, Any]]=None, imagery: str=None, label_source:str=None, ml_type:str=None):
        """initialize new label maker job for dask"""
        self.zoom = zoom
        self.bounds = bounds
        self.classes = classes
        self.imagery = imagery
        self.label_source = label_source
        self.ml_type = ml_type
        self.results = None

    def build_job(self):
        self.tile_list = list(tiles(*self.bounds, [self.zoom]))
        label_tups = [self.label(tile) for tile in self.tile_list]
        self.tasks = [self.get_image(tup) for tup in label_tups]
        print("Sample graph")
        return dask.visualize(self.tasks[:3])

    def execute_job(self):
        self.results = dask.compute(*self.tasks)
        
    def n_tiles(self):
        try:
            return len(self.tile_list)
        except AttributeError:
            print("Call build_job first to construct a tile list")
            return None
    
    @dask.delayed
    def label(self, tile):
        """
        Parameters
        ------------
        tile: mercantile.Tile
            tile index
        label_job: dict
            Job definition
        Returns
        ---------
        label: tuple
            The first element is a mercantile tile. The second element is a numpy array
            representing the label of the tile
        """
        ml_type = self.ml_type
        classes = self.classes
        label_source = self.label_source

        tile_bounds = bounds(tile)
        features = []

        with fiona.open(label_source, 'r') as src:
            for f in src.filter(bbox=(tile_bounds.west, tile_bounds.south, tile_bounds.east, tile_bounds.north)):
                f['properties'] = json.loads(f['properties']['json'])
                features.append(f)
                
        # if ml_type == 'classification':
        class_counts = np.zeros(len(classes) + 1, dtype=np.int32)
        for i, cl in enumerate(classes):
            ff = create_filter(cl.get('filter'))
            class_counts[i + 1] = int(bool([f for f in features if ff(f)]))
        # if there are no classes, activate the background
        if np.sum(class_counts) == 0:
            class_counts[0] = 1
            
        return (tile, class_counts)

    @dask.delayed
    def get_image(self, tup):
        tile, label = tup
        image_function = get_image_function(self.imagery)
        return (tile, label, np.random.rand(256, 256, 3))

In [3]:
lmj = LabelMakerJob(
    zoom=13,
    bounds=[-44.48364257812499, -23.026659627974098, -42.4127197265625, -22.085639901650328],
    classes=[
        { "name": "Roads", "filter": ["has", "highway"] },
        { "name": "Buildings", "filter": ["has", "building"] }
      ],
    imagery="http://a.tiles.mapbox.com/v4/mapbox.satellite/{z}/{x}/{y}.jpg?access_token=ACCESS_TOKEN",
    ml_type="classification",
    label_source="s3://fgb-explore/brazil.fgb"
)

In [4]:
ENV = "label-maker-dask-env"
# coiled.create_software_environment(
#    name=ENV,
#    pip=[
#         "dask[complete]",
#         "xarray==0.15.1",
#         "toolz",
#         "numpy",
#         "requests",
#         "mercantile",
#         "rasterio",
#         "shapely",
#         "fiona",
#         "boto3",
#         "label-maker"
#     ],
#     conda={"channels": ["conda-forge"], "dependencies": ["python==3.8.10"]}
# )

In [5]:
# cluster = coiled.Cluster(name="drewbo-123bdc47-4")
cluster = coiled.Cluster(n_workers=8, software=ENV, worker_cpu=1, worker_memory="2 GiB", backend_options=dict(region="us-east-1"))

Output()

Found software environment build
Created FW rules: coiled-dask-drewbo-39113-firewall
Created scheduler VM: coiled-dask-drewbo-39113-scheduler (ip: ['34.237.137.222'])


In [None]:
client = Client(cluster)
client.dashboard_link

In [None]:
lmj.build_job()

In [None]:
lmj.n_tiles()

In [None]:
lmj.execute_job()

In [None]:
lmj.results[0]

In [None]:
pickle.dumps(lmj)

In [None]:
%matplotlib inline
dask.visualize(results[:3])

In [None]:
results_out = dask.compute(*results)

In [None]:
results_out

In [None]:
lmj.thing_it_doesnt_have