From cefe30b7f0116c0299c66e469fa14adf830bbda7 Mon Sep 17 00:00:00 2001 From: Sergey Motornyuk Date: Fri, 25 Oct 2019 18:47:55 +0300 Subject: [PATCH] Update lib tests --- ckan/tests/lib/__init__.py | 4 +- .../lib/dictization/test_model_dictize.py | 739 ++++---- .../lib/navl/test_dictization_functions.py | 64 +- ckan/tests/lib/navl/test_validators.py | 177 +- ckan/tests/lib/search/test_index.py | 290 +-- ckan/tests/lib/test_app_globals.py | 37 +- ckan/tests/lib/test_auth_tkt.py | 284 +-- ckan/tests/lib/test_base.py | 749 ++++---- ckan/tests/lib/test_changes.py | 1569 +++++++++-------- ckan/tests/lib/test_cli.py | 353 ++-- ckan/tests/lib/test_config_tool.py | 226 ++- ckan/tests/lib/test_datapreview.py | 426 +++-- ckan/tests/lib/test_helpers.py | 1127 ++++++------ ckan/tests/lib/test_i18n.py | 120 +- ckan/tests/lib/test_io.py | 52 +- ckan/tests/lib/test_jobs.py | 243 ++- ckan/tests/lib/test_mailer.py | 200 ++- ckan/tests/lib/test_munge.py | 277 ++- ckan/tests/lib/test_navl.py | 47 +- 19 files changed, 3676 insertions(+), 3308 deletions(-) diff --git a/ckan/tests/lib/__init__.py b/ckan/tests/lib/__init__.py index bb5377869e0..4a454638cd3 100644 --- a/ckan/tests/lib/__init__.py +++ b/ckan/tests/lib/__init__.py @@ -1,6 +1,6 @@ # encoding: utf-8 -'''**All lib functions should have tests**. +"""**All lib functions should have tests**. .. todo:: @@ -16,4 +16,4 @@ extensions) to use, so all of these functions should really have tests and docstrings. It's probably worth focusing on these modules first. -''' +""" diff --git a/ckan/tests/lib/dictization/test_model_dictize.py b/ckan/tests/lib/dictization/test_model_dictize.py index 5edbf3e2aa5..2f82818228b 100644 --- a/ckan/tests/lib/dictization/test_model_dictize.py +++ b/ckan/tests/lib/dictization/test_model_dictize.py @@ -2,8 +2,7 @@ import datetime import copy - -from nose.tools import assert_equal +import pytest from ckan.lib.dictization import model_dictize, model_save from ckan import model @@ -13,319 +12,350 @@ class TestGroupListDictize: - - def setup(self): - helpers.reset_db() - search.clear_all() - + @pytest.mark.usefixtures("clean_db", "clean_index") def test_group_list_dictize(self): group = factories.Group() group_list = model.Session.query(model.Group).filter_by().all() - context = {'model': model, 'session': model.Session} + context = {"model": model, "session": model.Session} group_dicts = model_dictize.group_list_dictize(group_list, context) - assert_equal(len(group_dicts), 1) - assert_equal(group_dicts[0]['name'], group['name']) - assert_equal(group_dicts[0]['package_count'], 0) - assert 'extras' not in group_dicts[0] - assert 'tags' not in group_dicts[0] - assert 'groups' not in group_dicts[0] + assert len(group_dicts) == 1 + assert group_dicts[0]["name"] == group["name"] + assert group_dicts[0]["package_count"] == 0 + assert "extras" not in group_dicts[0] + assert "tags" not in group_dicts[0] + assert "groups" not in group_dicts[0] + @pytest.mark.usefixtures("clean_db", "clean_index") def test_group_list_dictize_sorted(self): - factories.Group(name='aa') - factories.Group(name='bb') - group_list = [model.Group.get(u'bb'), - model.Group.get(u'aa')] - context = {'model': model, 'session': model.Session} + factories.Group(name="aa") + factories.Group(name="bb") + group_list = [model.Group.get(u"bb"), model.Group.get(u"aa")] + context = {"model": model, "session": model.Session} group_dicts = model_dictize.group_list_dictize(group_list, context) # list is resorted by name - assert_equal(group_dicts[0]['name'], 'aa') - assert_equal(group_dicts[1]['name'], 'bb') + assert group_dicts[0]["name"] == "aa" + assert group_dicts[1]["name"] == "bb" + @pytest.mark.usefixtures("clean_db", "clean_index") def test_group_list_dictize_reverse_sorted(self): - factories.Group(name='aa') - factories.Group(name='bb') - group_list = [model.Group.get(u'aa'), - model.Group.get(u'bb')] - context = {'model': model, 'session': model.Session} + factories.Group(name="aa") + factories.Group(name="bb") + group_list = [model.Group.get(u"aa"), model.Group.get(u"bb")] + context = {"model": model, "session": model.Session} - group_dicts = model_dictize.group_list_dictize(group_list, context, - reverse=True) + group_dicts = model_dictize.group_list_dictize( + group_list, context, reverse=True + ) - assert_equal(group_dicts[0]['name'], 'bb') - assert_equal(group_dicts[1]['name'], 'aa') + assert group_dicts[0]["name"] == "bb" + assert group_dicts[1]["name"] == "aa" + @pytest.mark.usefixtures("clean_db", "clean_index") def test_group_list_dictize_sort_by_package_count(self): - factories.Group(name='aa') - factories.Group(name='bb') - factories.Dataset(groups=[{'name': 'aa'}, {'name': 'bb'}]) - factories.Dataset(groups=[{'name': 'bb'}]) - group_list = [model.Group.get(u'bb'), - model.Group.get(u'aa')] - context = {'model': model, 'session': model.Session} + factories.Group(name="aa") + factories.Group(name="bb") + factories.Dataset(groups=[{"name": "aa"}, {"name": "bb"}]) + factories.Dataset(groups=[{"name": "bb"}]) + group_list = [model.Group.get(u"bb"), model.Group.get(u"aa")] + context = {"model": model, "session": model.Session} group_dicts = model_dictize.group_list_dictize( - group_list, context, sort_key=lambda x: x['package_count'], - with_package_counts=True) + group_list, + context, + sort_key=lambda x: x["package_count"], + with_package_counts=True, + ) # list is resorted by package counts - assert_equal(group_dicts[0]['name'], 'aa') - assert_equal(group_dicts[1]['name'], 'bb') + assert group_dicts[0]["name"] == "aa" + assert group_dicts[1]["name"] == "bb" + @pytest.mark.usefixtures("clean_db", "clean_index") def test_group_list_dictize_without_package_count(self): group_ = factories.Group() - factories.Dataset(groups=[{'name': group_['name']}]) - group_list = [model.Group.get(group_['name'])] - context = {'model': model, 'session': model.Session} + factories.Dataset(groups=[{"name": group_["name"]}]) + group_list = [model.Group.get(group_["name"])] + context = {"model": model, "session": model.Session} group_dicts = model_dictize.group_list_dictize( - group_list, context, with_package_counts=False) + group_list, context, with_package_counts=False + ) - assert 'packages' not in group_dicts[0] + assert "packages" not in group_dicts[0] + @pytest.mark.usefixtures("clean_db", "clean_index") def test_group_list_dictize_including_extras(self): - factories.Group(extras=[{'key': 'k1', 'value': 'v1'}]) + factories.Group(extras=[{"key": "k1", "value": "v1"}]) group_list = model.Session.query(model.Group).filter_by().all() - context = {'model': model, 'session': model.Session} + context = {"model": model, "session": model.Session} - group_dicts = model_dictize.group_list_dictize(group_list, context, - include_extras=True) + group_dicts = model_dictize.group_list_dictize( + group_list, context, include_extras=True + ) - assert_equal(group_dicts[0]['extras'][0]['key'], 'k1') + assert group_dicts[0]["extras"][0]["key"] == "k1" + @pytest.mark.usefixtures("clean_db", "clean_index") def test_group_list_dictize_including_tags(self): factories.Group() # group tags aren't in the group_create schema, so its slightly more # convoluted way to create them group_obj = model.Session.query(model.Group).first() - tag = model.Tag(name='t1') + tag = model.Tag(name="t1") model.Session.add(tag) model.Session.commit() tag = model.Session.query(model.Tag).first() group_obj = model.Session.query(model.Group).first() - member = model.Member(group=group_obj, table_id=tag.id, - table_name='tag') + member = model.Member( + group=group_obj, table_id=tag.id, table_name="tag" + ) model.Session.add(member) model.repo.new_revision() model.Session.commit() group_list = model.Session.query(model.Group).filter_by().all() - context = {'model': model, 'session': model.Session} + context = {"model": model, "session": model.Session} - group_dicts = model_dictize.group_list_dictize(group_list, context, - include_tags=True) + group_dicts = model_dictize.group_list_dictize( + group_list, context, include_tags=True + ) - assert_equal(group_dicts[0]['tags'][0]['name'], 't1') + assert group_dicts[0]["tags"][0]["name"] == "t1" + @pytest.mark.usefixtures("clean_db", "clean_index") def test_group_list_dictize_including_groups(self): - factories.Group(name='parent') - factories.Group(name='child', groups=[{'name': 'parent'}]) - group_list = [model.Group.get(u'parent'), model.Group.get(u'child')] - context = {'model': model, 'session': model.Session} + factories.Group(name="parent") + factories.Group(name="child", groups=[{"name": "parent"}]) + group_list = [model.Group.get(u"parent"), model.Group.get(u"child")] + context = {"model": model, "session": model.Session} child_dict, parent_dict = model_dictize.group_list_dictize( - group_list, context, include_groups=True) + group_list, context, include_groups=True + ) - assert_equal(parent_dict['name'], 'parent') - assert_equal(child_dict['name'], 'child') - assert_equal(parent_dict['groups'], []) - assert_equal(child_dict['groups'][0]['name'], 'parent') + assert parent_dict["name"] == "parent" + assert child_dict["name"] == "child" + assert parent_dict["groups"] == [] + assert child_dict["groups"][0]["name"] == "parent" class TestGroupDictize: - - def setup(self): - helpers.reset_db() - search.clear_all() - + @pytest.mark.usefixtures("clean_db", "clean_index") def test_group_dictize(self): - group = factories.Group(name='test_dictize') + group = factories.Group(name="test_dictize") group_obj = model.Session.query(model.Group).filter_by().first() - context = {'model': model, 'session': model.Session} + context = {"model": model, "session": model.Session} group = model_dictize.group_dictize(group_obj, context) - assert_equal(group['name'], 'test_dictize') - assert_equal(group['packages'], []) - assert_equal(group['extras'], []) - assert_equal(group['tags'], []) - assert_equal(group['groups'], []) + assert group["name"] == "test_dictize" + assert group["packages"] == [] + assert group["extras"] == [] + assert group["tags"] == [] + assert group["groups"] == [] + @pytest.mark.usefixtures("clean_db", "clean_index") def test_group_dictize_group_with_dataset(self): group_ = factories.Group() - package = factories.Dataset(groups=[{'name': group_['name']}]) + package = factories.Dataset(groups=[{"name": group_["name"]}]) group_obj = model.Session.query(model.Group).filter_by().first() - context = {'model': model, 'session': model.Session} + context = {"model": model, "session": model.Session} group = model_dictize.group_dictize(group_obj, context) - assert_equal(group['packages'][0]['name'], package['name']) - assert_equal(group['packages'][0]['groups'][0]['name'], group_['name']) + assert group["packages"][0]["name"] == package["name"] + assert group["packages"][0]["groups"][0]["name"] == group_["name"] + @pytest.mark.usefixtures("clean_db", "clean_index") def test_group_dictize_group_with_extra(self): - factories.Group(extras=[{'key': 'k1', 'value': 'v1'}]) + factories.Group(extras=[{"key": "k1", "value": "v1"}]) group_obj = model.Session.query(model.Group).filter_by().first() - context = {'model': model, 'session': model.Session} + context = {"model": model, "session": model.Session} group = model_dictize.group_dictize(group_obj, context) - assert_equal(group['extras'][0]['key'], 'k1') + assert group["extras"][0]["key"] == "k1" + @pytest.mark.usefixtures("clean_db", "clean_index") def test_group_dictize_group_with_parent_group(self): - factories.Group(name='parent') - factories.Group(name='child', groups=[{'name': 'parent'}]) - group_obj = model.Group.get('child') - context = {'model': model, 'session': model.Session} + factories.Group(name="parent") + factories.Group(name="child", groups=[{"name": "parent"}]) + group_obj = model.Group.get("child") + context = {"model": model, "session": model.Session} group = model_dictize.group_dictize(group_obj, context) - assert_equal(len(group['groups']), 1) - assert_equal(group['groups'][0]['name'], 'parent') - assert_equal(group['groups'][0]['package_count'], 0) + assert len(group["groups"]) == 1 + assert group["groups"][0]["name"] == "parent" + assert group["groups"][0]["package_count"] == 0 + @pytest.mark.usefixtures("clean_db", "clean_index") def test_group_dictize_without_packages(self): # group_list_dictize might not be interested in packages at all # so sets these options. e.g. it is not all_fields nor are the results # sorted by the number of packages. factories.Group() group_obj = model.Session.query(model.Group).filter_by().first() - context = {'model': model, 'session': model.Session} + context = {"model": model, "session": model.Session} - group = model_dictize.group_dictize(group_obj, context, - packages_field=None) + group = model_dictize.group_dictize( + group_obj, context, packages_field=None + ) - assert 'packages' not in group + assert "packages" not in group + @pytest.mark.usefixtures("clean_db", "clean_index") def test_group_dictize_with_package_list(self): group_ = factories.Group() - package = factories.Dataset(groups=[{'name': group_['name']}]) + package = factories.Dataset(groups=[{"name": group_["name"]}]) group_obj = model.Session.query(model.Group).filter_by().first() - context = {'model': model, 'session': model.Session} + context = {"model": model, "session": model.Session} group = model_dictize.group_dictize(group_obj, context) - assert_equal(type(group['packages']), list) - assert_equal(len(group['packages']), 1) - assert_equal(group['packages'][0]['name'], package['name']) + assert type(group["packages"]) == list + assert len(group["packages"]) == 1 + assert group["packages"][0]["name"] == package["name"] + @pytest.mark.usefixtures("clean_db", "clean_index") def test_group_dictize_with_package_list_limited(self): - ''' + """ Packages returned in group are limited by context var. - ''' + """ group_ = factories.Group() for _ in range(10): - factories.Dataset(groups=[{'name': group_['name']}]) + factories.Dataset(groups=[{"name": group_["name"]}]) group_obj = model.Session.query(model.Group).filter_by().first() # limit packages to 4 - context = {'model': model, 'session': model.Session, 'limits': {'packages': 4}} + context = { + "model": model, + "session": model.Session, + "limits": {"packages": 4}, + } group = model_dictize.group_dictize(group_obj, context) - assert_equal(len(group['packages']), 4) + assert len(group["packages"]) == 4 + @pytest.mark.usefixtures("clean_db", "clean_index") def test_group_dictize_with_package_list_limited_over(self): - ''' + """ Packages limit is set higher than number of packages in group. - ''' + """ group_ = factories.Group() for _ in range(3): - factories.Dataset(groups=[{'name': group_['name']}]) + factories.Dataset(groups=[{"name": group_["name"]}]) group_obj = model.Session.query(model.Group).filter_by().first() # limit packages to 4 - context = {'model': model, 'session': model.Session, 'limits': {'packages': 4}} + context = { + "model": model, + "session": model.Session, + "limits": {"packages": 4}, + } group = model_dictize.group_dictize(group_obj, context) - assert_equal(len(group['packages']), 3) + assert len(group["packages"]) == 3 - @helpers.change_config('ckan.search.rows_max', '4') + @pytest.mark.usefixtures("clean_db", "clean_index") + @pytest.mark.ckan_config("ckan.search.rows_max", "4") def test_group_dictize_with_package_list_limited_by_config(self): group_ = factories.Group() for _ in range(5): - factories.Dataset(groups=[{'name': group_['name']}]) + factories.Dataset(groups=[{"name": group_["name"]}]) group_obj = model.Session.query(model.Group).filter_by().first() - context = {'model': model, 'session': model.Session} + context = {"model": model, "session": model.Session} group = model_dictize.group_dictize(group_obj, context) - assert_equal(len(group['packages']), 4) + assert len(group["packages"]) == 4 # limited by ckan.search.rows_max + @pytest.mark.usefixtures("clean_db", "clean_index") def test_group_dictize_with_package_count(self): # group_list_dictize calls it like this by default group_ = factories.Group() other_group_ = factories.Group() - factories.Dataset(groups=[{'name': group_['name']}]) - factories.Dataset(groups=[{'name': other_group_['name']}]) + factories.Dataset(groups=[{"name": group_["name"]}]) + factories.Dataset(groups=[{"name": other_group_["name"]}]) group_obj = model.Session.query(model.Group).filter_by().first() - context = {'model': model, 'session': model.Session, - 'dataset_counts': model_dictize.get_group_dataset_counts() - } + context = { + "model": model, + "session": model.Session, + "dataset_counts": model_dictize.get_group_dataset_counts(), + } - group = model_dictize.group_dictize(group_obj, context, - packages_field='dataset_count') - assert_equal(group['package_count'], 1) + group = model_dictize.group_dictize( + group_obj, context, packages_field="dataset_count" + ) + assert group["package_count"] == 1 - def test_group_dictize_with_no_packages_field_but_still_package_count(self): + @pytest.mark.usefixtures("clean_db", "clean_index") + def test_group_dictize_with_no_packages_field_but_still_package_count( + self + ): # logic.get.group_show calls it like this when not include_datasets group_ = factories.Group() - factories.Dataset(groups=[{'name': group_['name']}]) + factories.Dataset(groups=[{"name": group_["name"]}]) group_obj = model.Session.query(model.Group).filter_by().first() - context = {'model': model, 'session': model.Session} + context = {"model": model, "session": model.Session} # not supplying dataset_counts in this case either - group = model_dictize.group_dictize(group_obj, context, - packages_field='dataset_count') + group = model_dictize.group_dictize( + group_obj, context, packages_field="dataset_count" + ) - assert 'packages' not in group - assert_equal(group['package_count'], 1) + assert "packages" not in group + assert group["package_count"] == 1 + @pytest.mark.usefixtures("clean_db", "clean_index") def test_group_dictize_for_org_with_package_list(self): org_ = factories.Organization() - package = factories.Dataset(owner_org=org_['id']) + package = factories.Dataset(owner_org=org_["id"]) group_obj = model.Session.query(model.Group).filter_by().first() - context = {'model': model, 'session': model.Session} + context = {"model": model, "session": model.Session} org = model_dictize.group_dictize(group_obj, context) - assert_equal(type(org['packages']), list) - assert_equal(len(org['packages']), 1) - assert_equal(org['packages'][0]['name'], package['name']) + assert type(org["packages"]) == list + assert len(org["packages"]) == 1 + assert org["packages"][0]["name"] == package["name"] + @pytest.mark.usefixtures("clean_db", "clean_index") def test_group_dictize_for_org_with_package_count(self): # group_list_dictize calls it like this by default org_ = factories.Organization() other_org_ = factories.Organization() - factories.Dataset(owner_org=org_['id']) - factories.Dataset(owner_org=other_org_['id']) + factories.Dataset(owner_org=org_["id"]) + factories.Dataset(owner_org=other_org_["id"]) org_obj = model.Session.query(model.Group).filter_by().first() - context = {'model': model, 'session': model.Session, - 'dataset_counts': model_dictize.get_group_dataset_counts() - } + context = { + "model": model, + "session": model.Session, + "dataset_counts": model_dictize.get_group_dataset_counts(), + } - org = model_dictize.group_dictize(org_obj, context, - packages_field='dataset_count') + org = model_dictize.group_dictize( + org_obj, context, packages_field="dataset_count" + ) - assert_equal(org['package_count'], 1) + assert org["package_count"] == 1 class TestPackageDictize: - - def setup(self): - helpers.reset_db() - def remove_changable_values(self, dict_): dict_ = copy.deepcopy(dict_) for key, value in dict_.items(): - if key.endswith('id') and key != 'license_id': + if key.endswith("id") and key != "license_id": dict_.pop(key) - if key == 'created': + if key == "created": dict_.pop(key) - if 'timestamp' in key: + if "timestamp" in key: dict_.pop(key) - if key in ['metadata_created', 'metadata_modified']: + if key in ["metadata_created", "metadata_modified"]: dict_.pop(key) if isinstance(value, list): for i, sub_dict in enumerate(value): @@ -335,226 +365,248 @@ def remove_changable_values(self, dict_): def assert_equals_expected(self, expected_dict, result_dict): result_dict = self.remove_changable_values(result_dict) superfluous_keys = set(result_dict) - set(expected_dict) - assert not superfluous_keys, 'Did not expect key: %s' % \ - ' '.join(('%s=%s' % (k, result_dict[k]) for k in superfluous_keys)) + assert not superfluous_keys, "Did not expect key: %s" % " ".join( + ("%s=%s" % (k, result_dict[k]) for k in superfluous_keys) + ) for key in expected_dict: - assert expected_dict[key] == result_dict[key], \ - '%s=%s should be %s' % \ - (key, result_dict[key], expected_dict[key]) + assert expected_dict[key] == result_dict[key], ( + "%s=%s should be %s" + % (key, result_dict[key], expected_dict[key]) + ) + @pytest.mark.usefixtures("clean_db") def test_package_dictize_basic(self): - dataset = factories.Dataset(name='test_dataset_dictize', - notes='Some *description*', - url='http://example.com') - dataset_obj = model.Package.get(dataset['id']) - context = {'model': model, 'session': model.Session} + dataset = factories.Dataset( + name="test_dataset_dictize", + notes="Some *description*", + url="http://example.com", + ) + dataset_obj = model.Package.get(dataset["id"]) + context = {"model": model, "session": model.Session} result = model_dictize.package_dictize(dataset_obj, context) - assert_equal(result['name'], dataset['name']) - assert_equal(result['isopen'], False) - assert_equal(result['type'], dataset['type']) - today = datetime.date.today().strftime('%Y-%m-%d') - assert result['metadata_modified'].startswith(today) - assert result['metadata_created'].startswith(today) - assert_equal(result['creator_user_id'], dataset_obj.creator_user_id) + assert result["name"] == dataset["name"] + assert not (result["isopen"]) + assert result["type"] == dataset["type"] + today = datetime.date.today().strftime("%Y-%m-%d") + assert result["metadata_modified"].startswith(today) + assert result["metadata_created"].startswith(today) + assert result["creator_user_id"] == dataset_obj.creator_user_id expected_dict = { - 'author': None, - 'author_email': None, - 'extras': [], - 'groups': [], - 'isopen': False, - 'license_id': None, - 'license_title': None, - 'maintainer': None, - 'maintainer_email': None, - 'name': u'test_dataset_dictize', - 'notes': 'Some *description*', - 'num_resources': 0, - 'num_tags': 0, - 'organization': None, - 'owner_org': None, - 'private': False, - 'relationships_as_object': [], - 'relationships_as_subject': [], - 'resources': [], - 'state': u'active', - 'tags': [], - 'title': u'Test Dataset', - 'type': u'dataset', - 'url': 'http://example.com', - 'version': None} + "author": None, + "author_email": None, + "extras": [], + "groups": [], + "isopen": False, + "license_id": None, + "license_title": None, + "maintainer": None, + "maintainer_email": None, + "name": u"test_dataset_dictize", + "notes": "Some *description*", + "num_resources": 0, + "num_tags": 0, + "organization": None, + "owner_org": None, + "private": False, + "relationships_as_object": [], + "relationships_as_subject": [], + "resources": [], + "state": u"active", + "tags": [], + "title": u"Test Dataset", + "type": u"dataset", + "url": "http://example.com", + "version": None, + } self.assert_equals_expected(expected_dict, result) + @pytest.mark.usefixtures("clean_db") def test_package_dictize_license(self): - dataset = factories.Dataset(license_id='cc-by') - dataset_obj = model.Package.get(dataset['id']) - context = {'model': model, 'session': model.Session} + dataset = factories.Dataset(license_id="cc-by") + dataset_obj = model.Package.get(dataset["id"]) + context = {"model": model, "session": model.Session} result = model_dictize.package_dictize(dataset_obj, context) - assert_equal(result['isopen'], True) - assert_equal(result['license_id'], 'cc-by') - assert_equal(result['license_url'], - 'http://www.opendefinition.org/licenses/cc-by') - assert_equal(result['license_title'], 'Creative Commons Attribution') + assert result["isopen"] + assert result["license_id"] == "cc-by" + assert ( + result["license_url"] + == "http://www.opendefinition.org/licenses/cc-by" + ) + assert result["license_title"] == "Creative Commons Attribution" + @pytest.mark.usefixtures("clean_db") def test_package_dictize_title_stripped_of_whitespace(self): - dataset = factories.Dataset(title=' has whitespace \t') - dataset_obj = model.Package.get(dataset['id']) - context = {'model': model, 'session': model.Session} + dataset = factories.Dataset(title=" has whitespace \t") + dataset_obj = model.Package.get(dataset["id"]) + context = {"model": model, "session": model.Session} result = model_dictize.package_dictize(dataset_obj, context) - assert_equal(result['title'], 'has whitespace') - assert_equal(dataset_obj.title, ' has whitespace \t') + assert result["title"] == "has whitespace" + assert dataset_obj.title == " has whitespace \t" + @pytest.mark.usefixtures("clean_db") def test_package_dictize_resource(self): dataset = factories.Dataset() - resource = factories.Resource(package_id=dataset['id'], - name='test_pkg_dictize') - dataset_obj = model.Package.get(dataset['id']) - context = {'model': model, 'session': model.Session} + resource = factories.Resource( + package_id=dataset["id"], name="test_pkg_dictize" + ) + dataset_obj = model.Package.get(dataset["id"]) + context = {"model": model, "session": model.Session} result = model_dictize.package_dictize(dataset_obj, context) - assert_equal_for_keys(result['resources'][0], resource, 'name', 'url') + assert_equal_for_keys(result["resources"][0], resource, "name", "url") expected_dict = { - u'cache_last_updated': None, - u'cache_url': None, - u'description': u'Just another test resource.', - u'format': u'res_format', - u'hash': u'', - u'last_modified': None, - u'mimetype': None, - u'mimetype_inner': None, - u'name': u'test_pkg_dictize', - u'position': 0, - u'resource_type': None, - u'size': None, - u'state': u'active', - u'url': u'http://link.to.some.data', - u'url_type': None + u"cache_last_updated": None, + u"cache_url": None, + u"description": u"Just another test resource.", + u"format": u"res_format", + u"hash": u"", + u"last_modified": None, + u"mimetype": None, + u"mimetype_inner": None, + u"name": u"test_pkg_dictize", + u"position": 0, + u"resource_type": None, + u"size": None, + u"state": u"active", + u"url": u"http://link.to.some.data", + u"url_type": None, } - self.assert_equals_expected(expected_dict, result['resources'][0]) + self.assert_equals_expected(expected_dict, result["resources"][0]) + @pytest.mark.usefixtures("clean_db") def test_package_dictize_resource_upload_and_striped(self): dataset = factories.Dataset() - resource = factories.Resource(package=dataset['id'], - name='test_pkg_dictize', - url_type='upload', - url='some_filename.csv') + resource = factories.Resource( + package=dataset["id"], + name="test_pkg_dictize", + url_type="upload", + url="some_filename.csv", + ) - context = {'model': model, 'session': model.Session} + context = {"model": model, "session": model.Session} result = model_save.resource_dict_save(resource, context) - expected_dict = { - u'url': u'some_filename.csv', - u'url_type': u'upload' - } - assert expected_dict['url'] == result.url + expected_dict = {u"url": u"some_filename.csv", u"url_type": u"upload"} + assert expected_dict["url"] == result.url + @pytest.mark.usefixtures("clean_db") def test_package_dictize_resource_upload_with_url_and_striped(self): dataset = factories.Dataset() - resource = factories.Resource(package=dataset['id'], - name='test_pkg_dictize', - url_type='upload', - url='http://some_filename.csv') + resource = factories.Resource( + package=dataset["id"], + name="test_pkg_dictize", + url_type="upload", + url="http://some_filename.csv", + ) - context = {'model': model, 'session': model.Session} + context = {"model": model, "session": model.Session} result = model_save.resource_dict_save(resource, context) - expected_dict = { - u'url': u'some_filename.csv', - u'url_type': u'upload' - } - assert expected_dict['url'] == result.url + expected_dict = {u"url": u"some_filename.csv", u"url_type": u"upload"} + assert expected_dict["url"] == result.url + @pytest.mark.usefixtures("clean_db") def test_package_dictize_tags(self): - dataset = factories.Dataset(tags=[{'name': 'fish'}]) - dataset_obj = model.Package.get(dataset['id']) - context = {'model': model, 'session': model.Session} + dataset = factories.Dataset(tags=[{"name": "fish"}]) + dataset_obj = model.Package.get(dataset["id"]) + context = {"model": model, "session": model.Session} result = model_dictize.package_dictize(dataset_obj, context) - assert_equal(result['tags'][0]['name'], 'fish') - expected_dict = {'display_name': u'fish', - u'name': u'fish', - u'state': u'active'} - self.assert_equals_expected(expected_dict, result['tags'][0]) + assert result["tags"][0]["name"] == "fish" + expected_dict = { + "display_name": u"fish", + u"name": u"fish", + u"state": u"active", + } + self.assert_equals_expected(expected_dict, result["tags"][0]) + @pytest.mark.usefixtures("clean_db") def test_package_dictize_extras(self): - extras_dict = {'key': 'latitude', 'value': '54.6'} + extras_dict = {"key": "latitude", "value": "54.6"} dataset = factories.Dataset(extras=[extras_dict]) - dataset_obj = model.Package.get(dataset['id']) - context = {'model': model, 'session': model.Session} + dataset_obj = model.Package.get(dataset["id"]) + context = {"model": model, "session": model.Session} result = model_dictize.package_dictize(dataset_obj, context) - assert_equal_for_keys(result['extras'][0], extras_dict, - 'key', 'value') - expected_dict = {u'key': u'latitude', - u'state': u'active', - u'value': u'54.6'} - self.assert_equals_expected(expected_dict, result['extras'][0]) + assert_equal_for_keys(result["extras"][0], extras_dict, "key", "value") + expected_dict = { + u"key": u"latitude", + u"state": u"active", + u"value": u"54.6", + } + self.assert_equals_expected(expected_dict, result["extras"][0]) + @pytest.mark.usefixtures("clean_db") def test_package_dictize_group(self): - group = factories.Group(name='test_group_dictize', - title='Test Group Dictize') - dataset = factories.Dataset(groups=[{'name': group['name']}]) - dataset_obj = model.Package.get(dataset['id']) - context = {'model': model, 'session': model.Session} + group = factories.Group( + name="test_group_dictize", title="Test Group Dictize" + ) + dataset = factories.Dataset(groups=[{"name": group["name"]}]) + dataset_obj = model.Package.get(dataset["id"]) + context = {"model": model, "session": model.Session} result = model_dictize.package_dictize(dataset_obj, context) - assert_equal_for_keys(result['groups'][0], group, - 'name') + assert_equal_for_keys(result["groups"][0], group, "name") expected_dict = { - u'approval_status': u'approved', - u'capacity': u'public', - u'description': u'A test description for this test group.', - 'display_name': u'Test Group Dictize', - 'image_display_url': u'', - u'image_url': u'', - u'is_organization': False, - u'name': u'test_group_dictize', - u'state': u'active', - u'title': u'Test Group Dictize', - u'type': u'group'} - self.assert_equals_expected(expected_dict, result['groups'][0]) + u"approval_status": u"approved", + u"capacity": u"public", + u"description": u"A test description for this test group.", + "display_name": u"Test Group Dictize", + "image_display_url": u"", + u"image_url": u"", + u"is_organization": False, + u"name": u"test_group_dictize", + u"state": u"active", + u"title": u"Test Group Dictize", + u"type": u"group", + } + self.assert_equals_expected(expected_dict, result["groups"][0]) + @pytest.mark.usefixtures("clean_db") def test_package_dictize_owner_org(self): - org = factories.Organization(name='test_package_dictize') - dataset = factories.Dataset(owner_org=org['id']) - dataset_obj = model.Package.get(dataset['id']) - context = {'model': model, 'session': model.Session} + org = factories.Organization(name="test_package_dictize") + dataset = factories.Dataset(owner_org=org["id"]) + dataset_obj = model.Package.get(dataset["id"]) + context = {"model": model, "session": model.Session} result = model_dictize.package_dictize(dataset_obj, context) - assert_equal(result['owner_org'], org['id']) - assert_equal_for_keys(result['organization'], org, - 'name') + assert result["owner_org"] == org["id"] + assert_equal_for_keys(result["organization"], org, "name") expected_dict = { - u'approval_status': u'approved', - u'description': u'Just another test organization.', - u'image_url': u'http://placekitten.com/g/200/100', - u'is_organization': True, - u'name': u'test_package_dictize', - u'state': u'active', - u'title': u'Test Organization', - u'type': u'organization' + u"approval_status": u"approved", + u"description": u"Just another test organization.", + u"image_url": u"http://placekitten.com/g/200/100", + u"is_organization": True, + u"name": u"test_package_dictize", + u"state": u"active", + u"title": u"Test Organization", + u"type": u"organization", } - self.assert_equals_expected(expected_dict, result['organization']) + self.assert_equals_expected(expected_dict, result["organization"]) def assert_equal_for_keys(dict1, dict2, *keys): for key in keys: assert key in dict1, 'Dict 1 misses key "%s"' % key assert key in dict2, 'Dict 2 misses key "%s"' % key - assert dict1[key] == dict2[key], '%s != %s (key=%s)' % \ - (dict1[key], dict2[key], key) + assert dict1[key] == dict2[key], "%s != %s (key=%s)" % ( + dict1[key], + dict2[key], + key, + ) class TestTagDictize(object): @@ -576,8 +628,9 @@ def test_tag_dictize_not_including_datasets(self): factories.Dataset(tags=[dict(name="test_tag")]) tag = model.Tag.get("test_tag") - tag_dict = model_dictize.tag_dictize(tag, context={"model": model}, - include_datasets=False) + tag_dict = model_dictize.tag_dictize( + tag, context={"model": model}, include_datasets=False + ) assert not tag_dict.get("packages") @@ -588,12 +641,14 @@ class TestVocabularyDictize(object): def test_vocabulary_dictize_including_datasets(self): """include_datasets=True should include datasets in vocab dicts.""" vocab_dict = factories.Vocabulary( - tags=[dict(name="test_tag_1"), dict(name="test_tag_2")]) + tags=[dict(name="test_tag_1"), dict(name="test_tag_2")] + ) factories.Dataset(tags=vocab_dict["tags"]) vocab_obj = model.Vocabulary.get(vocab_dict["name"]) vocab_dict = model_dictize.vocabulary_dictize( - vocab_obj, context={"model": model}, include_datasets=True) + vocab_obj, context={"model": model}, include_datasets=True + ) assert len(vocab_dict["tags"]) == 2 for tag in vocab_dict["tags"]: @@ -602,12 +657,14 @@ def test_vocabulary_dictize_including_datasets(self): def test_vocabulary_dictize_not_including_datasets(self): """By default datasets should not be included in vocab dicts.""" vocab_dict = factories.Vocabulary( - tags=[dict(name="test_tag_1"), dict(name="test_tag_2")]) + tags=[dict(name="test_tag_1"), dict(name="test_tag_2")] + ) factories.Dataset(tags=vocab_dict["tags"]) vocab_obj = model.Vocabulary.get(vocab_dict["name"]) vocab_dict = model_dictize.vocabulary_dictize( - vocab_obj, context={"model": model}) + vocab_obj, context={"model": model} + ) assert len(vocab_dict["tags"]) == 2 for tag in vocab_dict["tags"]: @@ -615,48 +672,44 @@ def test_vocabulary_dictize_not_including_datasets(self): class TestActivityDictize(object): - def setup(self): - helpers.reset_db() - + @pytest.mark.usefixtures("clean_db") def test_include_data(self): dataset = factories.Dataset() user = factories.User() activity = factories.Activity( - user_id=user['id'], - object_id=dataset['id'], + user_id=user["id"], + object_id=dataset["id"], revision_id=None, - activity_type='new package', - data={ - 'package': copy.deepcopy(dataset), - 'actor': 'Mr Someone', - }) - activity_obj = model.Activity.get(activity['id']) - context = {'model': model, 'session': model.Session} - dictized = model_dictize.activity_dictize(activity_obj, context, - include_data=True) - assert_equal(dictized['user_id'], user['id']) - assert_equal(dictized['activity_type'], 'new package') - assert_equal(dictized['data']['package']['title'], dataset['title']) - assert_equal(dictized['data']['package']['id'], dataset['id']) - assert_equal(dictized['data']['actor'], 'Mr Someone') - + activity_type="new package", + data={"package": copy.deepcopy(dataset), "actor": "Mr Someone"}, + ) + activity_obj = model.Activity.get(activity["id"]) + context = {"model": model, "session": model.Session} + dictized = model_dictize.activity_dictize( + activity_obj, context, include_data=True + ) + assert dictized["user_id"] == user["id"] + assert dictized["activity_type"] == "new package" + assert dictized["data"]["package"]["title"] == dataset["title"] + assert dictized["data"]["package"]["id"] == dataset["id"] + assert dictized["data"]["actor"] == "Mr Someone" + + @pytest.mark.usefixtures("clean_db") def test_dont_include_data(self): dataset = factories.Dataset() user = factories.User() activity = factories.Activity( - user_id=user['id'], - object_id=dataset['id'], + user_id=user["id"], + object_id=dataset["id"], revision_id=None, - activity_type='new package', - data={ - 'package': copy.deepcopy(dataset), - 'actor': 'Mr Someone', - }) - activity_obj = model.Activity.get(activity['id']) - context = {'model': model, 'session': model.Session} - dictized = model_dictize.activity_dictize(activity_obj, context, - include_data=False) - assert_equal(dictized['user_id'], user['id']) - assert_equal(dictized['activity_type'], 'new package') - assert_equal(dictized['data'], - {'package': {'title': dataset['title']}}) + activity_type="new package", + data={"package": copy.deepcopy(dataset), "actor": "Mr Someone"}, + ) + activity_obj = model.Activity.get(activity["id"]) + context = {"model": model, "session": model.Session} + dictized = model_dictize.activity_dictize( + activity_obj, context, include_data=False + ) + assert dictized["user_id"] == user["id"] + assert dictized["activity_type"] == "new package" + assert dictized["data"] == {"package": {"title": dataset["title"]}} diff --git a/ckan/tests/lib/navl/test_dictization_functions.py b/ckan/tests/lib/navl/test_dictization_functions.py index 756a0839c46..97a18bca8b2 100644 --- a/ckan/tests/lib/navl/test_dictization_functions.py +++ b/ckan/tests/lib/navl/test_dictization_functions.py @@ -5,47 +5,34 @@ from ckan.lib.navl.dictization_functions import validate, Invalid -eq_ = nose.tools.eq_ - - class TestValidate(object): - def test_validate_passes_a_copy_of_the_context_to_validators(self): # We need to pass some keys on the context, otherwise validate # will do context = context || {}, which creates a new one, defeating # the purpose of this test - context = {'foo': 'bar'} + context = {"foo": "bar"} def my_validator(key, data, errors, context_in_validator): assert not id(context) == id(context_in_validator) - data_dict = { - 'my_field': 'test', - } + data_dict = {"my_field": "test"} - schema = { - 'my_field': [my_validator], - } + schema = {"my_field": [my_validator]} data, errors = validate(data_dict, schema, context) def test_validate_adds_schema_keys_to_context(self): - def my_validator(key, data, errors, context): - assert 'schema_keys' in context + assert "schema_keys" in context - eq_(context['schema_keys'], ['my_field']) + assert context["schema_keys"] == ["my_field"] - data_dict = { - 'my_field': 'test', - } + data_dict = {"my_field": "test"} - schema = { - 'my_field': [my_validator], - } + schema = {"my_field": [my_validator]} context = {} @@ -53,41 +40,40 @@ def my_validator(key, data, errors, context): class TestDictizationError(object): - def test_str_error(self): - err_obj = Invalid('Some ascii error') - eq_(str(err_obj), "Invalid: 'Some ascii error'") + err_obj = Invalid("Some ascii error") + assert str(err_obj) == "Invalid: 'Some ascii error'" def test_unicode_error(self): - err_obj = Invalid('Some ascii error') - eq_(text_type(err_obj), u"Invalid: 'Some ascii error'") + err_obj = Invalid("Some ascii error") + assert text_type(err_obj) == u"Invalid: 'Some ascii error'" def test_repr_error(self): - err_obj = Invalid('Some ascii error') - eq_(repr(err_obj), "") + err_obj = Invalid("Some ascii error") + assert repr(err_obj) == "" # Error msgs should be ascii, but let's just see what happens for unicode def test_str_unicode_error(self): - err_obj = Invalid(u'Some unicode \xa3 error') - eq_(str(err_obj), "Invalid: u'Some unicode \\xa3 error'") + err_obj = Invalid(u"Some unicode \xa3 error") + assert str(err_obj) == "Invalid: u'Some unicode \\xa3 error'" def test_unicode_unicode_error(self): - err_obj = Invalid(u'Some unicode \xa3 error') - eq_(text_type(err_obj), "Invalid: u'Some unicode \\xa3 error'") + err_obj = Invalid(u"Some unicode \xa3 error") + assert text_type(err_obj) == "Invalid: u'Some unicode \\xa3 error'" def test_repr_unicode_error(self): - err_obj = Invalid(u'Some unicode \xa3 error') - eq_(repr(err_obj), "") + err_obj = Invalid(u"Some unicode \xa3 error") + assert repr(err_obj) == "" def test_str_blank(self): - err_obj = Invalid('') - eq_(str(err_obj), "Invalid") + err_obj = Invalid("") + assert str(err_obj) == "Invalid" def test_unicode_blank(self): - err_obj = Invalid('') - eq_(text_type(err_obj), u"Invalid") + err_obj = Invalid("") + assert text_type(err_obj) == u"Invalid" def test_repr_blank(self): - err_obj = Invalid('') - eq_(repr(err_obj), "") + err_obj = Invalid("") + assert repr(err_obj) == "" diff --git a/ckan/tests/lib/navl/test_validators.py b/ckan/tests/lib/navl/test_validators.py index 5643a9eab29..0153372b9ab 100644 --- a/ckan/tests/lib/navl/test_validators.py +++ b/ckan/tests/lib/navl/test_validators.py @@ -1,21 +1,17 @@ # encoding: utf-8 -'''Unit tests for ckan/lib/navl/validators.py. +"""Unit tests for ckan/lib/navl/validators.py. -''' +""" import copy -import nose.tools - +import pytest import ckan.tests.factories as factories import ckan.lib.navl.validators as validators -eq_ = nose.tools.eq_ - - def returns_None(function): - '''A decorator that asserts that the decorated function returns None. + """A decorator that asserts that the decorated function returns None. :param function: the function to decorate :type function: function @@ -27,7 +23,8 @@ def call_validator(*args, **kwargs): return validators.user_name_validator(*args, **kwargs) call_validator(key, data, errors) - ''' + """ + def call_and_assert(*args, **kwargs): original_args = copy.deepcopy(args) original_kwargs = copy.deepcopy(kwargs) @@ -35,15 +32,18 @@ def call_and_assert(*args, **kwargs): result = function(*args, **kwargs) assert result is None, ( - 'Should return None when called with args: {args} and ' - 'kwargs: {kwargs}'.format(args=original_args, - kwargs=original_kwargs)) + "Should return None when called with args: {args} and " + "kwargs: {kwargs}".format( + args=original_args, kwargs=original_kwargs + ) + ) return result + return call_and_assert def raises_StopOnError(function): - '''A decorator that asserts that the decorated function raises + """A decorator that asserts that the decorated function raises dictization_functions.StopOnError. :param function: the function to decorate @@ -56,15 +56,19 @@ def call_validator(*args, **kwargs): return validators.user_name_validator(*args, **kwargs) call_validator(key, data, errors) - ''' + """ + def call_and_assert(*args, **kwargs): import ckan.lib.navl.dictization_functions as df - nose.tools.assert_raises(df.StopOnError, function, *args, **kwargs) + + with pytest.raises(df.StopOnError): + function(*args, **kwargs) + return call_and_assert def does_not_modify_data_dict(validator): - '''A decorator that asserts that the decorated validator doesn't modify + """A decorator that asserts that the decorated validator doesn't modify its `data` dict param. :param validator: the validator function to decorate @@ -77,7 +81,8 @@ def call_validator(*args, **kwargs): return validators.user_name_validator(*args, **kwargs) call_validator(key, data, errors) - ''' + """ + def call_and_assert(key, data, errors, context=None): if context is None: context = {} @@ -88,17 +93,22 @@ def call_and_assert(key, data, errors, context=None): result = validator(key, data, errors, context=context) assert data == original_data, ( - 'Should not modify data dict when called with ' - 'key: {key}, data: {data}, errors: {errors}, ' - 'context: {context}'.format(key=key, data=original_data, - errors=original_errors, - context=original_context)) + "Should not modify data dict when called with " + "key: {key}, data: {data}, errors: {errors}, " + "context: {context}".format( + key=key, + data=original_data, + errors=original_errors, + context=original_context, + ) + ) return result + return call_and_assert def removes_key_from_data_dict(validator): - '''A decorator that asserts that the decorated validator removes its key + """A decorator that asserts that the decorated validator removes its key from the data dict. :param validator: the validator function to decorate @@ -111,7 +121,8 @@ def call_validator(*args, **kwargs): return validators.user_name_validator(*args, **kwargs) call_validator(key, data, errors) - ''' + """ + def call_and_assert(key, data, errors, context=None): if context is None: context = {} @@ -122,17 +133,22 @@ def call_and_assert(key, data, errors, context=None): result = validator(key, data, errors, context=context) assert key not in data, ( - 'Should remove key from data dict when called with: ' - 'key: {key}, data: {data}, errors: {errors}, ' - 'context: {context} '.format(key=key, data=original_data, - errors=original_errors, - context=original_context)) + "Should remove key from data dict when called with: " + "key: {key}, data: {data}, errors: {errors}, " + "context: {context} ".format( + key=key, + data=original_data, + errors=original_errors, + context=original_context, + ) + ) return result + return call_and_assert def does_not_modify_other_keys_in_data_dict(validator): - '''A decorator that asserts that the decorated validator doesn't add, + """A decorator that asserts that the decorated validator doesn't add, modify the value of, or remove any other keys from its ``data`` dict param. The function *may* modify its own data dict key. @@ -147,7 +163,8 @@ def call_validator(*args, **kwargs): return validators.user_name_validator(*args, **kwargs) call_validator(key, data, errors) - ''' + """ + def call_and_assert(key, data, errors, context=None): if context is None: context = {} @@ -165,24 +182,33 @@ def call_and_assert(key, data, errors, context=None): del original_data[key] assert data.keys() == original_data.keys(), ( - 'Should not add or remove keys from data dict when called with ' - 'key: {key}, data: {data}, errors: {errors}, ' - 'context: {context}'.format(key=key, data=original_data, - errors=original_errors, - context=original_context)) + "Should not add or remove keys from data dict when called with " + "key: {key}, data: {data}, errors: {errors}, " + "context: {context}".format( + key=key, + data=original_data, + errors=original_errors, + context=original_context, + ) + ) for key_ in data: assert data[key_] == original_data[key_], ( - 'Should not modify other keys in data dict when called with ' - 'key: {key}, data: {data}, errors: {errors}, ' - 'context: {context}'.format(key=key, data=original_data, - errors=original_errors, - context=original_context)) + "Should not modify other keys in data dict when called with " + "key: {key}, data: {data}, errors: {errors}, " + "context: {context}".format( + key=key, + data=original_data, + errors=original_errors, + context=original_context, + ) + ) return result + return call_and_assert def does_not_modify_errors_dict(validator): - '''A decorator that asserts that the decorated validator doesn't modify its + """A decorator that asserts that the decorated validator doesn't modify its `errors` dict param. :param validator: the validator function to decorate @@ -195,7 +221,8 @@ def call_validator(*args, **kwargs): return validators.user_name_validator(*args, **kwargs) call_validator(key, data, errors) - ''' + """ + def call_and_assert(key, data, errors, context=None): if context is None: context = {} @@ -206,35 +233,39 @@ def call_and_assert(key, data, errors, context=None): result = validator(key, data, errors, context=context) assert errors == original_errors, ( - 'Should not modify errors dict when called with key: {key}, ' - 'data: {data}, errors: {errors}, ' - 'context: {context}'.format(key=key, data=original_data, - errors=original_errors, - context=original_context)) + "Should not modify errors dict when called with key: {key}, " + "data: {data}, errors: {errors}, " + "context: {context}".format( + key=key, + data=original_data, + errors=original_errors, + context=original_context, + ) + ) return result + return call_and_assert class TestValidators(object): - def test_ignore_missing_with_value_missing(self): - '''ignore_missing() should raise StopOnError if: + """ignore_missing() should raise StopOnError if: - data[key] is None, or - data[key] is dictization_functions.missing, or - key is not in data - ''' + """ import ckan.lib.navl.dictization_functions as df - for value in (None, df.missing, 'skip'): + for value in (None, df.missing, "skip"): # This is the key for the value that is going to be validated. - key = ('key to be validated',) + key = ("key to be validated",) # The data to pass to the validator function for validation. data = factories.validator_data_dict() - if value != 'skip': + if value != "skip": data[key] = value # The errors dict to pass to the validator function. @@ -247,16 +278,17 @@ def test_ignore_missing_with_value_missing(self): @raises_StopOnError def call_validator(*args, **kwargs): return validators.ignore_missing(*args, **kwargs) + call_validator(key=key, data=data, errors=errors, context={}) def test_ignore_missing_with_a_value(self): - '''If data[key] is neither None or missing, ignore_missing() should do + """If data[key] is neither None or missing, ignore_missing() should do nothing. - ''' - key = ('key to be validated',) + """ + key = ("key to be validated",) data = factories.validator_data_dict() - data[key] = 'value to be validated' + data[key] = "value to be validated" errors = factories.validator_errors_dict() errors[key] = [] @@ -265,33 +297,34 @@ def test_ignore_missing_with_a_value(self): @does_not_modify_errors_dict def call_validator(*args, **kwargs): return validators.ignore_missing(*args, **kwargs) + call_validator(key=key, data=data, errors=errors, context={}) class TestDefault(object): def test_key_doesnt_exist(self): dict_ = {} - validators.default('default_value')('key', dict_, {}, {}) - eq_(dict_, {'key': 'default_value'}) + validators.default("default_value")("key", dict_, {}, {}) + assert dict_ == {"key": "default_value"} def test_value_is_none(self): - dict_ = {'key': None} - validators.default('default_value')('key', dict_, {}, {}) - eq_(dict_, {'key': 'default_value'}) + dict_ = {"key": None} + validators.default("default_value")("key", dict_, {}, {}) + assert dict_ == {"key": "default_value"} def test_value_is_empty_string(self): - dict_ = {'key': ''} - validators.default('default_value')('key', dict_, {}, {}) - eq_(dict_, {'key': 'default_value'}) + dict_ = {"key": ""} + validators.default("default_value")("key", dict_, {}, {}) + assert dict_ == {"key": "default_value"} def test_value_is_false(self): # False is a consciously set value, so should not be changed to the # default - dict_ = {'key': False} - validators.default('default_value')('key', dict_, {}, {}) - eq_(dict_, {'key': False}) + dict_ = {"key": False} + validators.default("default_value")("key", dict_, {}, {}) + assert dict_ == {"key": False} def test_already_has_a_value(self): - dict_ = {'key': 'original'} - validators.default('default_value')('key', dict_, {}, {}) - eq_(dict_, {'key': 'original'}) + dict_ = {"key": "original"} + validators.default("default_value")("key", dict_, {}, {}) + assert dict_ == {"key": "original"} diff --git a/ckan/tests/lib/search/test_index.py b/ckan/tests/lib/search/test_index.py index 519cdea1134..41ef723149e 100644 --- a/ckan/tests/lib/search/test_index.py +++ b/ckan/tests/lib/search/test_index.py @@ -4,178 +4,183 @@ import hashlib import json import nose -from nose.tools import assert_equal, assert_in, assert_not_in - +import pytest from ckan.common import config import ckan.lib.search as search class TestSearchIndex(object): - @classmethod def setup_class(cls): - if not search.is_available(): - raise nose.SkipTest('Solr not reachable') - cls.solr_client = search.make_connection() - cls.fq = " +site_id:\"%s\" " % config['ckan.site_id'] + cls.fq = ' +site_id:"%s" ' % config["ckan.site_id"] cls.package_index = search.PackageSearchIndex() cls.base_package_dict = { - 'id': 'test-index', - 'name': 'monkey', - 'title': 'Monkey', - 'state': 'active', - 'private': False, - 'type': 'dataset', - 'owner_org': None, - 'metadata_created': datetime.datetime.now().isoformat(), - 'metadata_modified': datetime.datetime.now().isoformat(), + "id": "test-index", + "name": "monkey", + "title": "Monkey", + "state": "active", + "private": False, + "type": "dataset", + "owner_org": None, + "metadata_created": datetime.datetime.now().isoformat(), + "metadata_modified": datetime.datetime.now().isoformat(), } - def teardown(self): - # clear the search index after every test - self.package_index.clear() - + @pytest.mark.skipif(not search.is_available(), reason="Solr not reachable") + @pytest.mark.usefixtures("clean_index") def test_index_basic(self): self.package_index.index_package(self.base_package_dict) - response = self.solr_client.search(q='name:monkey', fq=self.fq) + response = self.solr_client.search(q="name:monkey", fq=self.fq) - assert_equal(len(response), 1) + assert len(response) == 1 - assert_equal(response.docs[0]['id'], 'test-index') - assert_equal(response.docs[0]['name'], 'monkey') - assert_equal(response.docs[0]['title'], 'Monkey') + assert response.docs[0]["id"] == "test-index" + assert response.docs[0]["name"] == "monkey" + assert response.docs[0]["title"] == "Monkey" index_id = hashlib.md5( - '{0}{1}'.format(self.base_package_dict['id'], - config['ckan.site_id']) + "{0}{1}".format( + self.base_package_dict["id"], config["ckan.site_id"] + ) ).hexdigest() - assert_equal(response.docs[0]['index_id'], index_id) + assert response.docs[0]["index_id"] == index_id + @pytest.mark.skipif(not search.is_available(), reason="Solr not reachable") + @pytest.mark.usefixtures("clean_index") def test_no_state_no_index(self): pkg_dict = self.base_package_dict.copy() - pkg_dict.update({ - 'state': None, - }) + pkg_dict.update({"state": None}) self.package_index.index_package(pkg_dict) - response = self.solr_client.search(q='name:monkey', fq=self.fq) + response = self.solr_client.search(q="name:monkey", fq=self.fq) - assert_equal(len(response), 0) + assert len(response) == 0 + @pytest.mark.skipif(not search.is_available(), reason="Solr not reachable") + @pytest.mark.usefixtures("clean_index") def test_clear_index(self): self.package_index.index_package(self.base_package_dict) self.package_index.clear() - response = self.solr_client.search(q='name:monkey', fq=self.fq) - assert_equal(len(response), 0) + response = self.solr_client.search(q="name:monkey", fq=self.fq) + assert len(response) == 0 + @pytest.mark.skipif(not search.is_available(), reason="Solr not reachable") + @pytest.mark.usefixtures("clean_index") def test_delete_package(self): self.package_index.index_package(self.base_package_dict) pkg_dict = self.base_package_dict.copy() - pkg_dict.update({ - 'id': 'test-index-2', - 'name': 'monkey2' - }) + pkg_dict.update({"id": "test-index-2", "name": "monkey2"}) self.package_index.index_package(pkg_dict) - response = self.solr_client.search(q='title:Monkey', fq=self.fq) - assert_equal(len(response), 2) - response_ids = sorted([x['id'] for x in response.docs]) - assert_equal(response_ids, ['test-index', 'test-index-2']) + response = self.solr_client.search(q="title:Monkey", fq=self.fq) + assert len(response) == 2 + response_ids = sorted([x["id"] for x in response.docs]) + assert response_ids == ["test-index", "test-index-2"] self.package_index.delete_package(pkg_dict) - response = self.solr_client.search(q='title:Monkey', fq=self.fq) - assert_equal(len(response), 1) - response_ids = sorted([x['id'] for x in response.docs]) - assert_equal(response_ids, ['test-index']) + response = self.solr_client.search(q="title:Monkey", fq=self.fq) + assert len(response) == 1 + response_ids = sorted([x["id"] for x in response.docs]) + assert response_ids == ["test-index"] + @pytest.mark.skipif(not search.is_available(), reason="Solr not reachable") + @pytest.mark.usefixtures("clean_index") def test_index_illegal_xml_chars(self): pkg_dict = self.base_package_dict.copy() - pkg_dict.update({ - 'title': u'\u00c3a\u0001ltimo n\u00famero penguin', - 'notes': u'\u00c3a\u0001ltimo n\u00famero penguin', - }) + pkg_dict.update( + { + "title": u"\u00c3a\u0001ltimo n\u00famero penguin", + "notes": u"\u00c3a\u0001ltimo n\u00famero penguin", + } + ) self.package_index.index_package(pkg_dict) - response = self.solr_client.search(q='name:monkey', fq=self.fq) + response = self.solr_client.search(q="name:monkey", fq=self.fq) - assert_equal(len(response), 1) - assert_equal(response.docs[0]['title'], - u'\u00c3altimo n\u00famero penguin') + assert len(response) == 1 + assert response.docs[0]["title"] == u"\u00c3altimo n\u00famero penguin" + @pytest.mark.skipif(not search.is_available(), reason="Solr not reachable") + @pytest.mark.usefixtures("clean_index") def test_index_date_field(self): pkg_dict = self.base_package_dict.copy() - pkg_dict.update({ - 'extras': [ - {'key': 'test_date', 'value': '2014-03-22'}, - {'key': 'test_tim_date', 'value': '2014-03-22 05:42:14'}, - ] - }) + pkg_dict.update( + { + "extras": [ + {"key": "test_date", "value": "2014-03-22"}, + {"key": "test_tim_date", "value": "2014-03-22 05:42:14"}, + ] + } + ) self.package_index.index_package(pkg_dict) - response = self.solr_client.search(q='name:monkey', fq=self.fq) + response = self.solr_client.search(q="name:monkey", fq=self.fq) - assert_equal(len(response), 1) + assert len(response) == 1 - assert isinstance(response.docs[0]['test_date'], datetime.datetime) - assert_equal(response.docs[0]['test_date'].strftime('%Y-%m-%d'), - '2014-03-22') - assert_equal( - response.docs[0]['test_tim_date'].strftime('%Y-%m-%d %H:%M:%S'), - '2014-03-22 05:42:14' + assert isinstance(response.docs[0]["test_date"], datetime.datetime) + assert ( + response.docs[0]["test_date"].strftime("%Y-%m-%d") == "2014-03-22" + ) + assert ( + response.docs[0]["test_tim_date"].strftime("%Y-%m-%d %H:%M:%S") + == "2014-03-22 05:42:14" ) + @pytest.mark.skipif(not search.is_available(), reason="Solr not reachable") + @pytest.mark.usefixtures("clean_index") def test_index_date_field_wrong_value(self): pkg_dict = self.base_package_dict.copy() - pkg_dict.update({ - 'extras': [ - {'key': 'test_wrong_date', 'value': 'Not a date'}, - {'key': 'test_another_wrong_date', 'value': '2014-13-01'}, - ] - }) + pkg_dict.update( + { + "extras": [ + {"key": "test_wrong_date", "value": "Not a date"}, + {"key": "test_another_wrong_date", "value": "2014-13-01"}, + ] + } + ) self.package_index.index_package(pkg_dict) - response = self.solr_client.search(q='name:monkey', fq=self.fq) + response = self.solr_client.search(q="name:monkey", fq=self.fq) - assert_equal(len(response), 1) + assert len(response) == 1 - assert 'test_wrong_date' not in response.docs[0] - assert 'test_another_wrong_date' not in response.docs[0] + assert "test_wrong_date" not in response.docs[0] + assert "test_another_wrong_date" not in response.docs[0] + @pytest.mark.skipif(not search.is_available(), reason="Solr not reachable") + @pytest.mark.usefixtures("clean_index") def test_index_date_field_empty_value(self): pkg_dict = self.base_package_dict.copy() - pkg_dict.update({ - 'extras': [ - {'key': 'test_empty_date', 'value': ''}, - ] - }) + pkg_dict.update({"extras": [{"key": "test_empty_date", "value": ""}]}) self.package_index.index_package(pkg_dict) - response = self.solr_client.search(q='name:monkey', fq=self.fq) + response = self.solr_client.search(q="name:monkey", fq=self.fq) - assert_equal(len(response), 1) + assert len(response) == 1 - assert 'test_empty_date' not in response.docs[0] + assert "test_empty_date" not in response.docs[0] class TestPackageSearchIndex: @@ -183,34 +188,37 @@ class TestPackageSearchIndex: def _get_pkg_dict(): # This is a simple package, enough to be indexed, in the format that # package_show would return - return {'name': 'river-quality', - 'id': 'd9567b82-d3f0-4c17-b222-d9a7499f7940', - 'state': 'active', - 'private': '', - 'type': 'dataset', - 'metadata_created': '2014-06-10T08:24:12.782257', - 'metadata_modified': '2014-06-10T08:24:12.782257', - } + return { + "name": "river-quality", + "id": "d9567b82-d3f0-4c17-b222-d9a7499f7940", + "state": "active", + "private": "", + "type": "dataset", + "metadata_created": "2014-06-10T08:24:12.782257", + "metadata_modified": "2014-06-10T08:24:12.782257", + } @staticmethod def _get_pkg_dict_with_resources(): # A simple package with some resources pkg_dict = TestPackageSearchIndex._get_pkg_dict() - pkg_dict['resources'] = [ - {'description': 'A river quality report', - 'format': 'pdf', - 'resource_type': 'doc', - 'url': 'http://www.foo.com/riverquality.pdf', - 'alt_url': 'http://www.bar.com/riverquality.pdf', - 'city': 'Asuncion' - }, - {'description': 'A river quality table', - 'format': 'csv', - 'resource_type': 'file', - 'url': 'http://www.foo.com/riverquality.csv', - 'alt_url': 'http://www.bar.com/riverquality.csv', - 'institution': 'Global River Foundation' - } + pkg_dict["resources"] = [ + { + "description": "A river quality report", + "format": "pdf", + "resource_type": "doc", + "url": "http://www.foo.com/riverquality.pdf", + "alt_url": "http://www.bar.com/riverquality.pdf", + "city": "Asuncion", + }, + { + "description": "A river quality table", + "format": "csv", + "resource_type": "file", + "url": "http://www.foo.com/riverquality.csv", + "alt_url": "http://www.bar.com/riverquality.csv", + "institution": "Global River Foundation", + }, ] return pkg_dict @@ -219,86 +227,90 @@ def test_index_package_stores_basic_solr_fields(self): pkg_dict = self._get_pkg_dict() index.index_package(pkg_dict) - indexed_pkg = search.show(pkg_dict['name']) + indexed_pkg = search.show(pkg_dict["name"]) # At root level are the fields that SOLR uses - assert_equal(indexed_pkg['name'], 'river-quality') - assert_equal(indexed_pkg['metadata_modified'], - '2014-06-10T08:24:12.782Z') - assert_equal(indexed_pkg['entity_type'], 'package') - assert_equal(indexed_pkg['dataset_type'], 'dataset') + assert indexed_pkg["name"] == "river-quality" + assert indexed_pkg["metadata_modified"] == "2014-06-10T08:24:12.782Z" + assert indexed_pkg["entity_type"] == "package" + assert indexed_pkg["dataset_type"] == "dataset" def test_index_package_stores_unvalidated_data_dict(self): index = search.index.PackageSearchIndex() pkg_dict = self._get_pkg_dict() index.index_package(pkg_dict) - indexed_pkg = search.show(pkg_dict['name']) + indexed_pkg = search.show(pkg_dict["name"]) # data_dict is the result of package_show, unvalidated - data_dict = json.loads(indexed_pkg['data_dict']) - assert_equal(data_dict['name'], 'river-quality') + data_dict = json.loads(indexed_pkg["data_dict"]) + assert data_dict["name"] == "river-quality" # title is inserted (copied from the name) during validation # so its absence shows it is not validated - assert_not_in('title', data_dict) + assert "title" not in data_dict def test_index_package_stores_validated_data_dict(self): index = search.index.PackageSearchIndex() pkg_dict = self._get_pkg_dict() index.index_package(pkg_dict) - indexed_pkg = search.show(pkg_dict['name']) + indexed_pkg = search.show(pkg_dict["name"]) # validated_data_dict is the result of package_show, validated - validated_data_dict = json.loads(indexed_pkg['validated_data_dict']) - assert_equal(validated_data_dict['name'], 'river-quality') + validated_data_dict = json.loads(indexed_pkg["validated_data_dict"]) + assert validated_data_dict["name"] == "river-quality" # title is inserted (copied from the name) during validation # so its presence shows it is validated - assert_in('title', validated_data_dict) + assert "title" in validated_data_dict - def test_index_package_stores_validated_data_dict_without_unvalidated_data_dict(self): + def test_index_package_stores_validated_data_dict_without_unvalidated_data_dict( + self + ): # This is a regression test for #1764 index = search.index.PackageSearchIndex() pkg_dict = self._get_pkg_dict() index.index_package(pkg_dict) - indexed_pkg = search.show(pkg_dict['name']) + indexed_pkg = search.show(pkg_dict["name"]) - validated_data_dict = json.loads(indexed_pkg['validated_data_dict']) - assert_not_in('data_dict', validated_data_dict) + validated_data_dict = json.loads(indexed_pkg["validated_data_dict"]) + assert "data_dict" not in validated_data_dict - def test_index_package_stores_unvalidated_data_dict_without_validated_data_dict(self): + def test_index_package_stores_unvalidated_data_dict_without_validated_data_dict( + self + ): # This is a regression test for #2208 index = search.index.PackageSearchIndex() pkg_dict = self._get_pkg_dict() index.index_package(pkg_dict) - data_dict = json.loads(search.show(pkg_dict['name'])['data_dict']) + data_dict = json.loads(search.show(pkg_dict["name"])["data_dict"]) - assert_not_in('validated_data_dict', data_dict) + assert "validated_data_dict" not in data_dict def test_index_package_stores_resource_extras_in_config_file(self): index = search.index.PackageSearchIndex() pkg_dict = self._get_pkg_dict_with_resources() index.index_package(pkg_dict) - indexed_pkg = search.show(pkg_dict['name']) + indexed_pkg = search.show(pkg_dict["name"]) # Resource fields given by ckan.extra_resource_fields are indexed - assert_equal(indexed_pkg['res_extras_alt_url'], - ['http://www.bar.com/riverquality.pdf', - 'http://www.bar.com/riverquality.csv']) + assert indexed_pkg["res_extras_alt_url"] == [ + "http://www.bar.com/riverquality.pdf", + "http://www.bar.com/riverquality.csv", + ] # Other resource fields are ignored - assert_equal(indexed_pkg.get('res_extras_institution', None), None) - assert_equal(indexed_pkg.get('res_extras_city', None), None) + assert indexed_pkg.get("res_extras_institution", None) is None + assert indexed_pkg.get("res_extras_city", None) is None def test_indexed_package_stores_resource_type(self): index = search.index.PackageSearchIndex() pkg_dict = self._get_pkg_dict_with_resources() index.index_package(pkg_dict) - indexed_pkg = search.show(pkg_dict['name']) + indexed_pkg = search.show(pkg_dict["name"]) # Resource types are indexed - assert_equal(indexed_pkg['res_type'], ['doc', 'file']) + assert indexed_pkg["res_type"] == ["doc", "file"] diff --git a/ckan/tests/lib/test_app_globals.py b/ckan/tests/lib/test_app_globals.py index 714acc4fd5c..ca7bab7cb23 100644 --- a/ckan/tests/lib/test_app_globals.py +++ b/ckan/tests/lib/test_app_globals.py @@ -3,17 +3,26 @@ from ckan.lib.app_globals import app_globals as g -class TestGlobals(object): - def test_config_not_set(self): - # ckan.site_about has not been configured. - # Behaviour has always been to return an empty string. - assert g.site_about == '' - - def test_config_set_to_blank(self): - # ckan.site_description is configured but with no value. - # Behaviour has always been to return an empty string. - assert g.site_description == '' - - def test_set_from_ini(self): - # ckan.template_head_end is configured in test-core.ini - assert g.template_head_end == '' +def test_config_not_set(): + """ckan.site_about has not been configured. Behaviour has always been + to return an empty string. + + """ + assert g.site_about == "" + + +def test_config_set_to_blank(): + """ckan.site_description is configured but with no value. Behaviour + has always been to return an empty string. + + """ + assert g.site_description == "" + + +def test_set_from_ini(): + """ckan.template_head_end is configured in test-core.ini + """ + assert ( + g.template_head_end + == '' + ) diff --git a/ckan/tests/lib/test_auth_tkt.py b/ckan/tests/lib/test_auth_tkt.py index d185beb4449..254e2e2e2fe 100644 --- a/ckan/tests/lib/test_auth_tkt.py +++ b/ckan/tests/lib/test_auth_tkt.py @@ -1,140 +1,158 @@ # encoding: utf-8 +""" +Test the added methods used by this subclass of +repoze.who.plugins.auth_tkt.AuthTktCookiePlugin -from nose import tools as nose_tools +Subclassing FunctionalTestBase ensures the original config is restored +after each test. +""" + +import pytest from ckan.tests import helpers from ckan.lib.repoze_plugins.auth_tkt import make_plugin -class TestCkanAuthTktCookiePlugin(helpers.FunctionalTestBase): - - ''' - Test the added methods used by this subclass of - repoze.who.plugins.auth_tkt.AuthTktCookiePlugin - - Subclassing FunctionalTestBase ensures the original config is restored - after each test. - ''' - - @helpers.change_config('who.httponly', True) - def test_httponly_expected_cookies_with_config_httponly_true(self): - ''' - The returned cookies are in the format we expect, with HttpOnly flag. - ''' - plugin = make_plugin(secret='sosecret') - cookies = plugin._get_cookies(environ={'SERVER_NAME': '0.0.0.0'}, - value='HELLO') - expected_cookies = [ - ('Set-Cookie', 'auth_tkt="HELLO"; Path=/; HttpOnly'), - ('Set-Cookie', 'auth_tkt="HELLO"; Path=/; Domain=0.0.0.0; HttpOnly'), - ('Set-Cookie', 'auth_tkt="HELLO"; Path=/; Domain=.0.0.0.0; HttpOnly') - ] - assert cookies == expected_cookies - - @helpers.change_config('who.httponly', False) - def test_httponly_expected_cookies_with_config_httponly_false(self): - ''' - The returned cookies are in the format we expect, without HttpOnly - flag. - ''' - plugin = make_plugin(secret='sosecret') - cookies = plugin._get_cookies(environ={'SERVER_NAME': '0.0.0.0'}, - value='HELLO') - expected_cookies = [ - ('Set-Cookie', 'auth_tkt="HELLO"; Path=/'), - ('Set-Cookie', 'auth_tkt="HELLO"; Path=/; Domain=0.0.0.0'), - ('Set-Cookie', 'auth_tkt="HELLO"; Path=/; Domain=.0.0.0.0') - ] - assert cookies == expected_cookies - - def test_httponly_expected_cookies_without_config_httponly(self): - ''' - The returned cookies are in the format we expect, with HttpOnly flag. - ''' - plugin = make_plugin(secret='sosecret') - cookies = plugin._get_cookies(environ={'SERVER_NAME': '0.0.0.0'}, - value='HELLO') - expected_cookies = [ - ('Set-Cookie', 'auth_tkt="HELLO"; Path=/; HttpOnly'), - ('Set-Cookie', 'auth_tkt="HELLO"; Path=/; Domain=0.0.0.0; HttpOnly'), - ('Set-Cookie', 'auth_tkt="HELLO"; Path=/; Domain=.0.0.0.0; HttpOnly') - ] - assert cookies == expected_cookies - - @helpers.change_config('who.secure', True) - def test_secure_expected_cookies_with_config_secure_true(self): - ''' - The returned cookies are in the format we expect, with secure flag. - ''' - plugin = make_plugin(secret='sosecret') - cookies = plugin._get_cookies(environ={'SERVER_NAME': '0.0.0.0'}, - value='HELLO') - expected_cookies = [ - ('Set-Cookie', 'auth_tkt="HELLO"; Path=/; secure; HttpOnly'), - ('Set-Cookie', 'auth_tkt="HELLO"; Path=/; Domain=0.0.0.0; secure; HttpOnly'), - ('Set-Cookie', 'auth_tkt="HELLO"; Path=/; Domain=.0.0.0.0; secure; HttpOnly') - ] - assert cookies == expected_cookies - - @helpers.change_config('who.secure', False) - def test_secure_expected_cookies_with_config_secure_false(self): - ''' - The returned cookies are in the format we expect, without secure - flag. - ''' - plugin = make_plugin(secret='sosecret') - cookies = plugin._get_cookies(environ={'SERVER_NAME': '0.0.0.0'}, - value='HELLO') - expected_cookies = [ - ('Set-Cookie', 'auth_tkt="HELLO"; Path=/; HttpOnly'), - ('Set-Cookie', 'auth_tkt="HELLO"; Path=/; Domain=0.0.0.0; HttpOnly'), - ('Set-Cookie', 'auth_tkt="HELLO"; Path=/; Domain=.0.0.0.0; HttpOnly') - ] - assert cookies == expected_cookies - - def test_secure_expected_cookies_without_config_secure(self): - ''' - The returned cookies are in the format we expect, without secure flag. - ''' - plugin = make_plugin(secret='sosecret') - cookies = plugin._get_cookies(environ={'SERVER_NAME': '0.0.0.0'}, - value='HELLO') - expected_cookies = [ - ('Set-Cookie', 'auth_tkt="HELLO"; Path=/; HttpOnly'), - ('Set-Cookie', 'auth_tkt="HELLO"; Path=/; Domain=0.0.0.0; HttpOnly'), - ('Set-Cookie', 'auth_tkt="HELLO"; Path=/; Domain=.0.0.0.0; HttpOnly') - ] - assert cookies == expected_cookies - - def test_timeout_not_set_in_config(self): - ''' - Creating a CkanAuthTktCookiePlugin instance without setting timeout in - config sets correct values in CkanAuthTktCookiePlugin instance. - ''' - plugin = make_plugin(secret='sosecret') - - nose_tools.assert_equal(plugin.timeout, None) - nose_tools.assert_equal(plugin.reissue_time, None) - - @helpers.change_config('who.timeout', 9000) - def test_timeout_set_in_config(self): - ''' - Setting who.timeout in config sets correct values in - CkanAuthTktCookiePlugin instance. - ''' - plugin = make_plugin(secret='sosecret') - - nose_tools.assert_equal(plugin.timeout, 9000) - nose_tools.assert_equal(plugin.reissue_time, 900) - - @helpers.change_config('who.timeout', 9000) - @helpers.change_config('who.reissue_time', 200) - def test_reissue_set_in_config(self): - ''' - Setting who.reissue in config sets correct values in - CkanAuthTktCookiePlugin instance. - ''' - plugin = make_plugin(secret='sosecret') - - nose_tools.assert_equal(plugin.timeout, 9000) - nose_tools.assert_equal(plugin.reissue_time, 200) +@pytest.mark.ckan_config("who.httponly", True) +def test_httponly_expected_cookies_with_config_httponly_true(): + """ + The returned cookies are in the format we expect, with HttpOnly flag. + """ + plugin = make_plugin(secret="sosecret") + cookies = plugin._get_cookies( + environ={"SERVER_NAME": "0.0.0.0"}, value="HELLO" + ) + expected_cookies = [ + ("Set-Cookie", 'auth_tkt="HELLO"; Path=/; HttpOnly'), + ("Set-Cookie", 'auth_tkt="HELLO"; Path=/; Domain=0.0.0.0; HttpOnly'), + ("Set-Cookie", 'auth_tkt="HELLO"; Path=/; Domain=.0.0.0.0; HttpOnly'), + ] + assert cookies == expected_cookies + + +@pytest.mark.usefixtures("clean_db") +@pytest.mark.ckan_config("who.httponly", False) +def test_httponly_expected_cookies_with_config_httponly_false(): + """ + The returned cookies are in the format we expect, without HttpOnly + flag. + """ + plugin = make_plugin(secret="sosecret") + cookies = plugin._get_cookies( + environ={"SERVER_NAME": "0.0.0.0"}, value="HELLO" + ) + expected_cookies = [ + ("Set-Cookie", 'auth_tkt="HELLO"; Path=/'), + ("Set-Cookie", 'auth_tkt="HELLO"; Path=/; Domain=0.0.0.0'), + ("Set-Cookie", 'auth_tkt="HELLO"; Path=/; Domain=.0.0.0.0'), + ] + assert cookies == expected_cookies + + +def test_httponly_expected_cookies_without_config_httponly(): + """ + The returned cookies are in the format we expect, with HttpOnly flag. + """ + plugin = make_plugin(secret="sosecret") + cookies = plugin._get_cookies( + environ={"SERVER_NAME": "0.0.0.0"}, value="HELLO" + ) + expected_cookies = [ + ("Set-Cookie", 'auth_tkt="HELLO"; Path=/; HttpOnly'), + ("Set-Cookie", 'auth_tkt="HELLO"; Path=/; Domain=0.0.0.0; HttpOnly'), + ("Set-Cookie", 'auth_tkt="HELLO"; Path=/; Domain=.0.0.0.0; HttpOnly'), + ] + assert cookies == expected_cookies + + +@pytest.mark.ckan_config("who.secure", True) +def test_secure_expected_cookies_with_config_secure_true(): + """ + The returned cookies are in the format we expect, with secure flag. + """ + plugin = make_plugin(secret="sosecret") + cookies = plugin._get_cookies( + environ={"SERVER_NAME": "0.0.0.0"}, value="HELLO" + ) + expected_cookies = [ + ("Set-Cookie", 'auth_tkt="HELLO"; Path=/; secure; HttpOnly'), + ( + "Set-Cookie", + 'auth_tkt="HELLO"; Path=/; Domain=0.0.0.0; secure; HttpOnly', + ), + ( + "Set-Cookie", + 'auth_tkt="HELLO"; Path=/; Domain=.0.0.0.0; secure; HttpOnly', + ), + ] + assert cookies == expected_cookies + + +@pytest.mark.ckan_config("who.secure", False) +def test_secure_expected_cookies_with_config_secure_false(): + """ + The returned cookies are in the format we expect, without secure + flag. + """ + plugin = make_plugin(secret="sosecret") + cookies = plugin._get_cookies( + environ={"SERVER_NAME": "0.0.0.0"}, value="HELLO" + ) + expected_cookies = [ + ("Set-Cookie", 'auth_tkt="HELLO"; Path=/; HttpOnly'), + ("Set-Cookie", 'auth_tkt="HELLO"; Path=/; Domain=0.0.0.0; HttpOnly'), + ("Set-Cookie", 'auth_tkt="HELLO"; Path=/; Domain=.0.0.0.0; HttpOnly'), + ] + assert cookies == expected_cookies + + +def test_secure_expected_cookies_without_config_secure(): + """ + The returned cookies are in the format we expect, without secure flag. + """ + plugin = make_plugin(secret="sosecret") + cookies = plugin._get_cookies( + environ={"SERVER_NAME": "0.0.0.0"}, value="HELLO" + ) + expected_cookies = [ + ("Set-Cookie", 'auth_tkt="HELLO"; Path=/; HttpOnly'), + ("Set-Cookie", 'auth_tkt="HELLO"; Path=/; Domain=0.0.0.0; HttpOnly'), + ("Set-Cookie", 'auth_tkt="HELLO"; Path=/; Domain=.0.0.0.0; HttpOnly'), + ] + assert cookies == expected_cookies + + +def test_timeout_not_set_in_config(): + """ + Creating a CkanAuthTktCookiePlugin instance without setting timeout in + config sets correct values in CkanAuthTktCookiePlugin instance. + """ + plugin = make_plugin(secret="sosecret") + + assert plugin.timeout is None + assert plugin.reissue_time is None + + +@pytest.mark.ckan_config("who.timeout", 9000) +def test_timeout_set_in_config(): + """ + Setting who.timeout in config sets correct values in + CkanAuthTktCookiePlugin instance. + """ + plugin = make_plugin(secret="sosecret") + + assert plugin.timeout == 9000 + assert plugin.reissue_time == 900 + + +@pytest.mark.ckan_config("who.timeout", 9000) +@pytest.mark.ckan_config("who.reissue_time", 200) +def test_reissue_set_in_config(): + """ + Setting who.reissue in config sets correct values in + CkanAuthTktCookiePlugin instance. + """ + plugin = make_plugin(secret="sosecret") + + assert plugin.timeout == 9000 + assert plugin.reissue_time == 200 diff --git a/ckan/tests/lib/test_base.py b/ckan/tests/lib/test_base.py index 1be68119b7c..71f7a75dcba 100644 --- a/ckan/tests/lib/test_base.py +++ b/ckan/tests/lib/test_base.py @@ -1,347 +1,420 @@ # encoding: utf-8 -from nose import tools as nose_tools +import pytest import ckan.tests.helpers as helpers import ckan.plugins as p import ckan.tests.factories as factories -class TestRenderSnippet(helpers.FunctionalTestBase): - """ - Test ``ckan.lib.base.render_snippet``. - """ - @helpers.change_config('debug', True) - def test_comment_present_if_debug_true(self): - response = self._get_test_app().get('/') - assert '