Skip to content

Commit

Permalink
Stats updates
Browse files Browse the repository at this point in the history
  • Loading branch information
omad committed Oct 21, 2016
1 parent 15a84e4 commit a2f7c69
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 20 deletions.
55 changes: 36 additions & 19 deletions datacube_apps/stats/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from datacube import Datacube
from datacube.api import make_mask
from datacube.api.grid_workflow import GridWorkflow, Tile
from datacube.api.query import query_group_by, query_geopolygon
from datacube.api.query import query_group_by, query_geopolygon, Query
from datacube.model import GridSpec, CRS, DatasetType, GeoBox, GeoPolygon
from datacube.storage.masking import mask_valid_data as mask_invalid_data
from datacube.ui import click as ui
Expand Down Expand Up @@ -124,8 +124,8 @@ def __init__(self):
#: How to slice a task up spatially to to fit into memory.
self.computation = None

#: A function to generate a sequence of date ranges.
self.generate_date_sequence = None
#: An iterable of date ranges.
self.date_ranges = None

#: Generates tasks to compute statistics. These tasks should be :class:`StatsTask` objects
#: and will define spatial and temporal boundaries, as well as statistical operations to be run.
Expand Down Expand Up @@ -158,7 +158,6 @@ def validate_config(self):
raise StatsConfigurationError("Configuration Error: listed measurements of source products "
"are not all the same.")

assert callable(self.generate_date_sequence)
assert callable(self.task_generator)
assert callable(self.output_driver)

Expand Down Expand Up @@ -190,7 +189,7 @@ def generate_tasks(self, index, output_products):
:param output_products: List of output product definitions
:return:
"""
for task in self.task_generator(index=index, generate_date_sequence=self.generate_date_sequence,
for task in self.task_generator(index=index, date_ranges=self.date_ranges,
sources_spec=self.sources):
task.output_products = output_products
yield task
Expand Down Expand Up @@ -306,16 +305,16 @@ def __getitem__(self, item):
return getattr(self, item)


def generate_gridded_tasks(index, sources_spec, generate_date_sequence, grid_spec, geopolygon=None):
def generate_gridded_tasks(index, sources_spec, date_ranges, grid_spec, geopolygon=None):
"""
Generate the required tasks through time and across a spatial grid.
:param index: Datacube Index
:return:
"""
workflow = GridWorkflow(index, grid_spec=grid_spec)
for time_period in generate_date_sequence():
_LOG.info('Making output_products tasks for time period: %s', time_period)
for time_period in date_ranges:
_LOG.debug('Making output_products tasks for time period: %s', time_period)

# Tasks are grouped by tile_index, and may contain sources from multiple places
# Each source may be masked by multiple masks
Expand All @@ -338,11 +337,13 @@ def generate_gridded_tasks(index, sources_spec, generate_date_sequence, grid_spe
'spec': source_spec,
})

if tasks:
_LOG.info('Created tasks for time period: %s', time_period)
for task in tasks.values():
yield task


def generate_non_gridded_tasks(index, storage, generate_date_sequence, input_region, sources_spec):
def generate_non_gridded_tasks(index, storage, date_ranges, input_region, sources_spec):
"""
Make stats tasks for a defined spatial region, that doesn't fit into a standard grid.
Expand All @@ -366,8 +367,7 @@ def make_tile(product, time, group_by):

return Tile(sources, geobox)

for time_period in generate_date_sequence():
_LOG.info('Making output_products tasks for time period: %s', time_period)
for time_period in date_ranges:

task = StatsTask(time_period)

Expand All @@ -392,6 +392,7 @@ def make_tile(product, time, group_by):
})

if task.sources:
_LOG.info('Created task for time period: %s', time_period)
yield task


Expand Down Expand Up @@ -425,7 +426,7 @@ def get_app_metadata(config_file):


def map_orderless(executor, core, tasks, queue=50):
tasks = (i for i in tasks) # ensure input is a generator
# tasks = (i for i in tasks) # ensure input is a generator

# pre-fill queue
results = [executor.submit(core, t) for t in itertools.islice(tasks, queue)]
Expand All @@ -442,7 +443,21 @@ def map_orderless(executor, core, tasks, queue=50):
yield executor.result(future)


def create_stats_app(filename):
def find_periods_with_data(index, sources, period_duration='1 day', start_date='1985-01-01', end_date='2000-01-01'):
query = dict(y=(-3760000, -3820000), x=(1375400.0, 1480600.0), crs='EPSG:3577', time=(start_date, end_date))
product_names = [source['product'] for source in sources]

valid_dates = set()
for product in product_names:
counts = index.datasets.count_product_through_time(period_duration, product=product,
**Query(**query).search_terms)
valid_dates.add(time_range for time_range, count in counts if count > 0)

for time_range in sorted(valid_dates):
yield time_range.begin, time_range.end


def create_stats_app(filename, index=None):
_, config = next(read_documents(filename))
stats_app = StatsApp()
stats_app.config_file = filename
Expand All @@ -452,22 +467,24 @@ def create_stats_app(filename):
stats_app.location = config['location']
stats_app.computation = config.get('computation', {'chunking': {'x': 1000, 'y': 1000}})

stats_app.generate_date_sequence = partial(date_sequence, start=pd.to_datetime(config['start_date']),
end=pd.to_datetime(config['end_date']),
stats_duration=config['stats_duration'],
step_size=config['step_size'])
date_ranges = config['date_ranges']
stats_app.date_ranges = date_sequence(start=pd.to_datetime(date_ranges['start_date']),
end=pd.to_datetime(date_ranges['end_date']),
stats_duration=date_ranges['stats_duration'],
step_size=date_ranges['step_size'])
if 'input_region' in config:
if config['input_region'].get('output_type') == 'tiled':
# A large, multi-tile input region, specified as geojson. Output will be individual tiles.
_LOG.info('Found geojson `input region`, outputing tiles.')
grid_spec = _make_grid_spec(config['storage'])
geopolygon = GeoPolygon.from_geojson(config['input_region']['geometry'], CRS('EPSG:4326'))
stats_app.task_generator = partial(generate_gridded_tasks, grid_spec=grid_spec, geopolygon=geopolygon)
else:
# Generate statistics for an Ungridded region. Output as a single file.
_LOG.info('Generating statistics for an ungridded `input region`. Output as a single file.')
stats_app.task_generator = partial(generate_non_gridded_tasks, input_region=config['input_region'],
storage=stats_app.storage)
else:
# Default output, full available spatial region.
_LOG.info('Default output, full available spatial region, gridded files.')
grid_spec = _make_grid_spec(config['storage'])
stats_app.task_generator = partial(generate_gridded_tasks, grid_spec=grid_spec)

Expand Down
6 changes: 5 additions & 1 deletion datacube_apps/stats/output_drivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,11 @@ def open_output_files(self):
})

output_name = prod_name + measurename
self.output_files[output_name] = rasterio.open(str(output_filename), mode='w', **profile)
src = rasterio.open(str(output_filename), mode='w', **profile)
# src.update_tags(created=self.app_info) # TODO record creation metadata
src.update_tags(1, platform=self.task.sources[0]['data'].product.name,
date='{:%Y-%m-%d}'.format(self.task.time_period[0]))
self.output_files[output_name] = src

def write_data(self, prod_name, measurement_name, tile_index, values):
output_name = prod_name + measurement_name
Expand Down

0 comments on commit a2f7c69

Please sign in to comment.