# WIT Dispatcher

In [1]:
import fiona.transform
import time
import asyncio, aiohttp, xmltodict
import string

if not hasattr(asyncio, 'create_task'):
    asyncio.create_task = asyncio.ensure_future

Polygons can be read directly from the zipped shapefile supplied. (However, an optimisation would be to store the vector data in the target projection.)

In [2]:
def generate_polygons(max_index=None, max_length=None):
    """Produce indexed WGS84 polygons from shapefile"""
    source = "zip://Queensland_dominant_wetland_areas_22042020.zip"
    with fiona.open(source) as collection:
        #collection.ignore_fields = list(collection.schema['properties']) # read fewer columns
        for i, record in enumerate(collection):
            if max_length and record['properties']['Shape_Leng'] > max_length:
                continue
            # Note, reprojection is very slow, and could instead be prepared prior to runtime. 
            geom = fiona.transform.transform_geom(collection.crs, 'EPSG:4326', record['geometry'])
            geom['coordinates'] = [[[lon, lat] for (lat, lon) in ring] for ring in geom['coordinates']]
            yield i, geom
            if max_index and i >= max_index:
                break

Polygons must be encoded into WPS *execute* requests, together with a time interval. For WIT, it is unimportant whether all time is processed in a single interval, or divided into multiple successive increments.

In [3]:
request_template = """
<wps:Execute version="1.0.0" service="WPS" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.opengis.net/wps/1.0.0" xmlns:wfs="http://www.opengis.net/wfs" xmlns:wps="http://www.opengis.net/wps/1.0.0" xmlns:ows="http://www.opengis.net/ows/1.1" xmlns:gml="http://www.opengis.net/gml" xmlns:ogc="http://www.opengis.net/ogc" xmlns:wcs="http://www.opengis.net/wcs/1.1.1" xmlns:xlink="http://www.w3.org/1999/xlink" xsi:schemaLocation="http://www.opengis.net/wps/1.0.0 http://schemas.opengis.net/wps/1.0.0/wpsAll.xsd">
    <ows:Identifier>WIT</ows:Identifier>
    <wps:DataInputs>
    <wps:Input>
        <ows:Identifier>geometry</ows:Identifier>
        <wps:Data>
        <wps:ComplexData>{"type":"FeatureCollection","features":[{"type":"Feature","geometry":%s}]}</wps:ComplexData>
        </wps:Data>
    </wps:Input>
    <wps:Input>
        <ows:Identifier>start</ows:Identifier>
        <wps:Data>
        <wps:ComplexData>{"type":"object","properties":{"timestamp":{"type":"string","format":"date-time","date-time":"2010-01-01T00:00"}}}</wps:ComplexData>
        </wps:Data>
    </wps:Input>
    <wps:Input>
        <ows:Identifier>end</ows:Identifier>
        <wps:Data>
        <wps:ComplexData>{"type":"object","properties":{"timestamp":{"type":"string","format":"date-time","date-time":"2011-01-01T00:00"}}}</wps:ComplexData>
        </wps:Data>
    </wps:Input>
    </wps:DataInputs>
    <wps:ResponseForm>
    <wps:ResponseDocument storeExecuteResponse="true" status="true"/>
    </wps:ResponseForm>
</wps:Execute>
"""
def request_doc(geom):
    return request_template % str(geom).replace("'", '"').replace(' ', '')

For testing, we just want a few individual requests for the WPS, and to exclude any large polygons. There is also a simpler request example that only targets a pixel drill.

In [4]:
example_requests = lambda:((i, request_doc(poly)) for i, poly in generate_polygons(max_index=10, max_length=2000))

Next, to poll the WPS for requests to be performed.

In [5]:
wps_url = 'https://ows.dev.dea.ga.gov.au/wps/?service=WPS&request=Execute'
wps_headers = {'Content-Type': 'text/xml;charset=UTF-8', 'cache-control': 'max-age=0'}
polling_interval = 2.0 # seconds
timeout = 600

