Skip to content

Commit

Permalink
Total estimation first attempt. Failing test for filters - I cant get…
Browse files Browse the repository at this point in the history
… it to accurately estimate with a filter.
  • Loading branch information
David Read committed Sep 21, 2018
1 parent 415ca4e commit 7bd11d3
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 25 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.rst
Expand Up @@ -10,6 +10,19 @@ Changelog
v.2.9.0 TBA
==================

General notes:

* This version requires re-running the ``datastore set-permissions`` command
(assuming you are using the DataStore). See: :ref:`datastore-set-permissions`

Otherwise datasets will not be viewable or searchable in DataStore and
the logs will contain this error::

ValidationError: ... function count_estimate(unknown) does not exist ...

CKAN developers should also re-run set-permissions on the test database:
:ref:`datastore-test-set-permissions`

Minor changes:

* For navl schemas, the 'default' validator no longer applies the default when
Expand Down
116 changes: 91 additions & 25 deletions ckanext/datastore/backend/postgres.py
Expand Up @@ -14,6 +14,7 @@
import hashlib
import json
from cStringIO import StringIO
import sqlparse

from six import string_types, text_type

Expand Down Expand Up @@ -1161,6 +1162,7 @@ def validate(context, data_dict):

data_dict_copy.pop('id', None)
data_dict_copy.pop('include_total', None)
data_dict_copy.pop('total_estimation_threshold', None)
data_dict_copy.pop('records_format', None)

for key, values in data_dict_copy.iteritems():
Expand Down Expand Up @@ -1216,14 +1218,17 @@ def search_data(context, data_dict):
else:
sort_clause = ''

core_query = u'''
SELECT {distinct} {select}
FROM "{resource}" {ts_query}
{where} {sort} LIMIT {limit} OFFSET {offset}'''

