diff --git a/ckan/tests/logic/test_action.py b/ckan/tests/logic/test_action.py index 7641a00f788..946018b9ab7 100644 --- a/ckan/tests/logic/test_action.py +++ b/ckan/tests/logic/test_action.py @@ -315,7 +315,7 @@ def test_05b_user_show_datasets(self): dataset = result['datasets'][0] assert_equal(dataset['name'], u'annakarenina') - def test_06_tag_list(self): + def test_06a_tag_list(self): postparams = '%s=1' % json.dumps({}) res = self.app.post('/api/action/tag_list', params=postparams) assert_dicts_equal_ignoring_ordering( @@ -354,6 +354,40 @@ def test_06_tag_list(self): assert 'id' in res_obj['result'][1] assert 'id' in res_obj['result'][2] + def test_06b_tag_list_vocab(self): + vocab_name = 'test-vocab' + tag_name = 'test-vocab-tag' + + # create vocab + params = json.dumps({'name': vocab_name}) + extra_environ = {'Authorization' : str(self.sysadmin_user.apikey)} + response = self.app.post('/api/action/vocabulary_create', params=params, + extra_environ=extra_environ) + assert response.json['success'] + vocab_id = response.json['result']['id'] + + # create new tag with vocab + params = json.dumps({'name': tag_name, 'vocabulary_id': vocab_id}) + extra_environ = {'Authorization' : str(self.sysadmin_user.apikey)} + response = self.app.post('/api/action/tag_create', params=params, + extra_environ=extra_environ) + assert response.json['success'] == True + + # check that tag shows up in list + params = '%s=1' % json.dumps({'vocabulary_name': vocab_name}) + res = self.app.post('/api/action/tag_list', params=params) + body = json.loads(res.body) + assert_dicts_equal_ignoring_ordering( + json.loads(res.body), + {'help': 'Returns a list of tags', + 'success': True, + 'result': [tag_name]} + ) + + # check that invalid vocab name results in a 404 + params = '%s=1' % json.dumps({'vocabulary_name': 'invalid-vocab-name'}) + res = self.app.post('/api/action/tag_list', params=params, status=404) + def test_07_tag_show(self): postparams = '%s=1' % json.dumps({'id':'russian'}) res = self.app.post('/api/action/tag_show', params=postparams) diff --git a/ckan/tests/logic/test_tag_vocab.py b/ckan/tests/logic/test_tag_vocab.py index 2def4bc56ed..0415f01253e 100644 --- a/ckan/tests/logic/test_tag_vocab.py +++ b/ckan/tests/logic/test_tag_vocab.py @@ -1,10 +1,14 @@ import json +from pylons import request, tmpl_context as c +from genshi.input import HTML +from genshi.filters import Transformer from ckan import model from ckan.logic.converters import convert_to_tags, convert_from_tags, free_tags_only from ckan.lib.navl.dictization_functions import unflatten from ckan.lib.create_test_data import CreateTestData +import ckan.lib.helpers as h +from ckan import plugins from ckan.tests import WsgiAppCase -from ckan.tests.functional.api import assert_dicts_equal_ignoring_ordering class TestConverters(object): @classmethod @@ -66,33 +70,98 @@ def test_free_tags_only(self): assert ('tags', 1, 'vocabulary_id') in data.keys() assert ('tags', 1, '__extras') in data.keys() -class TestAPI(WsgiAppCase): + +class MockVocabTagsPlugin(plugins.SingletonPlugin): + plugins.implements(plugins.IDatasetForm, inherit=True) + plugins.implements(plugins.IGenshiStreamFilter) + + def is_fallback(self): + return True + + def package_types(self): + return ["mock_vocab_tags_plugin"] + + def setup_template_variables(self, context, data_dict=None): + # c.vocab_tags = get_action('tag_list')(context, {'vocabulary_name': self.vocab_name}) + pass + + def form_to_db_schema(self): + # schema = package_form_schema() + # schema.update({ + # 'vocab_tags': [ignore_missing, convert_to_tags(self.vocab_name)], + # }) + # return schema + pass + + def db_to_form_schema(self): + # schema = package_form_schema() + # schema.update({ + # 'tags': { + # '__extras': [keep_extras, free_tags_only] + # }, + # 'vocab_tags_selected': [convert_from_tags(self.vocab_name), ignore_missing], + # }) + # return schema + pass + + def filter(self, stream): + routes = request.environ.get('pylons.routes_dict') + if routes.get('controller') == 'package' \ + and routes.get('action') == 'read': + # add vocab tags to the bottom of the page + tags = c.pkg_dict.get('tags', []) + for tag in tags: + if tag.get('vocabulary_id'): + stream = stream | Transformer('body')\ + .append(HTML('

%s

' % tag.get('name'))) + return stream + + +class TestWUI(WsgiAppCase): @classmethod def setup_class(cls): CreateTestData.create() - cls.vocab = model.Vocabulary('test-vocab') - model.Session.add(cls.vocab) - cls.vocab_tag = model.Tag('vocab-tag', cls.vocab.id) - model.Session.add(cls.vocab_tag) - model.Session.commit() - model.Session.remove() + cls.sysadmin_user = model.User.get('testsysadmin') + cls.dset = model.Package.get('warandpeace') + cls.vocab_name = 'vocab' + cls.tag_name = 'vocab-tag' @classmethod def teardown_class(cls): model.repo.rebuild_db() - def test_tag_list(self): - postparams = '%s=1' % json.dumps({'vocabulary_name': self.vocab.name}) - res = self.app.post('/api/action/tag_list', params=postparams) - body = json.loads(res.body) - assert_dicts_equal_ignoring_ordering( - json.loads(res.body), - {'help': 'Returns a list of tags', - 'success': True, - 'result': [self.vocab_tag.name]} - ) - - def test_tag_list_invalid_vocab_name_404(self): - postparams = '%s=1' % json.dumps({'vocabulary_name': 'invalid-vocab-name'}) - res = self.app.post('/api/action/tag_list', params=postparams, status=404) + def _create_vocabulary(self, vocab_name): + params = json.dumps({'name': vocab_name}) + extra_environ = {'Authorization' : str(self.sysadmin_user.apikey)} + response = self.app.post('/api/action/vocabulary_create', params=params, + extra_environ=extra_environ) + assert json.loads(response.body)['success'] + return json.loads(response.body)['result']['id'] + + def _add_vocab_tag(self, vocab_id, tag_name): + params = json.dumps({'name': tag_name, 'vocabulary_id': vocab_id}) + extra_environ = {'Authorization' : str(self.sysadmin_user.apikey)} + response = self.app.post('/api/action/tag_create', params=params, + extra_environ=extra_environ) + assert response.json['success'] + + def _add_vocab_tag_to_dataset(self, dataset_id, vocab_id, tag_name): + params = json.dumps({'id': dataset_id}) + response = self.app.post('/api/action/package_show', params=params) + dataset = json.loads(response.body)['result'] + dataset['tags'].append({'name': tag_name, 'vocabulary_id': vocab_id}) + params = json.dumps(dataset) + response = self.app.post('/api/action/package_update', params=params, + extra_environ={'Authorization': str(self.sysadmin_user.apikey)}) + assert response.json['success'] + + def test_01_dataset_view(self): + plugin = MockVocabTagsPlugin() + plugins.load(plugin) + vocab_id = self._create_vocabulary(self.vocab_name) + self._add_vocab_tag(vocab_id, self.tag_name) + self._add_vocab_tag_to_dataset(self.dset.id, vocab_id, self.tag_name) + response = self.app.get(h.url_for(controller='package', action='read', id=self.dset.id)) + assert self.vocab_name in response.body +