In [6]:
async def request(session, log, index, request_doc):
    "Execute a single request, and process through to completion"
    try:
        log[index] = dict(start=time.time())
        resp = await session.post(wps_url, data=request_doc, headers=wps_headers)
        text = await resp.text()
        log[index]['post'] = time.time()
        assert resp.status == 200, f"Initial request status {resp.status}"  
        log[index]['resp'] = time.time()
        info = xmltodict.parse(text)
        #info = xmltodict.parse(await resp.text())
        url = info['wps:ExecuteResponse']['@statusLocation']
        log[index]['status_url'] = url
        log[index]['response'] = text
        assert 'wps:ProcessAccepted' in info['wps:ExecuteResponse']['wps:Status'], "Process not accepted"
        while {'wps:ProcessAccepted', 'wps:ProcessStarted'} & set(info['wps:ExecuteResponse']['wps:Status']):
            if time.time() - log[index]['start'] > timeout:
                raise Exception("Timeout")
            await asyncio.sleep(polling_interval)
            resp = await session.get(url)
            assert resp.status == 200, f"Polling status f{resp.status}"
            info = xmltodict.parse(await resp.text())
        assert 'wps:ProcessSucceeded' in info['wps:ExecuteResponse']['wps:Status'], "Process reports failure"
        
        result = info['wps:ExecuteResponse']['wps:ProcessOutputs']['wps:Output']['wps:Data']\
                     ['wps:LiteralData']['#text']
        log[index]['output'] = result
        return info['wps:ExecuteResponse']['wps:ProcessOutputs']
    except Exception as e:
        log[index]['exception'] = e
    finally:
        log[index]['finish'] = time.time()

In [7]:
async def distribute(task_generator, func, max_concurrency):
    """Apply func to each task, while still limiting how many are loaded in memory"""
    
    sem = asyncio.Semaphore(max_concurrency)
    
    async def wrapper(args):
        try:
            await func(*args)
        finally:
            sem.release()
    
    for args in task_generator:
        await sem.acquire()
        asyncio.create_task(wrapper(args))
        
    # wait for completion
    for i in range(max_concurrency):
        await sem.acquire()

In [8]:
async def monitor(log, task):
    
    process = asyncio.create_task(task)
    
    while not process.done():
        await asyncio.sleep(0.2)
        
        started = len(log)
        finished = sum(1 for i in log if 'finish' in log[i])
        failure = sum(1 for i in log if 'exception' in log[i])
        
        print(f"{started - finished} running, {finished - failure} completed, {failure} failed.", end='\r')

In [9]:
log = {}

async with aiohttp.ClientSession() as session:
    
    # Trickle
    await monitor(log, distribute(example_requests(), lambda *args: request(session, log, *args), 2))
    
    # Flood
    #await monitor(log, asyncio.gather(*[request(session, log, i, req) for (i, req) in example_requests()]))

0 running, 10 completed, 0 failed.

In [10]:
for i, d in log.items():
    print(d['output'])

s3://dea-dev-wps-results/wit/f72f0e92-e367-11eb-9dae-b25a859a6219/f72f0e92-e367-11eb-9dae-b25a859a6219.snappy.parquet
s3://dea-dev-wps-results/wit/f7def730-e367-11eb-9dae-b25a859a6219/f7def730-e367-11eb-9dae-b25a859a6219.snappy.parquet
s3://dea-dev-wps-results/wit/61183586-e368-11eb-9dae-b25a859a6219/61183586-e368-11eb-9dae-b25a859a6219.snappy.parquet
s3://dea-dev-wps-results/wit/60f1926e-e368-11eb-9dae-b25a859a6219/60f1926e-e368-11eb-9dae-b25a859a6219.snappy.parquet
s3://dea-dev-wps-results/wit/b6dd1b1c-e368-11eb-9dae-b25a859a6219/b6dd1b1c-e368-11eb-9dae-b25a859a6219.snappy.parquet
s3://dea-dev-wps-results/wit/c66d7450-e368-11eb-9dae-b25a859a6219/c66d7450-e368-11eb-9dae-b25a859a6219.snappy.parquet
s3://dea-dev-wps-results/wit/0c4f6d3e-e369-11eb-9dae-b25a859a6219/0c4f6d3e-e369-11eb-9dae-b25a859a6219.snappy.parquet
s3://dea-dev-wps-results/wit/22601e70-e369-11eb-9dae-b25a859a6219/22601e70-e369-11eb-9dae-b25a859a6219.snappy.parquet
s3://dea-dev-wps-results/wit/5fdae348-e369-11eb-9dae-b25