diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 34b50544daa..cb055f34edf 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -10,6 +10,19 @@ Changelog v.2.9.0 TBA ================== +General notes: + + * This version requires re-running the ``datastore set-permissions`` command + (assuming you are using the DataStore). See: :ref:`datastore-set-permissions` + + Otherwise datasets will not be viewable or searchable in DataStore and + the logs will contain this error:: + + ValidationError: ... function count_estimate(unknown) does not exist ... + + CKAN developers should also re-run set-permissions on the test database: + :ref:`datastore-test-set-permissions` + Minor changes: * For navl schemas, the 'default' validator no longer applies the default when diff --git a/ckanext/datastore/backend/postgres.py b/ckanext/datastore/backend/postgres.py index 29dc7a26ca1..3559dc9521b 100644 --- a/ckanext/datastore/backend/postgres.py +++ b/ckanext/datastore/backend/postgres.py @@ -14,6 +14,7 @@ import hashlib import json from cStringIO import StringIO +import sqlparse from six import string_types, text_type @@ -1161,6 +1162,7 @@ def validate(context, data_dict): data_dict_copy.pop('id', None) data_dict_copy.pop('include_total', None) + data_dict_copy.pop('total_estimation_threshold', None) data_dict_copy.pop('records_format', None) for key, values in data_dict_copy.iteritems(): @@ -1216,14 +1218,17 @@ def search_data(context, data_dict): else: sort_clause = '' + core_query = u''' + SELECT {distinct} {select} + FROM "{resource}" {ts_query} + {where} {sort} LIMIT {limit} OFFSET {offset}''' + records_format = data_dict['records_format'] if records_format == u'objects': sql_fmt = u''' SELECT array_to_json(array_agg(j))::text FROM ( - SELECT {distinct} {select} - FROM "{resource}" {ts_query} - {where} {sort} LIMIT {limit} OFFSET {offset} - ) AS j''' + CORE_QUERY + ) AS j'''.replace('CORE_QUERY', core_query) elif records_format == u'lists': select_columns = u" || ',' || ".join( s for s in query_dict['select'] @@ -1238,19 +1243,16 @@ def search_data(context, data_dict): elif records_format == u'csv': sql_fmt = u''' COPY ( - SELECT {distinct} {select} - FROM "{resource}" {ts_query} - {where} {sort} LIMIT {limit} OFFSET {offset} - ) TO STDOUT csv DELIMITER ',' ''' + CORE_QUERY + ) TO STDOUT csv DELIMITER ',' '''.replace('CORE_QUERY', core_query) elif records_format == u'tsv': sql_fmt = u''' COPY ( - SELECT {distinct} {select} - FROM "{resource}" {ts_query} - {where} {sort} LIMIT {limit} OFFSET {offset} - ) TO STDOUT csv DELIMITER '\t' ''' + CORE_QUERY + ) TO STDOUT csv DELIMITER '\t' ''' \ + .replace('CORE_QUERY', core_query) - sql_string = sql_fmt.format( + sql_params = dict( distinct=distinct, select=select_columns, resource=resource_id, @@ -1258,7 +1260,9 @@ def search_data(context, data_dict): where=where_clause, sort=sort_clause, limit=limit, - offset=offset) + offset=offset + ) + sql_string = sql_fmt.format(**sql_params) if records_format == u'csv' or records_format == u'tsv': buf = StringIO() _execute_single_statement_copy_to( @@ -1287,17 +1291,72 @@ def search_data(context, data_dict): _insert_links(data_dict, limit, offset) if data_dict.get('include_total', True): - count_sql_string = u'''SELECT count(*) FROM ( - SELECT {distinct} {select} - FROM "{resource}" {ts_query} {where}) as t;'''.format( - distinct=distinct, - select=select_columns, - resource=resource_id, - ts_query=ts_query, - where=where_clause) - count_result = _execute_single_statement( - context, count_sql_string, where_values) - data_dict['total'] = count_result.fetchall()[0][0] + total_estimation_threshold = \ + data_dict.get('total_estimation_threshold') or 0 + if total_estimation_threshold != 0: + # Estimate the total (result row count) + # See: https://wiki.postgresql.org/wiki/Count_estimate + + # Estimates rely on postgres having run ANALYZE on the table, so + # ensure that has been done. + when_was_last_analyze_sql = sqlalchemy.text(''' + SELECT last_analyze, last_autoanalyze + FROM pg_stat_user_tables + WHERE relname=:resource; + ''') + result = context['connection'].execute( + when_was_last_analyze_sql, **sql_params) + last_analyze, last_autoanalyze = result.fetchall()[0] + if not (last_analyze or last_autoanalyze): + analyze_sql = ''' + ANALYZE "{resource}"; + '''.format(**sql_params) + context['connection'].execute(analyze_sql) + + if not (where_clause or distinct): + # there are no filters, so we can use the table row count from + # pg stats + analyze_count_sql = sqlalchemy.text(''' + SELECT reltuples::BIGINT AS approximate_row_count + FROM pg_class + WHERE relname=:resource; + ''') + count_result = context['connection'].execute(analyze_count_sql, + **sql_params) + else: + # use EXPLAIN to get an estimate of the row count for this + # filtered query + explain_count_sql = ''' + SELECT count_estimate('{query}'); + '''.format(query=core_query.format(**sql_params)) + if len(sqlparse.split(explain_count_sql)) > 2: + raise toolkit.ValidationError({ + 'query': ['Query is not a single statement.'] + }) + # count_estimate() is defined by 'datastore set-permissions' + # - see datastore.rst + count_result = context['connection'].execute(explain_count_sql, + [where_values]) + estimated_total = count_result.fetchall()[0][0] + + if total_estimation_threshold != 0 and \ + estimated_total >= total_estimation_threshold: + data_dict['total'] = estimated_total + data_dict['total_was_estimated'] = True + else: + # this is slow for large results, hence the 'estimate' alternative + count_sql_string = u'''SELECT count(*) FROM ( + SELECT {distinct} {select} + FROM "{resource}" {ts_query} {where}) as t;'''.format( + distinct=distinct, + select=select_columns, + resource=resource_id, + ts_query=ts_query, + where=where_clause) + count_result = _execute_single_statement( + context, count_sql_string, where_values) + data_dict['total'] = count_result.fetchall()[0][0] + data_dict['total_was_estimated'] = False return data_dict @@ -1313,6 +1372,13 @@ def _execute_single_statement_copy_to(context, sql_string, where_values, buf): cursor.close() +def _execute(context, sql_string): + cursor = context['connection'].connection.cursor() + cursor.copy_expert(cursor.mogrify(sql_string, where_values), buf) + cursor.close() + + + def format_results(context, results, data_dict): result_fields = [] for field in results.cursor.description: diff --git a/ckanext/datastore/logic/action.py b/ckanext/datastore/logic/action.py index 0c91b0f1057..341279ddf04 100644 --- a/ckanext/datastore/logic/action.py +++ b/ckanext/datastore/logic/action.py @@ -405,6 +405,13 @@ def datastore_search(context, data_dict): :param include_total: True to return total matching record count (optional, default: true) :type include_total: bool + :param total_estimation_threshold: By default, the "total" returned is a + precise count of the query result, which can be computationally + expensive for large results (e.g. >100,000k rows). By setting this + value to non-zero, you can set a threshold estimated total, above + which it doesn't do the count - it just returns the estimated + total, generated by quick sampling (optional, default: 0) + :type total_estimation_threshold: int :param records_format: the format for the records return value: 'objects' (default) list of {fieldname1: value1, ...} dicts, 'lists' list of [value1, value2, ...] lists, @@ -438,6 +445,8 @@ def datastore_search(context, data_dict): :type filters: list of dictionaries :param total: number of total matching records :type total: int + :param total_was_estimated: whether or not the total was estimated + :type total: bool :param records: list of matching results :type records: depends on records_format value passed diff --git a/ckanext/datastore/logic/schema.py b/ckanext/datastore/logic/schema.py index c9b221d9f42..510c112f8aa 100644 --- a/ckanext/datastore/logic/schema.py +++ b/ckanext/datastore/logic/schema.py @@ -167,6 +167,7 @@ def datastore_search_schema(): 'sort': [ignore_missing, list_of_strings_or_string], 'distinct': [ignore_missing, boolean_validator], 'include_total': [default(True), boolean_validator], + 'total_estimation_threshold': [default(0), int_validator], 'records_format': [ default(u'objects'), OneOf([u'objects', u'lists', u'csv', u'tsv'])], diff --git a/ckanext/datastore/set_permissions.sql b/ckanext/datastore/set_permissions.sql index e1a8747c95a..8169f6ca1d7 100644 --- a/ckanext/datastore/set_permissions.sql +++ b/ckanext/datastore/set_permissions.sql @@ -106,3 +106,18 @@ DO $body$ END; $body$; +-- estimates the number of rows returned by a query +CREATE OR REPLACE FUNCTION count_estimate(query text) RETURNS INTEGER +AS $body$ + DECLARE + rec record; + ROWS INTEGER; + BEGIN + FOR rec IN EXECUTE 'EXPLAIN ' || query LOOP + ROWS := SUBSTRING(rec."QUERY PLAN" FROM ' rows=([[:digit:]]+)'); + EXIT WHEN ROWS IS NOT NULL; + END LOOP; + + RETURN ROWS; + END +$body$ LANGUAGE plpgsql; diff --git a/ckanext/datastore/tests/helpers.py b/ckanext/datastore/tests/helpers.py index c94aacd3a65..4aeed00c3ba 100644 --- a/ckanext/datastore/tests/helpers.py +++ b/ckanext/datastore/tests/helpers.py @@ -28,6 +28,7 @@ def clear_db(Session): FROM pg_proc INNER JOIN pg_namespace ns ON (pg_proc.pronamespace = ns.oid) WHERE ns.nspname = 'public' AND proname != 'populate_full_text_trigger' + AND proname != 'count_estimate' ''' drop_functions = u''.join(r[0] for r in c.execute(drop_functions_sql)) if drop_functions: diff --git a/ckanext/datastore/tests/test_search.py b/ckanext/datastore/tests/test_search.py index 994e8309c95..5dc959d21c2 100644 --- a/ckanext/datastore/tests/test_search.py +++ b/ckanext/datastore/tests/test_search.py @@ -100,6 +100,25 @@ def test_all_params_work_with_fields_with_whitespaces(self): result_years = [r['the year'] for r in result['records']] assert_equals(result_years, [2013]) + def test_search_total(self): + resource = factories.Resource() + data = { + 'resource_id': resource['id'], + 'force': True, + 'records': [ + {'the year': 2014}, + {'the year': 2013}, + ], + } + result = helpers.call_action('datastore_create', **data) + search_data = { + 'resource_id': resource['id'], + 'include_total': True, + } + result = helpers.call_action('datastore_search', **search_data) + assert_equals(result['total'], 2) + assert_equals(result.get('total_was_estimated'), False) + def test_search_without_total(self): resource = factories.Resource() data = { @@ -117,6 +136,69 @@ def test_search_without_total(self): } result = helpers.call_action('datastore_search', **search_data) assert 'total' not in result + assert 'total_was_estimated' not in result + + def test_estimate_total(self): + resource = factories.Resource() + data = { + 'resource_id': resource['id'], + 'force': True, + 'records': [{'the year': 1900 + i} for i in range(100)], + } + result = helpers.call_action('datastore_create', **data) + analyze_sql = ''' + ANALYZE "{resource}"; + '''.format(resource=resource['id']) + db.get_write_engine().execute(analyze_sql) + search_data = { + 'resource_id': resource['id'], + 'total_estimation_threshold': 50, + } + result = helpers.call_action('datastore_search', **search_data) + assert_equals(result.get('total_was_estimated'), True) + assert 95 < result['total'] < 105, result['total'] + + def test_estimate_total_with_filters(self): + resource = factories.Resource() + data = { + 'resource_id': resource['id'], + 'force': True, + 'records': [{'the year': 1900 + i} for i in range(3)] * 10, + } + result = helpers.call_action('datastore_create', **data) + analyze_sql = ''' + ANALYZE "{resource}"; + '''.format(resource=resource['id']) + db.get_write_engine().execute(analyze_sql) + search_data = { + 'resource_id': resource['id'], + 'filters': {u'the year': 1901}, + 'total_estimation_threshold': 5, + } + # if this fails with: "ValidationError: ... function + # count_estimate(unknown) does not exist ..." + # then you need to rerun set-permissions for the test db- see test.rst + result = helpers.call_action('datastore_search', **search_data) + assert 8 <= result['total'] <= 12, result['total'] + assert_equals(result.get('total_was_estimated'), True) + + def test_estimate_total_where_analyze_is_not_already_done(self): + # ANALYSE is done by latest datapusher/xloader, but need to cope in + # if tables created in other ways which may not have had an ANALYSE + resource = factories.Resource() + data = { + 'resource_id': resource['id'], + 'force': True, + 'records': [{'the year': 1900 + i} for i in range(100)], + } + result = helpers.call_action('datastore_create', **data) + search_data = { + 'resource_id': resource['id'], + 'total_estimation_threshold': 50, + } + result = helpers.call_action('datastore_search', **search_data) + assert_equals(result.get('total_was_estimated'), True) + assert 95 < result['total'] < 105, result['total'] class TestDatastoreSearch(DatastoreLegacyTestBase):