Skip to content

Commit

Permalink
Merge pull request #4473 from ckan/datastore-estimate-total
Browse files Browse the repository at this point in the history
Datastore search can estimate total, rather than expensive COUNT(*) [WIP]
  • Loading branch information
wardi committed Nov 23, 2018
2 parents 3549e3b + e2f7a6d commit aa32b23
Show file tree
Hide file tree
Showing 11 changed files with 479 additions and 76 deletions.
73 changes: 62 additions & 11 deletions ckanext/datastore/backend/postgres.py
Expand Up @@ -1189,7 +1189,9 @@ def validate(context, data_dict):

data_dict_copy.pop('id', None)
data_dict_copy.pop('include_total', None)
data_dict_copy.pop('total_estimation_threshold', None)
data_dict_copy.pop('records_format', None)
data_dict_copy.pop('calculate_record_count', None)

for key, values in data_dict_copy.iteritems():
if not values:
Expand Down Expand Up @@ -1312,17 +1314,53 @@ def search_data(context, data_dict):
_insert_links(data_dict, limit, offset)

if data_dict.get('include_total', True):
count_sql_string = u'''SELECT count(*) FROM (
SELECT {distinct} {select}
FROM "{resource}" {ts_query} {where}) as t;'''.format(
distinct=distinct,
select=select_columns,
resource=resource_id,
ts_query=ts_query,
where=where_clause)
count_result = _execute_single_statement(
context, count_sql_string, where_values)
data_dict['total'] = count_result.fetchall()[0][0]
total_estimation_threshold = \
data_dict.get('total_estimation_threshold')
estimated_total = None
if total_estimation_threshold is not None and \
not (where_clause or distinct):
# there are no filters, so we can try to use the estimated table
# row count from pg stats
# See: https://wiki.postgresql.org/wiki/Count_estimate
# (We also tried using the EXPLAIN to estimate filtered queries but
# it didn't estimate well in tests)
analyze_count_sql = sqlalchemy.text('''
SELECT reltuples::BIGINT AS approximate_row_count
FROM pg_class
WHERE relname=:resource;
''')
count_result = context['connection'].execute(analyze_count_sql,
resource=resource_id)
try:
estimated_total = count_result.fetchall()[0][0]
except ValueError:
# the table doesn't have the stats calculated yet. (This should
# be done by xloader/datapusher at the end of loading.)
# We could provoke their creation with an ANALYZE, but that
# takes 10x the time to run, compared to SELECT COUNT(*) so
# we'll just revert to the latter. At some point the autovacuum
# will run and create the stats so we can use an estimate in
# future.
pass

if estimated_total is not None \
and estimated_total >= total_estimation_threshold:
data_dict['total'] = estimated_total
data_dict['total_was_estimated'] = True
else:
# this is slow for large results
count_sql_string = u'''SELECT count(*) FROM (
SELECT {distinct} {select}
FROM "{resource}" {ts_query} {where}) as t;'''.format(
distinct=distinct,
select=select_columns,
resource=resource_id,
ts_query=ts_query,
where=where_clause)
count_result = _execute_single_statement(
context, count_sql_string, where_values)
data_dict['total'] = count_result.fetchall()[0][0]
data_dict['total_was_estimated'] = False

return data_dict

Expand Down Expand Up @@ -1948,6 +1986,19 @@ def before_fork(self):
# to avoid sharing them between parent and child processes.
_dispose_engines()

def calculate_record_count(self, resource_id):
'''
Calculate an estimate of the record/row count and store it in
Postgresql's pg_stat_user_tables. This number will be used when
specifying `total_estimation_threshold`
'''
connection = get_write_engine().connect()
sql = 'ANALYZE "{}"'.format(resource_id)
try:
connection.execute(sql)
except sqlalchemy.exc.DatabaseError as err:
raise DatastoreException(err)


