In [1]:
import pw
import json
import glob
import ccd
import hashlib
import os
import numpy as np
import xarray as xr
import pandas as pd
from datetime import datetime

from pyspark import SparkConf, SparkContext

In [2]:
from test.test_worker import mock_get_spectral_request
from test.test_worker import mock_get_tiles_request

In [3]:
# these need to be set to python3
os.getenv('PYSPARK_DRIVER_PYTHON') == os.getenv('PYSPARK_PYTHON')

True

In [4]:
conf = (SparkConf().setAppName("lcmap-gen-{}".format(datetime.now().strftime('%Y-%m-%d-%I:%M')))\
        .setMaster(pw.LPW_MESOS_MASTER)\
        .set("spark.mesos.executor.docker.image", pw.LPW_EXECUTOR_IMAGE)\
        .set("spark.executor.cores", pw.LPW_EXECUTOR_CORES)\
        .set("spark.mesos.executor.docker.forcePullImage", pw.LPW_EXECUTOR_FORCE_PULL_IMAGE))

In [5]:
sc = SparkContext(conf=conf)

In [6]:
good_input_data = {'algorithm': 'lcmap-pyccd:1.0.3.b1',
                   'tile_update_requested': '2017-02-28T04:35:46.023Z',
                   'tile_x': -1821585,
                   'tile_y': 2891595,
                   'inputs_url': 'http://somehost.com/landsat/tiles?x=-1851585&y=2870805&acquired=1980-01-01/2015-12-31&ubid=LANDSAT_4/TM/cfmask&ubid=LANDSAT_4/TM/sr_band1&ubid=LANDSAT_4/TM/sr_band2&ubid=LANDSAT_4/TM/sr_band3&ubid=LANDSAT_4/TM/sr_band4&ubid=LANDSAT_4/TM/sr_band5&ubid=LANDSAT_4/TM/sr_band7&ubid=LANDSAT_4/TM/toa_band6&ubid=LANDSAT_5/TM/cfmask&ubid=LANDSAT_5/TM/sr_band1&ubid=LANDSAT_5/TM/sr_band2&ubid=LANDSAT_5/TM/sr_band3&ubid=LANDSAT_5/TM/sr_band4&ubid=LANDSAT_5/TM/sr_band5&ubid=LANDSAT_5/TM/sr_band7&ubid=LANDSAT_5/TM/toa_band6&ubid=LANDSAT_7/ETM/cfmask&ubid=LANDSAT_7/ETM/sr_band1&ubid=LANDSAT_7/ETM/sr_band2&ubid=LANDSAT_7/ETM/sr_band3&ubid=LANDSAT_7/ETM/sr_band4&ubid=LANDSAT_7/ETM/sr_band5&ubid=LANDSAT_7/ETM/sr_band7&ubid=LANDSAT_7/ETM/toa_band6'}

In [7]:
# included here to mock spectral requests
# pw.worker.spectral_map
def spectral_map(specs_url):
    """ Return a dict of sensor bands keyed to their respective spectrum """
    _spec_map = dict()
    _map = {'thermal': 'toa -11', 'cfmask': '+cfmask -conf'}
    for bnd in ('blue', 'green', 'red', 'nir', 'swir1', 'swir2'):
        _map[bnd] = 'sr'

    try:
        for spectra in _map:
            url = "{specurl}?q=((tags:{band}) AND tags:{spec})".format(specurl=specs_url, spec=spectra, band=_map[spectra])
            #pw.logger.debug("tile-specs url:{}".format(url))
            #resp = get_request(url)
            resp = mock_get_spectral_request(url)
            # value needs to be a list, make it unique using set()
            _spec_map[spectra] = list(set([i['ubid'] for i in resp]))
        #_spec_whole = get_request(specs_url)
        _spec_whole = mock_get_spectral_request(specs_url)
    except Exception as e:
        raise Exception("Problem generating spectral map from api query, specs_url: {}\n message: {}".format(specs_url, e))

    return _spec_map, _spec_whole

In [8]:
# included here to use /data for spectral_map and tiles requests
# pw.worker.rainbow
def rainbow(x, y, t, specs_url, tiles_url, requested_ubids):
    """ Return all the landsat data, organized by spectra for a given x, y, and time-span """
    
    spec_map, spec_whole = spectral_map(specs_url)
    
    ds = xr.Dataset()
    for (spectrum, ubids) in spec_map.items():
        for ubid in ubids:
            if ubid in requested_ubids:
                params = {'ubid': ubid, 'x': x, 'y': y, 'acquired': t}
                
                #tiles_resp = get_request(tiles_url, params=params)
                tiles_resp = mock_get_tiles_request(tiles_url, params=params)
                
                if not tiles_resp:
                    raise Exception("No tiles returned for url: {} , params: {}".format(tiles_url, params))
                
                band = pw.worker.landsat_dataset(spectrum, ubid, spec_whole, tiles_resp)
                
                if band:
                    # combine_first instead of merge, for locations where data is missing for some bands
                    ds = ds.combine_first(band)
    return ds

