# Distributed processing using Dynamic Tiler + mosaicJSON


### Goal
Create Mosaic of Landsat 8, using rio-tiler-mosaic pixel selection methods


### Requirement
```
!pip install tdqm cogeo-mosaic
```

![](https://user-images.githubusercontent.com/10407788/74463971-b47ae000-4e60-11ea-8413-b058e00a2b09.png)

In [None]:
import sys
import time
import random
import requests
import datetime
import urllib.parse
from io import BytesIO
from concurrent import futures

from tqdm.notebook import tqdm

import mercantile
from supermercado.burntiles import tile_extrema
from affine import Affine

import rasterio
from rasterio.io import MemoryFile
from rasterio.windows import Window

from rio_cogeo.utils import _meters_per_pixel
from rio_cogeo.cogeo import cog_translate
from rio_cogeo.profiles import cog_profiles

from rio_tiler_mosaic.mosaic import _filter_futures

In [None]:
# See https://github.com/developmentseed/awspds-mosaic to deploy your own 
endpoint = "{YOUR ENDPOINT HERE}"

# Define AOI here
bounds = [-5.833740234375, 46.15700496290803, -0.9118652343749999, 49.23912083246698]

# Date Filters
start = datetime.datetime.strptime("2019-01-01", "%Y-%m-%d").strftime("%Y-%m-%dT00:00:00Z")
end = datetime.datetime.strptime("2019-12-31", "%Y-%m-%d").strftime("%Y-%m-%dT23:59:59Z")

In [None]:
# SAT-API QUERY
# see http://sat-utils.github.io/sat-api/
query = {
    "bbox": bounds,
    "time": f"{start}/{end}",
    "query": {
        "eo:sun_elevation": {"gt": 0},
        "landsat:tier": {"eq": "T1"},
        "collection": {"eq": "landsat-8-l1"},
        "eo:cloud_cover": {"gte": 0, "lt": 3}  # Cloud Filters
    },
    "sort": [{
      "field": "eo:cloud_cover",
      "direction": "asc"
    }],
}

tilescale = 2  # Each tile will be 512x512px
tilesize = 256 * tilescale

# We post the query to the mosaic endpoint with some optional parameters
results = requests.post(
    f"{endpoint}/mosaic/create", 
    json=query,
    params={
        # Landsat covers zoom 7 to 12 but 
        # because we don't want to use the mosaic for visualization 
        # purpose we don't really care about lower zoom level. 
        # We could use zoom 11 but don't want to make the mosaicJSON file too big.
        # Minzoom define the quadkey zoom and thus the number of quadkey list
        # See https://github.com/developmentseed/mosaicjson-spec/tree/master/0.0.2
        "minzoom": 9, 
        # We filter the season to have greenest data 
        "seasons": "spring,summer",
        # Here we can also pass some tile option to be added to the tile url.
        "tile_format":"tif",  # We use GeoTIFF output from the tiler
        "tile_scale": tilescale,
    },
).json()

In [None]:
# Get Mercator tiles covering the AOI
# For Landsat the maxzoom is 12 (~38m resolution, in Web Mercator projection)
# because we use 512x512 pixel tiles we can 
# fetch tiles at zoom 11 instead of 12.
zoom = results["maxzoom"] - (tilescale - 1)

extrema = tile_extrema(bounds, zoom)
tiles = []
for x in range(extrema["x"]["min"], extrema["x"]["max"]):
    for y in range(extrema["y"]["min"], extrema["y"]["max"]):
        tiles.append(mercantile.Tile(z=zoom, x=x, y=y))

random.shuffle(tiles)
print(len(tiles))

In [None]:
# Create tile URL
# See https://landsatlive.live/tiles/docs for the list of options you can pass to the tile url
query_params = dict(
    bands="4,3,2", # True Color RGB
    color_ops="gamma RGB 3.5, saturation 1.7, sigmoidal RGB 15 0.35", # Looks nice
    pixel_selection="median",  # See https://github.com/cogeotiff/rio-tiler-mosaic/blob/master/rio_tiler_mosaic/methods/defaults.py#L86
)
tiles_url = results["tiles"][0] + urllib.parse.urlencode(query_params)

In [None]:
# Define Output COG parameters
width = (extrema["x"]["max"] - extrema["x"]["min"]) * tilesize
height = (extrema["y"]["max"] - extrema["y"]["min"]) * tilesize
w, n = mercantile.xy(
    *mercantile.ul(extrema["x"]["min"], extrema["y"]["min"], zoom)
)
res = _meters_per_pixel(zoom, 0, tilesize=tilesize)

params = dict(
    driver="GTiff",
    dtype="uint8",
    count=3,
    width=width,
    height=height,
    crs="epsg:3857",
    transform=Affine(res, 0, w, 0, -res, n),
    nodata=0,
    tiled=True,
    blockxsize=tilesize,
    blockysize=tilesize,
)
output_profile = cog_profiles.get("deflate")

In [None]:
def _worker(tile, tile_url, extrema, tilesize=256, retry=0):
    """
    Tile Worker.
    
    Call the tile endpoint for each mercator tile.
    
    """
    url = tile_url.format(z=tile.z, x=tile.x, y=tile.y)
    img = requests.get(url)
    if not img.status_code == 200:
        time.sleep(3)
        if retry == 3:
            raise Exception("Empty")
        return _worker(tile, retry=retry+1)

    row = (tile.y - extrema["y"]["min"]) * tilesize
    col = (tile.x - extrema["x"]["min"]) * tilesize
    window = Window(col_off=col, row_off=row, width=tilesize, height=tilesize)

    return window, img

In [None]:
# Distribute the processes using multithreading
# and then re-assemble the tiles in one COG
with rasterio.Env():
    with MemoryFile() as memfile:
        with memfile.open(**params) as mem:
            with futures.ThreadPoolExecutor(max_workers=50) as executor:
                future_work = [
                    executor.submit(_worker, tile, tiles_url, extrema, tilesize) for tile in tiles
                ]
                
                for f in tqdm(futures.as_completed(future_work), total=len(future_work)):               
                    pass

            for f in _filter_futures(future_work):
                window, img = f
                with rasterio.open(BytesIO(img.content)) as src_dst:
                    mem.write(src_dst.read(indexes=(1,2,3)), window=window)

            cog_translate(
                mem,
                f"Bretagne_medianPixel_2019.tif",
                output_profile,
                config=dict(GDAL_NUM_THREADS="ALL_CPUS", GDAL_TIFF_OVR_BLOCKSIZE="128"),
            )

![Capture d’écran, le 2020-02-13 à 12 51 36](https://user-images.githubusercontent.com/10407788/74463338-b2fce800-4e5f-11ea-9216-76ffb0dea2b4.jpg)


# Using rio-tiler Expression and Custom Pixel_Selection methods

### Create Mosaic and visualize True Color RGB with highest NDVI value

The idea is to use rio-tiler expression method to create a 4th band on which we'll base the pixel selection method.

The `lastband` method will:
- analyse the stack of tiles
- find where the value for the 4th band is the highest
- construct a tile with the index obtained
- automatically remove the last band of the tile

In [None]:
# Create tile URL
# See https://landsatlive.live/tiles/docs for the list of options you can pass to the tile url
query_params = dict(
    expr="b4,b3,b2,(b5-b4)/(b5+b4)",  # Red,Green,Blue,NDVI
    color_ops="gamma RGB 3.5, saturation 1.7, sigmoidal RGB 15 0.35", # Looks nice
    pixel_selection="lastband",  # See https://github.com/developmentseed/awspds-mosaic/blob/master/awspds_mosaic/pixel_methods.py#L30
)
tiles_url = results["tiles"][0] + urllib.parse.urlencode(query_params)


with rasterio.Env():
    with MemoryFile() as memfile:
        with memfile.open(**params) as mem:
            with futures.ThreadPoolExecutor(max_workers=50) as executor:
                future_work = [
                    executor.submit(_worker, tile, tiles_url, extrema, tilesize) for tile in tiles
                ]
                
                for f in tqdm(futures.as_completed(future_work), total=len(future_work)):               
                    pass

            for f in _filter_futures(future_work):
                window, img = f
                with rasterio.open(BytesIO(img.content)) as src_dst:
                    mem.write(src_dst.read(indexes=(1,2,3)), window=window)

            cog_translate(
                mem,
                f"Bretagne_highNDVI_2019.tif",
                output_profile,
                config=dict(GDAL_NUM_THREADS="ALL_CPUS", GDAL_TIFF_OVR_BLOCKSIZE="128"),
            )

![](https://user-images.githubusercontent.com/10407788/74463335-b2fce800-4e5f-11ea-80e2-9ac94d9132f5.jpg)

## At Scale

Let's try at country scale

In [None]:
# La France
bounds = [-6.8115234375, 42.35854391749705, 9.580078125, 52.07950600379697]

# Date Filters
start = datetime.datetime.strptime("2013-01-01", "%Y-%m-%d").strftime("%Y-%m-%dT00:00:00Z")
end = datetime.datetime.strptime("2019-12-31", "%Y-%m-%d").strftime("%Y-%m-%dT23:59:59Z")

# SAT-API QUERY
# see http://sat-utils.github.io/sat-api/
query = {
    "bbox": bounds,
    "time": f"{start}/{end}",
    "query": {
        "eo:sun_elevation": {"gt": 0},
        "landsat:tier": {"eq": "T1"},
        "collection": {"eq": "landsat-8-l1"},
        "eo:cloud_cover": {"gte": 0, "lt": 5}  # Cloud Filters
    }
}

tilescale = 2  # Each tile will be 512x512px
tilesize = 256 * tilescale

results = requests.post(
    f"{endpoint}/mosaic/create", 
    json=query,
    params={
        "minzoom": 9, 
        "seasons": "summer",
        "tile_format":"tif",
        "tile_scale": tilescale,
    },
).json()

mosaicid = results["name"]

# List of tiles
zoom = results["maxzoom"] - (tilescale - 1)
extrema = tile_extrema(bounds, zoom)
tiles = []
for x in range(extrema["x"]["min"], extrema["x"]["max"]):
    for y in range(extrema["y"]["min"], extrema["y"]["max"]):
        tiles.append(mercantile.Tile(z=zoom, x=x, y=y))
random.shuffle(tiles)

# Define Output COG parameters
width = (extrema["x"]["max"] - extrema["x"]["min"]) * tilesize
height = (extrema["y"]["max"] - extrema["y"]["min"]) * tilesize
w, n = mercantile.xy(
    *mercantile.ul(extrema["x"]["min"], extrema["y"]["min"], zoom)
)
res = _meters_per_pixel(zoom, 0, tilesize=tilesize)

params = dict(
    driver="GTiff",
    dtype="uint8",
    count=3,
    width=width,
    height=height,
    crs="epsg:3857",
    transform=Affine(res, 0, w, 0, -res, n),
    nodata=0,
    tiled=True,
    blockxsize=tilesize,
    blockysize=tilesize,
)
output_profile = cog_profiles.get("deflate")

In [None]:
# Create tile URL
# See https://landsatlive.live/tiles/docs for the list of options you can pass to the tile url
query_params = dict(
    bands="4,3,2", # True Color RGB
    color_ops="gamma RGB 3.5, saturation 1.7, sigmoidal RGB 15 0.35", # Looks nice
    pixel_selection="median",
)

# For this large scale process we are going to use another endpoint which can run up to 30seconds per tile.
tiles_url = f"{endpoint}/batch/{mosaicid}/{{z}}/{{x}}/{{y}}@{tilescale}x.tif?" + urllib.parse.urlencode(query_params)

In [None]:
# Distribute the processes using multithreading
# and then re-assemble the tiles in one COG
with rasterio.Env():
    with MemoryFile() as memfile:
        with memfile.open(**params) as mem:
            with futures.ThreadPoolExecutor(max_workers=100) as executor:
                future_work = [
                    executor.submit(_worker, tile, tiles_url, extrema, tilesize) for tile in tiles
                ]
                
                for f in tqdm(futures.as_completed(future_work), total=len(future_work)):               
                    pass

            for f in _filter_futures(future_work):
                window, img = f
                with rasterio.open(BytesIO(img.content)) as src_dst:
                    mem.write(src_dst.read(indexes=(1,2,3)), window=window)

            cog_translate(
                mem,
                f"France_Median_2013-2019_Z12.tif",
                output_profile,
                config=dict(GDAL_NUM_THREADS="ALL_CPUS", GDAL_TIFF_OVR_BLOCKSIZE="128"),
            )

![](https://user-images.githubusercontent.com/10407788/74540814-70deaf80-4f0e-11ea-879f-f51954d545d0.jpg)