diff --git a/ckanext/datastore/db.py b/ckanext/datastore/db.py index 63b026969f7..0edca4c51b7 100644 --- a/ckanext/datastore/db.py +++ b/ckanext/datastore/db.py @@ -32,9 +32,11 @@ def __init__(self, error_dict): _engines = {} # See http://www.postgresql.org/docs/9.2/static/errcodes-appendix.html -_pg_err_code = { +_PG_ERR_CODE = { 'unique_violation': 23505, - 'query_canceled': 57014 + 'query_canceled': 57014, + 'undefined_object': 42704, + 'syntax_error': 42601 } _date_formats = ['%Y-%m-%d', @@ -46,10 +48,10 @@ def __init__(self, error_dict): '%d-%m-%Y', '%m-%d-%Y', ] -INSERT = 'insert' -UPSERT = 'upsert' -UPDATE = 'update' -_methods = [INSERT, UPSERT, UPDATE] + +_INSERT = 'insert' +_UPSERT = 'upsert' +_UPDATE = 'update' def _strip(input): @@ -162,8 +164,15 @@ def _is_valid_pg_type(context, type_name): return True else: connection = context['connection'] - return connection.execute('SELECT is_valid_type(%s)', - type_name).first()[0] + try: + connection.execute('SELECT %s::regtype', type_name) + except ProgrammingError, e: + if int(e.orig.pgcode) in [_PG_ERR_CODE['undefined_object'], + _PG_ERR_CODE['syntax_error']]: + return False + raise + else: + return True def _get_type(context, oid): @@ -520,7 +529,7 @@ def alter_table(context, data_dict): def insert_data(context, data_dict): - data_dict['method'] = INSERT + data_dict['method'] = _INSERT return upsert_data(context, data_dict) @@ -529,9 +538,9 @@ def upsert_data(context, data_dict): if not data_dict.get('records'): return - method = data_dict.get('method', UPSERT) + method = data_dict.get('method', _UPSERT) - if method not in _methods: + if method not in [_INSERT, _UPSERT, _UPDATE]: raise ValidationError({ 'method': [u'"{0}" is not defined'.format(method)] }) @@ -542,7 +551,7 @@ def upsert_data(context, data_dict): sql_columns = ", ".join(['"%s"' % name.replace('%', '%%') for name in field_names] + ['"_full_text"']) - if method == INSERT: + if method == _INSERT: rows = [] for num, record in enumerate(records): _validate_record(record, num, field_names) @@ -565,7 +574,7 @@ def upsert_data(context, data_dict): context['connection'].execute(sql_string, rows) - elif method in [UPDATE, UPSERT]: + elif method in [_UPDATE, _UPSERT]: unique_keys = _get_unique_key(context, data_dict) if len(unique_keys) < 1: raise ValidationError({ @@ -607,7 +616,7 @@ def upsert_data(context, data_dict): full_text = _to_full_text(fields, record) - if method == UPDATE: + if method == _UPDATE: sql_string = u''' UPDATE "{res_id}" SET ({columns}, "_full_text") = ({values}, to_tsvector(%s)) @@ -628,7 +637,7 @@ def upsert_data(context, data_dict): 'key': [u'key "{0}" not found'.format(unique_values)] }) - elif method == UPSERT: + elif method == _UPSERT: sql_string = u''' UPDATE "{res_id}" SET ({columns}, "_full_text") = ({values}, to_tsvector(%s)) @@ -962,7 +971,7 @@ def create(context, data_dict): trans.commit() return _unrename_json_field(data_dict) except IntegrityError, e: - if int(e.orig.pgcode) == _pg_err_code['unique_violation']: + if int(e.orig.pgcode) == _PG_ERR_CODE['unique_violation']: raise ValidationError({ 'constraints': ['Cannot insert records or create index because ' 'of uniqueness constraint'], @@ -972,7 +981,7 @@ def create(context, data_dict): }) raise except DBAPIError, e: - if int(e.orig.pgcode) == _pg_err_code['query_canceled']: + if int(e.orig.pgcode) == _PG_ERR_CODE['query_canceled']: raise ValidationError({ 'query': ['Query took too long'] }) @@ -1005,7 +1014,7 @@ def upsert(context, data_dict): trans.commit() return _unrename_json_field(data_dict) except IntegrityError, e: - if int(e.orig.pgcode) == _pg_err_code['unique_violation']: + if int(e.orig.pgcode) == _PG_ERR_CODE['unique_violation']: raise ValidationError({ 'constraints': ['Cannot insert records or create index because ' 'of uniqueness constraint'], @@ -1015,7 +1024,7 @@ def upsert(context, data_dict): }) raise except DBAPIError, e: - if int(e.orig.pgcode) == _pg_err_code['query_canceled']: + if int(e.orig.pgcode) == _PG_ERR_CODE['query_canceled']: raise ValidationError({ 'query': ['Query took too long'] }) @@ -1082,7 +1091,7 @@ def search(context, data_dict): }) return search_data(context, data_dict) except DBAPIError, e: - if int(e.orig.pgcode) == _pg_err_code['query_canceled']: + if int(e.orig.pgcode) == _PG_ERR_CODE['query_canceled']: raise ValidationError({ 'query': ['Search took too long'] }) @@ -1115,7 +1124,7 @@ def search_sql(context, data_dict): } }) except DBAPIError, e: - if int(e.orig.pgcode) == _pg_err_code['query_canceled']: + if int(e.orig.pgcode) == _PG_ERR_CODE['query_canceled']: raise ValidationError({ 'query': ['Query took too long'] }) diff --git a/ckanext/datastore/plugin.py b/ckanext/datastore/plugin.py index 2f90f5e5d95..d7f8fec5d9a 100644 --- a/ckanext/datastore/plugin.py +++ b/ckanext/datastore/plugin.py @@ -114,7 +114,6 @@ def new_resource_show(context, data_dict): if not hasattr(resource_show, '_datastore_wrapped'): new_resource_show._datastore_wrapped = True logic._actions['resource_show'] = new_resource_show - self._add_is_valid_type_function() def _is_read_only_database(self): ''' @@ -199,25 +198,6 @@ def _create_alias_table(self): {'connection_url': pylons.config['ckan.datastore.write_url']}).connect() connection.execute(create_alias_table_sql) - def _add_is_valid_type_function(self): - # syntax_error - may occur if someone provides a keyword as a type - # undefined_object - is raised if the type does not exist - create_func_sql = ''' - CREATE OR REPLACE FUNCTION is_valid_type(v_type text) - RETURNS boolean - AS $$ - BEGIN - PERFORM v_type::regtype; - RETURN true; - EXCEPTION WHEN undefined_object OR syntax_error THEN - RETURN false; - END; - $$ LANGUAGE plpgsql stable; - ''' - connection = db._get_engine(None, - {'connection_url': pylons.config['ckan.datastore.write_url']}).connect() - connection.execute(create_func_sql) - def get_actions(self): actions = {'datastore_create': action.datastore_create, 'datastore_upsert': action.datastore_upsert,