Skip to content

Commit

Permalink
[#1871] Disallow DataStore SQL queries involving system tables
Browse files Browse the repository at this point in the history
Using the `get_table_names_from_sql` helper function before running the
provided statement, we raise a Validation error if some of the tables start
with `pg_`.
  • Loading branch information
amercader committed Aug 5, 2014
1 parent 8f026f5 commit d51a6e3
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 3 deletions.
18 changes: 15 additions & 3 deletions ckanext/datastore/db.py
Expand Up @@ -1158,12 +1158,24 @@ def search_sql(context, data_dict):
timeout = context.get('query_timeout', _TIMEOUT)
_cache_types(context)

sql = data_dict['sql'].replace('%', '%%')

try:

context['connection'].execute(
u'SET LOCAL statement_timeout TO {0}'.format(timeout))
results = context['connection'].execute(
data_dict['sql'].replace('%', '%%')
)

table_names = datastore_helpers.get_table_names_from_sql(context, sql)
log.debug('Tables involved in input SQL: {0}'.format(table_names))

system_tables = [t for t in table_names if t.startswith('pg_')]
if len(system_tables):
raise toolkit.NotAuthorized({
'permissions': ['Not authorized to access system tables']
})

results = context['connection'].execute(sql)

return format_results(context, results, data_dict)

except ProgrammingError, e:
Expand Down
20 changes: 20 additions & 0 deletions ckanext/datastore/tests/test_search.py
Expand Up @@ -945,3 +945,23 @@ def test_making_resource_private_makes_datastore_private(self):
extra_environ=auth)
res_dict = json.loads(res.body)
assert res_dict['success'] is True

def test_not_authorized_to_access_system_tables(self):
test_cases = [
'SELECT * FROM pg_roles',
'SELECT * FROM pg_catalog.pg_database',
'SELECT rolpassword FROM pg_roles',
'''SELECT p.rolpassword
FROM pg_roles p
JOIN "{0}" r
ON p.rolpassword = r.author'''.format(self.data['resource_id']),
]
for query in test_cases:
data = {'sql': query.replace('\n', '')}
postparams = json.dumps(data)
res = self.app.post('/api/action/datastore_search_sql',
params=postparams,
status=403)
res_dict = json.loads(res.body)
assert res_dict['success'] is False
assert res_dict['error']['__type'] == 'Authorization Error'

0 comments on commit d51a6e3

Please sign in to comment.