Skip to content

Commit

Permalink
Merge pull request #4562 from ckan/4561-limit-datastore_search
Browse files Browse the repository at this point in the history
Add a hard upper limit to datastore_search(_sql) rows returned
  • Loading branch information
David Read committed Jan 27, 2019
2 parents b8568db + 8bc80d7 commit 4171ce9
Show file tree
Hide file tree
Showing 9 changed files with 483 additions and 113 deletions.
7 changes: 4 additions & 3 deletions CHANGELOG.rst
Expand Up @@ -16,9 +16,10 @@ Minor changes:
the value is False, 0, [] or {} (#4448)
* If you've customized the schema for package_search, you'll need to add to it
the limiting of ``row``, as per default_package_search_schema now does (#4484)
* Several logic functions now have new limits to how many items can be
returned, notably ``group_list`` and ``organization_list`` when
``all_fields=true``. These are all configurable. (#4484)
* Several logic functions now have new upper limits to how many items can be
returned, notably ``group_list``, ``organization_list`` when
``all_fields=true``, ``datastore_search`` and ``datastore_search_sql``.
These are all configurable. (#4484, #4562)

Bugfixes:

Expand Down
20 changes: 18 additions & 2 deletions ckanext/datastore/backend/postgres.py
Expand Up @@ -1375,7 +1375,7 @@ def _execute_single_statement_copy_to(context, sql_string, where_values, buf):
cursor.close()


def format_results(context, results, data_dict):
def format_results(context, results, data_dict, rows_max):
result_fields = []
for field in results.cursor.description:
result_fields.append({
Expand All @@ -1391,6 +1391,8 @@ def format_results(context, results, data_dict):
field['type'])
records.append(converted_row)
data_dict['records'] = records
if data_dict.get('records_truncated', False):
data_dict['records'].pop()
data_dict['fields'] = result_fields

return _unrename_json_field(data_dict)
Expand Down Expand Up @@ -1550,6 +1552,11 @@ def search_sql(context, data_dict):

sql = data_dict['sql'].replace('%', '%%')

# limit the number of results to ckan.datastore.search.rows_max + 1
# (the +1 is so that we know if the results went over the limit or not)
rows_max = int(config.get('ckan.datastore.search.rows_max', 32000))
sql = 'SELECT * FROM ({0}) AS blah LIMIT {1} ;'.format(sql, rows_max + 1)

try:

context['connection'].execute(
Expand All @@ -1566,7 +1573,10 @@ def search_sql(context, data_dict):

results = context['connection'].execute(sql)

return format_results(context, results, data_dict)
if results.rowcount == rows_max + 1:
data_dict['records_truncated'] = True

return format_results(context, results, data_dict, rows_max)

except ProgrammingError as e:
if e.orig.pgcode == _PG_ERR_CODE['permission_denied']:
Expand Down Expand Up @@ -1718,6 +1728,11 @@ def configure(self, config):
else:
self._check_urls_and_permissions()

# check rows_max is valid on CKAN start-up
rows_max = config.get('ckan.datastore.search.rows_max')
if rows_max is not None:
int(rows_max)

def datastore_delete(self, context, data_dict, fields_types, query_dict):
query_dict['where'] += _where_clauses(data_dict, fields_types)
return query_dict
Expand All @@ -1739,6 +1754,7 @@ def datastore_search(self, context, data_dict, fields_types, query_dict):
else:
field_ids = fields_types.keys()

# add default limit here just in case - already defaulted in the schema
limit = data_dict.get('limit', 100)
offset = data_dict.get('offset', 0)

Expand Down
16 changes: 13 additions & 3 deletions ckanext/datastore/controller.py
Expand Up @@ -14,6 +14,7 @@
render,
c,
h,
config,
)
from ckanext.datastore.writer import (
csv_writer,
Expand Down Expand Up @@ -175,6 +176,15 @@ def result_page(offs, lim):

result = result_page(offset, limit)

if result['limit'] != limit:
# `limit` (from PAGINATE_BY) must have been more than
# ckan.datastore.search.rows_max, so datastore_search responded with a
# limit matching ckan.datastore.search.rows_max. So we need to paginate
# by that amount instead, otherwise we'll have gaps in the records.
paginate_by = result['limit']
else:
paginate_by = PAGINATE_BY

with start_writer(result['fields']) as wr:
while True:
if limit is not None and limit <= 0:
Expand All @@ -185,14 +195,14 @@ def result_page(offs, lim):
wr.write_records(records)

if records_format == 'objects' or records_format == 'lists':
if len(records) < PAGINATE_BY:
if len(records) < paginate_by:
break
elif not records:
break

offset += PAGINATE_BY
offset += paginate_by
if limit is not None:
limit -= PAGINATE_BY
limit -= paginate_by
if limit <= 0:
break

Expand Down
25 changes: 20 additions & 5 deletions ckanext/datastore/logic/action.py
Expand Up @@ -399,8 +399,10 @@ def datastore_delete(context, data_dict):
def datastore_search(context, data_dict):
'''Search a DataStore resource.
The datastore_search action allows you to search data in a resource.
DataStore resources that belong to private CKAN resource can only be
The datastore_search action allows you to search data in a resource. By
default 100 rows are returned - see the `limit` parameter for more info.
A DataStore resource that belongs to a private CKAN resource can only be
read by you if you have access to the CKAN resource and send the
appropriate authorization.
Expand All @@ -420,7 +422,10 @@ def datastore_search(context, data_dict):
:param language: language of the full text query
(optional, default: english)
:type language: string
:param limit: maximum number of rows to return (optional, default: 100)
:param limit: maximum number of rows to return
(optional, default: ``100``, unless set in the site's configuration
``ckan.datastore.search.rows_default``, upper limit: ``32000`` unless
set in site's configuration ``ckan.datastore.search.rows_max``)
:type limit: int
:param offset: offset this number of rows (optional)
:type offset: int
Expand Down Expand Up @@ -471,7 +476,9 @@ def datastore_search(context, data_dict):
:type fields: list of dictionaries
:param offset: query offset value
:type offset: int
:param limit: query limit value
:param limit: queried limit value (if the requested ``limit`` was above the
``ckan.datastore.search.rows_max`` value then this response ``limit``
will be set to the value of ``ckan.datastore.search.rows_max``)
:type limit: int
:param filters: query filters
:type filters: list of dictionaries
Expand Down Expand Up @@ -522,13 +529,15 @@ def datastore_search_sql(context, data_dict):
engine is the
`PostgreSQL engine <http://www.postgresql.org/docs/9.1/interactive/>`_.
There is an enforced timeout on SQL queries to avoid an unintended DOS.
The number of results returned is limited to 32000, unless set in the
site's configuration ``ckan.datastore.search.rows_max``
Queries are only allowed if you have access to the all the CKAN resources
in the query and send the appropriate authorization.
.. note:: This action is not available when
:ref:`ckan.datastore.sqlsearch.enabled` is set to false
.. note:: When source data columns (i.e. CSV) heading names are provdied
.. note:: When source data columns (i.e. CSV) heading names are provided
in all UPPERCASE you need to double quote them in the SQL select
statement to avoid returning null results.
Expand All @@ -544,6 +553,12 @@ def datastore_search_sql(context, data_dict):
:type fields: list of dictionaries
:param records: list of matching results
:type records: list of dictionaries
:param records_truncated: indicates whether the number of records returned
was limited by the internal limit, which is 32000 records (or other
value set in the site's configuration
``ckan.datastore.search.rows_max``). If records are truncated by this,
this key has value True, otherwise the key is not returned at all.
:type records_truncated: bool
'''
backend = DatastoreBackend.get_active_backend()
Expand Down
9 changes: 8 additions & 1 deletion ckanext/datastore/logic/schema.py
Expand Up @@ -21,6 +21,9 @@
OneOf = get_validator('OneOf')
unicode_only = get_validator('unicode_only')
default = get_validator('default')
natural_number_validator = get_validator('natural_number_validator')
configured_default = get_validator('configured_default')
limit_to_configured_maximum = get_validator('limit_to_configured_maximum')


def rename(old, new):
Expand Down Expand Up @@ -167,7 +170,11 @@ def datastore_search_schema():
'plain': [ignore_missing, boolean_validator],
'filters': [ignore_missing, json_validator],
'language': [ignore_missing, text_type],
'limit': [ignore_missing, int_validator],
'limit': [
configured_default('ckan.datastore.search.rows_default', 100),
natural_number_validator,
limit_to_configured_maximum('ckan.datastore.search.rows_max',
32000)],
'offset': [ignore_missing, int_validator],
'fields': [ignore_missing, list_of_strings_or_string],
'sort': [ignore_missing, list_of_strings_or_string],
Expand Down
115 changes: 100 additions & 15 deletions ckanext/datastore/tests/test_dump.py
Expand Up @@ -2,6 +2,7 @@

from nose.tools import assert_equals, assert_in
import mock
import json

from ckanext.datastore.tests.helpers import DatastoreFunctionalTestBase
import ckan.tests.helpers as helpers
Expand Down Expand Up @@ -428,6 +429,21 @@ def test_dump_xml(self):
)
assert_equals(content, expected_content)

@helpers.change_config('ckan.datastore.search.rows_max', '3')
def test_dump_with_low_rows_max(self):
resource = factories.Resource()
data = {
'resource_id': resource['id'],
'force': True,
'records': [{u'record': str(num)} for num in range(12)],
}
helpers.call_action('datastore_create', **data)

app = self._get_test_app()
response = app.get('/datastore/dump/{0}'.format(str(resource['id'])))
assert_equals(get_csv_record_values(response.body),
range(12))

@mock.patch('ckanext.datastore.controller.PAGINATE_BY', 5)
def test_dump_pagination(self):
resource = factories.Resource()
Expand All @@ -440,12 +456,10 @@ def test_dump_pagination(self):

app = self._get_test_app()
response = app.get('/datastore/dump/{0}'.format(str(resource['id'])))
assert_equals(
'_id,record\r\n'
'1,0\n2,1\n3,2\n4,3\n5,4\n6,5\n7,6\n8,7\n9,8\n10,9\n'
'11,10\n12,11\n',
response.body)
assert_equals(get_csv_record_values(response.body),
range(12))

@helpers.change_config('ckan.datastore.search.rows_max', '7')
@mock.patch('ckanext.datastore.controller.PAGINATE_BY', 5)
def test_dump_pagination_csv_with_limit(self):
resource = factories.Resource()
Expand All @@ -456,14 +470,62 @@ def test_dump_pagination_csv_with_limit(self):
}
helpers.call_action('datastore_create', **data)

app = self._get_test_app()
response = app.get('/datastore/dump/{0}?limit=11'.format(
str(resource['id'])))
assert_equals(get_csv_record_values(response.body),
range(11))

@helpers.change_config('ckan.datastore.search.rows_max', '7')
@mock.patch('ckanext.datastore.controller.PAGINATE_BY', 6)
def test_dump_pagination_csv_with_limit_same_as_paginate(self):
resource = factories.Resource()
data = {
'resource_id': resource['id'],
'force': True,
'records': [{u'record': str(num)} for num in range(12)],
}
helpers.call_action('datastore_create', **data)

app = self._get_test_app()
response = app.get('/datastore/dump/{0}?limit=6'.format(
str(resource['id'])))
assert_equals(
'_id,record\r\n'
'1,0\n2,1\n3,2\n4,3\n5,4\n6,5\n',
response.body)
assert_equals(get_csv_record_values(response.body),
range(6))

@helpers.change_config('ckan.datastore.search.rows_max', '6')
@mock.patch('ckanext.datastore.controller.PAGINATE_BY', 5)
def test_dump_pagination_with_rows_max(self):
resource = factories.Resource()
data = {
'resource_id': resource['id'],
'force': True,
'records': [{u'record': str(num)} for num in range(12)],
}
helpers.call_action('datastore_create', **data)

app = self._get_test_app()
response = app.get('/datastore/dump/{0}?limit=7'.format(str(resource['id'])))
assert_equals(get_csv_record_values(response.body),
range(7))

@helpers.change_config('ckan.datastore.search.rows_max', '6')
@mock.patch('ckanext.datastore.controller.PAGINATE_BY', 6)
def test_dump_pagination_with_rows_max_same_as_paginate(self):
resource = factories.Resource()
data = {
'resource_id': resource['id'],
'force': True,
'records': [{u'record': str(num)} for num in range(12)],
}
helpers.call_action('datastore_create', **data)

app = self._get_test_app()
response = app.get('/datastore/dump/{0}?limit=7'.format(str(resource['id'])))
assert_equals(get_csv_record_values(response.body),
range(7))

@helpers.change_config('ckan.datastore.search.rows_max', '7')
@mock.patch('ckanext.datastore.controller.PAGINATE_BY', 5)
def test_dump_pagination_json_with_limit(self):
resource = factories.Resource()
Expand All @@ -477,9 +539,32 @@ def test_dump_pagination_json_with_limit(self):
app = self._get_test_app()
response = app.get('/datastore/dump/{0}?limit=6&format=json'.format(
str(resource['id'])))
assert_equals(
'{\n "fields": [{"type":"int","id":"_id"},'
'{"type":"int4","id":"record"}],\n'
' "records": [\n [1,0],\n [2,1],\n [3,2],\n [4,3],\n'
' [5,4],\n [6,5]\n]}\n',
response.body)
assert_equals(get_json_record_values(response.body),
range(6))

@helpers.change_config('ckan.datastore.search.rows_max', '6')
@mock.patch('ckanext.datastore.controller.PAGINATE_BY', 5)
def test_dump_pagination_json_with_rows_max(self):
resource = factories.Resource()
data = {
'resource_id': resource['id'],
'force': True,
'records': [{u'record': str(num)} for num in range(12)],
}
helpers.call_action('datastore_create', **data)

app = self._get_test_app()
response = app.get('/datastore/dump/{0}?limit=7&format=json'.format(
str(resource['id'])))
assert_equals(get_json_record_values(response.body),
range(7))


def get_csv_record_values(response_body):
return [int(record.split(',')[1])
for record in response_body.split()[1:]]


def get_json_record_values(response_body):
return [record[1]
for record in json.loads(response_body)['records']]

0 comments on commit 4171ce9

Please sign in to comment.