diff --git a/ckan/tests/model/test_license.py b/ckan/tests/model/test_license.py index d8147d69f17..da2ebf1f325 100644 --- a/ckan/tests/model/test_license.py +++ b/ckan/tests/model/test_license.py @@ -2,108 +2,111 @@ import os -from nose.tools import assert_equal, assert_in -from ckan.common import config +import pytest +import ckan.model as model +from ckan.common import config from ckan.model.license import LicenseRegister -from ckan.tests import helpers, factories +from ckan.tests import factories this_dir = os.path.dirname(os.path.realpath(__file__)) -class TestLicenseRegister(object): +@pytest.fixture +def reset(clean_db): + yield + if hasattr(model.Package, "_license_register"): + del model.Package._license_register + + +@pytest.mark.usefixtures("reset") +def test_default_register_has_basic_properties_of_a_license(): + config["licenses_group_url"] = None + reg = LicenseRegister() - def setup(self): - helpers.reset_db() + license = reg["cc-by"] + assert license.url == "http://www.opendefinition.org/licenses/cc-by" + assert license.isopen() + assert license.title == "Creative Commons Attribution" - def teardown(self): - # _license_register is cached, so clear it after tests that change the - # config['licenses_group_url'] - from ckan import model - if hasattr(model.Package, '_license_register'): - del model.Package._license_register - def test_default_register_has_basic_properties_of_a_license(self): - config['licenses_group_url'] = None - reg = LicenseRegister() +@pytest.mark.usefixtures("reset") +@pytest.mark.ckan_config("licenses_group_url", + "file:///%s/licenses.v1" % this_dir) +def test_import_v1_style_register(): + reg = LicenseRegister() - license = reg['cc-by'] - assert_equal(license.url, - 'http://www.opendefinition.org/licenses/cc-by') - assert_equal(license.isopen(), True) - assert_equal(license.title, 'Creative Commons Attribution') + license = reg["cc-by"] + assert license.url == "http://www.opendefinition.org/licenses/cc-by" + assert license.isopen() + assert license.title == "Creative Commons Attribution" - @helpers.change_config('licenses_group_url', 'file:///%s/licenses.v1' % this_dir) - def test_import_v1_style_register(self): - reg = LicenseRegister() - license = reg['cc-by'] - assert_equal(license.url, - 'http://www.opendefinition.org/licenses/cc-by') - assert_equal(license.isopen(), True) - assert_equal(license.title, 'Creative Commons Attribution') +# v2 is used by http://licenses.opendefinition.org in recent times +@pytest.mark.usefixtures("reset") +@pytest.mark.ckan_config("licenses_group_url", + "file:///%s/licenses.v2" % this_dir) +def test_import_v2_style_register(): + reg = LicenseRegister() + license = reg["CC-BY-4.0"] + assert license.url == "https://creativecommons.org/licenses/by/4.0/" + assert license.isopen() + assert license.title == "Creative Commons Attribution 4.0" - # v2 is used by http://licenses.opendefinition.org in recent times - @helpers.change_config('licenses_group_url', 'file:///%s/licenses.v2' % this_dir) - def test_import_v2_style_register(self): - reg = LicenseRegister() - license = reg['CC-BY-4.0'] - assert_equal(license.url, - 'https://creativecommons.org/licenses/by/4.0/') - assert_equal(license.isopen(), True) - assert_equal(license.title, 'Creative Commons Attribution 4.0') +@pytest.mark.usefixtures("reset") +@pytest.mark.ckan_config("licenses_group_url", + "file:///%s/licenses.v1" % this_dir) +@pytest.mark.ckan_config("ckan.locale_default", "ca") +def test_import_v1_style_register_i18n(app): + sysadmin = factories.Sysadmin() + resp = app.get("/dataset/new", + extra_environ={"REMOTE_USER": str(sysadmin["name"])}) + assert "Altres (Oberta)" in resp.body - @helpers.change_config('licenses_group_url', 'file:///%s/licenses.v1' % this_dir) - @helpers.change_config('ckan.locale_default', 'ca') - def test_import_v1_style_register_i18n(self): - sysadmin = factories.Sysadmin() - app = helpers._get_test_app() +@pytest.mark.usefixtures("reset") +@pytest.mark.ckan_config("licenses_group_url", + "file:///%s/licenses.v2" % this_dir) +@pytest.mark.ckan_config("ckan.locale_default", "ca") +def test_import_v2_style_register_i18n(app): + sysadmin = factories.Sysadmin() + resp = app.get("/dataset/new", + extra_environ={"REMOTE_USER": str(sysadmin["name"])}) + assert "Altres (Oberta)" in resp.body - resp = app.get('/dataset/new', extra_environ={'REMOTE_USER': str(sysadmin['name'])}) - assert_in('Altres (Oberta)', resp.body) - @helpers.change_config('licenses_group_url', 'file:///%s/licenses.v2' % this_dir) - @helpers.change_config('ckan.locale_default', 'ca') - def test_import_v2_style_register_i18n(self): +def test_access_via_attribute(): + license = LicenseRegister()["cc-by"] + assert license.od_conformance == "approved" - sysadmin = factories.Sysadmin() - app = helpers._get_test_app() - resp = app.get('/dataset/new', extra_environ={'REMOTE_USER': str(sysadmin['name'])}) - assert_in('Altres (Oberta)', resp.body) +def test_access_via_key(): + license = LicenseRegister()["cc-by"] + assert license["od_conformance"] == "approved" -class TestLicense: - def test_access_via_attribute(self): - license = LicenseRegister()['cc-by'] - assert_equal(license.od_conformance, 'approved') +def test_access_via_dict(): + license = LicenseRegister()["cc-by"] + license_dict = license.as_dict() + assert license_dict["od_conformance"] == "approved" + assert license_dict["osd_conformance"] == "not reviewed" - def test_access_via_key(self): - license = LicenseRegister()['cc-by'] - assert_equal(license['od_conformance'], 'approved') - def test_access_via_dict(self): - license = LicenseRegister()['cc-by'] - license_dict = license.as_dict() - assert_equal(license_dict['od_conformance'], 'approved') - assert_equal(license_dict['osd_conformance'], 'not reviewed') +def test_access_via_attribute(): + license = LicenseRegister()["cc-by"] + assert license.is_okd_compliant + assert not license.is_osi_compliant -class TestLicenseLegacyFields: - def test_access_via_attribute(self): - license = LicenseRegister()['cc-by'] - assert_equal(license.is_okd_compliant, True) - assert_equal(license.is_osi_compliant, False) +def test_access_via_key(): + license = LicenseRegister()["cc-by"] + assert license["is_okd_compliant"] + assert not license["is_osi_compliant"] - def test_access_via_key(self): - license = LicenseRegister()['cc-by'] - assert_equal(license['is_okd_compliant'], True) - assert_equal(license['is_osi_compliant'], False) - def test_access_via_dict(self): - license = LicenseRegister()['cc-by'] - license_dict = license.as_dict() - assert_equal(license_dict['is_okd_compliant'], True) - assert_equal(license_dict['is_osi_compliant'], False) +def test_access_via_dict(): + license = LicenseRegister()["cc-by"] + license_dict = license.as_dict() + assert license_dict["is_okd_compliant"] + assert not license_dict["is_osi_compliant"] diff --git a/ckan/tests/model/test_package.py b/ckan/tests/model/test_package.py index 906b4ee9742..6fd391a618d 100644 --- a/ckan/tests/model/test_package.py +++ b/ckan/tests/model/test_package.py @@ -1,110 +1,128 @@ # encoding: utf-8 -from nose.tools import assert_equal +import pytest from ckan import model -from ckan.tests import helpers, factories - - -class TestPackage(object): - - def setup(self): - helpers.reset_db() - - def test_create(self): - # Demonstrate creating a package. - # - # In practice this is done by a combination of: - # * ckan.logic.action.create:package_create - # * ckan.lib.dictization.model_save.py:package_dict_save - # etc - - model.repo.new_revision() - - pkg = model.Package(name=u'test-package') - pkg.notes = u'Some notes' - pkg.author = u'bob' - pkg.license_id = u'odc-by' - - model.Session.add(pkg) - model.Session.commit() - model.Session.remove() - - pkg = model.Package.by_name(u'test-package') - assert_equal(pkg.notes, u'Some notes') - assert_equal(pkg.author, u'bob') - assert_equal(pkg.license_id, u'odc-by') - assert_equal(pkg.license.title, - u'Open Data Commons Attribution License') - - def test_update(self): - dataset = factories.Dataset() - pkg = model.Package.by_name(dataset['name']) - - model.repo.new_revision() - pkg.author = u'bob' - model.Session.commit() - model.Session.remove() - - pkg = model.Package.by_name(dataset['name']) - assert_equal(pkg.author, u'bob') - - def test_delete(self): - group = factories.Group() - dataset = factories.Dataset( - groups=[{u'id': group['id']}], - tags=[{u'name': u'science'}], - extras=[{u'key': u'subject', u'value': u'science'}], - ) - pkg = model.Package.by_name(dataset['name']) - - model.repo.new_revision() - pkg.delete() - model.Session.commit() - model.Session.remove() - - pkg = model.Package.by_name(dataset['name']) - assert_equal(pkg.state, u'deleted') - # it is removed from the group - group = model.Group.get(group['id']) - assert_equal([p.name for p in group.packages()], []) - # other related objects don't change - package_extra = model.Session.query(model.PackageExtra).all()[0] - assert_equal(package_extra.state, u'active') - package_tag = model.Session.query(model.PackageTag).all()[0] - assert_equal(package_tag.state, u'active') - tag = model.Session.query(model.Tag).all()[0] - assert_equal([p.name for p in tag.packages], [dataset['name']]) - - def test_purge(self): - org = factories.Organization() - group = factories.Group() - dataset = factories.Dataset( - resources=[{u'url': u'http://example.com/image.png', - u'format': u'png', u'name': u'Image 1'}], - tags=[{u'name': u'science'}], - extras=[{u'key': u'subject', u'value': u'science'}], - groups=[{u'id': group['id']}], - owner_org=org['id'], - ) - pkg = model.Package.by_name(dataset['name']) - - model.repo.new_revision() - pkg.purge() - model.Session.commit() - model.Session.remove() - - assert not model.Session.query(model.Package).all() - # the purge cascades to some objects - assert not model.Session.query(model.PackageExtra).all() - assert not model.Session.query(model.PackageTag).all() - assert not model.Session.query(model.Resource).all() - # org remains, just not attached to the package - org = model.Group.get(org['id']) - assert_equal(org.packages(), []) - # tag object remains, just not attached to the package - tag = model.Session.query(model.Tag).all()[0] - assert_equal(tag.packages, []) - # group object remains, just not attached to the package - group = model.Group.get(group['id']) - assert_equal(group.packages(), []) +from ckan.tests import factories + + +@pytest.mark.usefixtures("clean_db") +def test_create(): + # Demonstrate creating a package. + # + # In practice this is done by a combination of: + # * ckan.logic.action.create:package_create + # * ckan.lib.dictization.model_save.py:package_dict_save + # etc + + model.repo.new_revision() + + pkg = model.Package(name=u"test-package") + pkg.notes = u"Some notes" + pkg.author = u"bob" + pkg.license_id = u"odc-by" + + model.Session.add(pkg) + model.Session.commit() + model.Session.remove() + + pkg = model.Package.by_name(u"test-package") + assert pkg.notes == u"Some notes" + assert pkg.author == u"bob" + assert pkg.license_id == u"odc-by" + assert pkg.license.title == u"Open Data Commons Attribution License" + + +@pytest.mark.usefixtures("clean_db") +def test_update(): + dataset = factories.Dataset() + pkg = model.Package.by_name(dataset["name"]) + + model.repo.new_revision() + pkg.author = u"bob" + model.Session.commit() + model.Session.remove() + + pkg = model.Package.by_name(dataset["name"]) + assert pkg.author == u"bob" + + +@pytest.mark.usefixtures("clean_db") +def test_delete(): + group = factories.Group() + dataset = factories.Dataset( + groups=[{ + u"id": group["id"] + }], + tags=[{ + u"name": u"science" + }], + extras=[{ + u"key": u"subject", + u"value": u"science" + }], + ) + pkg = model.Package.by_name(dataset["name"]) + + model.repo.new_revision() + pkg.delete() + model.Session.commit() + model.Session.remove() + + pkg = model.Package.by_name(dataset["name"]) + assert pkg.state == u"deleted" + # it is removed from the group + group = model.Group.get(group["id"]) + assert [p.name for p in group.packages()] == [] + # other related objects don't change + package_extra = model.Session.query(model.PackageExtra).all()[0] + assert package_extra.state == u"active" + package_tag = model.Session.query(model.PackageTag).all()[0] + assert package_tag.state == u"active" + tag = model.Session.query(model.Tag).all()[0] + assert [p.name for p in tag.packages] == [dataset["name"]] + + +@pytest.mark.usefixtures("clean_db") +def test_purge(): + org = factories.Organization() + group = factories.Group() + dataset = factories.Dataset( + resources=[{ + u"url": u"http://example.com/image.png", + u"format": u"png", + u"name": u"Image 1", + }], + tags=[{ + u"name": u"science" + }], + extras=[{ + u"key": u"subject", + u"value": u"science" + }], + groups=[{ + u"id": group["id"] + }], + owner_org=org["id"], + ) + pkg = model.Package.by_name(dataset["name"]) + + model.repo.new_revision() + pkg.purge() + model.Session.commit() + model.Session.remove() + + assert not model.Session.query(model.Package).all() + # the purge cascades to some objects + assert not model.Session.query(model.PackageExtra).all() + assert not model.Session.query(model.PackageTag).all() + assert not model.Session.query(model.Resource).all() + # org remains, just not attached to the package + org = model.Group.get(org["id"]) + assert org.packages() == [] + # tag object remains, just not attached to the package + tag = model.Session.query(model.Tag).all()[0] + assert tag.packages == [] + # group object remains, just not attached to the package + group = model.Group.get(group["id"]) + assert group.packages() == [] diff --git a/ckan/tests/model/test_package_extra.py b/ckan/tests/model/test_package_extra.py index 8e80e1f8d7c..377a7efb2e6 100644 --- a/ckan/tests/model/test_package_extra.py +++ b/ckan/tests/model/test_package_extra.py @@ -1,78 +1,84 @@ # encoding: utf-8 -from nose.tools import assert_equal +import pytest from ckan import model from ckan.tests import helpers, factories -class TestPackage(object): - - def setup(self): - helpers.reset_db() - - def test_create_extras(self): - model.repo.new_revision() - - pkg = model.Package(name=u'test-package') - - # method 1 - extra1 = model.PackageExtra(key=u'subject', value=u'science') - pkg._extras[u'subject'] = extra1 - - # method 2 - pkg.extras[u'accuracy'] = u'metre' - - model.Session.add_all([pkg]) - model.Session.commit() - model.Session.remove() - - pkg = model.Package.by_name(u'test-package') - assert_equal( - pkg.extras, - {u'subject': u'science', - u'accuracy': u'metre'} - ) - - def test_delete_extras(self): - - dataset = factories.Dataset(extras=[ - {u'key': u'subject', u'value': u'science'}, - {u'key': u'accuracy', u'value': u'metre'}] - ) - pkg = model.Package.by_name(dataset['name']) - - model.repo.new_revision() - del pkg.extras[u'subject'] - model.Session.commit() - model.Session.remove() - - pkg = model.Package.by_name(dataset['name']) - assert_equal( - pkg.extras, - {u'accuracy': u'metre'} - ) - - def test_extras_list(self): - extras = [ - {u'key': u'subject', u'value': u'science'}, - {u'key': u'accuracy', u'value': u'metre'}, - {u'key': u'sample_years', u'value': u'2012-2013'}, - ] - dataset = factories.Dataset(extras=extras) - # delete the 'subject' extra - extras = extras[1:] - helpers.call_action(u'package_patch', id=dataset['id'], extras=extras) - # unrelated extra, to check it doesn't affect things - factories.Dataset(extras=[{u'key': u'foo', u'value': u'bar'}]) - - pkg = model.Package.by_name(dataset['name']) - assert isinstance(pkg.extras_list[0], model.PackageExtra) - assert_equal( - set([(pe.package_id, pe.key, pe.value, pe.state) - for pe in pkg.extras_list]), - set([(dataset['id'], u'subject', u'science', u'deleted'), - (dataset['id'], u'accuracy', u'metre', u'active'), - (dataset['id'], u'sample_years', u'2012-2013', u'active'), - ]) - ) +@pytest.mark.usefixtures("clean_db") +def test_create_extras(): + model.repo.new_revision() + + pkg = model.Package(name=u"test-package") + + # method 1 + extra1 = model.PackageExtra(key=u"subject", value=u"science") + pkg._extras[u"subject"] = extra1 + + # method 2 + pkg.extras[u"accuracy"] = u"metre" + + model.Session.add_all([pkg]) + model.Session.commit() + model.Session.remove() + + pkg = model.Package.by_name(u"test-package") + assert pkg.extras == {u"subject": u"science", u"accuracy": u"metre"} + + +@pytest.mark.usefixtures("clean_db") +def test_delete_extras(): + + dataset = factories.Dataset(extras=[ + { + u"key": u"subject", + u"value": u"science" + }, + { + u"key": u"accuracy", + u"value": u"metre" + }, + ]) + pkg = model.Package.by_name(dataset["name"]) + + model.repo.new_revision() + del pkg.extras[u"subject"] + model.Session.commit() + model.Session.remove() + + pkg = model.Package.by_name(dataset["name"]) + assert pkg.extras == {u"accuracy": u"metre"} + + +@pytest.mark.usefixtures("clean_db") +def test_extras_list(): + extras = [ + { + u"key": u"subject", + u"value": u"science" + }, + { + u"key": u"accuracy", + u"value": u"metre" + }, + { + u"key": u"sample_years", + u"value": u"2012-2013" + }, + ] + dataset = factories.Dataset(extras=extras) + # delete the 'subject' extra + extras = extras[1:] + helpers.call_action(u"package_patch", id=dataset["id"], extras=extras) + # unrelated extra, to check it doesn't affect things + factories.Dataset(extras=[{u"key": u"foo", u"value": u"bar"}]) + + pkg = model.Package.by_name(dataset["name"]) + assert isinstance(pkg.extras_list[0], model.PackageExtra) + assert set([(pe.package_id, pe.key, pe.value, pe.state) + for pe in pkg.extras_list]) == set([ + (dataset["id"], u"subject", u"science", u"deleted"), + (dataset["id"], u"accuracy", u"metre", u"active"), + (dataset["id"], u"sample_years", u"2012-2013", u"active"), + ]) diff --git a/ckan/tests/model/test_resource.py b/ckan/tests/model/test_resource.py index 65f68bf9257..e2f3a20fc8e 100644 --- a/ckan/tests/model/test_resource.py +++ b/ckan/tests/model/test_resource.py @@ -1,83 +1,77 @@ # encoding: utf-8 -import nose.tools +import pytest import ckan.model as model -import ckan.plugins as p - -import ckan.tests.helpers as helpers import ckan.tests.factories as factories -assert_equals = nose.tools.assert_equals -assert_not_equals = nose.tools.assert_not_equals Resource = model.Resource -class TestResource(object): - @classmethod - def setup_class(cls): - if not p.plugin_loaded('image_view'): - p.load('image_view') - - helpers.reset_db() - - @classmethod - def teardown_class(cls): - p.unload('image_view') - - def setup(self): - model.repo.rebuild_db() - - def test_edit_url(self): - res_dict = factories.Resource(url='http://first') - res = Resource.get(res_dict['id']) - res.url = 'http://second' - model.repo.new_revision() - model.repo.commit_and_remove() - res = Resource.get(res_dict['id']) - assert_equals(res.url, 'http://second') - - def test_edit_extra(self): - res_dict = factories.Resource(newfield='first') - res = Resource.get(res_dict['id']) - res.extras = {'newfield': 'second'} - res.url - model.repo.new_revision() - model.repo.commit_and_remove() - res = Resource.get(res_dict['id']) - assert_equals(res.extras['newfield'], 'second') - - def test_get_all_without_views_returns_all_resources_without_views(self): - # Create resource with resource_view - factories.ResourceView() - - expected_resources = [ - factories.Resource(format='format'), - factories.Resource(format='other_format') - ] - - resources = Resource.get_all_without_views() - - expected_resources_ids = [r['id'] for r in expected_resources] - resources_ids = [r.id for r in resources] - - assert_equals(expected_resources_ids.sort(), resources_ids.sort()) - - def test_get_all_without_views_accepts_list_of_formats_ignoring_case(self): - factories.Resource(format='other_format') - resource_id = factories.Resource(format='format')['id'] - - resources = Resource.get_all_without_views(['FORMAT']) - - length = len(resources) - assert length == 1, 'Expected 1 resource, but got %d' % length - assert_equals([resources[0].id], [resource_id]) - - def test_resource_count(self): - '''Resource.count() should return a count of instances of Resource - class''' - assert_equals(Resource.count(), 0) - factories.Resource() - factories.Resource() - factories.Resource() - assert_equals(Resource.count(), 3) +@pytest.mark.ckan_config("ckan.plugins", "image_view") +@pytest.mark.usefixtures("clean_db", "with_plugins") +def test_edit_url(): + res_dict = factories.Resource(url="http://first") + res = Resource.get(res_dict["id"]) + res.url = "http://second" + model.repo.new_revision() + model.repo.commit_and_remove() + res = Resource.get(res_dict["id"]) + assert res.url == "http://second" + + +@pytest.mark.ckan_config("ckan.plugins", "image_view") +@pytest.mark.usefixtures("clean_db", "with_plugins") +def test_edit_extra(): + res_dict = factories.Resource(newfield="first") + res = Resource.get(res_dict["id"]) + res.extras = {"newfield": "second"} + res.url + model.repo.new_revision() + model.repo.commit_and_remove() + res = Resource.get(res_dict["id"]) + assert res.extras["newfield"] == "second" + + +@pytest.mark.ckan_config("ckan.plugins", "image_view") +@pytest.mark.usefixtures("clean_db", "with_plugins") +def test_get_all_without_views_returns_all_resources_without_views(): + # Create resource with resource_view + factories.ResourceView() + + expected_resources = [ + factories.Resource(format="format"), + factories.Resource(format="other_format"), + ] + + resources = Resource.get_all_without_views() + + expected_resources_ids = [r["id"] for r in expected_resources] + resources_ids = [r.id for r in resources] + + assert expected_resources_ids.sort() == resources_ids.sort() + + +@pytest.mark.ckan_config("ckan.plugins", "image_view") +@pytest.mark.usefixtures("clean_db", "with_plugins") +def test_get_all_without_views_accepts_list_of_formats_ignoring_case(): + factories.Resource(format="other_format") + resource_id = factories.Resource(format="format")["id"] + + resources = Resource.get_all_without_views(["FORMAT"]) + + length = len(resources) + assert length == 1, "Expected 1 resource, but got %d" % length + assert [resources[0].id] == [resource_id] + + +@pytest.mark.ckan_config("ckan.plugins", "image_view") +@pytest.mark.usefixtures("clean_db", "with_plugins") +def test_resource_count(): + """Resource.count() should return a count of instances of Resource + class""" + assert Resource.count() == 0 + factories.Resource() + factories.Resource() + factories.Resource() + assert Resource.count() == 3 diff --git a/ckan/tests/model/test_resource_view.py b/ckan/tests/model/test_resource_view.py index 28642a0d3ac..45bd2ae23d8 100644 --- a/ckan/tests/model/test_resource_view.py +++ b/ckan/tests/model/test_resource_view.py @@ -1,74 +1,65 @@ # encoding: utf-8 -import nose.tools +import pytest import ckan.model as model -import ckan.plugins as p - -import ckan.tests.helpers as helpers import ckan.tests.factories as factories -assert_equals = nose.tools.assert_equals -assert_not_equals = nose.tools.assert_not_equals ResourceView = model.ResourceView -class TestResourceView(object): - @classmethod - def setup_class(cls): - if not p.plugin_loaded('image_view'): - p.load('image_view') - if not p.plugin_loaded('webpage_view'): - p.load('webpage_view') +@pytest.mark.ckan_config("ckan.plugins", "image_view webpage_view") +@pytest.mark.usefixtures("clean_db", "with_plugins") +def test_resource_view_get(): + resource_view_id = factories.ResourceView()["id"] + resource_view = ResourceView.get(resource_view_id) - helpers.reset_db() + assert resource_view is not None - @classmethod - def teardown_class(cls): - p.unload('image_view') - p.unload('webpage_view') - def setup(self): - model.repo.rebuild_db() +@pytest.mark.ckan_config("ckan.plugins", "image_view webpage_view") +@pytest.mark.usefixtures("clean_db", "with_plugins") +def test_get_count_view_type(): + factories.ResourceView(view_type="image_view") + factories.ResourceView(view_type="webpage_view") - def test_resource_view_get(self): - resource_view_id = factories.ResourceView()['id'] - resource_view = ResourceView.get(resource_view_id) + result = ResourceView.get_count_not_in_view_types(["image_view"]) - assert_not_equals(resource_view, None) + assert result == [("webpage_view", 1)] - def test_get_count_view_type(self): - factories.ResourceView(view_type='image_view') - factories.ResourceView(view_type='webpage_view') - result = ResourceView.get_count_not_in_view_types(['image_view']) +@pytest.mark.ckan_config("ckan.plugins", "image_view webpage_view") +@pytest.mark.usefixtures("clean_db", "with_plugins") +def test_delete_view_type(): + factories.ResourceView(view_type="image_view") + factories.ResourceView(view_type="webpage_view") - assert_equals(result, [('webpage_view', 1)]) + ResourceView.delete_not_in_view_types(["image_view"]) - def test_delete_view_type(self): - factories.ResourceView(view_type='image_view') - factories.ResourceView(view_type='webpage_view') + result = ResourceView.get_count_not_in_view_types(["image_view"]) + assert result == [] - ResourceView.delete_not_in_view_types(['image_view']) - result = ResourceView.get_count_not_in_view_types(['image_view']) - assert_equals(result, []) +@pytest.mark.ckan_config("ckan.plugins", "image_view webpage_view") +@pytest.mark.usefixtures("clean_db", "with_plugins") +def test_delete_view_type_doesnt_commit(): + factories.ResourceView(view_type="image_view") + factories.ResourceView(view_type="webpage_view") - def test_delete_view_type_doesnt_commit(self): - factories.ResourceView(view_type='image_view') - factories.ResourceView(view_type='webpage_view') + ResourceView.delete_not_in_view_types(["image_view"]) + model.Session.rollback() - ResourceView.delete_not_in_view_types(['image_view']) - model.Session.rollback() + result = ResourceView.get_count_not_in_view_types(["image_view"]) + assert result == [("webpage_view", 1)] - result = ResourceView.get_count_not_in_view_types(['image_view']) - assert_equals(result, [('webpage_view', 1)]) - def test_purging_resource_removes_its_resource_views(self): - resource_view_dict = factories.ResourceView() - resource = model.Resource.get(resource_view_dict['resource_id']) +@pytest.mark.ckan_config("ckan.plugins", "image_view webpage_view") +@pytest.mark.usefixtures("clean_db", "with_plugins") +def test_purging_resource_removes_its_resource_views(): + resource_view_dict = factories.ResourceView() + resource = model.Resource.get(resource_view_dict["resource_id"]) - resource.purge() - model.repo.commit_and_remove() + resource.purge() + model.repo.commit_and_remove() - assert_equals(ResourceView.get(resource_view_dict['id']), None) + assert ResourceView.get(resource_view_dict["id"]) is None diff --git a/ckan/tests/model/test_system_info.py b/ckan/tests/model/test_system_info.py index 4ff60b0f5a3..46aba25d974 100644 --- a/ckan/tests/model/test_system_info.py +++ b/ckan/tests/model/test_system_info.py @@ -1,69 +1,55 @@ # encoding: utf-8 -import nose.tools +import pytest -import ckan.tests.helpers as helpers import ckan.tests.factories as factories - from ckan import model -from ckan.model.system_info import (SystemInfo, - set_system_info, - ) - - -assert_equals = nose.tools.assert_equals -assert_not_equals = nose.tools.assert_not_equals - - -class TestSystemInfo(object): +from ckan.model.system_info import SystemInfo, set_system_info - @classmethod - def setup_class(cls): - helpers.reset_db() - @classmethod - def teardown_class(cls): - helpers.reset_db() +@pytest.mark.usefixtures('clean_db') +def test_set_value(): - def test_set_value(self): + key = "config_option_1" + value = "test_value" + set_system_info(key, value) - key = 'config_option_1' - value = 'test_value' + results = model.Session.query(SystemInfo).filter_by(key=key).all() - set_system_info(key, value) + assert len(results) == 1 - results = model.Session.query(SystemInfo).filter_by(key=key).all() + obj = results[0] - assert_equals(len(results), 1) + assert obj.key == key + assert obj.value == value - obj = results[0] - assert_equals(obj.key, key) - assert_equals(obj.value, value) +@pytest.mark.usefixtures('clean_db') +def test_sets_new_value_for_same_key(): - def test_sets_new_value_for_same_key(self): + config = factories.SystemInfo() + first_revision = config.revision_id - config = factories.SystemInfo() - first_revision = config.revision_id + set_system_info(config.key, "new_value") - set_system_info(config.key, 'new_value') + new_config = model.Session.query(SystemInfo).filter_by( + key=config.key).first() - new_config = model.Session.query(SystemInfo) \ - .filter_by(key=config.key).first() + assert config.id == new_config.id + assert first_revision != new_config.revision_id - assert_equals(config.id, new_config.id) - assert_not_equals(first_revision, new_config.revision_id) + assert new_config.value == "new_value" - assert_equals(new_config.value, 'new_value') - def test_does_not_set_same_value_for_same_key(self): +@pytest.mark.usefixtures('clean_db') +def test_does_not_set_same_value_for_same_key(): - config = factories.SystemInfo() + config = factories.SystemInfo() - set_system_info(config.key, config.value) + set_system_info(config.key, config.value) - new_config = model.Session.query(SystemInfo) \ - .filter_by(key=config.key).first() + new_config = model.Session.query(SystemInfo).filter_by( + key=config.key).first() - assert_equals(config.id, new_config.id) - assert_equals(config.revision_id, new_config.revision_id) + assert config.id == new_config.id + assert config.revision_id == new_config.revision_id diff --git a/ckan/tests/model/test_tags.py b/ckan/tests/model/test_tags.py index 42ca6bf3107..68d4d7999ad 100644 --- a/ckan/tests/model/test_tags.py +++ b/ckan/tests/model/test_tags.py @@ -1,69 +1,63 @@ # encoding: utf-8 -from nose.tools import assert_equal +import pytest from ckan import model -from ckan.tests import helpers, factories +from ckan.tests import factories -class TestPackage(object): +@pytest.mark.usefixtures("clean_db") +def test_create_package_with_tags(): + model.repo.new_revision() - def setup(self): - helpers.reset_db() + pkg = model.Package(name=u"test-package") - def test_create_package_with_tags(self): - model.repo.new_revision() + # method 1 + tag1 = model.Tag(name=u"science") + package_tag1 = model.PackageTag(package=pkg, tag=tag1) + pkg.package_tag_all[:] = [package_tag1] - pkg = model.Package(name=u'test-package') + # method 2 + tag2 = model.Tag(name=u"geology") + package_tag2 = model.PackageTag(package=pkg, tag=tag2) + pkg.add_tag(tag2) - # method 1 - tag1 = model.Tag(name=u'science') - package_tag1 = model.PackageTag(package=pkg, tag=tag1) - pkg.package_tag_all[:] = [package_tag1] + # method 3 + pkg.add_tag_by_name(u"energy") - # method 2 - tag2 = model.Tag(name=u'geology') - package_tag2 = model.PackageTag(package=pkg, tag=tag2) - pkg.add_tag(tag2) + model.Session.add_all([pkg, package_tag1, package_tag2]) + model.Session.commit() + model.Session.remove() - # method 3 - pkg.add_tag_by_name(u'energy') + pkg = model.Package.by_name(u"test-package") + assert set([tag.name for tag in pkg.get_tags() + ]) == set([u"science", u"geology", u"energy"]) - model.Session.add_all([pkg, package_tag1, package_tag2]) - model.Session.commit() - model.Session.remove() - pkg = model.Package.by_name(u'test-package') - assert_equal( - set([tag.name for tag in pkg.get_tags()]), - set([u'science', u'geology', u'energy']) - ) +@pytest.mark.usefixtures("clean_db") +def test_delete_tag(): + dataset = factories.Dataset(tags=[{ + u"name": u"science" + }, { + u"name": u"geology" + }, { + u"name": u"energy" + }]) + pkg = model.Package.by_name(dataset["name"]) - def test_delete_tag(self): - dataset = factories.Dataset(tags=[{u'name': u'science'}, - {u'name': u'geology'}, - {u'name': u'energy'}]) - pkg = model.Package.by_name(dataset['name']) + model.repo.new_revision() - model.repo.new_revision() + # method 1 - unused by ckan core + tag = model.Tag.by_name(u"science") + pkg.remove_tag(tag) - # method 1 - unused by ckan core - tag = model.Tag.by_name(u'science') - pkg.remove_tag(tag) + # method 2 + package_tag = (model.Session.query(model.PackageTag).join( + model.Tag).filter(model.Tag.name == u"geology").one()) + package_tag.state = u"deleted" - # method 2 - package_tag = \ - model.Session.query(model.PackageTag) \ - .join(model.Tag) \ - .filter(model.Tag.name == u'geology') \ - .one() - package_tag.state = u'deleted' + model.Session.commit() + model.Session.remove() - model.Session.commit() - model.Session.remove() - - pkg = model.Package.by_name(dataset['name']) - assert_equal( - set([tag.name for tag in pkg.get_tags()]), - set(['energy']) - ) + pkg = model.Package.by_name(dataset["name"]) + assert set([tag.name for tag in pkg.get_tags()]) == set(["energy"]) diff --git a/ckan/tests/model/test_user.py b/ckan/tests/model/test_user.py index fecef08804b..c6dbc39f39e 100644 --- a/ckan/tests/model/test_user.py +++ b/ckan/tests/model/test_user.py @@ -3,7 +3,6 @@ import os import hashlib -import nose.tools as nt import pytest from passlib.hash import pbkdf2_sha512 from six import text_type @@ -13,12 +12,12 @@ def _set_password(password): - '''Copy of the old password hashing function + """Copy of the old password hashing function This is needed to create old password hashes in the tests - ''' + """ if isinstance(password, text_type): - password_8bit = password.encode('ascii', 'ignore') + password_8bit = password.encode("ascii", "ignore") else: password_8bit = password @@ -27,95 +26,95 @@ def _set_password(password): hashed_password = salt.hexdigest() + hash.hexdigest() if not isinstance(hashed_password, text_type): - hashed_password = hashed_password.decode('utf-8') + hashed_password = hashed_password.decode("utf-8") return hashed_password -@pytest.mark.usefixtures('clean_db') +@pytest.mark.usefixtures("clean_db") def test_upgrade_from_sha(): user = factories.User() - user_obj = model.User.by_name(user['name']) + user_obj = model.User.by_name(user["name"]) # setup our user with an old password hash - old_hash = _set_password('testpass') + old_hash = _set_password("testpass") user_obj._password = old_hash user_obj.save() - user_obj.validate_password('testpass') - nt.assert_not_equals(old_hash, user_obj.password) - nt.assert_true(pbkdf2_sha512.identify(user_obj.password)) - nt.assert_true(pbkdf2_sha512.verify('testpass', user_obj.password)) + user_obj.validate_password("testpass") + assert old_hash != user_obj.password + assert pbkdf2_sha512.identify(user_obj.password) + assert pbkdf2_sha512.verify("testpass", user_obj.password) -@pytest.mark.usefixtures('clean_db') +@pytest.mark.usefixtures("clean_db") def test_upgrade_from_sha_with_unicode_password(): user = factories.User() - password = u'testpassword\xc2\xa0' - user_obj = model.User.by_name(user['name']) + password = u"testpassword\xc2\xa0" + user_obj = model.User.by_name(user["name"]) # setup our user with an old password hash old_hash = _set_password(password) user_obj._password = old_hash user_obj.save() - nt.assert_true(user_obj.validate_password(password)) - nt.assert_not_equals(old_hash, user_obj.password) - nt.assert_true(pbkdf2_sha512.identify(user_obj.password)) - nt.assert_true(pbkdf2_sha512.verify(password, user_obj.password)) + assert user_obj.validate_password(password) + assert old_hash != user_obj.password + assert pbkdf2_sha512.identify(user_obj.password) + assert pbkdf2_sha512.verify(password, user_obj.password) # check that we now allow unicode characters - nt.assert_false(pbkdf2_sha512.verify('testpassword', user_obj.password)) + assert not pbkdf2_sha512.verify("testpassword", user_obj.password) -@pytest.mark.usefixtures('clean_db') +@pytest.mark.usefixtures("clean_db") def test_upgrade_from_sha_with_wrong_password_fails_to_upgrade(): user = factories.User() - password = u'testpassword' - user_obj = model.User.by_name(user['name']) + password = u"testpassword" + user_obj = model.User.by_name(user["name"]) old_hash = _set_password(password) user_obj._password = old_hash user_obj.save() - nt.assert_false(user_obj.validate_password('wrongpass')) - nt.assert_equals(old_hash, user_obj.password) - nt.assert_false(pbkdf2_sha512.identify(user_obj.password)) + assert not user_obj.validate_password("wrongpass") + assert old_hash == user_obj.password + assert not pbkdf2_sha512.identify(user_obj.password) -@pytest.mark.usefixtures('clean_db') +@pytest.mark.usefixtures("clean_db") def test_upgrade_from_pbkdf2_with_less_rounds(): - '''set up a pbkdf key with less than the default rounds + """set up a pbkdf key with less than the default rounds If the number of default_rounds is increased in a later version of passlib, ckan should upgrade the password hashes for people without - involvement from users''' + involvement from users""" user = factories.User() - password = u'testpassword' - user_obj = model.User.by_name(user['name']) + password = u"testpassword" + user_obj = model.User.by_name(user["name"]) # setup hash with salt/rounds less than the default old_hash = pbkdf2_sha512.encrypt(password, salt_size=2, rounds=10) user_obj._password = old_hash user_obj.save() - nt.assert_true(user_obj.validate_password(password.encode('utf-8'))) + assert user_obj.validate_password(password.encode("utf-8")) # check that the hash has been updated - nt.assert_not_equals(old_hash, user_obj.password) + assert old_hash != user_obj.password new_hash = pbkdf2_sha512.from_string(user_obj.password) - nt.assert_true(pbkdf2_sha512.default_rounds > 10) - nt.assert_equals(pbkdf2_sha512.default_rounds, new_hash.rounds) + assert pbkdf2_sha512.default_rounds > 10 + assert pbkdf2_sha512.default_rounds == new_hash.rounds - nt.assert_true(pbkdf2_sha512.default_salt_size, 2) - nt.assert_equals(pbkdf2_sha512.default_salt_size, len(new_hash.salt)) - nt.assert_true(pbkdf2_sha512.verify(password, user_obj.password)) + assert pbkdf2_sha512.default_salt_size, 2 + assert pbkdf2_sha512.default_salt_size == len(new_hash.salt) + assert pbkdf2_sha512.verify(password, user_obj.password) -@pytest.mark.usefixtures('clean_db') +@pytest.mark.usefixtures("clean_db") def test_upgrade_from_pbkdf2_fails_with_wrong_password(): user = factories.User() - password = u'testpassword' - user_obj = model.User.by_name(user['name']) + password = u"testpassword" + user_obj = model.User.by_name(user["name"]) # setup hash with salt/rounds less than the default @@ -123,28 +122,28 @@ def test_upgrade_from_pbkdf2_fails_with_wrong_password(): user_obj._password = old_hash user_obj.save() - nt.assert_false(user_obj.validate_password('wrong_pass')) + assert not user_obj.validate_password("wrong_pass") # check that the hash has _not_ been updated - nt.assert_equals(old_hash, user_obj.password) + assert old_hash == user_obj.password -@pytest.mark.usefixtures('clean_db') +@pytest.mark.usefixtures("clean_db") def test_pbkdf2_password_auth(): user = factories.User() - password = u'testpassword' - user_obj = model.User.by_name(user['name']) + password = u"testpassword" + user_obj = model.User.by_name(user["name"]) user_obj._set_password(password) user_obj.save() - nt.assert_true(user_obj.validate_password(password)) + assert user_obj.validate_password(password) -@pytest.mark.usefixtures('clean_db') +@pytest.mark.usefixtures("clean_db") def test_pbkdf2_password_auth_unicode(): user = factories.User() - password = u'testpassword\xc2\xa0' - user_obj = model.User.by_name(user['name']) + password = u"testpassword\xc2\xa0" + user_obj = model.User.by_name(user["name"]) user_obj._set_password(password) user_obj.save() - nt.assert_true(user_obj.validate_password(password)) + assert user_obj.validate_password(password)