Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce asynchronous fetching of HiPS tiles #106

Merged
merged 8 commits into from
Aug 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .rtd-environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies:
- reproject
- matplotlib
- tqdm
- aiohttp
# There's a problem with Sphinx 1.6 with astropy-helpers
# For now, we pin the Sphinx version to something that works
- sphinx==1.5.6
Expand Down
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ env:
- MAIN_CMD='python setup.py'
- SETUP_CMD='test'
- EVENT_TYPE='pull_request push'
- CONDA_DEPENDENCIES='healpy scikit-image Pillow reproject matplotlib tqdm'
- CONDA_DEPENDENCIES='healpy scikit-image Pillow reproject matplotlib tqdm aiohttp'
- PIP_DEPENDENCIES=''
- CONDA_CHANNELS='conda-forge astropy-ci-extras astropy'
- SETUP_XVFB=True
Expand Down
1 change: 1 addition & 0 deletions docs/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,6 @@ In addition, the following packages are needed for optional functionality:

* `Matplotlib`_ 2.0 or later. Used for plotting in examples.
* `tqdm`_. Used for showing progress bar either on terminal or in Jupyter notebook.
* `aiohttp`_. Used for fetching HiPS tiles.

We have some info at :ref:`py3` on why we don't support legacy Python (Python 2).
3 changes: 2 additions & 1 deletion docs/references.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
.. _HiPS paper: https://www.aanda.org/articles/aa/pdf/2015/06/aa26075-15.pdf
.. _HiPS IVOA recommendation: http://www.ivoa.net/documents/HiPS/
.. _HiPS at CDS: http://aladin.u-strasbg.fr/hips/
.. _tqdm: https://pypi.python.org/pypi/tqdm
.. _tqdm: https://pypi.python.org/pypi/tqdm
.. _aiohttp: http://aiohttp.readthedocs.io/en/stable/
35 changes: 15 additions & 20 deletions hips/draw/paint.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Licensed under a 3-clause BSD style license - see LICENSE.rst
import time
import numpy as np
from typing import List, Tuple, Union, Dict, Any, Iterator
from typing import List, Tuple, Union, Dict, Any
from astropy.wcs.utils import proj_plane_pixel_scales
from skimage.transform import ProjectiveTransform, warp
from ..tiles import HipsSurveyProperties, HipsTile, HipsTileMeta
from ..tiles import HipsSurveyProperties, HipsTile, HipsTileMeta, fetch_tiles
from ..tiles.tile import compute_image_shape
from ..utils import WCSGeometry, healpix_pixels_in_sky_image, hips_order_for_pixel_resolution

Expand Down Expand Up @@ -36,6 +36,9 @@ class HipsPainter:
Use the precise drawing algorithm
progress_bar : bool
Show a progress bar for tile fetching and drawing
fetch_opts : dict
Keyword arguments for fetching HiPS tiles. To see the
list of passable arguments, refer to `~fetch_tiles`

