diff --git a/ckan/tests/logic/test_tag_vocab.py b/ckan/tests/logic/test_tag_vocab.py index eeb4b5c5537..0415f01253e 100644 --- a/ckan/tests/logic/test_tag_vocab.py +++ b/ckan/tests/logic/test_tag_vocab.py @@ -1,10 +1,14 @@ import json +from pylons import request, tmpl_context as c +from genshi.input import HTML +from genshi.filters import Transformer from ckan import model from ckan.logic.converters import convert_to_tags, convert_from_tags, free_tags_only from ckan.lib.navl.dictization_functions import unflatten from ckan.lib.create_test_data import CreateTestData +import ckan.lib.helpers as h +from ckan import plugins from ckan.tests import WsgiAppCase -from ckan.tests.functional.api import assert_dicts_equal_ignoring_ordering class TestConverters(object): @classmethod @@ -65,3 +69,99 @@ def test_free_tags_only(self): assert len(data) == 2 assert ('tags', 1, 'vocabulary_id') in data.keys() assert ('tags', 1, '__extras') in data.keys() + + +class MockVocabTagsPlugin(plugins.SingletonPlugin): + plugins.implements(plugins.IDatasetForm, inherit=True) + plugins.implements(plugins.IGenshiStreamFilter) + + def is_fallback(self): + return True + + def package_types(self): + return ["mock_vocab_tags_plugin"] + + def setup_template_variables(self, context, data_dict=None): + # c.vocab_tags = get_action('tag_list')(context, {'vocabulary_name': self.vocab_name}) + pass + + def form_to_db_schema(self): + # schema = package_form_schema() + # schema.update({ + # 'vocab_tags': [ignore_missing, convert_to_tags(self.vocab_name)], + # }) + # return schema + pass + + def db_to_form_schema(self): + # schema = package_form_schema() + # schema.update({ + # 'tags': { + # '__extras': [keep_extras, free_tags_only] + # }, + # 'vocab_tags_selected': [convert_from_tags(self.vocab_name), ignore_missing], + # }) + # return schema + pass + + def filter(self, stream): + routes = request.environ.get('pylons.routes_dict') + if routes.get('controller') == 'package' \ + and routes.get('action') == 'read': + # add vocab tags to the bottom of the page + tags = c.pkg_dict.get('tags', []) + for tag in tags: + if tag.get('vocabulary_id'): + stream = stream | Transformer('body')\ + .append(HTML('

%s

' % tag.get('name'))) + return stream + + +class TestWUI(WsgiAppCase): + @classmethod + def setup_class(cls): + CreateTestData.create() + cls.sysadmin_user = model.User.get('testsysadmin') + cls.dset = model.Package.get('warandpeace') + cls.vocab_name = 'vocab' + cls.tag_name = 'vocab-tag' + + @classmethod + def teardown_class(cls): + model.repo.rebuild_db() + + def _create_vocabulary(self, vocab_name): + params = json.dumps({'name': vocab_name}) + extra_environ = {'Authorization' : str(self.sysadmin_user.apikey)} + response = self.app.post('/api/action/vocabulary_create', params=params, + extra_environ=extra_environ) + assert json.loads(response.body)['success'] + return json.loads(response.body)['result']['id'] + + def _add_vocab_tag(self, vocab_id, tag_name): + params = json.dumps({'name': tag_name, 'vocabulary_id': vocab_id}) + extra_environ = {'Authorization' : str(self.sysadmin_user.apikey)} + response = self.app.post('/api/action/tag_create', params=params, + extra_environ=extra_environ) + assert response.json['success'] + + def _add_vocab_tag_to_dataset(self, dataset_id, vocab_id, tag_name): + params = json.dumps({'id': dataset_id}) + response = self.app.post('/api/action/package_show', params=params) + dataset = json.loads(response.body)['result'] + dataset['tags'].append({'name': tag_name, 'vocabulary_id': vocab_id}) + params = json.dumps(dataset) + response = self.app.post('/api/action/package_update', params=params, + extra_environ={'Authorization': str(self.sysadmin_user.apikey)}) + assert response.json['success'] + + def test_01_dataset_view(self): + plugin = MockVocabTagsPlugin() + plugins.load(plugin) + vocab_id = self._create_vocabulary(self.vocab_name) + self._add_vocab_tag(vocab_id, self.tag_name) + self._add_vocab_tag_to_dataset(self.dset.id, vocab_id, self.tag_name) + response = self.app.get(h.url_for(controller='package', action='read', id=self.dset.id)) + assert self.vocab_name in response.body + +