def create_function(name, arguments, rettype, definition, or_replace):
sql = u'''
Expand Down
43 changes: 42 additions & 1 deletion ckanext/datastore/logic/action.py
Expand Up @@ -70,6 +70,12 @@ def datastore_create(context, data_dict):
{"function": "trigger_clean_reference"},
{"function": "trigger_check_codes"}]
:type triggers: list of dictionaries
:param calculate_record_count: updates the stored count of records, used to
optimize datastore_search in combination with the
`total_estimation_threshold` parameter. If doing a series of requests
to change a resource, you only need to set this to True on the last
request.
:type calculate_record_count: bool (optional, default: False)
Please note that setting the ``aliases``, ``indexes`` or ``primary_key``
replaces the exising aliases or constraints. Setting ``records`` appends
Expand Down Expand Up @@ -152,6 +158,9 @@ def datastore_create(context, data_dict):
except InvalidDataError as err:
raise p.toolkit.ValidationError(text_type(err))

if data_dict.get('calculate_record_count', False):
backend.calculate_record_count(data_dict['resource_id'])

# Set the datastore_active flag on the resource if necessary
model = _get_or_bust(context, 'model')
resobj = model.Resource.get(data_dict['resource_id'])
Expand Down Expand Up @@ -229,6 +238,12 @@ def datastore_upsert(context, data_dict):
Possible options are: upsert, insert, update
(optional, default: upsert)
:type method: string
:param calculate_record_count: updates the stored count of records, used to
optimize datastore_search in combination with the
`total_estimation_threshold` parameter. If doing a series of requests
to change a resource, you only need to set this to True on the last
request.
:type calculate_record_count: bool (optional, default: False)
:param dry_run: set to True to abort transaction instead of committing,
e.g. to check for validation or type errors.
:type dry_run: bool (optional, default: False)
Expand Down Expand Up @@ -264,6 +279,10 @@ def datastore_upsert(context, data_dict):
result = backend.upsert(context, data_dict)
result.pop('id', None)
result.pop('connection_url', None)

if data_dict.get('calculate_record_count', False):
backend.calculate_record_count(data_dict['resource_id'])

return result


Expand Down Expand Up @@ -306,14 +325,20 @@ def datastore_delete(context, data_dict):
If missing delete whole table and all dependent views.
(optional)
:type filters: dictionary
:param calculate_record_count: updates the stored count of records, used to
optimize datastore_search in combination with the
`total_estimation_threshold` parameter. If doing a series of requests
to change a resource, you only need to set this to True on the last
request.
:type calculate_record_count: bool (optional, default: False)
**Results:**
:returns: Original filters sent.
:rtype: dictionary
'''
schema = context.get('schema', dsschema.datastore_upsert_schema())
schema = context.get('schema', dsschema.datastore_delete_schema())
backend = DatastoreBackend.get_active_backend()

# Remove any applied filters before running validation.
Expand Down Expand Up @@ -349,6 +374,9 @@ def datastore_delete(context, data_dict):

result = backend.delete(context, data_dict)

if data_dict.get('calculate_record_count', False):
backend.calculate_record_count(data_dict['resource_id'])

# Set the datastore_active flag on the resource if necessary
model = _get_or_bust(context, 'model')
resource = model.Resource.get(data_dict['resource_id'])
Expand Down Expand Up @@ -405,6 +433,17 @@ def datastore_search(context, data_dict):
:param include_total: True to return total matching record count
(optional, default: true)
:type include_total: bool
:param total_estimation_threshold: If "include_total" is True and
"total_estimation_threshold" is not None and the estimated total
(matching record count) is above the "total_estimation_threshold" then
this datastore_search will return an *estimate* of the total, rather
than a precise one. This is often good enough, and saves
computationally expensive row counting for larger results (e.g. >100000
rows). The estimated total comes from the PostgreSQL table statistics,
generated when Express Loader or DataPusher finishes a load, or by
autovacuum. NB Currently estimation can't be done if the user specifies
'filters' or 'distinct' options. (optional, default: None)
:type total_estimation_threshold: int or None
:param records_format: the format for the records return value:
'objects' (default) list of {fieldname1: value1, ...} dicts,
'lists' list of [value1, value2, ...] lists,
Expand Down Expand Up @@ -438,6 +477,8 @@ def datastore_search(context, data_dict):
:type filters: list of dictionaries
:param total: number of total matching records
:type total: int
:param total_was_estimated: whether or not the total was estimated
:type total_was_estimated: bool
:param records: list of matching results
:type records: depends on records_format value passed
Expand Down
4 changes: 4 additions & 0 deletions ckanext/datastore/logic/auth.py
Expand Up @@ -83,3 +83,7 @@ def datastore_function_delete(context, data_dict):
def datastore_run_triggers(context, data_dict):
'''sysadmin-only: functions can be used to skip access checks'''
return {'success': False}


def datastore_analyze(context, data_dict):
return {'success': False}
13 changes: 13 additions & 0 deletions ckanext/datastore/logic/schema.py
Expand Up @@ -122,6 +122,8 @@ def datastore_create_schema():
OneOf([u'row'])],
'function': [not_empty, unicode_only],
},
'calculate_record_count': [ignore_missing, default(False),
boolean_validator],
'__junk': [empty],
'__before': [rename('id', 'resource_id')]
}
Expand All @@ -135,6 +137,8 @@ def datastore_upsert_schema():
'id': [ignore_missing],
'method': [ignore_missing, text_type, OneOf(
['upsert', 'insert', 'update'])],
'calculate_record_count': [ignore_missing, default(False),
boolean_validator],
'dry_run': [ignore_missing, boolean_validator],
'__junk': [empty],
'__before': [rename('id', 'resource_id')]
Expand All @@ -147,6 +151,8 @@ def datastore_delete_schema():
'resource_id': [not_missing, not_empty, text_type],
'force': [ignore_missing, boolean_validator],
'id': [ignore_missing],
'calculate_record_count': [ignore_missing, default(False),
boolean_validator],
'__junk': [empty],
'__before': [rename('id', 'resource_id')]
}
Expand All @@ -167,6 +173,7 @@ def datastore_search_schema():
'sort': [ignore_missing, list_of_strings_or_string],
'distinct': [ignore_missing, boolean_validator],
'include_total': [default(True), boolean_validator],
'total_estimation_threshold': [default(None), int_validator],
'records_format': [
default(u'objects'),
OneOf([u'objects', u'lists', u'csv', u'tsv'])],
Expand Down Expand Up @@ -195,3 +202,9 @@ def datastore_function_delete_schema():
'name': [unicode_only, not_empty],
'if_exists': [default(False), boolean_validator],
}


