Skip to content

Commit

Permalink
Adds an option to do the ANALYZE to datastore_create
Browse files Browse the repository at this point in the history
  • Loading branch information
David Read committed Oct 26, 2018
1 parent 784d197 commit 8aa298d
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 22 deletions.
13 changes: 13 additions & 0 deletions ckanext/datastore/backend/postgres.py
Expand Up @@ -1953,6 +1953,19 @@ def before_fork(self):
# to avoid sharing them between parent and child processes.
_dispose_engines()

def calculate_record_count(self, resource_id):
'''
Calculate an estimate of the record/row count and store it in Postgresql's
pg_stat_user_tables. This number will be used when specifying
`total_estimation_threshold`
'''
connection = get_write_engine().connect()
sql = 'ANALYZE "{}"'.format(resource_id)
try:
results = connection.execute(sql)
except sqlalchemy.exc.DatabaseError as err:
raise DatastoreException(err)


def create_function(name, arguments, rettype, definition, or_replace):
sql = u'''
Expand Down
49 changes: 28 additions & 21 deletions ckanext/datastore/logic/action.py
Expand Up @@ -70,6 +70,12 @@ def datastore_create(context, data_dict):
{"function": "trigger_clean_reference"},
{"function": "trigger_check_codes"}]
:type triggers: list of dictionaries
:param calculate_record_count: updates the stored count of records, used to
optimize datastore_search in combination with the
`total_estimation_threshold` parameter. If doing a series of requests
to change a resource, you only need to set this to True on the last
request.
:type calculate_record_count: bool (optional, default: False)
Please note that setting the ``aliases``, ``indexes`` or ``primary_key``
replaces the exising aliases or constraints. Setting ``records`` appends
Expand Down Expand Up @@ -152,6 +158,9 @@ def datastore_create(context, data_dict):
except InvalidDataError as err:
raise p.toolkit.ValidationError(text_type(err))

if data_dict.get('calculate_record_count', False):
backend.calculate_record_count(data_dict['resource_id'])

# Set the datastore_active flag on the resource if necessary
model = _get_or_bust(context, 'model')
resobj = model.Resource.get(data_dict['resource_id'])
Expand Down Expand Up @@ -229,6 +238,12 @@ def datastore_upsert(context, data_dict):
Possible options are: upsert, insert, update
(optional, default: upsert)
:type method: string
:param calculate_record_count: updates the stored count of records, used to
optimize datastore_search in combination with the
`total_estimation_threshold` parameter. If doing a series of requests
to change a resource, you only need to set this to True on the last
request.
:type calculate_record_count: bool (optional, default: False)
:param dry_run: set to True to abort transaction instead of committing,
e.g. to check for validation or type errors.
:type dry_run: bool (optional, default: False)
Expand Down Expand Up @@ -264,6 +279,10 @@ def datastore_upsert(context, data_dict):
result = backend.upsert(context, data_dict)
result.pop('id', None)
result.pop('connection_url', None)

if data_dict.get('calculate_record_count', False):
backend.calculate_record_count(data_dict['resource_id'])

return result


Expand Down Expand Up @@ -306,6 +325,12 @@ def datastore_delete(context, data_dict):
If missing delete whole table and all dependent views.
(optional)
:type filters: dictionary
:param calculate_record_count: updates the stored count of records, used to
optimize datastore_search in combination with the
`total_estimation_threshold` parameter. If doing a series of requests
to change a resource, you only need to set this to True on the last
request.
:type calculate_record_count: bool (optional, default: False)
**Results:**
Expand Down Expand Up @@ -349,6 +374,9 @@ def datastore_delete(context, data_dict):

result = backend.delete(context, data_dict)

if data_dict.get('calculate_record_count', False):
backend.calculate_record_count(data_dict['resource_id'])

# Set the datastore_active flag on the resource if necessary
model = _get_or_bust(context, 'model')
resource = model.Resource.get(data_dict['resource_id'])
Expand Down Expand Up @@ -651,24 +679,3 @@ def datastore_function_delete(context, data_dict):
p.toolkit.check_access('datastore_function_delete', context, data_dict)
backend = DatastoreBackend.get_active_backend()
backend.drop_function(data_dict['name'], data_dict['if_exists'])


@logic.validate(dsschema.datastore_analyze_schema)
def datastore_analyze(context, data_dict):
'''Runs postgres's ANALYZE
:param resource_id: resource id for the table that will be analyzed
:type resource_id: string
'''
p.toolkit.check_access('datastore_analyze', context, data_dict)
backend = DatastoreBackend.get_active_backend()
connection = backend._get_write_engine().connect()

result = backend.analyze(context, data_dict)
#move to backend/postgres.py
sql = 'ANALYZE "{}"'.format(data_dict['resource_id'])
try:
results = connection.execute(sql)
except sqlalchemy.exc.DatabaseError as err:
raise p.toolkit.ValidationError({
u'records': [message.split(u') ', 1)[-1]]})
2 changes: 2 additions & 0 deletions ckanext/datastore/logic/schema.py
Expand Up @@ -122,6 +122,8 @@ def datastore_create_schema():
OneOf([u'row'])],
'function': [not_empty, unicode_only],
},
'calculate_record_count': [ignore_missing, default(False),
boolean_validator],
'__junk': [empty],
'__before': [rename('id', 'resource_id')]
}
Expand Down
44 changes: 43 additions & 1 deletion ckanext/datastore/tests/test_create.py
Expand Up @@ -2,7 +2,7 @@

import json
import nose
from nose.tools import assert_equal, raises
from nose.tools import assert_equal, assert_not_equal, raises

import sqlalchemy.orm as orm
from ckan.tests.helpers import _get_test_app
Expand Down Expand Up @@ -244,6 +244,48 @@ def test_create_exceeds_column_name_limit(self):
}
result = helpers.call_action('datastore_create', **data)

def test_analyze_not_run_by_default(self):
package = factories.Dataset(resources=[
{'url': 'https://example.com/file.csv', 'format': 'csv', 'name': 'Image 1'}])
resource_id = package['resources'][0]['id']
data = {
'resource_id': resource_id,
'fields': [{'id': 'name', 'type': 'text'},
{'id': 'age', 'type': 'text'}],
'records': [{"name": "Sunita", "age": "51"},
{"name": "Bowan", "age": "68"}],
'force': True,
}
helpers.call_action('datastore_create', **data)
last_analyze = self._when_was_last_analyze(resource_id)
assert_equal(last_analyze, None)

def test_create_with_records(self):
# how datapusher loads data (send_resource_to_datastore)
package = factories.Dataset(resources=[
{'url': 'https://example.com/file.csv', 'format': 'csv', 'name': 'Image 1'}])
resource_id = package['resources'][0]['id']
data = {
'resource_id': resource_id,
'fields': [{'id': 'name', 'type': 'text'},
{'id': 'age', 'type': 'text'}],
'records': [{"name": "Sunita", "age": "51"},
{"name": "Bowan", "age": "68"}],
'calculate_record_count': True,
'force': True,
}
helpers.call_action('datastore_create', **data)
last_analyze = self._when_was_last_analyze(resource_id)
assert_not_equal(last_analyze, None)

def _when_was_last_analyze(self, resource_id):
results = self._execute_sql(
'''SELECT last_analyze
FROM pg_stat_user_tables
WHERE relname=%s;
''', resource_id).fetchall()
return results[0][0]


class TestDatastoreCreate(DatastoreLegacyTestBase):
sysadmin_user = None
Expand Down

0 comments on commit 8aa298d

Please sign in to comment.