Skip to content

Commit

Permalink
Update aiohttp dependency config
Browse files Browse the repository at this point in the history
  • Loading branch information
adl1995 committed Aug 22, 2017
1 parent e511852 commit 1116a7a
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 126 deletions.
1 change: 1 addition & 0 deletions .rtd-environment.yml
Expand Up @@ -12,6 +12,7 @@ dependencies:
- reproject
- matplotlib
- tqdm
- aiohttp
# There's a problem with Sphinx 1.6 with astropy-helpers
# For now, we pin the Sphinx version to something that works
- sphinx==1.5.6
Expand Down
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -27,7 +27,7 @@ env:
- MAIN_CMD='python setup.py'
- SETUP_CMD='test'
- EVENT_TYPE='pull_request push'
- CONDA_DEPENDENCIES='healpy scikit-image Pillow reproject matplotlib tqdm'
- CONDA_DEPENDENCIES='healpy scikit-image Pillow reproject matplotlib tqdm aiohttp'
- PIP_DEPENDENCIES=''
- CONDA_CHANNELS='conda-forge astropy-ci-extras astropy'
- SETUP_XVFB=True
Expand Down
1 change: 1 addition & 0 deletions docs/installation.rst
Expand Up @@ -80,5 +80,6 @@ In addition, the following packages are needed for optional functionality:

* `Matplotlib`_ 2.0 or later. Used for plotting in examples.
* `tqdm`_. Used for showing progress bar either on terminal or in Jupyter notebook.
* `aiohttp`_. Used for fetching HiPS tiles.

We have some info at :ref:`py3` on why we don't support legacy Python (Python 2).
3 changes: 2 additions & 1 deletion docs/references.txt
Expand Up @@ -6,4 +6,5 @@
.. _HiPS paper: https://www.aanda.org/articles/aa/pdf/2015/06/aa26075-15.pdf
.. _HiPS IVOA recommendation: http://www.ivoa.net/documents/HiPS/
.. _HiPS at CDS: http://aladin.u-strasbg.fr/hips/
.. _tqdm: https://pypi.python.org/pypi/tqdm
.. _tqdm: https://pypi.python.org/pypi/tqdm
.. _aiohttp: http://aiohttp.readthedocs.io/en/stable/
16 changes: 7 additions & 9 deletions hips/draw/paint.py
Expand Up @@ -4,7 +4,7 @@
from typing import List, Tuple, Union, Dict, Any
from astropy.wcs.utils import proj_plane_pixel_scales
from skimage.transform import ProjectiveTransform, warp
from ..tiles import HipsSurveyProperties, HipsTile, HipsTileMeta, HipsTileFetcher
from ..tiles import HipsSurveyProperties, HipsTile, HipsTileMeta, fetch_tiles
from ..tiles.tile import compute_image_shape
from ..utils import WCSGeometry, healpix_pixels_in_sky_image, hips_order_for_pixel_resolution