In [9]:
# included so we can use our non-http request making rainbow function
def assemble_data(inputs, dimrng=100):
    """ Assemble data for RDD generation """
    output = []
    dates  = [i.split('=')[1] for i in inputs['inputs_url'].split('&') if 'acquired=' in i][0]
    tx, ty = int(inputs['tile_x']), int(inputs['tile_y'])
    t_url  = inputs['inputs_url'].split('?')[0]
    s_url  = t_url.replace('/tiles', '/tile-specs')
    qlist  = inputs['inputs_url'].split('?')[1].split('&')
    ubids  = [i.replace('ubid=', '') for i in qlist if 'ubid=' in i]

    rbow = rainbow(tx, ty, dates, s_url, t_url, ubids)
    for x in range(0, dimrng):
        for y in range(0, dimrng):
            row, col = y, x
            _d = dict()
            for _bnd in ('blue', 'green', 'red', 'swir1', 'swir2', 'thermal', 'cfmask', 'nir'):
                _d[_bnd] = np.array(rbow[_bnd].values[:, row, col])
            _d['dates'] = np.array(rbow['t'].values)
            px = tx+(x * 30)
            py = ty+(y * -30)
            output.append(((px, py), _d))
    return {'tile_x': tx, 'tile_y': ty, 'data': output}



In [10]:
# copy of pw.worker.detect method, which doesn't log or try to persist data to cassandra
def detect(input, tile_x, tile_y):                                                                                                                   
    """ Return results of ccd.detect for a given stack of data at a particular x and y """
    # input is a tuple: ((pixel x, pixel y), {bands dict}
    _px, _py = input[0][0], input[0][1]
    _bands   = input[1]
    output = dict()
    try:
        _results = ccd.detect(blues    = _bands['blue'],
                              greens   = _bands['green'],
                              reds     = _bands['red'],
                              nirs     = _bands['nir'],
                              swir1s   = _bands['swir1'],
                              swir2s   = _bands['swir2'],
                              thermals = _bands['thermal'],
                              quality  = _bands['cfmask'],
                              dates    = [pw.worker.dtstr_to_ordinal(str(pd.to_datetime(i)), False) for i in _bands['dates']])
        output['result'] = json.dumps(pw.worker.simplify_detect_results(_results))
        output['result_ok'] = True
        output['algorithm'] = _results['algorithm']
        output['tile_x'] = tile_x
        output['tile_y'] = tile_y
    except Exception as e:
        #pw.logger.error("Exception running ccd.detect: {}".format(e))
        print("exception message: {}".format(e))
        output['result'] = ''
        output['result_ok'] = False

    output['x'], output['y'] = _px, _py 
    output['result_md5'] = hashlib.md5(output['result'].encode('UTF-8')).hexdigest()
    output['result_produced'] = datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ')
    output['inputs_md5'] = 'not implemented'
    print("*** {} results for x, y: {}, {}".format(output['result_ok'], _px, _py))
    return output

In [11]:
data = assemble_data(good_input_data)

In [12]:
# we should have data for 10k pixels
len(data['data']) == 10000

True

In [13]:
# We dont need to operate on all 10k pixels
short_data = data['data'][:10]
ccd_rdd = sc.parallelize(short_data, pw.LPW_SPARK_PARALLELIZATION)

In [14]:
# map and collect pyccd results for our subset of data
# the first element of x is a tuple of the x and y coordinates for that pixel stack
x = ccd_rdd.map(lambda x: detect(x, x[0][0], x[0][1])).collect()

In [15]:
x[0]

{'algorithm': 'lcmap-pyccd:1.3.0',
 'inputs_md5': 'not implemented',
 'result': '{"processing_mask": [false, false, false, false, false, true, false, false, false, false, true, true, true, true, true, true, false, false, false, false, false, false, true, true, false, false, false, true, false, true, true, true, true, true, true, true, true, true, true, false, false, true, false, false, true, false, false, false, false, false, true, true, false, true, true, true, true, true, true, true, true, true, true, false, true, true, true, true, true, false, false, false, true, true, true, false, false, false, true, false, false, false, false, true, false, false, false, false, true, true, true, false, false, true, true, true, true, true, true, true, false, false, true, true, true, true, true, true, false, false, true, true, false, true, false, true, true, false, true, true, true, true, true, true, true, false, true, true, false, true, true, false, true, false, false, false, true, false, false, fal

In [7]:
# super simple rdd and spark job test, help ensure our spark jobs can get placed
#import random

#def inside(p):
#    x, y = random.random(), random.random()
#    return x*x + y*y < 1

In [8]:
#count = sc.parallelize(range(0, 10)).filter(inside).count()