records_format = data_dict['records_format']
if records_format == u'objects':
sql_fmt = u'''
SELECT array_to_json(array_agg(j))::text FROM (
SELECT {distinct} {select}
FROM "{resource}" {ts_query}
{where} {sort} LIMIT {limit} OFFSET {offset}
) AS j'''
CORE_QUERY
) AS j'''.replace('CORE_QUERY', core_query)
elif records_format == u'lists':
select_columns = u" || ',' || ".join(
s for s in query_dict['select']
Expand All @@ -1238,27 +1243,26 @@ def search_data(context, data_dict):
elif records_format == u'csv':
sql_fmt = u'''
COPY (
SELECT {distinct} {select}
FROM "{resource}" {ts_query}
{where} {sort} LIMIT {limit} OFFSET {offset}
) TO STDOUT csv DELIMITER ',' '''
CORE_QUERY
) TO STDOUT csv DELIMITER ',' '''.replace('CORE_QUERY', core_query)
elif records_format == u'tsv':
sql_fmt = u'''
COPY (
SELECT {distinct} {select}
FROM "{resource}" {ts_query}
{where} {sort} LIMIT {limit} OFFSET {offset}
) TO STDOUT csv DELIMITER '\t' '''
CORE_QUERY
) TO STDOUT csv DELIMITER '\t' ''' \
.replace('CORE_QUERY', core_query)

sql_string = sql_fmt.format(
sql_params = dict(
distinct=distinct,
select=select_columns,
resource=resource_id,
ts_query=ts_query,
where=where_clause,
sort=sort_clause,
limit=limit,
offset=offset)
offset=offset
)
sql_string = sql_fmt.format(**sql_params)
if records_format == u'csv' or records_format == u'tsv':
buf = StringIO()
_execute_single_statement_copy_to(
Expand Down Expand Up @@ -1287,17 +1291,72 @@ def search_data(context, data_dict):
_insert_links(data_dict, limit, offset)

if data_dict.get('include_total', True):
count_sql_string = u'''SELECT count(*) FROM (
SELECT {distinct} {select}
FROM "{resource}" {ts_query} {where}) as t;'''.format(
distinct=distinct,
select=select_columns,
resource=resource_id,
ts_query=ts_query,
where=where_clause)
count_result = _execute_single_statement(
context, count_sql_string, where_values)
data_dict['total'] = count_result.fetchall()[0][0]
total_estimation_threshold = \
data_dict.get('total_estimation_threshold') or 0
if total_estimation_threshold != 0:
# Estimate the total (result row count)
# See: https://wiki.postgresql.org/wiki/Count_estimate

# Estimates rely on postgres having run ANALYZE on the table, so
# ensure that has been done.
when_was_last_analyze_sql = sqlalchemy.text('''
SELECT last_analyze, last_autoanalyze
FROM pg_stat_user_tables
WHERE relname=:resource;
''')
result = context['connection'].execute(
when_was_last_analyze_sql, **sql_params)
last_analyze, last_autoanalyze = result.fetchall()[0]
if not (last_analyze or last_autoanalyze):
analyze_sql = '''
ANALYZE "{resource}";
'''.format(**sql_params)
context['connection'].execute(analyze_sql)

if not (where_clause or distinct):
# there are no filters, so we can use the table row count from
# pg stats
analyze_count_sql = sqlalchemy.text('''
SELECT reltuples::BIGINT AS approximate_row_count
FROM pg_class
WHERE relname=:resource;
''')
count_result = context['connection'].execute(analyze_count_sql,
**sql_params)
else:
# use EXPLAIN to get an estimate of the row count for this
# filtered query
explain_count_sql = '''
SELECT count_estimate('{query}');
'''.format(query=core_query.format(**sql_params))
if len(sqlparse.split(explain_count_sql)) > 2:
raise toolkit.ValidationError({
'query': ['Query is not a single statement.']
})
# count_estimate() is defined by 'datastore set-permissions'
# - see datastore.rst
count_result = context['connection'].execute(explain_count_sql,
[where_values])
estimated_total = count_result.fetchall()[0][0]

if total_estimation_threshold != 0 and \
estimated_total >= total_estimation_threshold:
data_dict['total'] = estimated_total
data_dict['total_was_estimated'] = True
else:
# this is slow for large results, hence the 'estimate' alternative
count_sql_string = u'''SELECT count(*) FROM (
SELECT {distinct} {select}
FROM "{resource}" {ts_query} {where}) as t;'''.format(
distinct=distinct,
select=select_columns,
resource=resource_id,
ts_query=ts_query,
where=where_clause)
count_result = _execute_single_statement(
context, count_sql_string, where_values)
data_dict['total'] = count_result.fetchall()[0][0]
data_dict['total_was_estimated'] = False

return data_dict

Expand All @@ -1313,6 +1372,13 @@ def _execute_single_statement_copy_to(context, sql_string, where_values, buf):
cursor.close()


def _execute(context, sql_string):
cursor = context['connection'].connection.cursor()
cursor.copy_expert(cursor.mogrify(sql_string, where_values), buf)
cursor.close()



def format_results(context, results, data_dict):
result_fields = []
for field in results.cursor.description:
Expand Down
9 changes: 9 additions & 0 deletions ckanext/datastore/logic/action.py
Expand Up @@ -405,6 +405,13 @@ def datastore_search(context, data_dict):
:param include_total: True to return total matching record count
(optional, default: true)
:type include_total: bool
:param total_estimation_threshold: By default, the "total" returned is a
precise count of the query result, which can be computationally
expensive for large results (e.g. >100,000k rows). By setting this
value to non-zero, you can set a threshold estimated total, above
which it doesn't do the count - it just returns the estimated
total, generated by quick sampling (optional, default: 0)
:type total_estimation_threshold: int
:param records_format: the format for the records return value:
'objects' (default) list of {fieldname1: value1, ...} dicts,
'lists' list of [value1, value2, ...] lists,
Expand Down Expand Up @@ -438,6 +445,8 @@ def datastore_search(context, data_dict):
:type filters: list of dictionaries
:param total: number of total matching records
:type total: int
:param total_was_estimated: whether or not the total was estimated
:type total: bool
:param records: list of matching results
:type records: depends on records_format value passed
Expand Down
1 change: 1 addition & 0 deletions ckanext/datastore/logic/schema.py
Expand Up @@ -167,6 +167,7 @@ def datastore_search_schema():
'sort': [ignore_missing, list_of_strings_or_string],
'distinct': [ignore_missing, boolean_validator],
'include_total': [default(True), boolean_validator],
'total_estimation_threshold': [default(0), int_validator],
'records_format': [
default(u'objects'),
OneOf([u'objects', u'lists', u'csv', u'tsv'])],
Expand Down
15 changes: 15 additions & 0 deletions ckanext/datastore/set_permissions.sql
Expand Up @@ -106,3 +106,18 @@ DO $body$
END;
$body$;

-- estimates the number of rows returned by a query
CREATE OR REPLACE FUNCTION count_estimate(query text) RETURNS INTEGER
AS $body$
DECLARE
rec record;
ROWS INTEGER;
BEGIN
FOR rec IN EXECUTE 'EXPLAIN ' || query LOOP
ROWS := SUBSTRING(rec."QUERY PLAN" FROM ' rows=([[:digit:]]+)');
EXIT WHEN ROWS IS NOT NULL;
END LOOP;

RETURN ROWS;
END
$body$ LANGUAGE plpgsql;
1 change: 1 addition & 0 deletions ckanext/datastore/tests/helpers.py
Expand Up @@ -28,6 +28,7 @@ def clear_db(Session):
FROM pg_proc
INNER JOIN pg_namespace ns ON (pg_proc.pronamespace = ns.oid)
WHERE ns.nspname = 'public' AND proname != 'populate_full_text_trigger'
AND proname != 'count_estimate'
'''
drop_functions = u''.join(r[0] for r in c.execute(drop_functions_sql))
if drop_functions:
Expand Down
82 changes: 82 additions & 0 deletions ckanext/datastore/tests/test_search.py
Expand Up @@ -100,6 +100,25 @@ def test_all_params_work_with_fields_with_whitespaces(self):
result_years = [r['the year'] for r in result['records']]
assert_equals(result_years, [2013])