Expand Down Expand Up @@ -36,8 +36,8 @@ class HipsPainter:
Use the precise drawing algorithm
progress_bar : bool
Show a progress bar for tile fetching and drawing
fetch_package : {'urllib', 'aiohttp'}
Package to use for fetching HiPS tiles
fetch_opts : dict
Keyword arguments for fetching HiPS tiles
Examples
--------
Expand All @@ -61,13 +61,13 @@ class HipsPainter:
"""

def __init__(self, geometry: Union[dict, WCSGeometry], hips_survey: Union[str, HipsSurveyProperties],
tile_format: str, precise: bool = False, progress_bar: bool = True, fetch_package: str = 'urllib') -> None:
tile_format: str, precise: bool = False, progress_bar: bool = True, fetch_opts : dict = None) -> None:
self.geometry = WCSGeometry.make(geometry)
self.hips_survey = HipsSurveyProperties.make(hips_survey)
self.tile_format = tile_format
self.precise = precise
self.progress_bar = progress_bar
self.fetch_package = fetch_package
self.fetch_opts = fetch_opts
self._tiles = None
self.float_image = None
self._stats: Dict[str, Any] = {}
Expand Down Expand Up @@ -125,11 +125,9 @@ def tiles(self) -> List[HipsTile]:
)
tile_metas.append(tile_meta)

tile_fetcher = HipsTileFetcher(tile_metas=tile_metas, hips_survey=self.hips_survey,
progress_bar=self.progress_bar, fetch_package=self.fetch_package)

if self._tiles is None:
self._tiles = tile_fetcher.tiles
self._tiles = fetch_tiles(tile_metas=tile_metas, hips_survey=self.hips_survey,
progress_bar=self.progress_bar, **self.fetch_opts)

return self._tiles

Expand Down
6 changes: 4 additions & 2 deletions hips/draw/tests/test_paint.py
Expand Up @@ -20,7 +20,8 @@ def setup_class(cls):
width=2000, height=1000, fov="3 deg",
coordsys='icrs', projection='AIT',
)
cls.painter = HipsPainter(cls.geometry, cls.hips_survey, 'fits', fetch_package='aiohttp')
fetch_opts = dict(fetch_package='urllib', timeout=30, n_parallel=10)
cls.painter = HipsPainter(cls.geometry, cls.hips_survey, 'fits', fetch_opts=fetch_opts)

def test_draw_hips_order(self):
assert self.painter.draw_hips_order == 7
Expand All @@ -43,7 +44,8 @@ def test_compute_matching_hips_order(self, pars):
coordsys='icrs', projection='AIT',
)

simple_tile_painter = HipsPainter(geometry, self.hips_survey, 'fits', fetch_package='aiohttp')
fetch_opts = dict(fetch_package='urllib', timeout=30, n_parallel=10)
simple_tile_painter = HipsPainter(geometry, self.hips_survey, 'fits', fetch_opts=fetch_opts)
assert simple_tile_painter.draw_hips_order == pars['order']

def test_run(self):
Expand Down
4 changes: 3 additions & 1 deletion hips/draw/tests/test_ui.py
Expand Up @@ -61,7 +61,9 @@ def test_make_sky_image(tmpdir, pars):
hips_survey = HipsSurveyProperties.fetch(url=pars['url'])
geometry = make_test_wcs_geometry()

result = make_sky_image(geometry=geometry, hips_survey=hips_survey, tile_format=pars['file_format'], precise=pars['precise'])
fetch_opts = dict(fetch_package='urllib', timeout=30, n_parallel=10)
result = make_sky_image(geometry=geometry, hips_survey=hips_survey, tile_format=pars['file_format'],
precise=pars['precise'], fetch_opts=fetch_opts)

assert result.image.shape == pars['shape']
assert result.image.dtype == pars['dtype']
Expand Down
8 changes: 4 additions & 4 deletions hips/draw/ui.py
Expand Up @@ -15,7 +15,7 @@


def make_sky_image(geometry: Union[dict, WCSGeometry], hips_survey: Union[str, 'HipsSurveyProperties'],
tile_format: str, precise: bool = False, progress_bar: bool = True, fetch_package: str = 'urllib') -> 'HipsDrawResult':
tile_format: str, precise: bool = False, progress_bar: bool = True, fetch_opts: dict = None) -> 'HipsDrawResult':
"""Make sky image: fetch tiles and draw.
The example for this can be found on the :ref:`gs` page.
Expand All @@ -33,15 +33,15 @@ def make_sky_image(geometry: Union[dict, WCSGeometry], hips_survey: Union[str, '
Use the precise drawing algorithm
progress_bar : bool
Show a progress bar for tile fetching and drawing
fetch_package : {'urllib', 'aiohttp'}
Package to use for fetching HiPS tiles
fetch_opts : dict
Keyword arguments for fetching HiPS tiles
Returns
-------
result : `~hips.HipsDrawResult`
Result object
"""
painter = HipsPainter(geometry, hips_survey, tile_format, precise, progress_bar, fetch_package)
painter = HipsPainter(geometry, hips_survey, tile_format, precise, progress_bar, fetch_opts)
painter.run()
return HipsDrawResult.from_painter(painter)

Expand Down
159 changes: 75 additions & 84 deletions hips/tiles/fetch.py
@@ -1,18 +1,23 @@
# Licensed under a 3-clause BSD style license - see LICENSE.rst
import numpy as np
import asyncio
import urllib.request
import concurrent.futures
from typing import Generator, List
from ..tiles import HipsSurveyProperties, HipsTile, HipsTileMeta

__all__ = [
'HipsTileFetcher',
'fetch_tiles',
]


class HipsTileFetcher:
def fetch_tiles(tile_metas: List[HipsTileMeta], hips_survey : HipsSurveyProperties,
progress_bar: bool = False, n_parallel: int = 10, timeout: int = 10, fetch_package : str = 'urllib') -> List[HipsTile]:
"""Fetch a list of HiPS tiles.
This function fetches a list of HiPS tiles based
on their URLs, which are generated using `hips_survey`
and `tile_metas`. The tiles are then fetched asynchronously
using urllib or aiohttp.
Parameters
----------
tile_metas : List[HipsTileMeta]
Expand All @@ -28,86 +33,72 @@ class HipsTileFetcher:
fetch_package : {'urllib', 'aiohttp'}
Package to use for fetching HiPS tiles
"""

def __init__(self, tile_metas: List[HipsTileMeta], hips_survey : HipsSurveyProperties,
progress_bar: bool = False, n_parallel: int = 10, timeout: int = 10, fetch_package : str = 'urllib') -> None:
self.tile_metas = tile_metas
self.hips_survey = hips_survey
self.progress_bar = progress_bar
self.n_parallel = n_parallel
self.timeout = timeout
self.fetch_package = fetch_package

@property
def tile_urls(self) -> List[str]:
"""List of tile URLs"""
tile_urls = []
for meta in self.tile_metas:
tile_urls.append(self.hips_survey.tile_url(meta))

return tile_urls

@property
def tiles(self):
if self.fetch_package == 'aiohttp':
return self.tiles_aiohttp
elif self.fetch_package == 'urllib':
return self.tiles_urllib
if fetch_package == 'aiohttp':
return tiles_aiohttp(tile_metas, hips_survey, progress_bar)
elif fetch_package == 'urllib':
return tiles_urllib(tile_metas, hips_survey, progress_bar, n_parallel, timeout)
else:
raise ValueError(f'Invalid package name: {fetch_package}')

def tile_urls(tile_metas: List[HipsTileMeta], hips_survey : HipsSurveyProperties) -> List[str]:
"""List of tile URLs"""
return [hips_survey.tile_url(meta) for meta in tile_metas]

def fetch_tile_urllib(url: str, meta: HipsTileMeta, timeout: int) -> Generator:
"""Fetch a HiPS tile asynchronously."""
with urllib.request.urlopen(url, timeout=timeout) as conn:
raw_data = conn.read()
return HipsTile(meta, raw_data)

def tiles_urllib(tile_metas: List[HipsTileMeta], hips_survey : HipsSurveyProperties,
progress_bar: bool = False, n_parallel: int = 10, timeout: int = 10) -> List[HipsTile]:
"""Generator function to fetch HiPS tiles from a remote URL."""
with concurrent.futures.ThreadPoolExecutor(max_workers=n_parallel) as executor:
future_to_url = {executor.submit(
fetch_tile_urllib,
url,
tile_metas[idx],
timeout)
: url for idx, url in enumerate(tile_urls(tile_metas, hips_survey))}

if progress_bar:
from tqdm import tqdm
requests = tqdm(future_to_url, total=len(future_to_url), desc='Fetching tiles')
else:
raise ValueError(f'Invalid package name: {self.fetch_package}')

def fetch_tile_urllib(self, url: str, meta : HipsTileMeta) -> Generator:
"""Fetch a HiPS tile asynchronously."""
with urllib.request.urlopen(url, timeout=self.timeout) as conn:
raw_data = conn.read()
return HipsTile(meta, raw_data)

@property
def tiles_urllib(self) -> np.ndarray:
"""Generator function to fetch HiPS tiles from a remote URL."""
with concurrent.futures.ThreadPoolExecutor(max_workers=self.n_parallel) as executor:
future_to_url = {executor.submit(self.fetch_tile_urllib, url, self.tile_metas[idx]) : url for idx, url in enumerate(self.tile_urls)}

if self.progress_bar:
from tqdm import tqdm
requests = tqdm(concurrent.futures.as_completed(future_to_url), total=len(future_to_url), desc='Fetching tiles')
else:
requests = future_to_url#concurrent.futures.as_completed(future_to_url)
requests = future_to_url

tiles = []
for request in requests:
tiles.append(request.result())

return tiles

async def fetch_tile_aiohttp(url: str, meta : HipsTileMeta, session) -> Generator:
"""Fetch a HiPS tile asynchronously using aiohttp."""
async with session.get(url) as response:
raw_data = await response.read()
return HipsTile(meta, raw_data)

async def fetch_all_tiles_aiohttp(tile_metas: List[HipsTileMeta], hips_survey : HipsSurveyProperties, progress_bar: bool) -> List[HipsTile]:
"""Generator function to fetch HiPS tiles from a remote URL using aiohttp."""
import aiohttp

tasks = []
async with aiohttp.ClientSession() as session:
for idx, url in enumerate(tile_urls(tile_metas, hips_survey)):
task = asyncio.ensure_future(fetch_tile_aiohttp(url.format(idx), tile_metas[idx], session))
tasks.append(task)

if progress_bar:
from tqdm import tqdm
tiles = []
for future in requests:
tiles.append(future.result())

return tiles

async def fetch_tile_aiohttp(self, url: str, meta : HipsTileMeta, session) -> Generator:
"""Fetch a HiPS tile asynchronously using aiohttp."""
async with session.get(url) as response:
raw_data = await response.read()
return HipsTile(meta, raw_data)

@property
async def fetch_all_tiles_aiohttp(self) -> np.ndarray:
"""Generator function to fetch HiPS tiles from a remote URL using aiohttp."""
import aiohttp, asyncio

tasks = []
async with aiohttp.ClientSession() as session:
for idx, url in enumerate(self.tile_urls):
task = asyncio.ensure_future(self.fetch_tile_aiohttp(url.format(idx), self.tile_metas[idx], session))
tasks.append(task)

if self.progress_bar:
from tqdm import tqdm
tiles = []
for f in tqdm(tasks, total=len(tasks), desc='Fetching tiles'):
tiles.append(await f)
else:
tiles = await asyncio.gather(*tasks)

return tiles

@property
def tiles_aiohttp(self) -> np.ndarray:
import asyncio
return asyncio.get_event_loop().run_until_complete(self.fetch_all_tiles_aiohttp)
for f in tqdm(tasks, total=len(tasks), desc='Fetching tiles'):
tiles.append(await f)
else:
tiles = await asyncio.gather(*tasks)

return tiles

def tiles_aiohttp(tile_metas: List[HipsTileMeta], hips_survey : HipsSurveyProperties,
progress_bar: bool) -> List[HipsTile]:
return asyncio.get_event_loop().run_until_complete(fetch_all_tiles_aiohttp(tile_metas, hips_survey, progress_bar))
61 changes: 37 additions & 24 deletions hips/tiles/tests/test_fetch.py
@@ -1,34 +1,47 @@
# Licensed under a 3-clause BSD style license - see LICENSE.rst
import pytest
from astropy.tests.helper import remote_data
from numpy.testing import assert_allclose
from ..fetch import HipsTileFetcher
from ..fetch import fetch_tiles
from ..survey import HipsSurveyProperties
from ..tile import HipsTileMeta

class TestHipsTileFetcher:
@classmethod
def setup_class(cls):
url = 'http://alasky.unistra.fr/DSS/DSS2Merged/properties'
hips_survey = HipsSurveyProperties.fetch(url)
TILE_FETCH_TEST_CASES = [
dict(
tile_indices=[69623, 69627, 69628, 69629, 69630, 69631],
tile_format='fits',
order=7,
url='http://alasky.unistra.fr/DSS/DSS2Merged/properties',
progress_bar=True,
data=[2101, 1680, 1532, 1625, 2131],
fetch_package='urllib'
),
dict(
tile_indices=[69623, 69627, 69628, 69629, 69630, 69631],
tile_format='fits',
order=7,
url='http://alasky.unistra.fr/DSS/DSS2Merged/properties',
progress_bar=True,
data=[2101, 1680, 1532, 1625, 2131],
fetch_package='aiohttp'
),
]

tile_metas, tile_indices = [], [69623, 69627, 69628, 69629, 69630, 69631]
for healpix_pixel_index in tile_indices:
tile_meta = HipsTileMeta(
order=7,
ipix=healpix_pixel_index,
frame=hips_survey.astropy_frame,
file_format='fits',
)
tile_metas.append(tile_meta)

cls.fetcher = HipsTileFetcher(tile_metas, hips_survey, progress_bar=False)
@pytest.mark.parametrize('pars', TILE_FETCH_TEST_CASES)
@remote_data
def test_fetch_tiles(pars):
hips_survey = HipsSurveyProperties.fetch(pars['url'])

@remote_data
def test_tiles(self):
tiles = self.fetcher.tiles
assert_allclose(tiles[0].data[0][5:10], [2101, 1680, 1532, 1625, 2131])
tile_metas = []
for healpix_pixel_index in pars['tile_indices']:
tile_meta = HipsTileMeta(
order=pars['order'],
ipix=healpix_pixel_index,
frame=hips_survey.astropy_frame,
file_format=pars['tile_format'],
)
tile_metas.append(tile_meta)

@remote_data
def test_tiles_aiohttp(self):
tiles = self.fetcher.tiles_aiohttp
assert_allclose(tiles[0].data[0][5:10], [2101, 1680, 1532, 1625, 2131])
tiles = fetch_tiles(tile_metas, hips_survey, progress_bar=pars['progress_bar'], fetch_package=pars['fetch_package'])
assert_allclose(tiles[0].data[0][5:10], [2101, 1680, 1532, 1625, 2131])

0 comments on commit 1116a7a

Please sign in to comment.