Examples
--------
Expand All @@ -59,12 +62,13 @@ class HipsPainter:
"""

def __init__(self, geometry: Union[dict, WCSGeometry], hips_survey: Union[str, HipsSurveyProperties],
tile_format: str, precise: bool = False, progress_bar: bool = True) -> None:
tile_format: str, precise: bool = False, progress_bar: bool = True, fetch_opts : dict = None) -> None:
self.geometry = WCSGeometry.make(geometry)
self.hips_survey = HipsSurveyProperties.make(hips_survey)
self.tile_format = tile_format
self.precise = precise
self.progress_bar = progress_bar
self.fetch_opts = fetch_opts
self._tiles = None
self.float_image = None
self._stats: Dict[str, Any] = {}
Expand Down Expand Up @@ -109,30 +113,22 @@ def projection(self, tile: HipsTile) -> ProjectiveTransform:
pt.estimate(src, dst)
return pt

def _fetch_tiles(self) -> Iterator[HipsTile]:
"""Generator function to fetch HiPS tiles from a remote URL."""
if self.progress_bar:
from tqdm import tqdm
tile_indices = tqdm(self.tile_indices, desc='Fetching tiles')
else:
tile_indices = self.tile_indices

for healpix_pixel_index in tile_indices:
@property
def tiles(self) -> List[HipsTile]:
"""List of `~hips.HipsTile` (cached on multiple access)."""
tile_metas = []
for healpix_pixel_index in self.tile_indices:
tile_meta = HipsTileMeta(
order=self.draw_hips_order,
ipix=healpix_pixel_index,
frame=self.hips_survey.astropy_frame,
file_format=self.tile_format,
)
url = self.hips_survey.tile_url(tile_meta)
tile = HipsTile.fetch(tile_meta, url)
yield tile
tile_metas.append(tile_meta)

@property
def tiles(self) -> List[HipsTile]:
"""List of `~hips.HipsTile` (cached on multiple access)."""
if self._tiles is None:
self._tiles = list(self._fetch_tiles())
self._tiles = fetch_tiles(tile_metas=tile_metas, hips_survey=self.hips_survey,
progress_bar=self.progress_bar, **(self.fetch_opts or {}))

return self._tiles

Expand Down Expand Up @@ -163,7 +159,6 @@ def run(self) -> np.ndarray:
self._stats['consumed_memory'] += len(tile.raw_data)



def make_tile_list(self):
parent_tiles = self.tiles

Expand Down
6 changes: 4 additions & 2 deletions hips/draw/tests/test_paint.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ def setup_class(cls):
width=2000, height=1000, fov="3 deg",
coordsys='icrs', projection='AIT',
)
cls.painter = HipsPainter(cls.geometry, cls.hips_survey, 'fits')
fetch_opts = dict(fetch_package='urllib', timeout=30, n_parallel=10)
cls.painter = HipsPainter(cls.geometry, cls.hips_survey, 'fits', fetch_opts=fetch_opts)

def test_draw_hips_order(self):
assert self.painter.draw_hips_order == 7
Expand All @@ -43,7 +44,8 @@ def test_compute_matching_hips_order(self, pars):
coordsys='icrs', projection='AIT',
)

simple_tile_painter = HipsPainter(geometry, self.hips_survey, 'fits')
fetch_opts = dict(fetch_package='urllib', timeout=30, n_parallel=10)
simple_tile_painter = HipsPainter(geometry, self.hips_survey, 'fits', fetch_opts=fetch_opts)
assert simple_tile_painter.draw_hips_order == pars['order']

def test_run(self):
Expand Down
4 changes: 3 additions & 1 deletion hips/draw/tests/test_ui.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ def test_make_sky_image(tmpdir, pars):
hips_survey = HipsSurveyProperties.fetch(url=pars['url'])
geometry = make_test_wcs_geometry()

result = make_sky_image(geometry=geometry, hips_survey=hips_survey, tile_format=pars['file_format'], precise=pars['precise'])
fetch_opts = dict(fetch_package='urllib', timeout=30, n_parallel=10)
result = make_sky_image(geometry=geometry, hips_survey=hips_survey, tile_format=pars['file_format'],
precise=pars['precise'], fetch_opts=fetch_opts)

assert result.image.shape == pars['shape']
assert result.image.dtype == pars['dtype']
Expand Down
7 changes: 5 additions & 2 deletions hips/draw/ui.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


def make_sky_image(geometry: Union[dict, WCSGeometry], hips_survey: Union[str, 'HipsSurveyProperties'],
tile_format: str, precise: bool = False, progress_bar: bool = True) -> 'HipsDrawResult':
tile_format: str, precise: bool = False, progress_bar: bool = True, fetch_opts: dict = None) -> 'HipsDrawResult':
"""Make sky image: fetch tiles and draw.

