diff --git a/ckanext/datastore/db.py b/ckanext/datastore/db.py index 248d1c87869..fb597d7053e 100644 --- a/ckanext/datastore/db.py +++ b/ckanext/datastore/db.py @@ -11,7 +11,7 @@ import logging import pprint import sqlalchemy -from sqlalchemy.exc import ProgrammingError, IntegrityError +from sqlalchemy.exc import ProgrammingError, IntegrityError, DBAPIError import psycopg2.extras log = logging.getLogger(__name__) @@ -31,19 +31,27 @@ def __init__(self, error_dict): _type_names = set() _engines = {} +# See http://www.postgresql.org/docs/9.2/static/errcodes-appendix.html +_PG_ERR_CODE = { + 'unique_violation': 23505, + 'query_canceled': 57014, + 'undefined_object': 42704, + 'syntax_error': 42601 +} + _date_formats = ['%Y-%m-%d', - '%Y-%m-%d %H:%M:%S', - '%Y-%m-%dT%H:%M:%S', - '%Y-%m-%dT%H:%M:%SZ', - '%d/%m/%Y', - '%m/%d/%Y', - '%d-%m-%Y', - '%m-%d-%Y', - ] -INSERT = 'insert' -UPSERT = 'upsert' -UPDATE = 'update' -_methods = [INSERT, UPSERT, UPDATE] + '%Y-%m-%d %H:%M:%S', + '%Y-%m-%dT%H:%M:%S', + '%Y-%m-%dT%H:%M:%SZ', + '%d/%m/%Y', + '%m/%d/%Y', + '%d-%m-%Y', + '%m-%d-%Y', + ] + +_INSERT = 'insert' +_UPSERT = 'upsert' +_UPDATE = 'update' def _strip(input): @@ -159,10 +167,10 @@ def _is_valid_pg_type(context, type_name): try: connection.execute('SELECT %s::regtype', type_name) except ProgrammingError, e: - if 'invalid type name' in str(e) or 'does not exist' in str(e): + if int(e.orig.pgcode) in [_PG_ERR_CODE['undefined_object'], + _PG_ERR_CODE['syntax_error']]: return False - else: - raise + raise else: return True @@ -520,7 +528,7 @@ def alter_table(context, data_dict): def insert_data(context, data_dict): - data_dict['method'] = INSERT + data_dict['method'] = _INSERT return upsert_data(context, data_dict) @@ -529,9 +537,9 @@ def upsert_data(context, data_dict): if not data_dict.get('records'): return - method = data_dict.get('method', UPSERT) + method = data_dict.get('method', _UPSERT) - if method not in _methods: + if method not in [_INSERT, _UPSERT, _UPDATE]: raise ValidationError({ 'method': [u'"{0}" is not defined'.format(method)] }) @@ -542,7 +550,7 @@ def upsert_data(context, data_dict): sql_columns = ", ".join(['"%s"' % name.replace('%', '%%') for name in field_names] + ['"_full_text"']) - if method == INSERT: + if method == _INSERT: rows = [] for num, record in enumerate(records): _validate_record(record, num, field_names) @@ -565,7 +573,7 @@ def upsert_data(context, data_dict): context['connection'].execute(sql_string, rows) - elif method in [UPDATE, UPSERT]: + elif method in [_UPDATE, _UPSERT]: unique_keys = _get_unique_key(context, data_dict) if len(unique_keys) < 1: raise ValidationError({ @@ -607,7 +615,7 @@ def upsert_data(context, data_dict): full_text = _to_full_text(fields, record) - if method == UPDATE: + if method == _UPDATE: sql_string = u''' UPDATE "{res_id}" SET ({columns}, "_full_text") = ({values}, to_tsvector(%s)) @@ -628,7 +636,7 @@ def upsert_data(context, data_dict): 'key': [u'key "{0}" not found'.format(unique_values)] }) - elif method == UPSERT: + elif method == _UPSERT: sql_string = u''' UPDATE "{res_id}" SET ({columns}, "_full_text") = ({values}, to_tsvector(%s)) @@ -961,23 +969,24 @@ def create(context, data_dict): trans.commit() return _unrename_json_field(data_dict) except IntegrityError, e: - if ('duplicate key value violates unique constraint' in str(e) - or 'could not create unique index' in str(e)): + if int(e.orig.pgcode) == _PG_ERR_CODE['unique_violation']: raise ValidationError({ - 'constraints': ['Cannot insert records or create index because of uniqueness constraint'], + 'constraints': ['Cannot insert records or create index because ' + 'of uniqueness constraint'], 'info': { 'details': str(e) } }) - else: - raise - except Exception, e: - trans.rollback() - if 'due to statement timeout' in str(e): + raise + except DBAPIError, e: + if int(e.orig.pgcode) == _PG_ERR_CODE['query_canceled']: raise ValidationError({ 'query': ['Query took too long'] }) raise + except Exception, e: + trans.rollback() + raise finally: context['connection'].close() @@ -1003,22 +1012,24 @@ def upsert(context, data_dict): trans.commit() return _unrename_json_field(data_dict) except IntegrityError, e: - if 'duplicate key value violates unique constraint' in str(e): + if int(e.orig.pgcode) == _PG_ERR_CODE['unique_violation']: raise ValidationError({ - 'constraints': ['Cannot insert records because of uniqueness constraint'], + 'constraints': ['Cannot insert records or create index because ' + 'of uniqueness constraint'], 'info': { 'details': str(e) } }) - else: - raise - except Exception, e: - trans.rollback() - if 'due to statement timeout' in str(e): + raise + except DBAPIError, e: + if int(e.orig.pgcode) == _PG_ERR_CODE['query_canceled']: raise ValidationError({ 'query': ['Query took too long'] }) raise + except Exception, e: + trans.rollback() + raise finally: context['connection'].close() @@ -1077,8 +1088,8 @@ def search(context, data_dict): data_dict['resource_id'])] }) return search_data(context, data_dict) - except Exception, e: - if 'due to statement timeout' in str(e): + except DBAPIError, e: + if int(e.orig.pgcode) == _PG_ERR_CODE['query_canceled']: raise ValidationError({ 'query': ['Search took too long'] }) @@ -1110,10 +1121,10 @@ def search_sql(context, data_dict): 'orig': [str(e.orig)] } }) - except Exception, e: - if 'due to statement timeout' in str(e): + except DBAPIError, e: + if int(e.orig.pgcode) == _PG_ERR_CODE['query_canceled']: raise ValidationError({ - 'query': ['Search took too long'] + 'query': ['Query took too long'] }) raise finally: