From cb73287777c6dad9f3144c785123660de1ad9b33 Mon Sep 17 00:00:00 2001 From: Sergey Motornyuk Date: Tue, 19 Nov 2019 12:07:53 +0200 Subject: [PATCH] replace nose asserts --- .circleci/config.yml | 8 - ckan/tests/controllers/test_admin.py | 4 +- ckan/tests/controllers/test_group.py | 19 +- ckan/tests/controllers/test_package.py | 57 +++-- ckan/tests/legacy/functional/api/__init__.py | 3 +- .../legacy/functional/api/model/test_group.py | 151 ++++++----- ckan/tests/legacy/functional/api/test_util.py | 1 - ckan/tests/legacy/functional/test_group.py | 5 +- ckan/tests/legacy/lib/test_resource_search.py | 3 +- .../legacy/misc/test_mock_mail_server.py | 6 +- ckan/tests/legacy/test_coding_standards.py | 42 +--- ckan/tests/lib/test_mailer.py | 19 +- ckan/tests/logic/action/test_delete.py | 1 + .../test_migrate_package_activity.py | 237 ------------------ ckan/tests/model/test_system_info.py | 2 - .../tests/test_routes.py | 43 ++-- .../tests/test_streaming_responses.py | 54 ++-- .../tests/test_iconfigurer_toolkit.py | 133 ++++++---- .../tests/test_iconfigurer_update_config.py | 107 ++++---- .../test/test_plugin.py | 121 +++++---- .../example_itranslation/tests/test_plugin.py | 53 ++-- ckanext/stats/tests/test_stats_lib.py | 55 ++-- ckanext/webpageview/tests/test_view.py | 28 ++- 23 files changed, 466 insertions(+), 686 deletions(-) delete mode 100644 ckan/tests/migration/test_migrate_package_activity.py diff --git a/.circleci/config.yml b/.circleci/config.yml index 1550661a2be..b012ec14494 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -77,14 +77,6 @@ jobs: - store_test_results: path: ~/junit - # test_revision_legacy_code is run separately because it mucks up the model for some other tests when nose starts up - - run: - command: | - case $CIRCLE_NODE_INDEX in - 3) nosetests -v --ckan --with-pylons=test-core-circle-ci.ini --nologcapture test_revision_legacy_code.py - ;; - esac - # Tests Frontend, only in one container - run: command: | diff --git a/ckan/tests/controllers/test_admin.py b/ckan/tests/controllers/test_admin.py index 34e2689e9c4..c67020d91c1 100644 --- a/ckan/tests/controllers/test_admin.py +++ b/ckan/tests/controllers/test_admin.py @@ -354,7 +354,7 @@ def _update_config_option(self, app): webtest_submit(form, "save", status=302, extra_environ=env) @pytest.mark.usefixtures("clean_db") - def test_admin_config_update(self, make_app): + def test_admin_config_update(self, app): """Changing a config option using the admin interface appropriately updates value returned by config_option_show, system_info.get_system_info and in the title tag in templates.""" @@ -377,7 +377,7 @@ def test_admin_config_update(self, make_app): assert before_update_default == "CKAN" # title tag contains default value - app = make_app() + # app = make_app() home_page_before = app.get("/", status=200) assert "Welcome - CKAN" in home_page_before diff --git a/ckan/tests/controllers/test_group.py b/ckan/tests/controllers/test_group.py index 1c9b0a4837b..dbec64577ef 100644 --- a/ckan/tests/controllers/test_group.py +++ b/ckan/tests/controllers/test_group.py @@ -161,17 +161,16 @@ def test_all_fields_saved(self, app): class TestGroupRead(object): @pytest.mark.usefixtures("clean_db") - def test_group_read(self): + def test_group_read(self, app): group = factories.Group() - app = helpers._get_test_app() response = app.get(url=url_for("group.read", id=group["name"])) assert group["title"] in response assert group["description"] in response @pytest.mark.usefixtures("clean_db") - def test_redirect_when_given_id(self): + def test_redirect_when_given_id(self, app): group = factories.Group() - app = helpers._get_test_app() + response = app.get(url_for("group.read", id=group["id"]), status=302) # redirect replaces the ID with the name in the URL redirected_response = response.follow() @@ -179,9 +178,9 @@ def test_redirect_when_given_id(self): assert redirected_response.request.path == expected_url @pytest.mark.usefixtures("clean_db") - def test_no_redirect_loop_when_name_is_the_same_as_the_id(self): + def test_no_redirect_loop_when_name_is_the_same_as_the_id(self, app): group = factories.Group(id="abc", name="abc") - app = helpers._get_test_app() + # 200 == no redirect app.get(url_for("group.read", id=group["id"]), status=200) @@ -411,15 +410,13 @@ def test_remove_member(self, app): assert user_roles["User One"] == "Admin" @pytest.mark.usefixtures("clean_db") - def test_member_users_cannot_add_members(self): + def test_member_users_cannot_add_members(self, app): user = factories.User() group = factories.Group( users=[{"name": user["name"], "capacity": "member"}] ) - app = helpers._get_test_app() - env = {"REMOTE_USER": user["name"].encode("ascii")} with app.flask_app.test_request_context(): @@ -442,11 +439,9 @@ def test_member_users_cannot_add_members(self): ) @pytest.mark.usefixtures("clean_db") - def test_anonymous_users_cannot_add_members(self): + def test_anonymous_users_cannot_add_members(self, app): group = factories.Group() - app = helpers._get_test_app() - with app.flask_app.test_request_context(): app.get(url_for("group.member_new", id=group["id"]), status=403) diff --git a/ckan/tests/controllers/test_package.py b/ckan/tests/controllers/test_package.py index 823bb06897c..7296e6a1da9 100644 --- a/ckan/tests/controllers/test_package.py +++ b/ckan/tests/controllers/test_package.py @@ -1897,7 +1897,6 @@ def test_create_tag_directly(self, app): @pytest.mark.usefixtures("clean_db") def test_create_tag(self, app): - app = self._get_test_app() user = factories.User() dataset = factories.Dataset(user=user) self._clear_activities() @@ -1908,20 +1907,20 @@ def test_create_tag(self, app): url = url_for("dataset.activity", id=dataset["id"]) response = app.get(url) - assert_in( - 'Mr. Test User'.format(user["name"]), response + assert ( + 'Mr. Test User'.format(user["name"]) in response ) - assert_in("updated the dataset", response) - assert_in( - '{}'.format(dataset["id"], dataset["title"]), - response, + assert "updated the dataset" in response + assert ( + '{}'.format(dataset["id"], dataset["title"]) + in response ) activities = helpers.call_action( "package_activity_list", id=dataset["id"] ) - assert_equal(len(activities), 1) + assert len(activities) == 1 @pytest.mark.usefixtures("clean_db") def test_create_extra(self, app): @@ -1935,20 +1934,20 @@ def test_create_extra(self, app): url = url_for("dataset.activity", id=dataset["id"]) response = app.get(url) - assert_in( - 'Mr. Test User'.format(user["name"]), response + assert ( + 'Mr. Test User'.format(user["name"]) in response ) - assert_in("updated the dataset", response) - assert_in( - '{}'.format(dataset["id"], dataset["title"]), - response, + assert "updated the dataset" in response + assert ( + '{}'.format(dataset["id"], dataset["title"]) + in response ) activities = helpers.call_action( "package_activity_list", id=dataset["id"] ) - assert_equal(len(activities), 1) + assert len(activities) == 1 @pytest.mark.usefixtures("clean_db") def test_create_resource(self, app): @@ -1964,20 +1963,20 @@ def test_create_resource(self, app): url = url_for("dataset.activity", id=dataset["id"]) response = app.get(url) - assert_in( - 'Mr. Test User'.format(user["name"]), response + assert ( + 'Mr. Test User'.format(user["name"]) in response ) - assert_in("updated the dataset", response) - assert_in( - '{}'.format(dataset["id"], dataset["title"]), - response, + assert "updated the dataset" in response + assert ( + '{}'.format(dataset["id"], dataset["title"]) + in response ) activities = helpers.call_action( "package_activity_list", id=dataset["id"] ) - assert_equal(len(activities), 1) + assert len(activities) == 1 @pytest.mark.usefixtures("clean_db") def test_update_resource(self, app): @@ -1996,20 +1995,20 @@ def test_update_resource(self, app): url = url_for("dataset.activity", id=dataset["id"]) response = app.get(url) - assert_in( - 'Mr. Test User'.format(user["name"]), response + assert ( + 'Mr. Test User'.format(user["name"]) in response ) - assert_in("updated the dataset", response) - assert_in( - '{}'.format(dataset["id"], dataset["title"]), - response, + assert "updated the dataset" in response + assert ( + '{}'.format(dataset["id"], dataset["title"]) + in response ) activities = helpers.call_action( "package_activity_list", id=dataset["id"] ) - assert_equal(len(activities), 1) + assert len(activities) == 1 @pytest.mark.usefixtures("clean_db") def test_delete_dataset(self, app): diff --git a/ckan/tests/legacy/functional/api/__init__.py b/ckan/tests/legacy/functional/api/__init__.py index 7cf4b8be788..0589c3ffce0 100644 --- a/ckan/tests/legacy/functional/api/__init__.py +++ b/ckan/tests/legacy/functional/api/__init__.py @@ -1,6 +1,5 @@ # encoding: utf-8 -from nose.tools import assert_equal import copy @@ -43,4 +42,4 @@ def assert_dicts_equal_ignoring_ordering(dict1, dict2): dicts = [copy.deepcopy(dict1), copy.deepcopy(dict2)] for d in dicts: d = change_lists_to_sets(d) - assert_equal(dicts[0], dicts[1]) + assert dicts[0] == dicts[1] diff --git a/ckan/tests/legacy/functional/api/model/test_group.py b/ckan/tests/legacy/functional/api/model/test_group.py index 689b58dabfd..0bc6dd9900f 100644 --- a/ckan/tests/legacy/functional/api/model/test_group.py +++ b/ckan/tests/legacy/functional/api/model/test_group.py @@ -6,18 +6,16 @@ from ckan.lib.create_test_data import CreateTestData from ckan.lib import search -from nose.tools import assert_equal from ckan.tests.legacy.functional.api.base import BaseModelApiTestCase class GroupsTestCase(BaseModelApiTestCase): - @classmethod def setup_class(cls): search.clear_all() CreateTestData.create() - cls.user_name = u'russianfan' # created in CreateTestData + cls.user_name = u"russianfan" # created in CreateTestData cls.init_extra_environ(cls.user_name) @classmethod @@ -25,7 +23,7 @@ def teardown_class(cls): model.repo.rebuild_db() def teardown(self): - self.purge_group_by_name(self.testgroupvalues['name']) + self.purge_group_by_name(self.testgroupvalues["name"]) def test_register_get_ok(self): offset = self.group_offset() @@ -35,21 +33,30 @@ def test_register_get_ok(self): def test_register_post_ok(self): data = self.testgroupvalues - postparams = '%s=1' % self.dumps(data) + postparams = "%s=1" % self.dumps(data) offset = self.group_offset() - res = self.app.post(offset, params=postparams, - status=self.STATUS_201_CREATED, - extra_environ=self.extra_environ) + res = self.app.post( + offset, + params=postparams, + status=self.STATUS_201_CREATED, + extra_environ=self.extra_environ, + ) # check group object - group = self.get_group_by_name(self.testgroupvalues['name']) + group = self.get_group_by_name(self.testgroupvalues["name"]) assert group - assert group.title == self.testgroupvalues['title'], group - assert group.description == self.testgroupvalues['description'], group + assert group.title == self.testgroupvalues["title"], group + assert group.description == self.testgroupvalues["description"], group pkg_ids = [member.table_id for member in group.member_all] - pkgs = model.Session.query(model.Package).filter(model.Package.id.in_(pkg_ids)).all() + pkgs = ( + model.Session.query(model.Package) + .filter(model.Package.id.in_(pkg_ids)) + .all() + ) pkg_names = [pkg.name for pkg in pkgs] - assert set(pkg_names) == set(('annakarenina', 'warandpeace')), pkg_names + assert set(pkg_names) == set( + ("annakarenina", "warandpeace") + ), pkg_names # check register updated res = self.app.get(offset, status=self.STATUS_200_OK) @@ -58,32 +65,36 @@ def test_register_post_ok(self): assert self._ref_group(group) in data, data # check entity - offset = self.group_offset(self.testgroupvalues['name']) + offset = self.group_offset(self.testgroupvalues["name"]) res = self.app.get(offset, status=self.STATUS_200_OK) group = self.loads(res.body) expected_group = copy.deepcopy(self.testgroupvalues) - expected_group['packages'] = \ - sorted(self.ref_package(self.get_package_by_name(pkg_name)) - for pkg_name in expected_group['packages']) + expected_group["packages"] = sorted( + self.ref_package(self.get_package_by_name(pkg_name)) + for pkg_name in expected_group["packages"] + ) for expected_key, expected_value in expected_group.items(): - assert_equal(group.get(expected_key), expected_value) + assert group.get(expected_key) == expected_value # Test Group Register Post 409 (conflict - create duplicate group). offset = self.group_offset() - postparams = '%s=1' % self.dumps(self.testgroupvalues) - res = self.app.post(offset, params=postparams, - status=self.STATUS_409_CONFLICT, - extra_environ=self.extra_environ) - self.assert_json_response(res, 'Group name already exists') + postparams = "%s=1" % self.dumps(self.testgroupvalues) + res = self.app.post( + offset, + params=postparams, + status=self.STATUS_409_CONFLICT, + extra_environ=self.extra_environ, + ) + self.assert_json_response(res, "Group name already exists") def test_entity_get_ok(self): offset = self.group_offset(self.roger.name) res = self.app.get(offset, status=self.STATUS_200_OK) self.assert_msg_represents_roger(msg=res.body) - assert self.package_ref_from_name('annakarenina') in res, res - assert self.group_ref_from_name('roger') in res, res - assert not self.package_ref_from_name('warandpeace') in res, res + assert self.package_ref_from_name("annakarenina") in res, res + assert self.group_ref_from_name("roger") in res, res + assert not self.package_ref_from_name("warandpeace") in res, res def test_entity_get_then_post(self): # (ticket 662) Ensure an entity you 'get' from a register can be @@ -91,26 +102,29 @@ def test_entity_get_then_post(self): offset = self.group_offset(self.david.name) res = self.app.get(offset, status=self.STATUS_200_OK) data = self.loads(res.body) - postparams = '%s=1' % self.dumps(data) - res = self.app.post(offset, params=postparams, - status=self.STATUS_200_OK, - extra_environ=self.admin_extra_environ) + postparams = "%s=1" % self.dumps(data) + res = self.app.post( + offset, + params=postparams, + status=self.STATUS_200_OK, + extra_environ=self.admin_extra_environ, + ) res = self.set_env(self.extra_environ) def test_10_edit_group_name_duplicate(self): # create a group with testgroupvalues - if not model.Group.by_name(self.testgroupvalues['name']): + if not model.Group.by_name(self.testgroupvalues["name"]): group = model.Group() model.Session.add(group) - group.name = self.testgroupvalues['name'] + group.name = self.testgroupvalues["name"] model.Session.commit() - group = model.Group.by_name(self.testgroupvalues['name']) + group = model.Group.by_name(self.testgroupvalues["name"]) model.repo.commit_and_remove() - assert model.Group.by_name(self.testgroupvalues['name']) + assert model.Group.by_name(self.testgroupvalues["name"]) # create a group with name 'dupname' - dupname = u'dupname' + dupname = u"dupname" if not model.Group.by_name(dupname): group = model.Group() model.Session.add(group) @@ -119,61 +133,76 @@ def test_10_edit_group_name_duplicate(self): assert model.Group.by_name(dupname) # edit first group to have dupname - group_vals = {'name':dupname} - offset = self.group_offset(self.testgroupvalues['name']) - postparams = '%s=1' % self.dumps(group_vals) - res = self.app.post(offset, params=postparams, status=[409], - extra_environ=self.admin_extra_environ) - self.assert_json_response(res, 'Group name already exists') + group_vals = {"name": dupname} + offset = self.group_offset(self.testgroupvalues["name"]) + postparams = "%s=1" % self.dumps(group_vals) + res = self.app.post( + offset, + params=postparams, + status=[409], + extra_environ=self.admin_extra_environ, + ) + self.assert_json_response(res, "Group name already exists") res = self.set_env(self.extra_environ) def test_11_delete_group(self): # Test Groups Entity Delete 200. # create a group with testgroupvalues - group = model.Group.by_name(self.testgroupvalues['name']) + group = model.Group.by_name(self.testgroupvalues["name"]) if not group: group = model.Group() model.Session.add(group) - group.name = self.testgroupvalues['name'] + group.name = self.testgroupvalues["name"] model.repo.commit_and_remove() - group = model.Group.by_name(self.testgroupvalues['name']) + group = model.Group.by_name(self.testgroupvalues["name"]) model.repo.commit_and_remove() assert group user = model.User.by_name(self.user_name) # delete it - offset = self.group_offset(self.testgroupvalues['name']) - res = self.app.delete(offset, status=[200], - extra_environ=self.admin_extra_environ) + offset = self.group_offset(self.testgroupvalues["name"]) + res = self.app.delete( + offset, status=[200], extra_environ=self.admin_extra_environ + ) res = self.set_env(self.extra_environ) - group = model.Group.by_name(self.testgroupvalues['name']) + group = model.Group.by_name(self.testgroupvalues["name"]) assert group - assert group.state == 'deleted', group.state + assert group.state == "deleted", group.state # Anyone can see groups especially sysadmins # maybe we want to do something different with # deleted groups but that would be a new requirement - #res = self.app.get(offset, status=[403]) - #self.assert_json_response(res, 'Access denied') - res = self.app.get(offset, status=[200], - extra_environ=self.admin_extra_environ) + # res = self.app.get(offset, status=[403]) + # self.assert_json_response(res, 'Access denied') + res = self.app.get( + offset, status=[200], extra_environ=self.admin_extra_environ + ) res = self.set_env(self.extra_environ) def test_12_get_group_404(self): # Test Package Entity Get 404. - assert not model.Session.query(model.Group).filter_by(name=self.testgroupvalues['name']).count() - offset = self.group_offset(self.testgroupvalues['name']) + assert ( + not model.Session.query(model.Group) + .filter_by(name=self.testgroupvalues["name"]) + .count() + ) + offset = self.group_offset(self.testgroupvalues["name"]) res = self.app.get(offset, status=404) - self.assert_json_response(res, 'Not found') + self.assert_json_response(res, "Not found") def test_13_delete_group_404(self): # Test Packages Entity Delete 404. - assert not model.Session.query(model.Group).filter_by(name=self.testgroupvalues['name']).count() - offset = self.group_offset(self.testgroupvalues['name']) - res = self.app.delete(offset, status=[404], - extra_environ=self.extra_environ) - self.assert_json_response(res, 'not found') + assert ( + not model.Session.query(model.Group) + .filter_by(name=self.testgroupvalues["name"]) + .count() + ) + offset = self.group_offset(self.testgroupvalues["name"]) + res = self.app.delete( + offset, status=[404], extra_environ=self.extra_environ + ) + self.assert_json_response(res, "not found") diff --git a/ckan/tests/legacy/functional/api/test_util.py b/ckan/tests/legacy/functional/api/test_util.py index f5b84fa41e8..eb99c38335a 100644 --- a/ckan/tests/legacy/functional/api/test_util.py +++ b/ckan/tests/legacy/functional/api/test_util.py @@ -1,6 +1,5 @@ # encoding: utf-8 -from nose.tools import assert_equal import pytest from ckan import model from ckan.lib.create_test_data import CreateTestData diff --git a/ckan/tests/legacy/functional/test_group.py b/ckan/tests/legacy/functional/test_group.py index 47080ac1c92..6db14637adf 100644 --- a/ckan/tests/legacy/functional/test_group.py +++ b/ckan/tests/legacy/functional/test_group.py @@ -8,13 +8,12 @@ from ckan.lib.create_test_data import CreateTestData from ckan.logic import get_action from ckan.lib.helpers import url_for +import ckan.tests.factories as factories @pytest.mark.usefixtures("clean_db", "clean_index") def test_sorting(): - testsysadmin = model.User(name=u"testsysadmin") - testsysadmin.sysadmin = True - model.Session.add(testsysadmin) + testsysadmin = factories.Sysadmin(name=u"testsysadmin") pkg1 = model.Package(name="pkg1") pkg2 = model.Package(name="pkg2") diff --git a/ckan/tests/legacy/lib/test_resource_search.py b/ckan/tests/legacy/lib/test_resource_search.py index b29c15827a7..0f1aa2bfc38 100644 --- a/ckan/tests/legacy/lib/test_resource_search.py +++ b/ckan/tests/legacy/lib/test_resource_search.py @@ -1,6 +1,5 @@ # encoding: utf-8 -from nose.tools import assert_raises, assert_equal, assert_set_equal import pytest from ckan.tests.legacy import is_search_supported import ckan.lib.search as search @@ -84,7 +83,7 @@ def test_02_search_url_2(self): assert set([self.ab]) == urls, urls def test_03_search_url_multiple_words(self): - fields = {'url': 'e f'} + fields = {"url": "e f"} urls = self.res_search(fields=fields) assert {self.ef} == urls diff --git a/ckan/tests/legacy/misc/test_mock_mail_server.py b/ckan/tests/legacy/misc/test_mock_mail_server.py index ce068c51343..ef03d0d2c28 100644 --- a/ckan/tests/legacy/misc/test_mock_mail_server.py +++ b/ckan/tests/legacy/misc/test_mock_mail_server.py @@ -1,7 +1,7 @@ # encoding: utf-8 import time -from nose.tools import assert_equal + from ckan.common import config import hashlib @@ -23,7 +23,7 @@ def setup_class(cls): def test_basic(self): msgs = self.get_smtp_messages() - assert_equal(msgs, []) + assert msgs == [] test_email = { "recipient_name": "Bob", @@ -36,4 +36,4 @@ def test_basic(self): time.sleep(0.1) msgs = self.get_smtp_messages() - assert_equal(len(msgs), 1) + assert len(msgs) == 1 diff --git a/ckan/tests/legacy/test_coding_standards.py b/ckan/tests/legacy/test_coding_standards.py index d24dad09534..b1c062bd68a 100644 --- a/ckan/tests/legacy/test_coding_standards.py +++ b/ckan/tests/legacy/test_coding_standards.py @@ -378,7 +378,6 @@ class TestPep8(object): "ckan/tests/legacy/ckantestplugin/ckantestplugin/__init__.py", "ckan/tests/legacy/ckantestplugin/setup.py", "ckan/tests/legacy/functional/api/base.py", - "ckan/tests/legacy/functional/api/model/test_group.py", "ckan/tests/legacy/functional/api/model/test_package.py", "ckan/tests/legacy/functional/api/model/test_ratings.py", "ckan/tests/legacy/functional/api/model/test_relationships.py", @@ -427,6 +426,7 @@ class TestPep8(object): "ckanext/example_itemplatehelpers/plugin.py", "ckanext/multilingual/plugin.py", "ckanext/stats/controller.py", + "ckanext/stats/stats.py", "ckanext/stats/tests/test_stats_plugin.py", "ckanext/test_tag_vocab_plugin.py", "ckanext/tests/plugin.py", @@ -443,43 +443,9 @@ class TestPep8(object): "/tests/test_plugin.py", "contrib/cookiecutter/ckan_extension/{{cookiecutter.project}}" "/ckanext/{{cookiecutter.project_shortname}}/plugin.py", - 'ckan/tests/legacy/ckantestplugins.py', - 'ckan/tests/legacy/functional/api/model/test_vocabulary.py', - 'ckan/tests/legacy/functional/api/test_api.py', - 'ckan/tests/legacy/functional/api/test_email_notifications.py', - 'ckan/tests/legacy/functional/api/test_follow.py', - 'ckan/tests/legacy/functional/api/test_package_search.py', - 'ckan/tests/legacy/functional/api/test_resource.py', - 'ckan/tests/legacy/functional/api/test_user.py', - 'ckan/tests/legacy/functional/api/test_util.py', - 'ckan/tests/legacy/functional/base.py', - 'ckan/tests/legacy/functional/test_admin.py', - 'ckan/tests/legacy/functional/test_error.py', - 'ckan/tests/legacy/functional/test_tag.py', - 'ckan/tests/legacy/functional/test_tracking.py', - 'ckan/tests/legacy/lib/__init__.py', - 'ckan/tests/legacy/lib/test_cli.py', - 'ckan/tests/legacy/lib/test_dictization.py', - 'ckan/tests/legacy/lib/test_email_notifications.py', - 'ckan/tests/legacy/lib/test_hash.py', - 'ckan/tests/legacy/lib/test_helpers.py', - 'ckan/tests/legacy/lib/test_resource_search.py', - 'ckan/tests/legacy/lib/test_solr_package_search_synchronous_update.py', - 'ckan/tests/legacy/lib/test_solr_schema_version.py', - 'ckan/tests/legacy/lib/test_solr_search_index.py', - 'ckan/tests/legacy/lib/test_tag_search.py', - 'ckan/tests/legacy/logic/test_validators.py', - 'ckan/tests/legacy/misc/test_mock_mail_server.py', - 'ckan/tests/legacy/mock_plugin.py', - 'ckan/tests/legacy/models/test_misc.py', - 'ckan/tests/legacy/models/test_package.py', - 'ckan/tests/legacy/models/test_purge_revision.py', - 'ckan/tests/legacy/models/test_user.py', - 'ckan/tests/legacy/schema/test_schema.py', - 'ckan/tests/legacy/test_plugins.py', - 'test_revision_legacy_code.py', - 'ckanext/datastore/tests/test_search.py', - 'ckan/tests/legacy/models/test_revision.py', + "ckan/tests/legacy/models/test_package.py", + "ckan/tests/legacy/models/test_purge_revision.py", + "ckan/tests/legacy/models/test_revision.py", ] fails = {} passes = [] diff --git a/ckan/tests/lib/test_mailer.py b/ckan/tests/lib/test_mailer.py index 390d895eac2..545e3fd9027 100644 --- a/ckan/tests/lib/test_mailer.py +++ b/ckan/tests/lib/test_mailer.py @@ -240,18 +240,20 @@ def test_bad_smtp_host(self): with pytest.raises(mailer.MailerException): mailer.mail_recipient(**test_email) - @helpers.change_config('smtp.reply_to', 'norply@ckan.org') + @helpers.change_config("smtp.reply_to", "norply@ckan.org") def test_reply_to(self): msgs = self.get_smtp_messages() - assert_equal(msgs, []) + assert msgs == [] # send email - test_email = {'recipient_name': 'Bob', - 'recipient_email': 'Bob@bob.com', - 'subject': 'Meeting', - 'body': 'The meeting is cancelled.', - 'headers': {'header1': 'value1'}} + test_email = { + "recipient_name": "Bob", + "recipient_email": "Bob@bob.com", + "subject": "Meeting", + "body": "The meeting is cancelled.", + "headers": {"header1": "value1"}, + } mailer.mail_recipient(**test_email) # check it went to the mock smtp server @@ -259,6 +261,7 @@ def test_reply_to(self): msg = msgs[0] expected_from_header = "Reply-to: {}".format( - config.get('smtp.reply_to')) + config.get("smtp.reply_to") + ) assert expected_from_header in msg[3] diff --git a/ckan/tests/logic/action/test_delete.py b/ckan/tests/logic/action/test_delete.py index 64941b97ae6..06150c2b438 100644 --- a/ckan/tests/logic/action/test_delete.py +++ b/ckan/tests/logic/action/test_delete.py @@ -423,6 +423,7 @@ def test_purged_dataset_leaves_no_trace_in_the_model(self): for m in model.Session.query(model.Member).join(model.Group) ] ) == [("user", "group1"), ("user", org["name"])] + @pytest.mark.usefixtures("clean_db") def test_purged_dataset_removed_from_relationships(self): child = factories.Dataset() diff --git a/ckan/tests/migration/test_migrate_package_activity.py b/ckan/tests/migration/test_migrate_package_activity.py deleted file mode 100644 index f88655d1621..00000000000 --- a/ckan/tests/migration/test_migrate_package_activity.py +++ /dev/null @@ -1,237 +0,0 @@ -# encoding: utf-8 - -import copy -from collections import defaultdict - -import mock -import pytest - -import ckan.logic -import ckan.tests.factories as factories -import ckan.tests.helpers as helpers -from ckan import model -from ckan.migration.migrate_package_activity import ( - migrate_dataset, - wipe_activity_detail, - PackageDictizeMonkeyPatch, -) -from ckan.model.activity import package_activity_list - - -@pytest.mark.usefixtures(u"clean_db") -def test_migration(): - # Test that the migration correctly takes the package_revision (etc) - # tables and populates the Activity.data, i.e. it does it the same as - # if you made a change to the dataset with the current version of CKAN - # and the Activity was created by activity_stream_item(). - dataset = factories.Dataset( - resources=[{u"url": u"http://example.com/a.csv", u"format": u"csv"}] - ) - activity = package_activity_list(dataset["id"], 0, 0)[0] - activity_data_as_it_should_be = copy.deepcopy(activity.data) - - # Remove 'activity.data.package' to provoke the migration to regenerate - # it from package_revision (etc) - activity = model.Activity.get(activity.id) - del activity.data["package"] - model.repo.commit_and_remove() - with PackageDictizeMonkeyPatch(): - migrate_dataset(dataset["name"], {}) - - activity_data_migrated = package_activity_list(dataset["id"], 0, 0)[0].data - assert activity_data_as_it_should_be == activity_data_migrated - - -@pytest.mark.usefixtures(u"clean_db") -def test_migration_with_multiple_revisions(): - dataset = factories.Dataset( - resources=[{u"url": u"http://example.com/a.csv", u"format": u"csv"}] - ) - dataset["title"] = u"Title 2" - helpers.call_action(u"package_update", **dataset) - dataset["title"] = u"Title 3" - helpers.call_action(u"package_update", **dataset) - - activity = package_activity_list(dataset["id"], 0, 0)[1] - activity_data_as_it_should_be = copy.deepcopy(activity.data["package"]) - - # Remove 'activity.data.package.resources' to provoke the migration to - # regenerate it from package_revision (etc) - activity = model.Activity.get(activity.id) - activity.data = {u"actor": None, u"package": {u"title": u"Title 2"}} - model.Session.commit() - model.Session.remove() - # double check that worked... - assert ( - not model.Activity.get(activity.id).data["package"].get(u"resources") - ) - - with PackageDictizeMonkeyPatch(): - migrate_dataset(dataset["name"], {}) - - activity_data_migrated = package_activity_list(dataset["id"], 0, 0)[ - 1 - ].data[u"package"] - assert activity_data_as_it_should_be == activity_data_migrated - assert activity_data_migrated["title"] == u"Title 2" - - -@pytest.mark.usefixtures(u"clean_db") -def test_a_contemporary_activity_needs_no_migration(): - # An Activity created by a change under the current CKAN should not - # need a migration - check it does a nop - dataset = factories.Dataset( - resources=[{u"url": u"http://example.com/a.csv", u"format": u"csv"}] - ) - activity = package_activity_list(dataset["id"], 0, 0)[0] - activity_data_before = copy.deepcopy(activity.data) - - with PackageDictizeMonkeyPatch(): - migrate_dataset(dataset["name"], {}) - - activity_data_after = package_activity_list(dataset["id"], 0, 0)[0].data - assert activity_data_before == activity_data_after - - -@pytest.mark.usefixtures(u"clean_db") -def test_revision_missing(): - dataset = factories.Dataset( - resources=[{u"url": u"http://example.com/a.csv", u"format": u"csv"}] - ) - # delete a part of the revision, so package_show for the revision will - # return NotFound - model.Session.query(model.PackageRevision).delete() - model.Session.commit() - # delete 'activity.data.package.resources' so it needs migrating - activity = package_activity_list(dataset["id"], 0, 0)[0] - activity = model.Activity.get(activity.id) - activity.data = {u"actor": None, u"package": {u"title": u"Test Dataset"}} - model.Session.commit() - model.Session.remove() - # double check that worked... - assert ( - not model.Activity.get(activity.id).data["package"].get(u"resources") - ) - - errors = defaultdict(int) - with PackageDictizeMonkeyPatch(): - migrate_dataset(dataset["name"], errors) - - assert dict(errors) == {u"Revision missing": 1} - activity_data_migrated = package_activity_list(dataset["id"], 0, 0)[0].data - # the title is there so the activity stream can display it - assert activity_data_migrated["package"]["title"] == u"Test Dataset" - # the rest of the dataset is missing - better that than just the - # dictized package without resources, extras etc - assert u"resources" not in activity_data_migrated["package"] - - -@pytest.mark.usefixtures(u"clean_db") -def test_revision_and_data_missing(): - dataset = factories.Dataset( - resources=[{u"url": u"http://example.com/a.csv", u"format": u"csv"}] - ) - # delete a part of the revision, so package_show for the revision will - # return NotFound - model.Session.query(model.PackageRevision).delete() - model.Session.commit() - # delete 'activity.data.package' so it needs migrating AND the package - # title won't be available, so we test how the migration deals with - # that - activity = package_activity_list(dataset["id"], 0, 0)[0] - activity = model.Activity.get(activity.id) - del activity.data["package"] - model.Session.commit() - - errors = defaultdict(int) - with PackageDictizeMonkeyPatch(): - migrate_dataset(dataset["name"], errors) - - assert dict(errors) == {u"Revision missing": 1} - activity_data_migrated = package_activity_list(dataset["id"], 0, 0)[0].data - # the title is there so the activity stream can display it - assert activity_data_migrated["package"]["title"] == u"unknown" - assert u"resources" not in activity_data_migrated["package"] - - -@pytest.mark.usefixtures(u"clean_db") -def test_package_show_error(): - dataset = factories.Dataset( - resources=[{u"url": u"http://example.com/a.csv", u"format": u"csv"}] - ) - # delete 'activity.data.package.resources' so it needs migrating - activity = package_activity_list(dataset["id"], 0, 0)[0] - activity = model.Activity.get(activity.id) - activity.data = {u"actor": None, u"package": {u"title": u"Test Dataset"}} - model.Session.commit() - model.Session.remove() - # double check that worked... - assert ( - not model.Activity.get(activity.id).data["package"].get(u"resources") - ) - - errors = defaultdict(int) - # package_show raises an exception - could be because data doesn't - # conform to the latest dataset schema or is incompatible with - # currently installed plugins. Those errors shouldn't prevent the - # migration from going ahead. - ckan.logic._actions["package_show"] = mock.MagicMock( - side_effect=Exception(u"Schema error") - ) - - try: - with PackageDictizeMonkeyPatch(): - migrate_dataset(dataset["name"], errors) - finally: - # restore package_show - ckan.logic.clear_actions_cache() - - assert dict(errors) == {u"Schema error": 1} - - -@pytest.mark.usefixtures(u"clean_db") -def test_wipe_activity_detail(): - dataset = factories.Dataset() - user = factories.User() - activity = factories.Activity( - user_id=user["id"], - object_id=dataset["id"], - revision_id=None, - activity_type=u"new package", - data={u"package": copy.deepcopy(dataset), u"actor": u"Mr Someone"}, - ) - ad = model.ActivityDetail( - activity_id=activity["id"], - object_id=dataset["id"], - object_type=u"package", - activity_type=u"new package", - ) - model.Session.add(ad) - model.Session.commit() - assert model.Session.query(model.ActivityDetail).count() == 1 - wipe_activity_detail(delete_activity_detail=u"y") - assert model.Session.query(model.ActivityDetail).count() == 0 - - -@pytest.mark.usefixtures(u"clean_db") -def test_dont_wipe_activity_detail(): - dataset = factories.Dataset() - user = factories.User() - activity = factories.Activity( - user_id=user["id"], - object_id=dataset["id"], - revision_id=None, - activity_type=u"new package", - data={u"package": copy.deepcopy(dataset), u"actor": u"Mr Someone"}, - ) - ad = model.ActivityDetail( - activity_id=activity["id"], - object_id=dataset["id"], - object_type=u"package", - activity_type=u"new package", - ) - model.Session.add(ad) - model.Session.commit() - assert model.Session.query(model.ActivityDetail).count() == 1 - wipe_activity_detail(delete_activity_detail=u"n") # i.e. don't do it! - assert model.Session.query(model.ActivityDetail).count() == 1 diff --git a/ckan/tests/model/test_system_info.py b/ckan/tests/model/test_system_info.py index 2df544b301b..e5842cdc33f 100644 --- a/ckan/tests/model/test_system_info.py +++ b/ckan/tests/model/test_system_info.py @@ -28,7 +28,6 @@ def test_set_value(): def test_sets_new_value_for_same_key(): config = factories.SystemInfo() - first_revision = config.revision_id config = factories.SystemInfo() new_config = ( @@ -36,7 +35,6 @@ def test_sets_new_value_for_same_key(): ) assert config.id == new_config.id - assert first_revision != new_config.revision_id assert config.id == new_config.id diff --git a/ckanext/example_flask_iblueprint/tests/test_routes.py b/ckanext/example_flask_iblueprint/tests/test_routes.py index 49273d77d7b..b01cd4b0628 100644 --- a/ckanext/example_flask_iblueprint/tests/test_routes.py +++ b/ckanext/example_flask_iblueprint/tests/test_routes.py @@ -1,6 +1,6 @@ # encoding: utf-8 -from nose.tools import eq_, ok_, assert_raises +import pytest from ckan.exceptions import HelperError import ckan.plugins as plugins @@ -8,40 +8,45 @@ class TestFlaskIBlueprint(helpers.FunctionalTestBase): - def setup(self): self.app = helpers._get_test_app() # Install plugin and register its blueprint - if not plugins.plugin_loaded(u'example_flask_iblueprint'): - plugins.load(u'example_flask_iblueprint') - plugin = plugins.get_plugin(u'example_flask_iblueprint') - self.app.flask_app.register_extension_blueprint(plugin.get_blueprint()) + if not plugins.plugin_loaded(u"example_flask_iblueprint"): + plugins.load(u"example_flask_iblueprint") + plugin = plugins.get_plugin(u"example_flask_iblueprint") + self.app.flask_app.register_extension_blueprint( + plugin.get_blueprint() + ) def test_plugin_route(self): - u'''Test extension sets up a unique route.''' - res = self.app.get(u'/hello_plugin') + u"""Test extension sets up a unique route.""" + res = self.app.get(u"/hello_plugin") - eq_(u'Hello World, this is served from an extension', res.body) + assert u"Hello World, this is served from an extension" == res.body def test_plugin_route_core_flask_override(self): - u'''Test extension overrides flask core route.''' - res = self.app.get(u'/') + u"""Test extension overrides flask core route.""" + res = self.app.get(u"/") - ok_(u'Hello World, this is served from an extension, overriding the flask url.' in res.body) + assert ( + u"Hello World, this is served from an extension, overriding the flask url." + in res.body + ) def test_plugin_route_with_helper(self): - u''' + u""" Test extension rendering with a helper method that exists shouldn't cause error. - ''' - res = self.app.get(u'/helper') + """ + res = self.app.get(u"/helper") - ok_(u'Hello World, helper here:

hi

' in res.body) + assert u"Hello World, helper here:

hi

" in res.body def test_plugin_route_with_non_existent_helper(self): - u''' + u""" Test extension rendering with a helper method that doesn't exist raises an exception. - ''' - assert_raises(HelperError, self.app.get, u'/helper_not_here') + """ + with pytest.raises(HelperError): + self.app.get(u"/helper_not_here") diff --git a/ckanext/example_flask_streaming/tests/test_streaming_responses.py b/ckanext/example_flask_streaming/tests/test_streaming_responses.py index 4390c641ae4..236095d57e9 100644 --- a/ckanext/example_flask_streaming/tests/test_streaming_responses.py +++ b/ckanext/example_flask_streaming/tests/test_streaming_responses.py @@ -4,7 +4,6 @@ import pytest -from nose.tools import eq_, assert_raises, ok_ from webtest.app import TestRequest from webtest import lint # NOQA @@ -19,52 +18,55 @@ def _get_resp(url): return _get_resp - @pytest.mark.ckan_config(u'ckan.plugins', u'example_flask_streaming') + @pytest.mark.ckan_config(u"ckan.plugins", u"example_flask_streaming") def test_accordance_of_chunks(self, get_response): - u'''Test streaming of items collection.''' - url = str(u'/stream/string') # produces list of words + u"""Test streaming of items collection.""" + url = str(u"/stream/string") # produces list of words resp = get_response(url) - eq_(u'Hello World, this is served from an extension'.split(), - list(resp.app_iter)) + assert u"Hello World, this is served from an extension".split() == list( + resp.app_iter + ) resp.app_iter.close() - @pytest.mark.ckan_config(u'ckan.plugins', u'example_flask_streaming') + @pytest.mark.ckan_config(u"ckan.plugins", u"example_flask_streaming") def test_template_streaming(self, get_response): - u'''Test streaming of template response.''' + u"""Test streaming of template response.""" bound = 7 - url = str(u'/stream/template/{}'.format(bound)) # produces nums list + url = str(u"/stream/template/{}".format(bound)) # produces nums list resp = get_response(url) - content = u''.join(resp.app_iter) + content = u"".join(resp.app_iter) for i in range(bound): - ok_(str(i) in content) + assert str(i) in content resp._app_iter.close() - @pytest.mark.ckan_config(u'ckan.plugins', u'example_flask_streaming') + @pytest.mark.ckan_config(u"ckan.plugins", u"example_flask_streaming") def test_file_streaming(self, get_response): - u'''Test streaming of existing file(10lines.txt).''' - url = str(u'/stream/file') # streams file + u"""Test streaming of existing file(10lines.txt).""" + url = str(u"/stream/file") # streams file resp = get_response(url) - f_path = path.join(path.dirname(path.abspath(__file__)), - u'10lines.txt') + f_path = path.join( + path.dirname(path.abspath(__file__)), u"10lines.txt" + ) with open(f_path) as test_file: content = test_file.readlines() - eq_(content, list(resp.app_iter)) + assert content == list(resp.app_iter) resp._app_iter.close() - @pytest.mark.ckan_config(u'ckan.plugins', u'example_flask_streaming') + @pytest.mark.ckan_config(u"ckan.plugins", u"example_flask_streaming") def test_render_with_context(self, get_response): - u'''Test availability of context inside templates.''' - url = str(u'/stream/context?var=10') # produces `var` value + u"""Test availability of context inside templates.""" + url = str(u"/stream/context?var=10") # produces `var` value resp = get_response(url) - eq_(u'10', resp.body) + assert u"10" == resp.body - @pytest.mark.ckan_config(u'ckan.plugins', u'example_flask_streaming') + @pytest.mark.ckan_config(u"ckan.plugins", u"example_flask_streaming") def test_render_without_context(self, get_response): - u''' + u""" Test that error raised if there is an attempt to pick variable if context is not provider. - ''' - url = str(u'/stream/without_context?var=10') + """ + url = str(u"/stream/without_context?var=10") resp = get_response(url) - assert_raises(AttributeError, u''.join, resp.app_iter) + with pytest.raises(AttributeError): + u"".join(resp.app_iter) resp.app_iter.close() diff --git a/ckanext/example_iconfigurer/tests/test_iconfigurer_toolkit.py b/ckanext/example_iconfigurer/tests/test_iconfigurer_toolkit.py index e22f39bad46..c0be8ab2000 100644 --- a/ckanext/example_iconfigurer/tests/test_iconfigurer_toolkit.py +++ b/ckanext/example_iconfigurer/tests/test_iconfigurer_toolkit.py @@ -1,113 +1,146 @@ # encoding: utf-8 -from nose import tools as nosetools - +import pytest import ckan.tests.helpers as helpers import ckan.plugins.toolkit as toolkit class TestIConfigurerToolkitAddCkanAdminTab(helpers.FunctionalTestBase): - ''' + """ Tests for toolkit.add_ckan_admin_tab used by the IConfigurer interface. - ''' + """ def test_add_ckan_admin_tab_updates_config_dict(self): - '''Config dict updated by toolkit.add_ckan_admin_tabs method.''' + """Config dict updated by toolkit.add_ckan_admin_tabs method.""" config = {} - toolkit.add_ckan_admin_tab(config, 'my_route_name', 'my_label') + toolkit.add_ckan_admin_tab(config, "my_route_name", "my_label") - nosetools.assert_equal({'ckan.admin_tabs': {'my_route_name': {'label': 'my_label', 'icon': None}}}, config) + assert { + "ckan.admin_tabs": { + "my_route_name": {"label": "my_label", "icon": None} + } + } == config def test_add_ckan_admin_tab_twice(self): - ''' + """ Calling add_ckan_admin_tab twice with same values returns expected config. - ''' + """ config = {} - toolkit.add_ckan_admin_tab(config, 'my_route_name', 'my_label') - toolkit.add_ckan_admin_tab(config, 'my_route_name', 'my_label') + toolkit.add_ckan_admin_tab(config, "my_route_name", "my_label") + toolkit.add_ckan_admin_tab(config, "my_route_name", "my_label") expected_dict = { - 'ckan.admin_tabs': {'my_route_name': {'label': 'my_label', 'icon': None}} + "ckan.admin_tabs": { + "my_route_name": {"label": "my_label", "icon": None} + } } - nosetools.assert_equal(expected_dict, config) + assert expected_dict == config def test_add_ckan_admin_tab_twice_replace_value(self): - ''' + """ Calling add_ckan_admin_tab twice with a different value returns expected config. - ''' + """ config = {} - toolkit.add_ckan_admin_tab(config, 'my_route_name', 'my_label') - toolkit.add_ckan_admin_tab(config, 'my_route_name', 'my_replacement_label') + toolkit.add_ckan_admin_tab(config, "my_route_name", "my_label") + toolkit.add_ckan_admin_tab( + config, "my_route_name", "my_replacement_label" + ) expected_dict = { - 'ckan.admin_tabs': {'my_route_name': {'label': 'my_replacement_label', 'icon': None}} + "ckan.admin_tabs": { + "my_route_name": { + "label": "my_replacement_label", + "icon": None, + } + } } - nosetools.assert_equal(expected_dict, config) + assert expected_dict == config def test_add_ckan_admin_tab_two_routes(self): - ''' + """ Add two different route/label pairs to ckan.admin_tabs. - ''' + """ config = {} - toolkit.add_ckan_admin_tab(config, 'my_route_name', 'my_label') - toolkit.add_ckan_admin_tab(config, 'my_other_route_name', 'my_other_label') + toolkit.add_ckan_admin_tab(config, "my_route_name", "my_label") + toolkit.add_ckan_admin_tab( + config, "my_other_route_name", "my_other_label" + ) expected_dict = { - 'ckan.admin_tabs': { - 'my_other_route_name': {'label': 'my_other_label', 'icon': None}, - 'my_route_name': {'label': 'my_label', 'icon': None} + "ckan.admin_tabs": { + "my_other_route_name": { + "label": "my_other_label", + "icon": None, + }, + "my_route_name": {"label": "my_label", "icon": None}, } } - nosetools.assert_equal(expected_dict, config) + assert expected_dict == config def test_add_ckan_admin_tab_config_has_existing_admin_tabs(self): - ''' + """ Config already has a ckan.admin_tabs option. - ''' + """ config = { - 'ckan.admin_tabs': {'my_existing_route': {'label': 'my_existing_label', 'icon': None}} + "ckan.admin_tabs": { + "my_existing_route": { + "label": "my_existing_label", + "icon": None, + } + } } - toolkit.add_ckan_admin_tab(config, 'my_route_name', 'my_label') - toolkit.add_ckan_admin_tab(config, 'my_other_route_name', 'my_other_label') + toolkit.add_ckan_admin_tab(config, "my_route_name", "my_label") + toolkit.add_ckan_admin_tab( + config, "my_other_route_name", "my_other_label" + ) expected_dict = { - 'ckan.admin_tabs': { - 'my_existing_route': {'label': 'my_existing_label', 'icon': None}, - 'my_other_route_name': {'label': 'my_other_label', 'icon': None}, - 'my_route_name': {'label': 'my_label', 'icon': None} + "ckan.admin_tabs": { + "my_existing_route": { + "label": "my_existing_label", + "icon": None, + }, + "my_other_route_name": { + "label": "my_other_label", + "icon": None, + }, + "my_route_name": {"label": "my_label", "icon": None}, } } - nosetools.assert_equal(expected_dict, config) + assert expected_dict == config def test_add_ckan_admin_tab_config_has_existing_other_option(self): - ''' + """ Config already has existing other option. - ''' - config = { - 'ckan.my_option': 'This is my option' - } + """ + config = {"ckan.my_option": "This is my option"} - toolkit.add_ckan_admin_tab(config, 'my_route_name', 'my_label') - toolkit.add_ckan_admin_tab(config, 'my_other_route_name', 'my_other_label') + toolkit.add_ckan_admin_tab(config, "my_route_name", "my_label") + toolkit.add_ckan_admin_tab( + config, "my_other_route_name", "my_other_label" + ) expected_dict = { - 'ckan.my_option': 'This is my option', - 'ckan.admin_tabs': { - 'my_other_route_name': {'label': 'my_other_label', 'icon': None}, - 'my_route_name': {'label': 'my_label', 'icon': None} - } + "ckan.my_option": "This is my option", + "ckan.admin_tabs": { + "my_other_route_name": { + "label": "my_other_label", + "icon": None, + }, + "my_route_name": {"label": "my_label", "icon": None}, + }, } - nosetools.assert_equal(expected_dict, config) + assert expected_dict == config diff --git a/ckanext/example_iconfigurer/tests/test_iconfigurer_update_config.py b/ckanext/example_iconfigurer/tests/test_iconfigurer_update_config.py index 379dd377e41..583307d47db 100644 --- a/ckanext/example_iconfigurer/tests/test_iconfigurer_update_config.py +++ b/ckanext/example_iconfigurer/tests/test_iconfigurer_update_config.py @@ -1,6 +1,6 @@ # encoding: utf-8 -import nose.tools +import pytest from six import text_type from ckan.common import config @@ -13,46 +13,42 @@ import ckan.tests.helpers as helpers -assert_equals = nose.tools.assert_equals - - class TestConfigOptionUpdatePluginNotEnabled(object): - def test_updating_unregistered_core_setting_not_allowed(self): - key = 'ckan.datasets_per_page' + key = "ckan.datasets_per_page" value = 5 params = {key: value} - nose.tools.assert_raises(logic.ValidationError, helpers.call_action, - 'config_option_update', - **params) + with pytest.raises(logic.ValidationError): + helpers.call_action("config_option_update", **params) def test_updating_unregistered_new_setting_not_allowed(self): - key = 'ckanext.example_iconfigurer.test_conf' - value = 'Test value' + key = "ckanext.example_iconfigurer.test_conf" + value = "Test value" params = {key: value} - nose.tools.assert_raises(logic.ValidationError, helpers.call_action, - 'config_option_update', - **params) + with pytest.raises(logic.ValidationError): + helpers.call_action("config_option_update", **params) class TestConfigOptionUpdatePluginEnabled(helpers.FunctionalTestBase): - _load_plugins = ('example_iconfigurer', ) + _load_plugins = ("example_iconfigurer",) @classmethod def setup_class(cls): super(TestConfigOptionUpdatePluginEnabled, cls).setup_class() - cls._datasets_per_page_original_value = \ - config.get('ckan.datasets_per_page') + cls._datasets_per_page_original_value = config.get( + "ckan.datasets_per_page" + ) @classmethod def teardown_class(cls): super(TestConfigOptionUpdatePluginEnabled, cls).teardown_class() - config['ckan.datasets_per_page'] = \ - cls._datasets_per_page_original_value + config[ + "ckan.datasets_per_page" + ] = cls._datasets_per_page_original_value helpers.reset_db() def setup(self): @@ -62,20 +58,20 @@ def setup(self): def test_update_registered_core_value(self): - key = 'ckan.datasets_per_page' + key = "ckan.datasets_per_page" value = 5 params = {key: value} - assert_equals(config[key], self._datasets_per_page_original_value) + assert config[key] == self._datasets_per_page_original_value - new_config = helpers.call_action('config_option_update', **params) + new_config = helpers.call_action("config_option_update", **params) # output - assert_equals(new_config[key], value) + assert new_config[key] == value # config - assert_equals(config[key], value) + assert config[key] == value # app_globals globals_key = app_globals.get_globals_key(key) @@ -83,92 +79,93 @@ def test_update_registered_core_value(self): # db obj = model.Session.query(model.SystemInfo).filter_by(key=key).first() - assert_equals(obj.value, text_type(value)) # all values stored as string + assert obj.value == text_type(value) # all values stored as string def test_update_registered_external_value(self): - key = 'ckanext.example_iconfigurer.test_conf' - value = 'Test value' + key = "ckanext.example_iconfigurer.test_conf" + value = "Test value" params = {key: value} assert not config.get(key) - new_config = helpers.call_action('config_option_update', **params) + new_config = helpers.call_action("config_option_update", **params) # output - assert_equals(new_config[key], value) + assert new_config[key] == value # config - assert_equals(config[key], value) + assert config[key] == value # db obj = model.Session.query(model.SystemInfo).filter_by(key=key).first() - assert_equals(obj.value, value) + assert obj.value == value # not set in globals globals_key = app_globals.get_globals_key(key) assert not getattr(app_globals.app_globals, globals_key, None) def test_update_registered_core_value_in_list(self): - '''Registering a core key/value will allow it to be included in the - list returned by config_option_list action.''' + """Registering a core key/value will allow it to be included in the + list returned by config_option_list action.""" - key = 'ckan.datasets_per_page' + key = "ckan.datasets_per_page" value = 5 params = {key: value} # add registered core value - helpers.call_action('config_option_update', **params) + helpers.call_action("config_option_update", **params) - option_list = helpers.call_action('config_option_list') + option_list = helpers.call_action("config_option_list") assert key in option_list def test_update_registered_core_value_in_show(self): - '''Registering a core key/value will allow it to be shown by the - config_option_show action.''' + """Registering a core key/value will allow it to be shown by the + config_option_show action.""" - key = 'ckan.datasets_per_page' + key = "ckan.datasets_per_page" value = 5 params = {key: value} # add registered core value - helpers.call_action('config_option_update', **params) + helpers.call_action("config_option_update", **params) - show_value = helpers.call_action('config_option_show', - key='ckan.datasets_per_page') + show_value = helpers.call_action( + "config_option_show", key="ckan.datasets_per_page" + ) assert show_value == value def test_update_registered_external_value_in_list(self): - '''Registering an external key/value will allow it to be included in - the list returned by config_option_list action.''' + """Registering an external key/value will allow it to be included in + the list returned by config_option_list action.""" - key = 'ckanext.example_iconfigurer.test_conf' - value = 'Test value' + key = "ckanext.example_iconfigurer.test_conf" + value = "Test value" params = {key: value} # add registered external value - helpers.call_action('config_option_update', **params) + helpers.call_action("config_option_update", **params) - option_list = helpers.call_action('config_option_list') + option_list = helpers.call_action("config_option_list") assert key in option_list def test_update_registered_external_value_in_show(self): - '''Registering an external key/value will allow it to be shown by the - config_option_show action.''' + """Registering an external key/value will allow it to be shown by the + config_option_show action.""" - key = 'ckanext.example_iconfigurer.test_conf' - value = 'Test value' + key = "ckanext.example_iconfigurer.test_conf" + value = "Test value" params = {key: value} # add registered external value - helpers.call_action('config_option_update', **params) + helpers.call_action("config_option_update", **params) show_value = helpers.call_action( - 'config_option_show', - key='ckanext.example_iconfigurer.test_conf') + "config_option_show", key="ckanext.example_iconfigurer.test_conf" + ) assert show_value == value diff --git a/ckanext/example_idatastorebackend/test/test_plugin.py b/ckanext/example_idatastorebackend/test/test_plugin.py index c0f3f06491e..d8aa38c6c26 100644 --- a/ckanext/example_idatastorebackend/test/test_plugin.py +++ b/ckanext/example_idatastorebackend/test/test_plugin.py @@ -2,12 +2,7 @@ from mock import patch, Mock, call -from nose.tools import ( - assert_equal, - assert_in, - assert_is_instance, - assert_raises -) +import pytest import ckan.plugins as plugins from ckan.common import config @@ -16,57 +11,57 @@ from ckanext.datastore.backend import DatastoreBackend from ckanext.datastore.backend.postgres import DatastorePostgresqlBackend from ckanext.example_idatastorebackend.example_sqlite import ( - DatastoreExampleSqliteBackend + DatastoreExampleSqliteBackend, ) class_to_patch = ( - u'ckanext.example_idatastorebackend.' - 'example_sqlite.DatastoreExampleSqliteBackend' + u"ckanext.example_idatastorebackend." + "example_sqlite.DatastoreExampleSqliteBackend" ) class ExampleIDatastoreBackendPlugin(helpers.FunctionalTestBase): - def setup(self): super(ExampleIDatastoreBackendPlugin, self).setup() - plugins.load(u'datastore') - plugins.load(u'example_idatastorebackend') + plugins.load(u"datastore") + plugins.load(u"example_idatastorebackend") def teardown(self): - plugins.unload(u'example_idatastorebackend') - plugins.unload(u'datastore') + plugins.unload(u"example_idatastorebackend") + plugins.unload(u"datastore") def test_backends_correctly_registered(self): DatastoreBackend.register_backends() - assert_in(u'sqlite', DatastoreBackend._backends) - assert_in(u'postgresql', DatastoreBackend._backends) + assert u"sqlite" in DatastoreBackend._backends + assert u"postgresql" in DatastoreBackend._backends def test_postgres_backend_with_standard_config(self): - assert_is_instance( - DatastoreBackend.get_active_backend(), - DatastorePostgresqlBackend) + assert isinstance( + DatastoreBackend.get_active_backend(), DatastorePostgresqlBackend + ) def test_inconsistent_engines_for_read_and_write(self): - with helpers.changed_config(u'ckan.datastore.write_url', u'sqlite://x'): - assert_raises( - AssertionError, - DatastoreBackend.set_active_backend, config) - with helpers.changed_config(u'ckan.datastore.read_url', u'sqlite://x'): - assert_raises( - AssertionError, - DatastoreBackend.set_active_backend, config) - - @helpers.change_config(u'ckan.datastore.write_url', u'sqlite://x') - @helpers.change_config(u'ckan.datastore.read_url', u'sqlite://x') + with helpers.changed_config( + u"ckan.datastore.write_url", u"sqlite://x" + ): + with pytest.raises(AssertionError): + DatastoreBackend.set_active_backend(config) + with helpers.changed_config(u"ckan.datastore.read_url", u"sqlite://x"): + with pytest.raises(AssertionError): + DatastoreBackend.set_active_backend(config) + + @helpers.change_config(u"ckan.datastore.write_url", u"sqlite://x") + @helpers.change_config(u"ckan.datastore.read_url", u"sqlite://x") def test_sqlite_engine(self): DatastoreBackend.set_active_backend(config) - assert_is_instance( + assert isinstance( DatastoreBackend.get_active_backend(), - DatastoreExampleSqliteBackend) + DatastoreExampleSqliteBackend, + ) - @helpers.change_config(u'ckan.datastore.write_url', u'sqlite://x') - @helpers.change_config(u'ckan.datastore.read_url', u'sqlite://x') - @patch(class_to_patch + u'._get_engine') + @helpers.change_config(u"ckan.datastore.write_url", u"sqlite://x") + @helpers.change_config(u"ckan.datastore.read_url", u"sqlite://x") + @patch(class_to_patch + u"._get_engine") def test_backend_functionality(self, get_engine): engine = get_engine() execute = engine.execute @@ -74,55 +69,57 @@ def test_backend_functionality(self, get_engine): execute.reset_mock() DatastoreExampleSqliteBackend.resource_fields = Mock( - return_value={u'meta': {}, u'schema': { - u'a': u'text' - }} + return_value={u"meta": {}, u"schema": {u"a": u"text"}} ) records = [ - {u'a': u'x'}, {u'a': u'y'}, {u'a': u'z'}, + {u"a": u"x"}, + {u"a": u"y"}, + {u"a": u"z"}, ] DatastoreBackend.set_active_backend(config) - res = factories.Resource(url_type=u'datastore') + res = factories.Resource(url_type=u"datastore") helpers.call_action( - u'datastore_create', resource_id=res['id'], - fields=[ - {u'id': u'a'} - ], records=records + u"datastore_create", + resource_id=res["id"], + fields=[{u"id": u"a"}], + records=records, ) # check, create and 3 inserts - assert_equal(5, execute.call_count) - insert_query = u'INSERT INTO "{0}"(a) VALUES(?)'.format(res['id']) + assert 5 == execute.call_count + insert_query = u'INSERT INTO "{0}"(a) VALUES(?)'.format(res["id"]) execute.assert_has_calls( [ - call(u' CREATE TABLE IF NOT EXISTS "{0}"(a text);'.format( - res['id'] - )), - call(insert_query, ['x']), - call(insert_query, ['y']), - call(insert_query, ['z']) - ]) + call( + u' CREATE TABLE IF NOT EXISTS "{0}"(a text);'.format( + res["id"] + ) + ), + call(insert_query, ["x"]), + call(insert_query, ["y"]), + call(insert_query, ["z"]), + ] + ) execute.reset_mock() fetchall.return_value = records - helpers.call_action( - u'datastore_search', resource_id=res['id']) + helpers.call_action(u"datastore_search", resource_id=res["id"]) execute.assert_called_with( - u'SELECT * FROM "{0}" LIMIT 10'.format(res['id']) + u'SELECT * FROM "{0}" LIMIT 10'.format(res["id"]) ) execute.reset_mock() - helpers.call_action( - u'datastore_delete', resource_id=res['id']) + helpers.call_action(u"datastore_delete", resource_id=res["id"]) # check delete execute.assert_called_with( - u'DROP TABLE IF EXISTS "{0}"'.format(res['id']) + u'DROP TABLE IF EXISTS "{0}"'.format(res["id"]) ) execute.reset_mock() - helpers.call_action( - u'datastore_info', id=res['id']) + helpers.call_action(u"datastore_info", id=res["id"]) # check c = u''' select name from sqlite_master - where type = "table" and name = "{0}"'''.format(res['id']) + where type = "table" and name = "{0}"'''.format( + res["id"] + ) execute.assert_called_with(c) diff --git a/ckanext/example_itranslation/tests/test_plugin.py b/ckanext/example_itranslation/tests/test_plugin.py index b21da5961f1..b1968179662 100644 --- a/ckanext/example_itranslation/tests/test_plugin.py +++ b/ckanext/example_itranslation/tests/test_plugin.py @@ -3,14 +3,11 @@ from ckan import plugins from ckan.tests import helpers -from nose.tools import assert_true, assert_false - class TestExampleITranslationPlugin(helpers.FunctionalTestBase): - @classmethod def _apply_config_changes(cls, config): - config['ckan.plugins'] = 'example_itranslation' + config["ckan.plugins"] = "example_itranslation" @classmethod def setup_class(cls): @@ -19,53 +16,45 @@ def setup_class(cls): @classmethod def teardown_class(cls): - if plugins.plugin_loaded('example_itranslation'): - plugins.unload('example_itranslation') + if plugins.plugin_loaded("example_itranslation"): + plugins.unload("example_itranslation") super(TestExampleITranslationPlugin, cls).teardown_class() def test_translated_string_in_extensions_templates(self): app = self._get_test_app() response = app.get( - url=plugins.toolkit.url_for(u'home.index', - locale='fr'), + url=plugins.toolkit.url_for(u"home.index", locale="fr"), ) - assert_true('This is a itranslated string' in response.body) - assert_false('This is an untranslated string' in response.body) + assert "This is a itranslated string" in response.body + assert "This is an untranslated string" not in response.body # double check the untranslated strings - response = app.get( - url=plugins.toolkit.url_for(u'home.index'), - ) - assert_true('This is an untranslated string' in response.body) - assert_false('This is a itranslated string' in response.body) + response = app.get(url=plugins.toolkit.url_for(u"home.index"),) + assert "This is an untranslated string" in response.body + assert "This is a itranslated string" not in response.body def test_translated_string_in_core_templates(self): app = self._get_test_app() response = app.get( - url=plugins.toolkit.url_for(u'home.index', - locale='fr'), + url=plugins.toolkit.url_for(u"home.index", locale="fr"), ) - assert_true('Overwritten string in ckan.mo' in response.body) - assert_false('Connexion' in response.body) + assert "Overwritten string in ckan.mo" in response.body + assert "Connexion" not in response.body # double check the untranslated strings - response = app.get( - url=plugins.toolkit.url_for(u'home.index'), - ) - assert_true('Log in' in response.body) - assert_false('Overwritten string in ckan.mo' in response.body) + response = app.get(url=plugins.toolkit.url_for(u"home.index"),) + assert "Log in" in response.body + assert "Overwritten string in ckan.mo" not in response.body # check that we have only overwritten 'fr' response = app.get( - url=plugins.toolkit.url_for(u'home.index', locale='de'), + url=plugins.toolkit.url_for(u"home.index", locale="de"), ) - assert_true('Einloggen' in response.body) - assert_false('Overwritten string in ckan.mo' in response.body) + assert "Einloggen" in response.body + assert "Overwritten string in ckan.mo" not in response.body def test_english_translation_replaces_default_english_string(self): app = self._get_test_app() - response = app.get( - url=plugins.toolkit.url_for(u'home.index'), - ) - assert_true('Replaced' in response.body) - assert_false('Register' in response.body) + response = app.get(url=plugins.toolkit.url_for(u"home.index"),) + assert "Replaced" in response.body + assert "Register" not in response.body diff --git a/ckanext/stats/tests/test_stats_lib.py b/ckanext/stats/tests/test_stats_lib.py index e53c31ce892..bb862b71029 100644 --- a/ckanext/stats/tests/test_stats_lib.py +++ b/ckanext/stats/tests/test_stats_lib.py @@ -1,7 +1,7 @@ # encoding: utf-8 import datetime -from nose.tools import assert_equal +import pytest from ckan import model from ckan.tests import factories @@ -18,33 +18,44 @@ def setup_class(cls): model.repo.rebuild_db() - user = factories.User(name='bob') - org_users = [{'name': user['name'], 'capacity': 'editor'}] - org1 = factories.Organization(name='org1', users=org_users) + user = factories.User(name="bob") + org_users = [{"name": user["name"], "capacity": "editor"}] + org1 = factories.Organization(name="org1", users=org_users) group2 = factories.Group() - tag1 = {'name': 'tag1'} - tag2 = {'name': 'tag2'} - factories.Dataset(name='test1', owner_org=org1['id'], tags=[tag1], - user=user) - factories.Dataset(name='test2', owner_org=org1['id'], groups=[{'name': - group2['name']}], tags=[tag1], user=user) - factories.Dataset(name='test3', owner_org=org1['id'], groups=[{'name': - group2['name']}], tags=[tag1, tag2], user=user, - private=True) - factories.Dataset(name='test4', user=user) + tag1 = {"name": "tag1"} + tag2 = {"name": "tag2"} + factories.Dataset( + name="test1", owner_org=org1["id"], tags=[tag1], user=user + ) + factories.Dataset( + name="test2", + owner_org=org1["id"], + groups=[{"name": group2["name"]}], + tags=[tag1], + user=user, + ) + factories.Dataset( + name="test3", + owner_org=org1["id"], + groups=[{"name": group2["name"]}], + tags=[tag1, tag2], + user=user, + private=True, + ) + factories.Dataset(name="test4", user=user) # week 2 - model.Package.by_name(u'test2').delete() + model.Package.by_name(u"test2").delete() model.repo.commit_and_remove() # week 3 - model.Package.by_name(u'test3').title = 'Test 3' + model.Package.by_name(u"test3").title = "Test 3" model.repo.commit_and_remove() - model.Package.by_name(u'test4').title = 'Test 4' + model.Package.by_name(u"test4").title = "Test 4" model.repo.commit_and_remove() # week 4 - model.Package.by_name(u'test3').notes = 'Test 3 notes' + model.Package.by_name(u"test3").notes = "Test 3 notes" model.repo.commit_and_remove() @classmethod @@ -63,16 +74,18 @@ def test_largest_groups(self): grps = [(grp.name, count) for grp, count in grps] # test2 does not come up because it was deleted # test3 does not come up because it is private - assert_equal(grps, [('org1', 1), ]) + assert grps == [ + ("org1", 1), + ] def test_top_tags(self): tags = Stats.top_tags() tags = [(tag.name, count) for tag, count in tags] - assert_equal(tags, [('tag1', 1)]) + assert tags == [("tag1", 1)] def test_top_package_creators(self): creators = Stats.top_package_creators() creators = [(creator.name, count) for creator, count in creators] # Only 2 shown because one of them was deleted and the other one is # private - assert_equal(creators, [('bob', 2)]) + assert creators == [("bob", 2)] diff --git a/ckanext/webpageview/tests/test_view.py b/ckanext/webpageview/tests/test_view.py index a4eb109f395..00ab7325f5a 100644 --- a/ckanext/webpageview/tests/test_view.py +++ b/ckanext/webpageview/tests/test_view.py @@ -4,14 +4,13 @@ import ckan.plugins as p -from nose.tools import assert_true from ckan.tests import helpers, factories class TestWebPageView(helpers.FunctionalTestBase): @classmethod def _apply_config_changes(cls, cfg): - cfg['ckan.plugins'] = 'webpage_view' + cfg["ckan.plugins"] = "webpage_view" @classmethod def setup_class(cls): @@ -20,30 +19,33 @@ def setup_class(cls): @classmethod def teardown_class(cls): - if p.plugin_loaded('webpage_view'): - p.unload('webpage_view') + if p.plugin_loaded("webpage_view"): + p.unload("webpage_view") super(TestWebPageView, cls).teardown_class() helpers.reset_db() - @helpers.change_config('ckan.views.default_views', '') + @helpers.change_config("ckan.views.default_views", "") def test_view_shown_on_resource_page(self): app = self._get_test_app() dataset = factories.Dataset() - resource = factories.Resource(package_id=dataset['id'], - url='http://some.website.html') + resource = factories.Resource( + package_id=dataset["id"], url="http://some.website.html" + ) resource_view = factories.ResourceView( - resource_id=resource['id'], - view_type='webpage_view', - page_url='http://some.other.website.html',) + resource_id=resource["id"], + view_type="webpage_view", + page_url="http://some.other.website.html", + ) - url = url_for('resource.read', - id=dataset['name'], resource_id=resource['id']) + url = url_for( + "resource.read", id=dataset["name"], resource_id=resource["id"] + ) response = app.get(url) - assert_true(resource_view['page_url'] in response) + assert resource_view["page_url"] in response