Skip to content

Commit

Permalink
Merge pull request #79 from alaniwi/combined_changes
Browse files Browse the repository at this point in the history
fix #74, #75 and update tests (see description)
  • Loading branch information
agstephens committed Nov 3, 2021
2 parents da6800c + 3cd87f8 commit f60330a
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 52 deletions.
6 changes: 4 additions & 2 deletions pyesgf/search/connection.py
Expand Up @@ -49,8 +49,10 @@ class SearchConnection(object):
of the ESGF search service excluding the final endpoint name.
Usually this is http://<hostname>/esg-search
:ivar distrib: Boolean stating whether searches through this connection are
distributed. I.e. whether the Search service distributes the query to
other search peers.
distributed. i.e. whether the Search service distributes the query to
other search peers. See also the documentation for the ``facets``
argument to ``pyesgf.search.context.SearchContext`` in relation to
distributed searches.
:ivar cache: Path to `sqlite` cache file. Cache expires every hours.
:ivar timeout: Time (in seconds) before query returns an error.
Default: 120s.
Expand Down
63 changes: 52 additions & 11 deletions pyesgf/search/context.py
Expand Up @@ -8,6 +8,8 @@
"""

import os
import sys
import copy

from webob.multidict import MultiDict
Expand All @@ -20,8 +22,7 @@


class SearchContext(object):
"""
Instances of this class represent the state of a current search.
"""Instances of this class represent the state of a current search.
It exposes what facets are available to select and the facet counts
if they are available.
Expand All @@ -35,6 +36,15 @@ class SearchContext(object):
:ivar constraints: A dictionary of facet constraints currently in effect.
``constraint[facet_name] = [value, value, ...]``
:ivar facets: A string containing a comma-separated list of facets to be
returned (for example ``'source_id,ensemble_id'``). If set, this will
be used to select which facet counts to include, as returned in the
``facet_counts`` dictionary. Defaults to including all available
facets, but with distributed searches (where the SearchConnection
instance was created with ``distrib=True``), some results may be
missing for server-side reasons when requesting all facets, so a
warning message will be issued. This contains further details.
:property facet_counts: A dictionary of available hits with each
facet value for the search as currently constrained.
This property returns a dictionary of dictionaries where
Expand Down Expand Up @@ -76,15 +86,15 @@ def __init__(self, connection, constraints, search_type=None,
self.connection = connection
self.__facet_counts = None
self.__hit_count = None

self._did_facets_star_warning = False
if search_type is None:
search_type = self.DEFAULT_SEARCH_TYPE

# Constraints
self.freetext_constraint = None
self.facet_constraints = MultiDict()
self.temporal_constraint = [from_timestamp, to_timestamp]
self.geosplatial_constraint = None
self.geospatial_constraint = None

self._update_constraints(constraints)

Expand Down Expand Up @@ -113,7 +123,9 @@ def search(self, batch_size=DEFAULT_BATCH_SIZE, ignore_facet_check=False,
Perform the search with current constraints returning a set of results.
:batch_size: The number of results to get per HTTP request.
:param constraints: Further constraints for this query. Equivilent
:ignore_facet_check: Do not make an extra HTTP request to populate
:py:attr:`~facet_counts` and :py:attr:`~hit_count`.
:param constraints: Further constraints for this query. Equivalent
to calling ``self.constrain(**constraints).search()``
:return: A ResultSet for this query
Expand All @@ -123,7 +135,8 @@ def search(self, batch_size=DEFAULT_BATCH_SIZE, ignore_facet_check=False,
else:
sc = self

sc.__update_counts(ignore_facet_check=ignore_facet_check)
if not ignore_facet_check:
sc.__update_counts()

return ResultSet(sc, batch_size=batch_size)

Expand All @@ -140,7 +153,7 @@ def get_download_script(self, **constraints):
"""
Download a script for downloading all files in the set of results.
:param constraints: Further constraints for this query. Equivilent
:param constraints: Further constraints for this query. Equivalent
to calling ``self.constrain(**constraints).get_download_script()``
:return: A string containing the script
"""
Expand Down Expand Up @@ -188,7 +201,7 @@ def get_facet_options(self):

return facet_options

def __update_counts(self, ignore_facet_check=False):
def __update_counts(self):
# If hit_count is set the counts are already retrieved
if self.__hit_count is not None:
return
Expand All @@ -197,11 +210,12 @@ def __update_counts(self, ignore_facet_check=False):
self.__hit_count = None
query_dict = self._build_query()

if not ignore_facet_check:
query_dict['facets'] = '*'

if self.facets:
query_dict['facets'] = self.facets
else:
query_dict['facets'] = '*'
if self.connection.distrib:
self._do_facets_star_warning()

response = self.connection.send_search(query_dict, limit=0)
for facet, counts in (list(response['facet_counts']['facet_fields'].items())):
Expand All @@ -211,6 +225,33 @@ def __update_counts(self, ignore_facet_check=False):

self.__hit_count = response['response']['numFound']

def _do_facets_star_warning(self):
env_var_name = 'ESGF_PYCLIENT_NO_FACETS_STAR_WARNING'
if env_var_name in os.environ:
return
if not self._did_facets_star_warning:
sys.stderr.write(f'''
-------------------------------------------------------------------------------
Warning - defaulting to search with facets=*
This behavior is kept for backward-compatibility, but ESGF indexes might not
successfully perform a distributed search when this option is used, so some
results may be missing. For full results, it is recommended to pass a list of
facets of interest when instantiating a context object. For example,
ctx = conn.new_context(facets='project,experiment_id')
Only the facets that you specify will be present in the facets_counts dictionary.
This warning is displayed when a distributed search is performed while using the
facets=* default, a maximum of once per context object. To suppress this warning,
set the environment variable {env_var_name} to any value
or explicitly use conn.new_context(facets='*')
-------------------------------------------------------------------------------
''')
self._did_facets_star_warning = True

# -------------------------------------------------------------------------
# Constraint mutation interface
# These functions update the instance in-place.
Expand Down
30 changes: 20 additions & 10 deletions pyesgf/search/results.py
Expand Up @@ -36,16 +36,15 @@ def __init__(self, context, batch_size=DEFAULT_BATCH_SIZE, eager=True):
"""
self.context = context
self.__batch_size = batch_size
self.__batch_cache = [None] * ((len(self) // batch_size) + 1)
if eager and len(self) > 0:
self.__batch_cache[0] = self.__get_batch(0)
self.__batch_cache = {}
self.__len_cache = None
if eager:
self.__get_batch(0)

def __getitem__(self, index):
batch_i = index // self.batch_size
offset = index % self.batch_size
if self.__batch_cache[batch_i] is None:
self.__batch_cache[batch_i] = self.__get_batch(batch_i)
batch = self.__batch_cache[batch_i]
batch = self.__get_batch(batch_i)

search_type = self.context.search_type
ResultClass = _result_classes[search_type]
Expand All @@ -54,7 +53,9 @@ def __getitem__(self, index):
return ResultClass(batch[offset], self.context)

def __len__(self):
return self.context.hit_count
if self.__len_cache is None:
self.__get_batch(0)
return self.__len_cache

@property
def batch_size(self):
Expand All @@ -71,6 +72,9 @@ def _build_result(self, result):
return result

def __get_batch(self, batch_i):
if batch_i in self.__batch_cache:
return self.__batch_cache[batch_i]

offset = self.batch_size * batch_i
limit = self.batch_size

Expand All @@ -79,8 +83,14 @@ def __get_batch(self, batch_i):
.send_search(query_dict, limit=limit, offset=offset,
shards=self.context.shards))

if self.__len_cache is None:
self.__len_cache = response['response']['numFound']

# !TODO: strip out results
return response['response']['docs']
batch = response['response']['docs']

self.__batch_cache[batch_i] = batch
return batch


class BaseResult(object):
Expand All @@ -89,7 +99,7 @@ class BaseResult(object):
Subclasses represent different search types such as File and Dataset.
:ivar json: The oroginial json representation of the result.
:ivar json: The original json representation of the result.
:ivar context: The SearchContext which generated this result.
:property urls: a dictionary of the form
``{service: [(url, mime_type), ...], ...}``
Expand Down Expand Up @@ -259,7 +269,7 @@ class FileResult(BaseResult):
:property checksum: The checksum of the file
:property checksum_type: The algorithm used for generating the checksum
:property filename: The filename
:proprty size: The file size in bytes
:property size: The file size in bytes
"""
@property
Expand Down
6 changes: 3 additions & 3 deletions tests/test_connection.py
Expand Up @@ -41,7 +41,7 @@ def test_get_shard_list(self):
# replication configuration
# on the test server
assert 'esgf-index1.ceda.ac.uk' in shards
# in esg-search in esgf-index1.ceda.ac.uk, there are a bunch
# in esg-search in esgf-index1.ceda.ac.uk, there are a bunch
# of replicas hosted on esgf-index2
assert len(shards['esgf-index2.ceda.ac.uk']) > 1

Expand Down Expand Up @@ -69,7 +69,7 @@ def test_passed_cached_session(self):
import requests_cache
td = datetime.timedelta(hours=1)
session = requests_cache.CachedSession(self.cache,
expire_after=td)
expire_after=td)
conn = SearchConnection(self.test_service, session=session)
context = conn.new_context(project='cmip5')
assert context.facet_constraints['project'] == 'cmip5'
Expand All @@ -78,7 +78,7 @@ def test_connection_instance(self):
import requests_cache
td = datetime.timedelta(hours=1)
session = requests_cache.CachedSession(self.cache,
expire_after=td)
expire_after=td)
with SearchConnection(self.test_service, session=session) as conn:
context = conn.new_context(project='cmip5')
assert context.facet_constraints['project'] == 'cmip5'
44 changes: 30 additions & 14 deletions tests/test_context.py
Expand Up @@ -10,12 +10,18 @@
import os


_all_facets_explanation = ('tests with facets=* may fail for server-side reasons, '
'so these are marked XFAIL but may sometimes pass')


class TestContext(TestCase):

_test_few_facets = 'project,model,index_node,data_node'

def setUp(self):
self.test_service = 'http://esgf-data.dkrz.de/esg-search'
# self.test_service = 'http://esgf-index1.ceda.ac.uk/esg-search'
# self.test_service = 'http://esgf-node.llnl.gov/esg-search'
self.cache = os.path.join(os.path.dirname(__file__), 'url_cache')

def test_context_freetext(self):
Expand Down Expand Up @@ -52,17 +58,22 @@ def test_context_facet_multivalue2(self):
self.assertTrue(sorted(context2.facet_constraints.getall('model')) == ['IPSL-CM5A-LR', 'IPSL-CM5A-MR'])

def test_context_facet_multivalue3(self):
#
# use distrib=False here - with distrib=True sometimes results are missing and we can't safely
# compare numbers of results from two queries.
#
conn = SearchConnection(self.test_service, cache=self.cache)
ctx = conn.new_context(project='CMIP5', query='humidity',
experiment='rcp45')
experiment='rcp45', distrib=False)
hits1 = ctx.hit_count
self.assertTrue(hits1 > 0)
ctx2 = conn.new_context(project='CMIP5', query='humidity',
experiment=['rcp45', 'rcp85'])
experiment=['rcp45', 'rcp85'], distrib=False)
hits2 = ctx2.hit_count

self.assertTrue(hits2 > hits1)

@pytest.mark.xfail(reason="results may sometimes be missing - may or may not pass")
def test_context_facet_options(self):
conn = SearchConnection(self.test_service, cache=self.cache)
context = conn.new_context(project='CMIP5', model='IPSL-CM5A-LR',
Expand Down Expand Up @@ -96,14 +107,13 @@ def test_facet_count(self):
self.assertTrue(list(counts['model'].keys()) == ['IPSL-CM5A-LR'])
self.assertTrue(list(counts['project'].keys()) == ['CMIP5'])


def _test_distrib(self, constraints=None, test_service=None,
cache=None):
if constraints == None:
constraints={}
if test_service == None:
if constraints is None:
constraints = {}
if test_service is None:
test_service = self.test_service

conn1 = SearchConnection(test_service, distrib=False, cache=cache)
context1 = conn1.new_context(**constraints)
count1 = context1.hit_count
Expand All @@ -112,8 +122,13 @@ def _test_distrib(self, constraints=None, test_service=None,
context2 = conn2.new_context(**constraints)
count2 = context2.hit_count

assert count1 < count2
#
# We would generally expect more counts with distrib=True but sometimes this fails for
# server-side reasons, so we use a weaker test here.
#

# assert count1 < count2
assert count1 <= count2

_distrib_constraints_few_facets = {'project': 'CMIP5',
'facets': _test_few_facets}
Expand All @@ -124,9 +139,9 @@ def test_distrib_with_few_facets(self):
self._test_distrib(constraints=self._distrib_constraints_few_facets)

@pytest.mark.slow
@pytest.mark.xfail
# Expected failure: with facets=* the distrib=true appears to be
# ignored. This is observed both on the CEDA and also DKRZ index nodes
@pytest.mark.xfail(reason=_all_facets_explanation)
# Expected failure: with facets=* the distrib=true appears to be
# ignored. This is observed both on the CEDA and also DKRZ index nodes
# (the only nodes investigated).
def test_distrib_with_all_facets(self):
self._test_distrib(constraints=self._distrib_constraints_all_facets)
Expand All @@ -137,13 +152,13 @@ def test_distrib_with_cache_with_few_facets(self):
cache=self.cache)

# @pytest.mark.skip(reason="cache fails on python 3.7")
@pytest.mark.xfail
@pytest.mark.xfail(reason=_all_facets_explanation)
# Expected failure: see test_distrib_all_facets above
def test_distrib_with_cache_with_all_facets(self):
self._test_distrib(constraints=self._distrib_constraints_all_facets,
cache=self.cache)


@pytest.mark.xfail(reason="may sometimes fail if server returns incomplete set of results")
def test_constrain(self):
conn = SearchConnection(self.test_service, cache=self.cache)

Expand Down Expand Up @@ -172,6 +187,7 @@ def test_constrain_regression1(self):
context2 = context.constrain(experiment='historical')
self.assertTrue('experiment' in context2.facet_constraints)

@pytest.mark.xfail(reason="may sometimes fail if server returns incomplete set of results")
def test_negative_facet(self):
conn = SearchConnection(self.test_service, cache=self.cache)

Expand Down Expand Up @@ -212,7 +228,7 @@ def _test_replica(self, facets=None):
def test_replica_with_few_facets(self):
self._test_replica(facets=self._test_few_facets)

@pytest.mark.xfail
@pytest.mark.xfail(reason=_all_facets_explanation)
# Expected failure - same considerations as test_distrib_all_facets
@pytest.mark.slow
def test_replica_with_all_facets(self):
Expand Down
3 changes: 2 additions & 1 deletion tests/test_logon.py
Expand Up @@ -17,10 +17,11 @@
except (ImportError, SyntaxError):
_has_myproxy = False


TEST_USER = os.environ.get('USERNAME')
TEST_PASSWORD = os.environ.get('PASSWORD')
TEST_OPENID = os.environ.get('OPENID')
TEST_MYPROXY = 'slcs.ceda.ac.uk'
TEST_MYPROXY = os.environ.get('MYPROXY')


TEST_DATA_DIR = op.join(op.dirname(__file__), 'data')
Expand Down

0 comments on commit f60330a

Please sign in to comment.