def test_search_total(self):
resource = factories.Resource()
data = {
'resource_id': resource['id'],
'force': True,
'records': [
{'the year': 2014},
{'the year': 2013},
],
}
result = helpers.call_action('datastore_create', **data)
search_data = {
'resource_id': resource['id'],
'include_total': True,
}
result = helpers.call_action('datastore_search', **search_data)
assert_equals(result['total'], 2)
assert_equals(result.get('total_was_estimated'), False)

def test_search_without_total(self):
resource = factories.Resource()
data = {
Expand All @@ -117,6 +136,69 @@ def test_search_without_total(self):
}
result = helpers.call_action('datastore_search', **search_data)
assert 'total' not in result
assert 'total_was_estimated' not in result

def test_estimate_total(self):
resource = factories.Resource()
data = {
'resource_id': resource['id'],
'force': True,
'records': [{'the year': 1900 + i} for i in range(100)],
}
result = helpers.call_action('datastore_create', **data)
analyze_sql = '''
ANALYZE "{resource}";
'''.format(resource=resource['id'])
db.get_write_engine().execute(analyze_sql)
search_data = {
'resource_id': resource['id'],
'total_estimation_threshold': 50,
}
result = helpers.call_action('datastore_search', **search_data)
assert_equals(result.get('total_was_estimated'), True)
assert 95 < result['total'] < 105, result['total']

def test_estimate_total_with_filters(self):
resource = factories.Resource()
data = {
'resource_id': resource['id'],
'force': True,
'records': [{'the year': 1900 + i} for i in range(3)] * 10,
}
result = helpers.call_action('datastore_create', **data)
analyze_sql = '''
ANALYZE "{resource}";
'''.format(resource=resource['id'])
db.get_write_engine().execute(analyze_sql)
search_data = {
'resource_id': resource['id'],
'filters': {u'the year': 1901},
'total_estimation_threshold': 5,
}
# if this fails with: "ValidationError: ... function
# count_estimate(unknown) does not exist ..."
# then you need to rerun set-permissions for the test db- see test.rst
result = helpers.call_action('datastore_search', **search_data)
assert 8 <= result['total'] <= 12, result['total']
assert_equals(result.get('total_was_estimated'), True)

def test_estimate_total_where_analyze_is_not_already_done(self):
# ANALYSE is done by latest datapusher/xloader, but need to cope in
# if tables created in other ways which may not have had an ANALYSE
resource = factories.Resource()
data = {
'resource_id': resource['id'],
'force': True,
'records': [{'the year': 1900 + i} for i in range(100)],
}
result = helpers.call_action('datastore_create', **data)
search_data = {
'resource_id': resource['id'],
'total_estimation_threshold': 50,
}
result = helpers.call_action('datastore_search', **search_data)
assert_equals(result.get('total_was_estimated'), True)
assert 95 < result['total'] < 105, result['total']


class TestDatastoreSearch(DatastoreLegacyTestBase):
Expand Down

0 comments on commit 7bd11d3

Please sign in to comment.