From 8aa298dfcf13cc5f0b5853f6fe60d49948cb43f0 Mon Sep 17 00:00:00 2001 From: David Read Date: Fri, 26 Oct 2018 14:16:10 +0100 Subject: [PATCH] Adds an option to do the ANALYZE to datastore_create --- ckanext/datastore/backend/postgres.py | 13 +++++++ ckanext/datastore/logic/action.py | 49 +++++++++++++++----------- ckanext/datastore/logic/schema.py | 2 ++ ckanext/datastore/tests/test_create.py | 44 ++++++++++++++++++++++- 4 files changed, 86 insertions(+), 22 deletions(-) diff --git a/ckanext/datastore/backend/postgres.py b/ckanext/datastore/backend/postgres.py index 39582f8a68e..362a26981b5 100644 --- a/ckanext/datastore/backend/postgres.py +++ b/ckanext/datastore/backend/postgres.py @@ -1953,6 +1953,19 @@ def before_fork(self): # to avoid sharing them between parent and child processes. _dispose_engines() + def calculate_record_count(self, resource_id): + ''' + Calculate an estimate of the record/row count and store it in Postgresql's + pg_stat_user_tables. This number will be used when specifying + `total_estimation_threshold` + ''' + connection = get_write_engine().connect() + sql = 'ANALYZE "{}"'.format(resource_id) + try: + results = connection.execute(sql) + except sqlalchemy.exc.DatabaseError as err: + raise DatastoreException(err) + def create_function(name, arguments, rettype, definition, or_replace): sql = u''' diff --git a/ckanext/datastore/logic/action.py b/ckanext/datastore/logic/action.py index 6a257ec9d5c..1c78d006ce8 100644 --- a/ckanext/datastore/logic/action.py +++ b/ckanext/datastore/logic/action.py @@ -70,6 +70,12 @@ def datastore_create(context, data_dict): {"function": "trigger_clean_reference"}, {"function": "trigger_check_codes"}] :type triggers: list of dictionaries + :param calculate_record_count: updates the stored count of records, used to + optimize datastore_search in combination with the + `total_estimation_threshold` parameter. If doing a series of requests + to change a resource, you only need to set this to True on the last + request. + :type calculate_record_count: bool (optional, default: False) Please note that setting the ``aliases``, ``indexes`` or ``primary_key`` replaces the exising aliases or constraints. Setting ``records`` appends @@ -152,6 +158,9 @@ def datastore_create(context, data_dict): except InvalidDataError as err: raise p.toolkit.ValidationError(text_type(err)) + if data_dict.get('calculate_record_count', False): + backend.calculate_record_count(data_dict['resource_id']) + # Set the datastore_active flag on the resource if necessary model = _get_or_bust(context, 'model') resobj = model.Resource.get(data_dict['resource_id']) @@ -229,6 +238,12 @@ def datastore_upsert(context, data_dict): Possible options are: upsert, insert, update (optional, default: upsert) :type method: string + :param calculate_record_count: updates the stored count of records, used to + optimize datastore_search in combination with the + `total_estimation_threshold` parameter. If doing a series of requests + to change a resource, you only need to set this to True on the last + request. + :type calculate_record_count: bool (optional, default: False) :param dry_run: set to True to abort transaction instead of committing, e.g. to check for validation or type errors. :type dry_run: bool (optional, default: False) @@ -264,6 +279,10 @@ def datastore_upsert(context, data_dict): result = backend.upsert(context, data_dict) result.pop('id', None) result.pop('connection_url', None) + + if data_dict.get('calculate_record_count', False): + backend.calculate_record_count(data_dict['resource_id']) + return result @@ -306,6 +325,12 @@ def datastore_delete(context, data_dict): If missing delete whole table and all dependent views. (optional) :type filters: dictionary + :param calculate_record_count: updates the stored count of records, used to + optimize datastore_search in combination with the + `total_estimation_threshold` parameter. If doing a series of requests + to change a resource, you only need to set this to True on the last + request. + :type calculate_record_count: bool (optional, default: False) **Results:** @@ -349,6 +374,9 @@ def datastore_delete(context, data_dict): result = backend.delete(context, data_dict) + if data_dict.get('calculate_record_count', False): + backend.calculate_record_count(data_dict['resource_id']) + # Set the datastore_active flag on the resource if necessary model = _get_or_bust(context, 'model') resource = model.Resource.get(data_dict['resource_id']) @@ -651,24 +679,3 @@ def datastore_function_delete(context, data_dict): p.toolkit.check_access('datastore_function_delete', context, data_dict) backend = DatastoreBackend.get_active_backend() backend.drop_function(data_dict['name'], data_dict['if_exists']) - - -@logic.validate(dsschema.datastore_analyze_schema) -def datastore_analyze(context, data_dict): - '''Runs postgres's ANALYZE - - :param resource_id: resource id for the table that will be analyzed - :type resource_id: string - ''' - p.toolkit.check_access('datastore_analyze', context, data_dict) - backend = DatastoreBackend.get_active_backend() - connection = backend._get_write_engine().connect() - - result = backend.analyze(context, data_dict) - #move to backend/postgres.py - sql = 'ANALYZE "{}"'.format(data_dict['resource_id']) - try: - results = connection.execute(sql) - except sqlalchemy.exc.DatabaseError as err: - raise p.toolkit.ValidationError({ - u'records': [message.split(u') ', 1)[-1]]}) diff --git a/ckanext/datastore/logic/schema.py b/ckanext/datastore/logic/schema.py index 161ef8a9748..c5e3d31b486 100644 --- a/ckanext/datastore/logic/schema.py +++ b/ckanext/datastore/logic/schema.py @@ -122,6 +122,8 @@ def datastore_create_schema(): OneOf([u'row'])], 'function': [not_empty, unicode_only], }, + 'calculate_record_count': [ignore_missing, default(False), + boolean_validator], '__junk': [empty], '__before': [rename('id', 'resource_id')] } diff --git a/ckanext/datastore/tests/test_create.py b/ckanext/datastore/tests/test_create.py index 9533769b326..9b35e7a5831 100644 --- a/ckanext/datastore/tests/test_create.py +++ b/ckanext/datastore/tests/test_create.py @@ -2,7 +2,7 @@ import json import nose -from nose.tools import assert_equal, raises +from nose.tools import assert_equal, assert_not_equal, raises import sqlalchemy.orm as orm from ckan.tests.helpers import _get_test_app @@ -244,6 +244,48 @@ def test_create_exceeds_column_name_limit(self): } result = helpers.call_action('datastore_create', **data) + def test_analyze_not_run_by_default(self): + package = factories.Dataset(resources=[ + {'url': 'https://example.com/file.csv', 'format': 'csv', 'name': 'Image 1'}]) + resource_id = package['resources'][0]['id'] + data = { + 'resource_id': resource_id, + 'fields': [{'id': 'name', 'type': 'text'}, + {'id': 'age', 'type': 'text'}], + 'records': [{"name": "Sunita", "age": "51"}, + {"name": "Bowan", "age": "68"}], + 'force': True, + } + helpers.call_action('datastore_create', **data) + last_analyze = self._when_was_last_analyze(resource_id) + assert_equal(last_analyze, None) + + def test_create_with_records(self): + # how datapusher loads data (send_resource_to_datastore) + package = factories.Dataset(resources=[ + {'url': 'https://example.com/file.csv', 'format': 'csv', 'name': 'Image 1'}]) + resource_id = package['resources'][0]['id'] + data = { + 'resource_id': resource_id, + 'fields': [{'id': 'name', 'type': 'text'}, + {'id': 'age', 'type': 'text'}], + 'records': [{"name": "Sunita", "age": "51"}, + {"name": "Bowan", "age": "68"}], + 'calculate_record_count': True, + 'force': True, + } + helpers.call_action('datastore_create', **data) + last_analyze = self._when_was_last_analyze(resource_id) + assert_not_equal(last_analyze, None) + + def _when_was_last_analyze(self, resource_id): + results = self._execute_sql( + '''SELECT last_analyze + FROM pg_stat_user_tables + WHERE relname=%s; + ''', resource_id).fetchall() + return results[0][0] + class TestDatastoreCreate(DatastoreLegacyTestBase): sysadmin_user = None