def datastore_analyze_schema():
return {
'resource_id': [text_type, resource_id_exists],
}
15 changes: 15 additions & 0 deletions ckanext/datastore/tests/helpers.py
Expand Up @@ -60,6 +60,21 @@ def set_url_type(resources, user):
p.toolkit.get_action('resource_update')(context, resource)


def execute_sql(sql, *args):
engine = db.get_write_engine()
session = orm.scoped_session(orm.sessionmaker(bind=engine))
return session.connection().execute(sql, *args)


def when_was_last_analyze(resource_id):
results = execute_sql(
'''SELECT last_analyze
FROM pg_stat_user_tables
WHERE relname=%s;
''', resource_id).fetchall()
return results[0][0]


class DatastoreFunctionalTestBase(FunctionalTestBase):
_load_plugins = (u'datastore', )

Expand Down
46 changes: 35 additions & 11 deletions ckanext/datastore/tests/test_create.py
@@ -1,8 +1,7 @@
# encoding: utf-8

import json
import nose
from nose.tools import assert_equal, raises
from nose.tools import assert_equal, assert_not_equal, raises

import sqlalchemy.orm as orm
from ckan.tests.helpers import _get_test_app
Expand All @@ -11,13 +10,13 @@
import ckan.plugins as p
import ckan.lib.create_test_data as ctd
import ckan.model as model
import ckan.tests.legacy as tests
import ckan.tests.helpers as helpers
import ckan.tests.factories as factories

import ckanext.datastore.backend.postgres as db
from ckanext.datastore.tests.helpers import (
set_url_type, DatastoreFunctionalTestBase, DatastoreLegacyTestBase)
set_url_type, DatastoreFunctionalTestBase, DatastoreLegacyTestBase,
execute_sql, when_was_last_analyze)
from ckan.plugins.toolkit import ValidationError


Expand Down Expand Up @@ -163,7 +162,7 @@ def _has_index_on_field(self, resource_id, field):
pg_class.relname = %s
"""
index_name = db._generate_index_name(resource_id, field)
results = self._execute_sql(sql, index_name).fetchone()
results = execute_sql(sql, index_name).fetchone()
return bool(results)

def _get_index_names(self, resource_id):
Expand All @@ -180,14 +179,9 @@ def _get_index_names(self, resource_id):
AND t.relkind = 'r'
AND t.relname = %s
"""
results = self._execute_sql(sql, resource_id).fetchall()
results = execute_sql(sql, resource_id).fetchall()
return [result[0] for result in results]

def _execute_sql(self, sql, *args):
engine = db.get_write_engine()
session = orm.scoped_session(orm.sessionmaker(bind=engine))
return session.connection().execute(sql, *args)

def test_sets_datastore_active_on_resource_on_create(self):
resource = factories.Resource()

Expand Down Expand Up @@ -244,6 +238,36 @@ def test_create_exceeds_column_name_limit(self):
}
result = helpers.call_action('datastore_create', **data)

def test_calculate_record_count_is_false(self):
resource = factories.Resource()
data = {
'resource_id': resource['id'],
'fields': [{'id': 'name', 'type': 'text'},
{'id': 'age', 'type': 'text'}],
'records': [{"name": "Sunita", "age": "51"},
{"name": "Bowan", "age": "68"}],
'force': True,
}
helpers.call_action('datastore_create', **data)
last_analyze = when_was_last_analyze(resource['id'])
assert_equal(last_analyze, None)

def test_calculate_record_count(self):
# how datapusher loads data (send_resource_to_datastore)
resource = factories.Resource()
data = {
'resource_id': resource['id'],
'fields': [{'id': 'name', 'type': 'text'},
{'id': 'age', 'type': 'text'}],
'records': [{"name": "Sunita", "age": "51"},
{"name": "Bowan", "age": "68"}],
'calculate_record_count': True,
'force': True,
}
helpers.call_action('datastore_create', **data)
last_analyze = when_was_last_analyze(resource['id'])
assert_not_equal(last_analyze, None)


class TestDatastoreCreate(DatastoreLegacyTestBase):
sysadmin_user = None
Expand Down

0 comments on commit aa32b23

Please sign in to comment.