diff --git a/ckan/lib/dictization/model_dictize.py b/ckan/lib/dictization/model_dictize.py index 4238a77d2ab..309350e5c24 100644 --- a/ckan/lib/dictization/model_dictize.py +++ b/ckan/lib/dictization/model_dictize.py @@ -462,6 +462,9 @@ def package_to_api2(pkg, context): def vocabulary_dictize(vocabulary, context): vocabulary_dict = table_dictize(vocabulary, context) + assert not vocabulary_dict.has_key('tags') + vocabulary_dict['tags'] = [tag_dictize(tag, context) for tag + in vocabulary.tags] return vocabulary_dict def vocabulary_list_dictize(vocabulary_list, context): diff --git a/ckan/lib/dictization/model_save.py b/ckan/lib/dictization/model_save.py index ff78a662eaf..f4ee8a0c9e1 100644 --- a/ckan/lib/dictization/model_save.py +++ b/ckan/lib/dictization/model_save.py @@ -478,6 +478,23 @@ def activity_dict_save(activity_dict, context): return activity_obj +def vocabulary_tag_list_save(new_tag_dicts, vocabulary_obj, context): + model = context['model'] + session = context['session'] + + # First delete any tags not in new_tag_dicts. + for tag in vocabulary_obj.tags: + if tag.name not in [tag['name'] for tag in new_tag_dicts]: + tag.delete() + # Now add any new tags. + for tag_dict in new_tag_dicts: + current_tag_names = [tag.name for tag in vocabulary_obj.tags] + if tag_dict['name'] not in current_tag_names: + # Make sure the tag belongs to this vocab.. + tag_dict['vocabulary_id'] = vocabulary_obj.id + # then add it. + tag_dict_save(tag_dict, {'model': model, 'session': session}) + def vocabulary_dict_save(vocabulary_dict, context): model = context['model'] session = context['session'] @@ -486,6 +503,10 @@ def vocabulary_dict_save(vocabulary_dict, context): vocabulary_obj = model.Vocabulary(vocabulary_name) session.add(vocabulary_obj) + if vocabulary_dict.has_key('tags'): + vocabulary_tag_list_save(vocabulary_dict['tags'], vocabulary_obj, + context) + return vocabulary_obj def vocabulary_dict_update(vocabulary_dict, context): @@ -494,7 +515,13 @@ def vocabulary_dict_update(vocabulary_dict, context): session = context['session'] vocabulary_obj = model.vocabulary.Vocabulary.get(vocabulary_dict['id']) - vocabulary_obj.name = vocabulary_dict['name'] + + if vocabulary_dict.has_key('name'): + vocabulary_obj.name = vocabulary_dict['name'] + + if vocabulary_dict.has_key('tags'): + vocabulary_tag_list_save(vocabulary_dict['tags'], vocabulary_obj, + context) return vocabulary_obj diff --git a/ckan/lib/navl/dictization_functions.py b/ckan/lib/navl/dictization_functions.py index 88818d2aaac..822b29b142d 100644 --- a/ckan/lib/navl/dictization_functions.py +++ b/ckan/lib/navl/dictization_functions.py @@ -358,24 +358,29 @@ def unflatten(data): 'state': u'active', 'save': u'Save Changes', 'cancel': u'Cancel'} - ''' unflattened = {} + convert_to_list = [] for flattend_key in sorted(data.keys(), key=flattened_order_key): current_pos = unflattened + + if (len(flattend_key) > 1 + and not flattend_key[0] in convert_to_list + and not flattend_key[0] in unflattened): + convert_to_list.append(flattend_key[0]) + for key in flattend_key[:-1]: try: current_pos = current_pos[key] except KeyError: - new_pos = [] - current_pos[key] = new_pos - current_pos = new_pos - except IndexError: new_pos = {} - current_pos.append(new_pos) + current_pos[key] = new_pos current_pos = new_pos current_pos[flattend_key[-1]] = data[flattend_key] + for key in convert_to_list: + unflattened[key] = [unflattened[key][s] for s in sorted(unflattened[key])] + return unflattened diff --git a/ckan/logic/action/create.py b/ckan/logic/action/create.py index d1bbba69935..f74f3f08e1e 100644 --- a/ckan/logic/action/create.py +++ b/ckan/logic/action/create.py @@ -348,7 +348,6 @@ def group_create_rest(context, data_dict): def vocabulary_create(context, data_dict): model = context['model'] - user = context['user'] schema = context.get('schema') or default_create_vocabulary_schema() model.Session.remove() @@ -368,7 +367,7 @@ def vocabulary_create(context, data_dict): model.repo.commit() log.debug('Created Vocabulary %s' % str(vocabulary.name)) - + return vocabulary_dictize(vocabulary, context) def activity_create(context, activity_dict, ignore_auth=False): diff --git a/ckan/logic/action/update.py b/ckan/logic/action/update.py index 84793fae2b6..ed4450ed447 100644 --- a/ckan/logic/action/update.py +++ b/ckan/logic/action/update.py @@ -624,6 +624,11 @@ def vocabulary_update(context, data_dict): if vocab is None: raise NotFound(_('Could not find vocabulary "%s"') % vocab_id) + data_dict['id'] = vocab.id + if data_dict.has_key('name'): + if data_dict['name'] == vocab.name: + del data_dict['name'] + check_access('vocabulary_update', context, data_dict) schema = context.get('schema') or default_update_vocabulary_schema() diff --git a/ckan/logic/converters.py b/ckan/logic/converters.py index 066520cc1e8..5de75a7f1e4 100644 --- a/ckan/logic/converters.py +++ b/ckan/logic/converters.py @@ -34,13 +34,11 @@ def date_to_form(value, context): def free_tags_only(key, data, errors, context): tag_number = key[1] - to_delete = [] - if data.get(('tags', tag_number, 'vocabulary_id')): - to_delete.append(tag_number) + if not data.get(('tags', tag_number, 'vocabulary_id')): + return for k in data.keys(): - for n in to_delete: - if k[0] == 'tags' and k[1] == n: - del data[k] + if k[0] == 'tags' and k[1] == tag_number: + del data[k] def convert_to_tags(vocab): def callable(key, data, errors, context): diff --git a/ckan/logic/schema.py b/ckan/logic/schema.py index 7f9718dc3e0..961bb293924 100644 --- a/ckan/logic/schema.py +++ b/ckan/logic/schema.py @@ -318,8 +318,9 @@ def default_task_status_schema(): def default_vocabulary_schema(): schema = { - 'id': [ignore_empty, ignore_missing, unicode], + 'id': [ignore_missing, unicode, vocabulary_id_exists], 'name': [not_empty, unicode, vocabulary_name_validator], + 'tags': default_tags_schema(), } return schema @@ -330,7 +331,8 @@ def default_create_vocabulary_schema(): def default_update_vocabulary_schema(): schema = default_vocabulary_schema() - schema['id'] = schema['id'] + [vocabulary_id_not_changed] + schema['id'] = [ignore_missing, vocabulary_id_not_changed] + schema['name'] = [ignore_missing, vocabulary_name_validator] return schema def default_create_activity_schema(): diff --git a/ckan/model/tag.py b/ckan/model/tag.py index 009b47395f0..f3fd0f6f148 100644 --- a/ckan/model/tag.py +++ b/ckan/model/tag.py @@ -260,7 +260,7 @@ def related_packages(self): 'package_tags': relation(PackageTag, backref='tag', cascade='all, delete, delete-orphan', ), - 'vocabulary': relation(vocabulary.Vocabulary, backref='tags', + 'vocabulary': relation(vocabulary.Vocabulary, order_by=tag_table.c.name) }, order_by=tag_table.c.name, diff --git a/ckan/model/vocabulary.py b/ckan/model/vocabulary.py index c4d2dec401a..8501629c144 100644 --- a/ckan/model/vocabulary.py +++ b/ckan/model/vocabulary.py @@ -1,6 +1,7 @@ from meta import Table, types, Session from core import metadata, Column, DomainObject, mapper from types import make_uuid +import tag VOCABULARY_NAME_MIN_LENGTH = 2 VOCABULARY_NAME_MAX_LENGTH = 100 @@ -30,4 +31,8 @@ def get(cls, id_or_name): vocab = Vocabulary.by_name(id_or_name) return vocab + @property + def tags(self): + return Session.query(tag.Tag).filter(tag.Tag.vocabulary_id==self.id) + mapper(Vocabulary, vocabulary_table) diff --git a/ckan/tests/functional/api/model/test_vocabulary.py b/ckan/tests/functional/api/model/test_vocabulary.py index 77c0f18062a..456fe17151b 100644 --- a/ckan/tests/functional/api/model/test_vocabulary.py +++ b/ckan/tests/functional/api/model/test_vocabulary.py @@ -74,6 +74,10 @@ def _update_vocabulary(self, params, user=None): extra_environ = {'Authorization' : str(user.apikey)} else: extra_environ = None + + original_vocab = self._post('/api/action/vocabulary_show', + {'id': params.get('id') or params.get('name')})['result'] + response = self._post('/api/action/vocabulary_update', params=params, extra_environ=extra_environ) @@ -81,14 +85,21 @@ def _update_vocabulary(self, params, user=None): assert response['success'] == True assert response['result'] updated_vocab = response['result'] + # id should never change. + assert updated_vocab['id'] == original_vocab['id'] if params.has_key('id'): assert updated_vocab['id'] == params['id'] - else: - assert updated_vocab['id'] + # name should change only if given in params. if params.has_key('name'): assert updated_vocab['name'] == params['name'] else: - assert updated_vocab['name'] + assert updated_vocab['name'] == original_vocab['name'] + # tags should change only if given in params. + if params.has_key('tags'): + assert sorted([tag['name'] for tag in params['tags']]) \ + == sorted([tag['name'] for tag in updated_vocab['tags']]) + else: + assert updated_vocab['tags'] == original_vocab['tags'] # Get the list of vocabularies. response = self._post('/api/action/vocabulary_list') @@ -186,6 +197,107 @@ def test_vocabulary_create(self): self._create_vocabulary(vocab_name="My cool vocab", user=self.sysadmin_user) + def test_vocabulary_create_with_tags(self): + '''Test adding a new vocabulary with some tags. + + ''' + params = {'name': 'foobar'} + tag1 = {'name': 'foo'} + tag2 = {'name': 'bar'} + params['tags'] = [tag1, tag2] + response = self._post('/api/action/vocabulary_create', + params=params, + extra_environ = {'Authorization': + str(self.sysadmin_user.apikey)}) + assert response['success'] == True + assert response['result'] + created_vocab = response['result'] + assert created_vocab['name'] == 'foobar' + assert created_vocab['id'] + + # Get the list of vocabularies. + response = self._post('/api/action/vocabulary_list') + # Check that the vocabulary we created is in the list. + assert response['success'] == True + assert response['result'] + assert response['result'].count(created_vocab) == 1 + + # Get the created vocabulary. + params = {'id': created_vocab['id']} + response = self._post('/api/action/vocabulary_show', params) + # Check that retrieving the vocab by name gives the same result. + by_name_params = {'id': created_vocab['name']} + assert response == self._post('/api/action/vocabulary_show', + by_name_params) + # Check that it matches what we created. + assert response['success'] == True + assert response['result'] == created_vocab + + # Get the list of tags for the vocabulary. + tags = self._list_tags(created_vocab) + assert len(tags) == 2 + assert tags.count('foo') == 1 + assert tags.count('bar') == 1 + + def test_vocabulary_create_bad_tags(self): + '''Test creating new vocabularies with invalid tags. + + ''' + for tags in ( + [{'id': 'xxx'}, {'name': 'foo'}], + [{'name': 'foo'}, {'name': None}], + [{'name': 'foo'}, {'name': ''}], + [{'name': 'foo'}, {'name': 'f'}], + [{'name': 'f'*200}, {'name': 'foo'}], + [{'name': 'Invalid!'}, {'name': 'foo'}], + ): + params = {'name': 'foobar', 'tags': tags} + response = self.app.post('/api/action/vocabulary_create', + params=json.dumps(params), + extra_environ = {'Authorization': + str(self.sysadmin_user.apikey)}, + status=409) + assert response.json['success'] == False + assert response.json['error'].has_key('tags') + assert len(response.json['error']) == 2 + + def test_vocabulary_create_none_tags(self): + '''Test creating new vocabularies with None for 'tags'. + + ''' + params = {'name': 'foobar', 'tags': None} + response = self.app.post('/api/action/vocabulary_create', + params=json.dumps(params), + extra_environ = {'Authorization': + str(self.sysadmin_user.apikey)}, + status=400) + assert response.body == ("Integrity Error: Only lists of dicts can " + "be placed against subschema tags - {u'name': u'foobar', " + "u'tags': None}") + + def test_vocabulary_create_empty_tags(self): + '''Test creating new vocabularies with [] for 'tags'. + + ''' + params = {'name': 'foobar', 'tags': []} + response = self.app.post('/api/action/vocabulary_create', + params=json.dumps(params), + extra_environ = {'Authorization': + str(self.sysadmin_user.apikey)}, + status=200) + assert response.json['success'] == True + assert response.json['result'] + created_vocab = response.json['result'] + assert created_vocab['name'] == 'foobar' + assert created_vocab['id'] + assert created_vocab['tags'] == [] + params = {'id': created_vocab['id']} + response = self._post('/api/action/vocabulary_show', params) + assert response['success'] == True + assert response['result'] == created_vocab + tags = self._list_tags(created_vocab) + assert tags == [] + def test_vocabulary_create_id(self): '''Test error response when user tries to supply their own ID when creating a vocabulary. @@ -199,6 +311,8 @@ def test_vocabulary_create_id(self): str(self.sysadmin_user.apikey)}, status=409) assert response.json['success'] == False + assert response.json['error']['id'] == [u'The input field id was ' + 'not expected.'] def test_vocabulary_create_no_name(self): '''Test error response when user tries to create a vocab without a @@ -213,6 +327,7 @@ def test_vocabulary_create_no_name(self): str(self.sysadmin_user.apikey)}, status=409) assert response.json['success'] == False + assert response.json['error']['name'] == [u'Missing value'] def test_vocabulary_create_invalid_name(self): '''Test error response when user tries to create a vocab with an @@ -228,6 +343,7 @@ def test_vocabulary_create_invalid_name(self): str(self.sysadmin_user.apikey)}, status=409) assert response.json['success'] == False + assert response.json['error']['name'] def test_vocabulary_create_exists(self): '''Test error response when user tries to create a vocab that already @@ -242,6 +358,8 @@ def test_vocabulary_create_exists(self): str(self.sysadmin_user.apikey)}, status=409) assert response.json['success'] == False + assert response.json['error']['name'] == [u'That vocabulary name is ' + 'already in use.'] def test_vocabulary_create_not_logged_in(self): '''Test that users who are not logged in cannot create vocabularies.''' @@ -253,6 +371,7 @@ def test_vocabulary_create_not_logged_in(self): params=param_string, status=403) assert response.json['success'] == False + assert response.json['error']['message'] == 'Access denied' def test_vocabulary_create_not_authorized(self): '''Test that users who are not authorized cannot create vocabs.''' @@ -265,10 +384,52 @@ def test_vocabulary_create_not_authorized(self): str(self.normal_user.apikey)}, status=403) assert response.json['success'] == False + assert response.json['error']['message'] == 'Access denied' + + def test_vocabulary_update_id_only(self): + self._update_vocabulary({'id': self.genre_vocab['id']}, + self.sysadmin_user) + + def test_vocabulary_update_id_and_same_name(self): + self._update_vocabulary({'id': self.genre_vocab['id'], + 'name': self.genre_vocab['name']}, self.sysadmin_user) + + def test_vocabulary_update_id_and_new_name(self): + self._update_vocabulary({'id': self.genre_vocab['id'], + 'name': 'new name'}, self.sysadmin_user) + + def test_vocabulary_update_id_and_same_tags(self): + self._update_vocabulary({'id': self.genre_vocab['id'], + 'tags': self.genre_vocab['tags']}, self.sysadmin_user) + + def test_vocabulary_update_id_and_new_tags(self): + tags = [ + {'name': 'new test tag one'}, + {'name': 'new test tag two'}, + {'name': 'new test tag three'}, + ] + self._update_vocabulary({'id': self.genre_vocab['id'], 'tags': tags}, + self.sysadmin_user) + + def test_vocabulary_update_id_same_name_and_same_tags(self): + self._update_vocabulary({'id': self.genre_vocab['id'], + 'name': self.genre_vocab['name'], + 'tags': self.genre_vocab['tags']}, self.sysadmin_user) + + def test_vocabulary_update_id_same_name_and_new_tags(self): + tags = [ + {'name': 'new test tag one'}, + {'name': 'new test tag two'}, + {'name': 'new test tag three'}, + ] + self._update_vocabulary({'id': self.genre_vocab['id'], + 'name': self.genre_vocab['name'], + 'tags': tags}, self.sysadmin_user) - def test_vocabulary_update(self): + def test_vocabulary_update_id_new_name_and_same_tags(self): self._update_vocabulary({'id': self.genre_vocab['id'], - 'name': 'updated_name'}, self.sysadmin_user) + 'name': 'new name', + 'tags': self.genre_vocab['tags']}, self.sysadmin_user) def test_vocabulary_update_not_exists(self): '''Test the error response given when a user tries to update a @@ -283,16 +444,7 @@ def test_vocabulary_update_not_exists(self): str(self.sysadmin_user.apikey)}, status=404) assert response.json['success'] == False - - def test_vocabulary_update_no_name(self): - params = {'id': self.genre_vocab['id']} - param_string = json.dumps(params) - response = self.app.post('/api/action/vocabulary_update', - params=param_string, - extra_environ = {'Authorization': - str(self.sysadmin_user.apikey)}, - status=409) - assert response.json['success'] == False + assert response.json['error']['message'].startswith('Not found: ') def test_vocabulary_update_no_id(self): params = {'name': 'bagel radio'} @@ -314,6 +466,29 @@ def test_vocabulary_update_not_logged_in(self): params=param_string, status=403) assert response.json['success'] == False + assert response.json['error']['message'] == 'Access denied' + + def test_vocabulary_update_with_tags(self): + tags = [ + {'name': 'drone'}, + {'name': 'noise'}, + {'name': 'fuzz'}, + {'name': 'field recordings'}, + {'name': 'hypnagogia'}, + {'name': 'textures without rhythm'}, + ] + self._update_vocabulary( + { + 'id': self.genre_vocab['id'], + 'name': self.genre_vocab['name'], + 'tags': tags + }, + self.sysadmin_user) + + params = {'id': self.genre_vocab['id']} + response = self._post('/api/action/vocabulary_show', params) + # Check that retrieving the vocab by name gives the same result. + assert len(response['result']['tags']) == len(tags) def test_vocabulary_update_not_authorized(self): '''Test that users who are not authorized cannot update vocabs.''' @@ -325,6 +500,64 @@ def test_vocabulary_update_not_authorized(self): str(self.normal_user.apikey)}, status=403) assert response.json['success'] == False + assert response.json['error']['message'] == 'Access denied' + + def test_vocabulary_update_bad_tags(self): + '''Test updating vocabularies with invalid tags. + + ''' + for tags in ( + [{'id': 'xxx'}, {'name': 'foo'}], + [{'name': 'foo'}, {'name': None}], + [{'name': 'foo'}, {'name': ''}], + [{'name': 'foo'}, {'name': 'f'}], + [{'name': 'f'*200}, {'name': 'foo'}], + [{'name': 'Invalid!'}, {'name': 'foo'}], + ): + params = {'id': self.genre_vocab['name'], 'tags': tags} + response = self.app.post('/api/action/vocabulary_update', + params=json.dumps(params), + extra_environ = {'Authorization': + str(self.sysadmin_user.apikey)}, + status=409) + assert response.json['success'] == False + assert response.json['error']['tags'] + + def test_vocabulary_update_none_tags(self): + '''Test updating vocabularies with None for 'tags'. + + ''' + params = {'id': self.genre_vocab['id'], 'tags': None} + response = self.app.post('/api/action/vocabulary_update', + params=json.dumps(params), + extra_environ = {'Authorization': + str(self.sysadmin_user.apikey)}, + status=400) + assert response.body.startswith("Integrity Error: Only lists of " + "dicts can be placed against subschema tags") + + def test_vocabulary_update_empty_tags(self): + '''Test updating vocabularies with [] for 'tags'. + + ''' + params = {'id': self.genre_vocab['id'], 'tags': []} + response = self.app.post('/api/action/vocabulary_update', + params=json.dumps(params), + extra_environ = {'Authorization': + str(self.sysadmin_user.apikey)}, + status=200) + assert response.json['success'] == True + assert response.json['result'] + updated_vocab = response.json['result'] + assert updated_vocab['name'] == self.genre_vocab['name'] + assert updated_vocab['id'] == self.genre_vocab['id'] + assert updated_vocab['tags'] == [] + params = {'id': updated_vocab['id']} + response = self._post('/api/action/vocabulary_show', params) + assert response['success'] == True + assert response['result'] == updated_vocab + tags = self._list_tags(updated_vocab) + assert tags == [] def test_vocabulary_delete(self): self._delete_vocabulary(self.genre_vocab['id'], self.sysadmin_user) @@ -342,6 +575,8 @@ def test_vocabulary_delete_not_exists(self): str(self.sysadmin_user.apikey)}, status=404) assert response.json['success'] == False + assert response.json['error']['message'].startswith('Not found: ' + 'Could not find vocabulary') def test_vocabulary_delete_no_id(self): '''Test the error response given when a user tries to delete a @@ -367,6 +602,7 @@ def test_vocabulary_delete_not_logged_in(self): params=param_string, status=403) assert response.json['success'] == False + assert response.json['error']['message'] == 'Access denied' def test_vocabulary_delete_not_authorized(self): '''Test that users who are not authorized cannot delete vocabs.''' @@ -378,6 +614,7 @@ def test_vocabulary_delete_not_authorized(self): str(self.normal_user.apikey)}, status=403) assert response.json['success'] == False + assert response.json['error']['message'] == 'Access denied' def test_add_tag_to_vocab(self): '''Test that a tag can be added to and then retrieved from a vocab.''' @@ -403,6 +640,7 @@ def test_add_tag_no_vocab(self): str(self.sysadmin_user.apikey)}, status=409) assert response.json['success'] == False + assert response.json['error']['vocabulary_id'] == ['Missing value'] def test_add_tag_vocab_not_exists(self): '''Test the error response when a user tries to add a tag to a vocab @@ -417,6 +655,8 @@ def test_add_tag_vocab_not_exists(self): str(self.sysadmin_user.apikey)}, status=409) assert response.json['success'] == False + assert response.json['error']['vocabulary_id'] == [ + 'Tag vocabulary was not found.'] def test_add_tag_already_added(self): '''Test the error response when a user tries to add a tag to a vocab @@ -433,6 +673,8 @@ def test_add_tag_already_added(self): str(self.sysadmin_user.apikey)}, status=409) assert response.json['success'] == False + assert response.json['error']['vocabulary_id'][0].startswith( + 'Tag noise already belongs to vocabulary') def test_add_tag_with_id(self): '''Test the error response when a user tries to specify the tag ID when @@ -451,6 +693,8 @@ def test_add_tag_with_id(self): str(self.sysadmin_user.apikey)}, status=409) assert response.json['success'] == False + assert response.json['error']['id'] == [u'The input field id was not ' + 'expected.'] def test_add_tag_without_name(self): '''Test the error response when a user tries to create a tag without a @@ -467,6 +711,7 @@ def test_add_tag_without_name(self): str(self.sysadmin_user.apikey)}, status=409) assert response.json['success'] == False + assert response.json['error']['name'] == [u'Missing value'] def test_add_tag_invalid_name(self): for name in ('Not a valid tag name!', '', None): @@ -481,6 +726,7 @@ def test_add_tag_invalid_name(self): str(self.sysadmin_user.apikey)}, status=409) assert response.json['success'] == False + assert response.json['error']['name'] def test_add_tag_invalid_vocab_id(self): tag_dict = { @@ -494,6 +740,8 @@ def test_add_tag_invalid_vocab_id(self): str(self.sysadmin_user.apikey)}, status=409) assert response.json['success'] == False + assert response.json['error']['vocabulary_id'] == [ + u'Tag vocabulary was not found.'] def test_add_tag_not_logged_in(self): tag_dict = { @@ -505,6 +753,7 @@ def test_add_tag_not_logged_in(self): params=tag_string, status=403) assert response.json['success'] == False + assert response.json['error']['message'] == 'Access denied' def test_add_tag_not_authorized(self): tag_dict = { @@ -518,6 +767,7 @@ def test_add_tag_not_authorized(self): str(self.normal_user.apikey)}, status=403) assert response.json['success'] == False + assert response.json['error']['message'] == 'Access denied' def test_add_vocab_tag_to_dataset(self): '''Test that a tag belonging to a vocab can be added to a dataset, diff --git a/ckan/tests/functional/test_tag_vocab.py b/ckan/tests/functional/test_tag_vocab.py index 226f018544f..65786014eb1 100644 --- a/ckan/tests/functional/test_tag_vocab.py +++ b/ckan/tests/functional/test_tag_vocab.py @@ -11,7 +11,7 @@ from ckan.lib.create_test_data import CreateTestData import ckan.lib.helpers as h from ckan import plugins -from ckan.tests import WsgiAppCase +from ckan.tests import WsgiAppCase, url_for TEST_VOCAB_NAME = 'test-vocab' @@ -134,7 +134,6 @@ def value__get(self): value = property(value__get, value__set) - class TestWUI(WsgiAppCase): @classmethod def setup_class(cls): @@ -215,7 +214,8 @@ def _remove_vocab_tags(self, dataset_id, vocab_id, tag_name): def test_01_dataset_view(self): vocab_id = self._get_vocab_id(TEST_VOCAB_NAME) self._add_vocab_tag_to_dataset(self.dset.id, vocab_id, self.tag1_name) - response = self.app.get(h.url_for(controller='package', action='read', id=self.dset.id)) + response = self.app.get(h.url_for(controller='package', action='read', + id=self.dset.id)) assert self.tag1_name in response.body self._remove_vocab_tags(self.dset.id, vocab_id, self.tag1_name) @@ -233,6 +233,31 @@ def test_02_dataset_edit_add_vocab_tag(self): self._remove_vocab_tags(self.dset.id, vocab_id, self.tag1_name) self._remove_vocab_tags(self.dset.id, vocab_id, self.tag2_name) + def test_02_dataset_edit_add_free_and_vocab_tags_then_edit_again(self): + vocab_id = self._get_vocab_id(TEST_VOCAB_NAME) + url = h.url_for(controller='package', action='edit', id=self.dset.id) + response = self.app.get(url) + fv = response.forms['dataset-edit'] + fv = Form(fv.response, fv.text) + + # Add a free tag with a space in its name. + fv['tag_string'] = 'water quality' + + # Add a vocab tag. + fv['vocab_tags'] = [self.tag2_name] + + # Save the dataset and visit the page again + response = fv.submit('save') + response = response.follow() + assert not self.tag1_name in response.body + assert self.tag2_name in response.body + url = h.url_for(controller='package', action='edit', id=self.dset.id) + response = self.app.get(url) + fv = response.forms['dataset-edit'] + fv = Form(fv.response, fv.text) + assert fv['vocab_tags'].value == [self.tag2_name], fv['vocab_tags'].value + self._remove_vocab_tags(self.dset.id, vocab_id, self.tag2_name) + def test_03_dataset_edit_remove_vocab_tag(self): vocab_id = self._get_vocab_id(TEST_VOCAB_NAME) self._add_vocab_tag_to_dataset(self.dset.id, vocab_id, self.tag1_name)