Skip to content

Commit

Permalink
Merge pull request #3786 from ckan/3785-trigger-full-text-search
Browse files Browse the repository at this point in the history
Datastore full-text-search column is populated by postgres trigger rather than python
  • Loading branch information
David Read committed Sep 5, 2017
2 parents 177143f + c6b613c commit 8e36ade
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 80 deletions.
15 changes: 15 additions & 0 deletions CHANGELOG.rst
Expand Up @@ -7,6 +7,21 @@
Changelog
---------

v?? (TBA)
=========

Note: This version requires re-running the 'datastore set-permissions' command
(assuming you run DataStore). See: :ref:`datastore-set-permissions`

Otherwise new and updated datasets will not be searchable in DataStore and
the logs will contain this error::

ProgrammingError: (psycopg2.ProgrammingError) function populate_full_text_trigger() does not exist

CKAN developers should also re-run set-permissions on the test database:
:ref:`datastore-test-set-permissions`


v2.7.0 2017-08-02
=================

Expand Down
74 changes: 27 additions & 47 deletions ckanext/datastore/backend/postgres.py
Expand Up @@ -349,22 +349,6 @@ def _validate_record(record, num, field_names):
})


def _to_full_text(fields, record):
full_text = []
ft_types = ['int8', 'int4', 'int2', 'float4', 'float8', 'date', 'time',
'timetz', 'timestamp', 'numeric', 'text']
for field in fields:
value = record.get(field['id'])
if not value:
continue

if field['type'].lower() in ft_types and unicode(value):
full_text.append(unicode(value))
else:
full_text.extend(json_get_values(value))
return ' '.join(set(full_text))


def _where_clauses(data_dict, fields_types):
filters = data_dict.get('filters', {})
clauses = []
Expand Down Expand Up @@ -755,19 +739,6 @@ def convert(data, type_name):
return unicode(data)


def json_get_values(obj, current_list=None):
if current_list is None:
current_list = []
if isinstance(obj, list) or isinstance(obj, tuple):
for item in obj:
json_get_values(item, current_list)
elif isinstance(obj, dict):
json_get_values(obj.items(), current_list)
elif obj:
current_list.append(unicode(obj))
return current_list


def check_fields(context, fields):
'''Check if field types are valid.'''
for field in fields:
Expand Down Expand Up @@ -1052,8 +1023,8 @@ def upsert_data(context, data_dict):
fields = _get_fields(context, data_dict)
field_names = _pluck('id', fields)
records = data_dict['records']
sql_columns = ", ".join(['"%s"' % name.replace(
'%', '%%') for name in field_names] + ['"_full_text"'])
sql_columns = ", ".join(
identifier(name) for name in field_names)

if method == _INSERT:
rows = []
Expand All @@ -1067,13 +1038,12 @@ def upsert_data(context, data_dict):
# a tuple with an empty second value
value = (json.dumps(value), '')
row.append(value)
row.append(_to_full_text(fields, record))
rows.append(row)

