diff --git a/ckan/logic/schema.py b/ckan/logic/schema.py index 03c5575eaaa..0b135c2aa99 100644 --- a/ckan/logic/schema.py +++ b/ckan/logic/schema.py @@ -36,7 +36,8 @@ vocabulary_id_exists, user_id_exists, object_id_validator, - activity_type_exists) + activity_type_exists, + tag_not_in_vocabulary) from formencode.validators import OneOf import ckan.model @@ -75,7 +76,8 @@ def default_update_resource_schema(): def default_tags_schema(): schema = { - 'name': [not_empty, + 'name': [not_missing, + not_empty, unicode, tag_length_validator, tag_name_validator, @@ -88,6 +90,13 @@ def default_tags_schema(): def default_create_tag_schema(): schema = default_tags_schema() + # When creating a tag via the create_tag() logic action function, a + # vocabulary_id _must_ be given (you cannot create free tags via this + # function). + schema['vocabulary_id'] = [not_missing, not_empty, unicode, + vocabulary_id_exists, tag_not_in_vocabulary] + # You're not allowed to specify your own ID when creating a tag. + schema['id'] = [empty] return schema def default_package_schema(): diff --git a/ckan/logic/validators.py b/ckan/logic/validators.py index f02467fc30e..28a3b278a84 100644 --- a/ckan/logic/validators.py +++ b/ckan/logic/validators.py @@ -446,3 +446,23 @@ def tag_in_vocabulary_validator(value, context): raise Invalid(_('Tag %s does not belong to vocabulary %s') % (value, vocabulary.name)) return value +def tag_not_in_vocabulary(key, tag_dict, errors, context): + tag_name = tag_dict[('name',)] + if not tag_name: + raise Invalid(_('No tag name')) + if tag_dict.has_key(('vocabulary_id',)): + vocabulary_id = tag_dict[('vocabulary_id',)] + else: + vocabulary_id = None + model = context['model'] + session = context['session'] + + query = session.query(model.Tag) + query = query.filter(model.Tag.vocabulary_id==vocabulary_id) + query = query.filter(model.Tag.name==tag_name) + count = query.count() + if count > 0: + raise Invalid(_('Tag %s already belongs to vocabulary %s') % + (tag_name, vocabulary_id)) + else: + return diff --git a/ckan/tests/functional/api/model/test_vocabulary.py b/ckan/tests/functional/api/model/test_vocabulary.py index a4d036d95a7..025507ba128 100644 --- a/ckan/tests/functional/api/model/test_vocabulary.py +++ b/ckan/tests/functional/api/model/test_vocabulary.py @@ -174,7 +174,7 @@ def test_vocabulary_create(self): def test_vocabulary_create_id(self): '''Test error response when user tries to supply their own ID when creating a vocabulary. - + ''' params = {'id': 'xxx', 'name': 'foobar'} param_string = json.dumps(params) @@ -337,11 +337,141 @@ def test_vocabulary_delete_not_authorized(self): assert response.json['success'] == False def test_add_tag_to_vocab(self): - vocab = self._create_vocabulary(vocab_name="Musical Genres", - user=self.sysadmin_user) + '''Test that a tag can be added to and then retrieved from a vocab.''' + vocab = self.genre_vocab tags_before = self._list_tags(vocab) - assert len(tags_before) == 0, tags_before tag_created = self._create_tag(self.sysadmin_user, 'noise', vocab) tags_after = self._list_tags(vocab) - assert len(tags_after) == 1 - assert tag_created['name'] in tags_after + new_tag_names = [tag_name for tag_name in tags_after if tag_name not in + tags_before] + assert len(new_tag_names) == 1 + assert tag_created['name'] in new_tag_names + + def test_add_tag_no_vocab(self): + '''Test the error response when a user tries to create a tab without + specifying a vocab. + + ''' + tag_dict = {'name': 'noise'} + tag_string = json.dumps(tag_dict) + response = self.app.post('/api/action/tag_create', + params=tag_string, + extra_environ = {'Authorization': + str(self.sysadmin_user.apikey)}, + status=409) + assert response.json['success'] == False + + def test_add_tag_vocab_not_exists(self): + '''Test the error response when a user tries to add a tag to a vocab + that doesn't exist. + + ''' + tag_dict = {'name': 'noise', 'vocabulary_id': 'does not exist'} + tag_string = json.dumps(tag_dict) + response = self.app.post('/api/action/tag_create', + params=tag_string, + extra_environ = {'Authorization': + str(self.sysadmin_user.apikey)}, + status=409) + assert response.json['success'] == False + + def test_add_tag_already_added(self): + '''Test the error response when a user tries to add a tag to a vocab + that already has a tag with the same name. + + ''' + self.test_add_tag_to_vocab() + vocab = self.genre_vocab + tag_dict = {'name': 'noise', 'vocabulary_id': vocab['id']} + tag_string = json.dumps(tag_dict) + response = self.app.post('/api/action/tag_create', + params=tag_string, + extra_environ = {'Authorization': + str(self.sysadmin_user.apikey)}, + status=409) + assert response.json['success'] == False + + def test_add_tag_with_id(self): + '''Test the error response when a user tries to specify the tag ID when + adding a tag to a vocab. + + ''' + tag_dict = { + 'id': 'dsagdsgsgsd', + 'name': 'noise', + 'vocabulary_id': self.genre_vocab['id'] + } + tag_string = json.dumps(tag_dict) + response = self.app.post('/api/action/tag_create', + params=tag_string, + extra_environ = {'Authorization': + str(self.sysadmin_user.apikey)}, + status=409) + assert response.json['success'] == False + + def test_add_tag_without_name(self): + '''Test the error response when a user tries to create a tag without a + name. + + ''' + tag_dict = { + 'vocabulary_id': self.genre_vocab['id'] + } + tag_string = json.dumps(tag_dict) + response = self.app.post('/api/action/tag_create', + params=tag_string, + extra_environ = {'Authorization': + str(self.sysadmin_user.apikey)}, + status=409) + assert response.json['success'] == False + + def test_add_tag_invalid_name(self): + for name in ('Not a valid tag name!', '', None): + tag_dict = { + 'name': name, + 'vocabulary_id': self.genre_vocab['id'] + } + tag_string = json.dumps(tag_dict) + response = self.app.post('/api/action/tag_create', + params=tag_string, + extra_environ = {'Authorization': + str(self.sysadmin_user.apikey)}, + status=409) + assert response.json['success'] == False + + def test_add_tag_invalid_vocab_id(self): + tag_dict = { + 'name': 'noise', + 'vocabulary_id': 'xxcxzczxczxc', + } + tag_string = json.dumps(tag_dict) + response = self.app.post('/api/action/tag_create', + params=tag_string, + extra_environ = {'Authorization': + str(self.sysadmin_user.apikey)}, + status=409) + assert response.json['success'] == False + + def test_add_tag_not_logged_in(self): + tag_dict = { + 'name': 'noise', + 'vocabulary_id': self.genre_vocab['id'] + } + tag_string = json.dumps(tag_dict) + response = self.app.post('/api/action/tag_create', + params=tag_string, + status=403) + assert response.json['success'] == False + + def test_add_tag_not_authorized(self): + tag_dict = { + 'name': 'noise', + 'vocabulary_id': self.genre_vocab['id'] + } + tag_string = json.dumps(tag_dict) + response = self.app.post('/api/action/tag_create', + params=tag_string, + extra_environ = {'Authorization': + str(self.normal_user.apikey)}, + status=403) + assert response.json['success'] == False