Skip to content

Commit

Permalink
Disallow object ids being set on create by sysadmins
Browse files Browse the repository at this point in the history
There is no need for this.
  • Loading branch information
mark-saeon committed Apr 9, 2019
1 parent 37e073b commit dac57c4
Show file tree
Hide file tree
Showing 9 changed files with 8 additions and 206 deletions.
18 changes: 8 additions & 10 deletions ckanext/metadata/logic/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@
ignore = tk.get_validator('ignore')
ignore_missing = tk.get_validator('ignore_missing')
default = tk.get_validator('default')
package_id_does_not_exist = tk.get_validator('package_id_does_not_exist')
name_validator = tk.get_validator('name_validator')
package_name_validator = tk.get_validator('package_name_validator')
package_version_validator = tk.get_validator('package_version_validator')
email_validator = tk.get_validator('email_validator')
group_name_validator = tk.get_validator('group_name_validator')
empty_if_not_sysadmin = tk.get_validator('empty_if_not_sysadmin')
ignore_not_sysadmin = tk.get_validator('ignore_not_sysadmin')
ignore_not_package_admin = tk.get_validator('ignore_not_package_admin')
ignore_not_group_admin = tk.get_validator('ignore_not_group_admin')
Expand Down Expand Up @@ -78,7 +76,7 @@ def _extras_schema():
def metadata_record_create_schema():
schema = {
# native package fields with special usage
'id': [empty_if_not_sysadmin, ignore_missing, unicode, package_id_does_not_exist],
'id': [ignore],
'owner_org': [v.not_empty, v.object_exists('organization'), owner_org_validator, unicode],
'state': [ignore_not_package_admin, ignore_missing],
'type': [],
Expand Down Expand Up @@ -209,7 +207,7 @@ def metadata_record_workflow_annotation_show_schema(deserialize_json=False):
def metadata_collection_create_schema():
schema = {
# from the default group schema
'id': [empty_if_not_sysadmin, ignore_missing, unicode, v.group_does_not_exist],
'id': [ignore],
'name': [v.not_empty, unicode, name_validator, group_name_validator],
'title': [ignore_missing, unicode],
'description': [ignore_missing, unicode],
Expand Down Expand Up @@ -252,7 +250,7 @@ def metadata_collection_show_schema():
def infrastructure_create_schema():
schema = {
# from the default group schema
'id': [empty_if_not_sysadmin, ignore_missing, unicode, v.group_does_not_exist],
'id': [ignore],
'name': [v.not_empty, unicode, name_validator, group_name_validator],
'title': [ignore_missing, unicode],
'description': [ignore_missing, unicode],
Expand Down Expand Up @@ -288,7 +286,7 @@ def infrastructure_show_schema():

def metadata_standard_create_schema():
schema = {
'id': [empty_if_not_sysadmin, ignore_missing, unicode, v.object_does_not_exist('metadata_standard')],
'id': [ignore],
'name': [ignore_missing, unicode, name_validator, v.object_name_validator('metadata_standard')],
'description': [ignore_missing, unicode],
'standard_name': [v.not_empty, unicode],
Expand Down Expand Up @@ -326,7 +324,7 @@ def metadata_standard_show_schema(deserialize_json=False):

def metadata_json_attr_map_create_schema():
schema = {
'id': [empty_if_not_sysadmin, ignore_missing, unicode, v.object_does_not_exist('metadata_json_attr_map')],
'id': [ignore],
'json_path': [v.not_empty, unicode, v.json_pointer_validator],
'record_attr': [v.not_empty, unicode, v.schema_key_validator(metadata_record_attr_mappable_schema(), True)],
'is_key': [v.not_missing, boolean_validator],
Expand Down Expand Up @@ -367,7 +365,7 @@ def metadata_json_attr_map_apply_schema():

def metadata_schema_create_schema():
schema = {
'id': [empty_if_not_sysadmin, ignore_missing, unicode, v.object_does_not_exist('metadata_schema')],
'id': [ignore],
'name': [ignore_missing, unicode, name_validator, v.object_name_validator('metadata_schema')],
'description': [ignore_missing, unicode],
'metadata_standard_id': [v.not_empty, unicode, v.object_exists('metadata_standard')],
Expand Down Expand Up @@ -407,7 +405,7 @@ def metadata_schema_show_schema(deserialize_json=False):

def workflow_state_create_schema():
schema = {
'id': [empty_if_not_sysadmin, ignore_missing, unicode, v.object_does_not_exist('workflow_state')],
'id': [ignore],
'name': [v.not_empty, unicode, name_validator, v.object_name_validator('workflow_state')],
'title': [ignore_missing, unicode],
'description': [ignore_missing, unicode],
Expand Down Expand Up @@ -440,7 +438,7 @@ def workflow_state_show_schema(deserialize_json=False):

def workflow_transition_create_schema():
schema = {
'id': [empty_if_not_sysadmin, ignore_missing, unicode, v.object_does_not_exist('workflow_transition')],
'id': [ignore],
'from_state_id': [v.not_missing, unicode, v.object_exists('workflow_state')],
'to_state_id': [v.not_empty, unicode, v.object_exists('workflow_state')],
'state': [ignore_not_sysadmin, ignore_missing],
Expand Down
21 changes: 0 additions & 21 deletions ckanext/metadata/tests/test_infrastructure_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,12 @@ def test_create_valid(self):
assert obj.is_organization == False
assert_object_matches_dict(obj, input_dict)

def test_create_valid_sysadmin_setid(self):
input_dict = {
'id': make_uuid(),
'name': 'test-infrastructure',
}
result, obj = self.test_action('infrastructure_create', sysadmin=True, check_auth=True, **input_dict)
assert obj.type == 'infrastructure'
assert obj.is_organization == False
assert_object_matches_dict(obj, input_dict)

def test_create_invalid_duplicate_name(self):
infrastructure = ckanext_factories.Infrastructure()
result, obj = self.test_action('infrastructure_create', should_error=True,
name=infrastructure['name'])
assert_error(result, 'name', 'Group name already exists in database')

def test_create_invalid_nonsysadmin_setid(self):
result, obj = self.test_action('infrastructure_create', should_error=True, check_auth=True,
id=make_uuid())
assert_error(result, 'id', 'The input field id was not expected.')

def test_create_invalid_sysadmin_duplicate_id(self):
infrastructure = ckanext_factories.Infrastructure()
result, obj = self.test_action('infrastructure_create', should_error=True, sysadmin=True, check_auth=True,
id=infrastructure['id'])
assert_error(result, 'id', 'Already exists: Group')

def test_update_valid(self):
infrastructure = ckanext_factories.Infrastructure()
input_dict = {
Expand Down
26 changes: 0 additions & 26 deletions ckanext/metadata/tests/test_metadata_collection_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,38 +46,12 @@ def test_create_valid_organization_byname(self):
assert_group_has_extra(obj.id, 'organization_id', organization['id'])
assert_group_has_member(organization['id'], obj.id, 'group', capacity='parent')

def test_create_valid_sysadmin_setid(self):
organization = ckan_factories.Organization(user=self.normal_user)
input_dict = {
'id': make_uuid(),
'name': 'test-metadata-collection',
'organization_id': organization['id'],
}
result, obj = self.test_action('metadata_collection_create', sysadmin=True, check_auth=True, **input_dict)
assert obj.type == 'metadata_collection'
assert obj.is_organization == False
assert_group_has_extra(obj.id, 'organization_id', input_dict['organization_id'])
del input_dict['organization_id']
assert_object_matches_dict(obj, input_dict)
assert_group_has_member(organization['id'], obj.id, 'group', capacity='parent')

def test_create_invalid_duplicate_name(self):
metadata_collection = ckanext_factories.MetadataCollection()
result, obj = self.test_action('metadata_collection_create', should_error=True,
name=metadata_collection['name'])
assert_error(result, 'name', 'Group name already exists in database')

def test_create_invalid_nonsysadmin_setid(self):
result, obj = self.test_action('metadata_collection_create', should_error=True, check_auth=True,
id=make_uuid())
assert_error(result, 'id', 'The input field id was not expected.')

def test_create_invalid_sysadmin_duplicate_id(self):
metadata_collection = ckanext_factories.MetadataCollection()
result, obj = self.test_action('metadata_collection_create', should_error=True, sysadmin=True, check_auth=True,
id=metadata_collection['id'])
assert_error(result, 'id', 'Already exists: Group')

def test_create_invalid_bad_organization(self):
result, obj = self.test_action('metadata_collection_create', should_error=True,
organization_id='foo')
Expand Down
24 changes: 0 additions & 24 deletions ckanext/metadata/tests/test_metadata_json_attr_map_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,30 +39,6 @@ def test_create_valid_metadata_standard_byname(self):
input_dict['metadata_standard_id'] = metadata_standard['id']
assert_object_matches_dict(obj, input_dict)

def test_create_valid_sysadmin_setid(self):
metadata_standard = ckanext_factories.MetadataStandard(
metadata_template_json=load_example('datacite_4.2_saeon_record.json'))
input_dict = {
'id': make_uuid(),
'json_path': '/identifier/identifier',
'record_attr': 'name',
'is_key': True,
'metadata_standard_id': metadata_standard['id'],
}
result, obj = self.test_action('metadata_json_attr_map_create', sysadmin=True, check_auth=True, **input_dict)
assert_object_matches_dict(obj, input_dict)

def test_create_invalid_nonsysadmin_setid(self):
result, obj = self.test_action('metadata_json_attr_map_create', should_error=True, check_auth=True,
id=make_uuid())
assert_error(result, 'id', 'The input field id was not expected.')

def test_create_invalid_sysadmin_duplicate_id(self):
metadata_json_attr_map = ckanext_factories.MetadataJSONAttrMap()
result, obj = self.test_action('metadata_json_attr_map_create', should_error=True, sysadmin=True, check_auth=True,
id=metadata_json_attr_map['id'])
assert_error(result, 'id', 'Already exists: Metadata JSON Attribute Map')

def test_create_invalid_bad_metadata_standard(self):
result, obj = self.test_action('metadata_json_attr_map_create', should_error=True,
metadata_standard_id='foo')
Expand Down
17 changes: 0 additions & 17 deletions ckanext/metadata/tests/test_metadata_record_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,6 @@ def test_create_valid_with_infrastructures(self):
assert_group_has_member(infrastructure1['id'], obj.id, 'package')
assert_group_has_member(infrastructure2['id'], obj.id, 'package')

def test_create_valid_sysadmin_setid(self):
input_dict = self._make_input_dict()
input_dict['id'] = make_uuid()
result, obj = self.test_action('metadata_record_create', sysadmin=True, check_auth=True, **input_dict)
self._assert_metadata_record_ok(obj, input_dict)

def test_create_valid_map_attributes(self):
"""
Test copying of metadata element values into package attributes via metadata JSON
Expand Down Expand Up @@ -301,17 +295,6 @@ def test_create_invalid_map_attributes_partial_keys_2(self):
result, obj = self.test_action('metadata_record_create', should_error=True, **input_dict)
assert_error(result, 'message', 'Cannot unambiguously match an existing record for the given key attribute values')

def test_create_invalid_nonsysadmin_setid(self):
result, obj = self.test_action('metadata_record_create', should_error=True, check_auth=True,
id=make_uuid())
assert_error(result, 'id', 'The input field id was not expected.')

def test_create_invalid_sysadmin_duplicate_id(self):
metadata_record = ckanext_factories.MetadataRecord()
result, obj = self.test_action('metadata_record_create', should_error=True, sysadmin=True, check_auth=True,
id=metadata_record['id'])
assert_error(result, 'id', 'Dataset id already exists')

def test_create_invalid_missing_params(self):
result, obj = self.test_action('metadata_record_create', should_error=True)
assert_error(result, 'owner_org', 'Missing parameter')
Expand Down
23 changes: 0 additions & 23 deletions ckanext/metadata/tests/test_metadata_schema_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,18 +135,6 @@ def test_create_valid_with_infrastructure_byname(self):
assert obj.infrastructure_id == infrastructure['id']
assert obj.name == generate_name(metadata_standard['name'], '', infrastructure['name'])

def test_create_valid_sysadmin_setid(self):
metadata_standard = ckanext_factories.MetadataStandard()
input_dict = {
'id': make_uuid(),
'metadata_standard_id': metadata_standard['id'],
'organization_id': '',
'infrastructure_id': '',
'schema_json': '{}',
}
result, obj = self.test_action('metadata_schema_create', sysadmin=True, check_auth=True, **input_dict)
assert_object_matches_dict(obj, input_dict)

def test_create_valid_same_standard_different_organization(self):
organization1 = ckan_factories.Organization()
organization2 = ckan_factories.Organization()
Expand Down Expand Up @@ -331,17 +319,6 @@ def test_create_invalid_with_organization_and_infrastructure(self):
assert_error(result, '__after',
'A metadata schema may be associated with either an organization or an infrastructure but not both.')

def test_create_invalid_nonsysadmin_setid(self):
result, obj = self.test_action('metadata_schema_create', should_error=True, check_auth=True,
id=make_uuid())
assert_error(result, 'id', 'The input field id was not expected.')

def test_create_invalid_sysadmin_duplicate_id(self):
metadata_schema = ckanext_factories.MetadataSchema()
result, obj = self.test_action('metadata_schema_create', should_error=True, sysadmin=True, check_auth=True,
id=metadata_schema['id'])
assert_error(result, 'id', 'Already exists: Metadata Schema')

def test_create_invalid_not_json(self):
result, obj = self.test_action('metadata_schema_create', should_error=True,
schema_json='not json')
Expand Down
32 changes: 0 additions & 32 deletions ckanext/metadata/tests/test_metadata_standard_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,6 @@ def test_create_valid_with_parent_byname(self):
input_dict['parent_standard_id'] = metadata_standard['id']
assert_object_matches_dict(obj, input_dict)

def test_create_valid_sysadmin_setid(self):
input_dict = {
'id': make_uuid(),
'standard_name': 'DataCite',
'standard_version': '1.0',
'parent_standard_id': '',
'metadata_template_json': '{}',
}
result, obj = self.test_action('metadata_standard_create', sysadmin=True, check_auth=True, **input_dict)
assert_object_matches_dict(obj, input_dict)

def test_create_valid_same_name_new_version(self):
metadata_standard = ckanext_factories.MetadataStandard()
input_dict = {
Expand Down Expand Up @@ -122,27 +111,6 @@ def test_create_invalid_duplicate(self):
result, obj = self.test_action('metadata_standard_create', should_error=True, **input_dict)
assert_error(result, '__after', 'Unique constraint violation')

def test_create_invalid_nonsysadmin_setid(self):
result, obj = self.test_action('metadata_standard_create', should_error=True, check_auth=True,
id=make_uuid())
assert_error(result, 'id', 'The input field id was not expected.')

def test_create_invalid_sysadmin_duplicate_id(self):
metadata_standard = ckanext_factories.MetadataStandard()
result, obj = self.test_action('metadata_standard_create', should_error=True, sysadmin=True, check_auth=True,
id=metadata_standard['id'])
assert_error(result, 'id', 'Already exists: Metadata Standard')

def test_create_invalid_sysadmin_self_parent(self):
new_id = make_uuid()
input_dict = {
'id': new_id,
'parent_standard_id': new_id,
}
result, obj = self.test_action('metadata_standard_create', should_error=True,
sysadmin=True, check_auth=True, **input_dict)
assert_error(result, 'parent_standard_id', 'Not found: Metadata Standard')

def test_create_invalid_bad_parent(self):
result, obj = self.test_action('metadata_standard_create', should_error=True,
parent_standard_id='foo')
Expand Down
32 changes: 0 additions & 32 deletions ckanext/metadata/tests/test_workflow_state_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,6 @@ def test_create_valid_with_revert_byname(self):
input_dict['revert_state_id'] = workflow_state['id']
assert_object_matches_dict(obj, input_dict)

def test_create_valid_sysadmin_setid(self):
input_dict = {
'id': make_uuid(),
'name': 'test-workflow-state',
'revert_state_id': '',
'metadata_records_private': True,
'workflow_rules_json': '{ "testkey": "testvalue" }',
}
result, obj = self.test_action('workflow_state_create', sysadmin=True, check_auth=True, **input_dict)
assert_object_matches_dict(obj, input_dict)

def test_create_invalid_duplicate_name(self):
workflow_state = ckanext_factories.WorkflowState()
result, obj = self.test_action('workflow_state_create', should_error=True,
Expand All @@ -82,27 +71,6 @@ def test_create_invalid_missing_values(self):
assert_error(result, 'name', 'Missing value')
assert_error(result, 'workflow_rules_json', 'Missing value')

def test_create_invalid_nonsysadmin_setid(self):
result, obj = self.test_action('workflow_state_create', should_error=True, check_auth=True,
id=make_uuid())
assert_error(result, 'id', 'The input field id was not expected.')

def test_create_invalid_sysadmin_duplicate_id(self):
workflow_state = ckanext_factories.WorkflowState()
result, obj = self.test_action('workflow_state_create', should_error=True, sysadmin=True, check_auth=True,
id=workflow_state['id'])
assert_error(result, 'id', 'Already exists: Workflow State')

def test_create_invalid_sysadmin_self_revert(self):
new_id = make_uuid()
input_dict = {
'id': new_id,
'revert_state_id': new_id,
}
result, obj = self.test_action('workflow_state_create', should_error=True,
sysadmin=True, check_auth=True, **input_dict)
assert_error(result, 'revert_state_id', 'Not found: Workflow State')

def test_create_invalid_bad_revert(self):
result, obj = self.test_action('workflow_state_create', should_error=True,
revert_state_id='foo')
Expand Down
Loading

0 comments on commit dac57c4

Please sign in to comment.