sql_string = u'''INSERT INTO "{res_id}" ({columns})
VALUES ({values}, to_tsvector(%s));'''.format(
res_id=data_dict['resource_id'],
columns=sql_columns,
sql_string = u'''INSERT INTO {res_id} ({columns})
VALUES ({values});'''.format(
res_id=identifier(data_dict['resource_id']),
columns=sql_columns.replace('%', '%%'),
values=', '.join(['%s' for field in field_names])
)

Expand Down Expand Up @@ -1129,18 +1099,16 @@ def upsert_data(context, data_dict):

used_values = [record[field] for field in used_field_names]

full_text = _to_full_text(fields, record)

if method == _UPDATE:
sql_string = u'''
UPDATE "{res_id}"
SET ({columns}, "_full_text") = ({values}, to_tsvector(%s))
SET ({columns}, "_full_text") = ({values}, NULL)
WHERE ({primary_key}) = ({primary_value});
'''.format(
res_id=data_dict['resource_id'],
columns=u', '.join(
[u'"{0}"'.format(field)
for field in used_field_names]),
[identifier(field)
for field in used_field_names]).replace('%', '%%'),
values=u', '.join(
['%s' for _ in used_field_names]),
primary_key=u','.join(
Expand All @@ -1149,7 +1117,7 @@ def upsert_data(context, data_dict):
)
try:
results = context['connection'].execute(
sql_string, used_values + [full_text] + unique_values)
sql_string, used_values + unique_values)
except sqlalchemy.exc.DatabaseError as err:
raise ValidationError({
u'records': [_programming_error_summary(err)],
Expand All @@ -1164,10 +1132,10 @@ def upsert_data(context, data_dict):
elif method == _UPSERT:
sql_string = u'''
UPDATE "{res_id}"
SET ({columns}, "_full_text") = ({values}, to_tsvector(%s))
SET ({columns}, "_full_text") = ({values}, NULL)
WHERE ({primary_key}) = ({primary_value});
INSERT INTO "{res_id}" ({columns}, "_full_text")
SELECT {values}, to_tsvector(%s)
INSERT INTO "{res_id}" ({columns})
SELECT {values}
WHERE NOT EXISTS (SELECT 1 FROM "{res_id}"
WHERE ({primary_key}) = ({primary_value}));
'''.format(
Expand All @@ -1184,7 +1152,7 @@ def upsert_data(context, data_dict):
try:
context['connection'].execute(
sql_string,
(used_values + [full_text] + unique_values) * 2)
(used_values + unique_values) * 2)
except sqlalchemy.exc.DatabaseError as err:
raise ValidationError({
u'records': [_programming_error_summary(err)],
Expand Down Expand Up @@ -1416,7 +1384,8 @@ def _create_triggers(connection, resource_id, triggers):
or "for_each" parameters from triggers list.
'''
existing = connection.execute(
u'SELECT tgname FROM pg_trigger WHERE tgrelid = %s::regclass',
u"""SELECT tgname FROM pg_trigger
WHERE tgrelid = %s::regclass AND tgname LIKE 't___'""",
resource_id)
sql_list = (
[u'DROP TRIGGER {name} ON {table}'.format(
Expand All @@ -1438,6 +1407,14 @@ def _create_triggers(connection, resource_id, triggers):
raise ValidationError({u'triggers': [_programming_error_summary(pe)]})


def _create_fulltext_trigger(connection, resource_id):
connection.execute(
u'''CREATE TRIGGER zfulltext
BEFORE INSERT OR UPDATE ON {table}
FOR EACH ROW EXECUTE PROCEDURE populate_full_text_trigger()'''.format(
table=identifier(resource_id)))


def upsert(context, data_dict):
'''
This method combines upsert insert and update on the datastore. The method
Expand Down Expand Up @@ -1829,6 +1806,9 @@ def create(self, context, data_dict):
).fetchone()
if not result:
create_table(context, data_dict)
_create_fulltext_trigger(
context['connection'],
data_dict['resource_id'])
else:
alter_table(context, data_dict)
if 'triggers' in data_dict:
Expand Down
16 changes: 8 additions & 8 deletions ckanext/datastore/logic/action.py
Expand Up @@ -450,16 +450,16 @@ def datastore_search(context, data_dict):

res_id = data_dict['resource_id']

res_exists, real_id = backend.resource_id_from_alias(res_id)
# Resource only has to exist in the datastore (because it could be an
# alias)
if data_dict['resource_id'] not in WHITELISTED_RESOURCES:
res_exists, real_id = backend.resource_id_from_alias(res_id)
# Resource only has to exist in the datastore (because it could be an
# alias)

if not res_exists:
raise p.toolkit.ObjectNotFound(p.toolkit._(
'Resource "{0}" was not found.'.format(res_id)
))
if not res_exists:
raise p.toolkit.ObjectNotFound(p.toolkit._(
'Resource "{0}" was not found.'.format(res_id)
))

if data_dict['resource_id'] not in WHITELISTED_RESOURCES:
# Replace potential alias with real id to simplify access checks
if real_id:
data_dict['resource_id'] = real_id
Expand Down
45 changes: 41 additions & 4 deletions ckanext/datastore/set_permissions.sql
Expand Up @@ -49,23 +49,60 @@ GRANT SELECT ON ALL TABLES IN SCHEMA public TO {readuser};
ALTER DEFAULT PRIVILEGES FOR USER {writeuser} IN SCHEMA public
GRANT SELECT ON TABLES TO {readuser};

-- a view for listing valid table (resource id) and view names
CREATE OR REPLACE VIEW "_table_metadata" AS
SELECT DISTINCT
substr(md5(dependee.relname || COALESCE(dependent.relname, '')), 0, 17) AS "_id",
dependee.relname AS name,
dependee.oid AS oid,
dependent.relname AS alias_of
-- dependent.oid AS oid
FROM
pg_class AS dependee
LEFT OUTER JOIN pg_rewrite AS r ON r.ev_class = dependee.oid
LEFT OUTER JOIN pg_depend AS d ON d.objid = r.oid
LEFT OUTER JOIN pg_class AS dependent ON d.refobjid = dependent.oid
WHERE
(dependee.oid != dependent.oid OR dependent.oid IS NULL) AND
(dependee.relname IN (SELECT tablename FROM pg_catalog.pg_tables)
OR dependee.relname IN (SELECT viewname FROM pg_catalog.pg_views)) AND
dependee.relnamespace = (SELECT oid FROM pg_namespace WHERE nspname='public')
-- is a table (from pg_tables view definition)
-- or is a view (from pg_views view definition)
(dependee.relkind = 'r'::"char" OR dependee.relkind = 'v'::"char")
AND dependee.relnamespace = (
SELECT oid FROM pg_namespace WHERE nspname='public')
ORDER BY dependee.oid DESC;
ALTER VIEW "_table_metadata" OWNER TO {writeuser};
GRANT SELECT ON "_table_metadata" TO {readuser};

-- _full_text fields are now updated by a trigger when set to NULL
CREATE OR REPLACE FUNCTION populate_full_text_trigger() RETURNS trigger
AS $body$
BEGIN
IF NEW._full_text IS NOT NULL THEN
RETURN NEW;
END IF;
NEW._full_text := (
SELECT to_tsvector(string_agg(value, ' '))
FROM json_each_text(row_to_json(NEW.*))
WHERE key NOT LIKE '\_%');
RETURN NEW;
END;
$body$ LANGUAGE plpgsql;
ALTER FUNCTION populate_full_text_trigger() OWNER TO {writeuser};

-- migrate existing tables that don't have full text trigger applied
DO $body$
BEGIN
EXECUTE coalesce(
(SELECT string_agg(
'CREATE TRIGGER zfulltext BEFORE INSERT OR UPDATE ON ' ||
quote_ident(relname) || ' FOR EACH ROW EXECUTE PROCEDURE ' ||
'populate_full_text_trigger();', ' ')
FROM pg_class
LEFT OUTER JOIN pg_trigger AS t
ON t.tgrelid = relname::regclass AND t.tgname = 'zfulltext'
WHERE relkind = 'r'::"char" AND t.tgname IS NULL
AND relnamespace = (
SELECT oid FROM pg_namespace WHERE nspname='public')),
'SELECT 1;');
END;
$body$;

2 changes: 1 addition & 1 deletion ckanext/datastore/tests/helpers.py
Expand Up @@ -26,7 +26,7 @@ def clear_db(Session):
SELECT 'drop function ' || quote_ident(proname) || '();'
FROM pg_proc
INNER JOIN pg_namespace ns ON (pg_proc.pronamespace = ns.oid)
WHERE ns.nspname = 'public'
WHERE ns.nspname = 'public' AND proname != 'populate_full_text_trigger'
'''
drop_functions = u''.join(r[0] for r in c.execute(drop_functions_sql))
if drop_functions:
Expand Down
20 changes: 0 additions & 20 deletions ckanext/datastore/tests/test_db.py
Expand Up @@ -173,26 +173,6 @@ def test_upsert_with_insert_method_and_invalid_data(
backend.InvalidDataError, db.upsert_data, context, data_dict)


class TestJsonGetValues(object):
def test_returns_empty_list_if_called_with_none(self):
assert_equal(db.json_get_values(None), [])

def test_returns_list_with_value_if_called_with_string(self):
assert_equal(db.json_get_values('foo'), ['foo'])

def test_returns_list_with_only_the_original_truthy_values_if_called(self):
data = [None, 'foo', 42, 'bar', {}, []]
assert_equal(db.json_get_values(data), ['foo', '42', 'bar'])

def test_returns_flattened_list(self):
data = ['foo', ['bar', ('baz', 42)]]
assert_equal(db.json_get_values(data), ['foo', 'bar', 'baz', '42'])

def test_returns_only_truthy_values_from_dict(self):
data = {'foo': 'bar', 'baz': [42, None, {}, [], 'hey']}
assert_equal(db.json_get_values(data), ['foo', 'bar', 'baz', '42', 'hey'])


class TestGetAllResourcesIdsInDatastore(object):
@classmethod
def setup_class(cls):
Expand Down
4 changes: 4 additions & 0 deletions doc/contributing/test.rst
Expand Up @@ -40,6 +40,7 @@ environment:
pip install -r |virtualenv|/src/ckan/dev-requirements.txt
.. _datastore-test-set-permissions:

~~~~~~~~~~~~~~~~~~~~~~~~~
Set up the test databases
Expand All @@ -55,6 +56,9 @@ Create test databases:
sudo -u postgres createdb -O |database_user| |test_database| -E utf-8
sudo -u postgres createdb -O |database_user| |test_datastore| -E utf-8
Set the permissions::

paster datastore set-permissions -c test-core.ini | sudo -u postgres psql

This database connection is specified in the ``test-core.ini`` file by the
Expand Down
2 changes: 2 additions & 0 deletions doc/maintaining/datastore.rst
Expand Up @@ -120,6 +120,8 @@ if necessary, for example:
Replace ``pass`` with the passwords you created for your |database_user| and
|datastore_user| database users.

.. _datastore-set-permissions:

Set permissions
---------------

Expand Down

0 comments on commit 8e36ade

Please sign in to comment.