The example for this can be found on the :ref:`gs` page.
Expand All @@ -33,13 +33,16 @@ def make_sky_image(geometry: Union[dict, WCSGeometry], hips_survey: Union[str, '
Use the precise drawing algorithm
progress_bar : bool
Show a progress bar for tile fetching and drawing
fetch_opts : dict
Keyword arguments for fetching HiPS tiles. To see the
list of passable arguments, refer to `~hips.fetch_tiles`

Returns
-------
result : `~hips.HipsDrawResult`
Result object
"""
painter = HipsPainter(geometry, hips_survey, tile_format, precise, progress_bar)
painter = HipsPainter(geometry, hips_survey, tile_format, precise, progress_bar, fetch_opts)
painter.run()
return HipsDrawResult.from_painter(painter)

Expand Down
1 change: 1 addition & 0 deletions hips/tiles/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
from .tile import *
from .survey import *
from .allsky import *
from .fetch import *
157 changes: 157 additions & 0 deletions hips/tiles/fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# Licensed under a 3-clause BSD style license - see LICENSE.rst
import asyncio
import urllib.request
import concurrent.futures
from typing import List
from ..tiles import HipsSurveyProperties, HipsTile, HipsTileMeta

__all__ = [
'fetch_tiles',
]

__doctest_skip__ = [
'fetch_tiles',
]


def fetch_tiles(tile_metas: List[HipsTileMeta], hips_survey: HipsSurveyProperties,
progress_bar: bool = True, n_parallel: int = 5,
timeout: float = 10, fetch_package: str = 'urllib') -> List[HipsTile]:
"""Fetch a list of HiPS tiles.

This function fetches a list of HiPS tiles based
on their URLs, which are generated using ``hips_survey``
and ``tile_metas``.

The tiles are then fetched asynchronously using ``urllib`` or ``aiohttp``.

Parameters
----------
tile_metas : list
Python list of `~hips.HipsTileMeta`
hips_survey : `~hips.HipsSurveyProperties`
HiPS survey properties
progress_bar : bool
Show a progress bar for tile fetching and drawing
n_parallel : int
Number of tile fetch web requests to make in parallel
timeout : float
Seconds to timeout for fetching a HiPS tile
fetch_package : {'urllib', 'aiohttp'}
Package to use for fetching HiPS tiles

Examples
--------
Define a list of tiles we want::

from hips import HipsSurveyProperties, HipsTileMeta
from hips import fetch_tiles
url = 'http://alasky.unistra.fr/DSS/DSS2Merged/properties'
hips_survey = HipsSurveyProperties.fetch(url)
tile_indices = [69623, 69627, 69628, 69629, 69630, 69631]
tile_metas = []
for healpix_pixel_index in tile_indices:
tile_meta = HipsTileMeta(
order=7,
ipix=healpix_pixel_index,
frame=hips_survey.astropy_frame,
file_format='fits',
)
tile_metas.append(tile_meta)

Fetch all tiles (in parallel)::

tiles = fetch_tiles(tile_metas, hips_survey)

Returns
-------
tiles : list
A Python list of `~hips.HipsTile`
"""
if fetch_package == 'aiohttp':
fetch_fct = tiles_aiohttp
elif fetch_package == 'urllib':
fetch_fct = tiles_urllib
else:
raise ValueError(f'Invalid package name: {fetch_package}')

tiles = fetch_fct(tile_metas, hips_survey, progress_bar, n_parallel, timeout)

# Sort tiles to match the tile_meta list
# TODO: this doesn't seem like a great solution.
# Use OrderedDict instead?
out = []
for tile_meta in tile_metas:
for tile in tiles:
if tile.meta == tile_meta:
out.append(tile)
continue
return out


def fetch_tile_urllib(url: str, meta: HipsTileMeta, timeout: float) -> HipsTile:
"""Fetch a HiPS tile asynchronously."""
with urllib.request.urlopen(url, timeout=timeout) as conn:
raw_data = conn.read()
return HipsTile(meta, raw_data)


def tiles_urllib(tile_metas: List[HipsTileMeta], hips_survey: HipsSurveyProperties,
progress_bar: bool, n_parallel, timeout: float) -> List[HipsTile]:
"""Generator function to fetch HiPS tiles from a remote URL."""
with concurrent.futures.ThreadPoolExecutor(max_workers=n_parallel) as executor:
futures = []
for meta in tile_metas:
url = hips_survey.tile_url(meta)
future = executor.submit(fetch_tile_urllib, url, meta, timeout)
futures.append(future)

futures = concurrent.futures.as_completed(futures)
if progress_bar:
from tqdm import tqdm
futures = tqdm(futures, total=len(tile_metas), desc='Fetching tiles')

tiles = []
for future in futures:
tiles.append(future.result())

return tiles


async def fetch_tile_aiohttp(url: str, meta: HipsTileMeta, session, timeout: float) -> HipsTile:
"""Fetch a HiPS tile asynchronously using aiohttp."""
async with session.get(url, timeout=timeout) as response:
raw_data = await response.read()
return HipsTile(meta, raw_data)


async def fetch_all_tiles_aiohttp(tile_metas: List[HipsTileMeta], hips_survey: HipsSurveyProperties,
progress_bar: bool, n_parallel: int, timeout: float) -> List[HipsTile]:
"""Generator function to fetch HiPS tiles from a remote URL using aiohttp."""
import aiohttp

connector = aiohttp.TCPConnector(limit=n_parallel)
async with aiohttp.ClientSession(connector=connector) as session:
futures = []
for meta in tile_metas:
url = hips_survey.tile_url(meta)
future = asyncio.ensure_future(fetch_tile_aiohttp(url, meta, session, timeout))
futures.append(future)

futures = asyncio.as_completed(futures)
if progress_bar:
from tqdm import tqdm
futures = tqdm(futures, total=len(tile_metas), desc='Fetching tiles')

tiles = []
for future in futures:
tiles.append(await future)

return tiles


def tiles_aiohttp(tile_metas: List[HipsTileMeta], hips_survey: HipsSurveyProperties,
progress_bar: bool, n_parallel: int, timeout: float) -> List[HipsTile]:
return asyncio.get_event_loop().run_until_complete(
fetch_all_tiles_aiohttp(tile_metas, hips_survey, progress_bar, n_parallel, timeout)
)
55 changes: 55 additions & 0 deletions hips/tiles/tests/test_fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Licensed under a 3-clause BSD style license - see LICENSE.rst
import pytest
from astropy.tests.helper import remote_data
from numpy.testing import assert_allclose
from ..fetch import fetch_tiles
from ..survey import HipsSurveyProperties
from ..tile import HipsTileMeta

TILE_FETCH_TEST_CASES = [
dict(
tile_indices=[69623, 69627, 69628, 69629, 69630, 69631],
tile_format='fits',
order=7,
url='http://alasky.unistra.fr/DSS/DSS2Merged/properties',
progress_bar=True,
data=[2101, 1945, 1828, 1871, 2079, 2336],
fetch_package='urllib',
),
dict(
tile_indices=[69623, 69627, 69628, 69629, 69630, 69631],
tile_format='fits',
order=7,
url='http://alasky.unistra.fr/DSS/DSS2Merged/properties',
progress_bar=True,
data=[2101, 1945, 1828, 1871, 2079, 2336],
fetch_package='aiohttp',
),
]


def make_tile_metas(hips_survey, pars):
for healpix_pixel_index in pars['tile_indices']:
yield HipsTileMeta(
order=pars['order'],
ipix=healpix_pixel_index,
frame=hips_survey.astropy_frame,
file_format=pars['tile_format'],
)


@pytest.mark.parametrize('pars', TILE_FETCH_TEST_CASES)
@remote_data
def test_fetch_tiles(pars):
hips_survey = HipsSurveyProperties.fetch(pars['url'])

tile_metas = list(make_tile_metas(hips_survey, pars))

tiles = fetch_tiles(
tile_metas, hips_survey,
progress_bar=pars['progress_bar'],
fetch_package=pars['fetch_package'],
)

for idx, val in enumerate(pars['data']):
assert_allclose(tiles[idx].data[0][5], val)
Loading