diff --git a/CHANGELOG b/CHANGELOG index c9eda95..6cfaf51 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,5 +1,7 @@ 0.15.0 + - BREAKING: drop support for DCOR API version "1" - ref: migrate to dcor_shared 0.8.0 + - ref: upload condensed datasets directly to S3 (no copy on block storage) 0.14.1 - maintenance release 0.14.0 diff --git a/ckanext/dc_serve/jobs.py b/ckanext/dc_serve/jobs.py index e165a77..72492c8 100644 --- a/ckanext/dc_serve/jobs.py +++ b/ckanext/dc_serve/jobs.py @@ -1,8 +1,12 @@ +import pathlib +import tempfile +import warnings + import ckan.plugins.toolkit as toolkit from dclab import RTDCWriter -from dclab.cli import condense +from dclab.cli import condense_dataset from dcor_shared import ( - DC_MIME_TYPES, s3, sha256sum, get_ckan_config_option, get_resource_path, + DC_MIME_TYPES, get_dc_instance, s3cc, get_ckan_config_option, wait_for_resource) import h5py @@ -15,13 +19,19 @@ def admin_context(): def generate_condensed_resource_job(resource, override=False): """Generates a condensed version of the dataset""" - path = get_resource_path(resource["id"]) - if resource["mimetype"] in DC_MIME_TYPES: - wait_for_resource(resource["id"]) - cond = path.with_name(path.name + "_condensed.rtdc") - if not cond.exists() or override: + rid = resource["id"] + wait_for_resource(rid) + mtype = resource.get('mimetype', '') + if (mtype in DC_MIME_TYPES + # Check whether the file already exists on S3 + and (override + or not s3cc.artifact_exists(resource_id=rid, + artifact="condensed"))): + # Create the condensed file in a temporary location + with tempfile.TemporaryDirectory() as ttd_name: + path_cond = pathlib.Path(ttd_name) / "condensed.rtdc" with CKANResourceFileLock( - resource_id=resource["id"], + resource_id=rid, locker_id="DCOR_generate_condensed") as fl: # The CKANResourceFileLock creates a lock file if not present # and then sets `is_locked` to True if the lock was acquired. @@ -33,20 +43,26 @@ def generate_condensed_resource_job(resource, override=False): # then several processes would end up condensing the same # resource. if fl.is_locked: - # Condense the dataset - condense(path_out=cond, - path_in=path, - ancillaries=True, - check_suffix=False) - # Determine the features that are not in the condensed - # dataset. - with h5py.File(path) as hsrc, h5py.File(cond) as hdst: - feats_src = set(hsrc["events"].keys()) - feats_dst = set(hdst["events"].keys()) - feats_upstream = sorted(feats_src - feats_dst) + with get_dc_instance(rid) as ds, \ + h5py.File(path_cond, "w") as h5_cond: + # Condense the dataset (do not store any warning + # messages during instantiation, because we are + # scared of leaking credentials). + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + condense_dataset(ds=ds, + h5_cond=h5_cond, + ancillaries=True, + warnings_list=w) + + # Determine the features that are not in the condensed + # dataset. + feats_src = set(ds.h5file["events"].keys()) + feats_dst = set(h5_cond["events"].keys()) + feats_upstream = sorted(feats_src - feats_dst) # Write DCOR basins - with RTDCWriter(cond) as hw: + with RTDCWriter(path_cond) as hw: # DCOR site_url = get_ckan_config_option("ckan.site_url") rid = resource["id"] @@ -87,28 +103,11 @@ def generate_condensed_resource_job(resource, override=False): basin_descr="Public resource access via HTTP", basin_feats=feats_upstream, verify=False) + + # Upload the condensed file to S3 + s3cc.upload_artifact(resource_id=rid, + path_artifact=path_cond, + artifact="condensed", + override=True) return True return False - - -def migrate_condensed_to_s3_job(resource): - """Migrate a condensed resource to the S3 object store""" - path = get_resource_path(resource["id"]) - path_cond = path.with_name(path.name + "_condensed.rtdc") - ds_dict = toolkit.get_action('package_show')( - admin_context(), - {'id': resource["package_id"]}) - # Perform the upload - bucket_name = get_ckan_config_option( - "dcor_object_store.bucket_name").format( - organization_id=ds_dict["organization"]["id"]) - rid = resource["id"] - sha256 = sha256sum(path_cond) - s3.upload_file( - bucket_name=bucket_name, - object_name=f"condensed/{rid[:3]}/{rid[3:6]}/{rid[6:]}", - path=path_cond, - sha256=sha256, - private=ds_dict["private"], - override=False) - # TODO: delete the local resource after successful upload? diff --git a/ckanext/dc_serve/plugin.py b/ckanext/dc_serve/plugin.py index 838958b..8f8153a 100644 --- a/ckanext/dc_serve/plugin.py +++ b/ckanext/dc_serve/plugin.py @@ -7,11 +7,10 @@ from .cli import get_commands from . import helpers as dcor_helpers -from .jobs import generate_condensed_resource_job, migrate_condensed_to_s3_job +from .jobs import generate_condensed_resource_job from .route_funcs import dccondense, dcresource from .serve import dcserv - from dcor_shared import DC_MIME_TYPES, s3 @@ -56,7 +55,7 @@ def update_config(self, config): # IResourceController def after_resource_create(self, context, resource): """Generate condensed dataset""" - if resource.get('mimetype') in DC_MIME_TYPES: + if resource.get('mimetype') in DC_MIME_TYPES and s3.is_available(): # Generate the condensed dataset pkg_job_id = f"{resource['package_id']}_{resource['position']}_" jid_condense = pkg_job_id + "condense" @@ -64,29 +63,13 @@ def after_resource_create(self, context, resource): if not Job.exists(jid_condense, connection=ckan_redis_connect()): toolkit.enqueue_job(generate_condensed_resource_job, [resource], - title="Create condensed dataset", + title="Create condensed dataset and " + "upload it to S3", queue="dcor-long", rq_kwargs={"timeout": 3600, "job_id": jid_condense, "depends_on": [jid_symlink]}) - # Upload the condensed dataset to S3 - if s3.is_available(): - jid_condensed_s3 = pkg_job_id + "condenseds3" - toolkit.enqueue_job( - migrate_condensed_to_s3_job, - [resource], - title="Migrate condensed resource to S3 object store", - queue="dcor-normal", - rq_kwargs={"timeout": 1000, - "job_id": jid_condensed_s3, - "depends_on": [ - # symlink is general requirement - jid_symlink, - jid_condense, - ]} - ) - # IActions def get_actions(self): # Registers the custom API method @@ -100,5 +83,5 @@ def get_helpers(self): hlps = { 'dc_serve_resource_has_condensed': dcor_helpers.resource_has_condensed, - } + } return hlps diff --git a/ckanext/dc_serve/res_file_lock.py b/ckanext/dc_serve/res_file_lock.py index a5c3fb3..df182f7 100644 --- a/ckanext/dc_serve/res_file_lock.py +++ b/ckanext/dc_serve/res_file_lock.py @@ -13,6 +13,7 @@ class CKANResourceFileLock: the `is_locked` property to make sure that you have actually acquired a lock. """ + def __init__(self, resource_id, locker_id="CKAN_resource_lock"): """ diff --git a/ckanext/dc_serve/serve.py b/ckanext/dc_serve/serve.py index 40feb91..1188412 100644 --- a/ckanext/dc_serve/serve.py +++ b/ckanext/dc_serve/serve.py @@ -1,89 +1,37 @@ import atexit import functools -import warnings import ckan.logic as logic import ckan.model as model import ckan.plugins.toolkit as toolkit -import dclab -from dclab.rtdc_dataset import linker as dclab_linker -from dcor_shared import ( - DC_MIME_TYPES, get_resource_dc_config, get_resource_path, s3cc -) -import numpy as np +from dcor_shared import DC_MIME_TYPES, get_resource_dc_config, s3cc def admin_context(): return {'ignore_auth': True, 'user': 'default'} -def get_rtdc_instance(res_id, force_s3=False): - """Return instance of RTDCBase - - See Also - -------- - get_rtdc_instance_local: get file linked to data on block storage - get_rtdc_instance_local: get file with basins from S3 - """ - # First check whether we have a local file. Local files should be - # faster to access, so we favor this scenario. - path = get_resource_path(res_id) - if path.exists() and not force_s3: - return get_rtdc_instance_local(res_id) - else: - return s3cc.get_s3_dc_handle_basin_based(res_id) - - -@functools.lru_cache(maxsize=100) -def get_rtdc_instance_local(res_id): - """Return an instance of RTDCBase for the given resource identifier - - The `rid` identifier is used to resolve the uploaded .rtdc file. - Using :func:`combined_h5`, the condensed .rtdc file is merged with - this .rtdc file into a new in-memory file which is opened with dclab. - - This method is cached using an `lru_cache`, so consecutive calls - with the same identifier should be fast. - - This whole process takes approximately 20ms: - - Per Hit % Time Line Contents - 1.8 0.0 path_list = ["calibration_beads_condensed.rtdc", path_name] - 11915.4 57.4 h5io = combined_h5(path_list) - 8851.6 42.6 return dclab.rtdc_dataset.fmt_hdf5.RTDC_HDF5(h5io) - """ - path = get_resource_path(res_id) - paths = [path] - - path_condensed = path.with_name(path.name + "_condensed.rtdc") - if path_condensed.exists(): - paths.append(path_condensed) - - h5io = dclab_linker.combine_h5files(paths, external="raise") - return dclab.rtdc_dataset.fmt_hdf5.RTDC_HDF5(h5io) - - -def get_rtdc_logs(ds, from_basins=False): +def get_dc_logs(ds, from_basins=False): """Return logs of a dataset, optionally looking only in its basins""" logs = {} if from_basins: for bn in ds.basins: if bn.is_available(): - logs.update(get_rtdc_logs(bn.ds)) + logs.update(get_dc_logs(bn.ds)) else: # all the features are logs.update(dict(ds.logs)) return logs -def get_rtdc_tables(ds, from_basins=False): +def get_dc_tables(ds, from_basins=False): """Return tables of a dataset, optionally looking only in its basins""" tables = {} if from_basins: for bn in ds.basins: if bn.is_available(): - tables.update(get_rtdc_tables(bn.ds)) + tables.update(get_dc_tables(bn.ds)) else: for tab in ds.tables: tables[tab] = (ds.tables[tab].dtype.names, @@ -98,14 +46,7 @@ def dcserv(context, data_dict=None): Required key in `data_doct` are 'id' (resource id) and 'query'. Query may be one of the following: - - 'feature', in which case the 'feature' parameter must be set - to a valid feature name (e.g. `query=feature&feature=deform`). - Returns feature data. If the feature is not a scalar feature, - then 'event' (index) must also be given - (e.g. `query=feature&feature=image&event=42`). In case of - 'feature=trace', then in addition to the 'event' key, the - 'trace' key (e.g. 'trace=fl1_raw') must also be set. - - 'feature_list': a list of available features + - 'logs': dictionary of logs - 'metadata': the metadata configuration dictionary - 'size': the number of events in the dataset @@ -114,27 +55,32 @@ def dcserv(context, data_dict=None): - 'basins': list of basin dictionaries (upstream and http data) - 'trace_list': list of available traces - 'valid': whether the corresponding .rtdc file is accessible. - - 'version': which version of the API to use (defaults to 1); - If you specify '2' and the resource and condensed resources are - on S3, then no feature data is served via the API, only basins - (on S3) are specified via the "http" remote. + - 'version': which version of the API to use (defaults to 2); + + .. versionchanged: 0.15.0 + + Drop support for DCOR API version 1 The "result" value will either be a dictionary resembling RTDCBase.config (e.g. query=metadata), - a list of available features (query=feature_list), + a list of available traces (query=trace_list), or the requested data converted to a list (use numpy.asarray to convert back to a numpy array). """ if data_dict is None: data_dict = {} - data_dict.setdefault("version", "1") + data_dict.setdefault("version", "2") # Check required parameters if "query" not in data_dict: raise logic.ValidationError("Please specify 'query' parameter!") if "id" not in data_dict: raise logic.ValidationError("Please specify 'id' parameter!") - if data_dict["version"] not in ["1", "2"]: + if data_dict["version"] == "1": + raise logic.ValidationError("Version '1' of the DCOR API is not " + "supported anymore. Please use version " + "'2' instead!") + if data_dict["version"] not in ["2"]: raise logic.ValidationError("Please specify version '1' or '2'!") # Perform all authorization checks for the resource @@ -143,56 +89,40 @@ def dcserv(context, data_dict=None): data_dict={"id": data_dict["id"]}) query = data_dict["query"] - res_id = data_dict["id"] + rid = data_dict["id"] # Check whether we actually have an .rtdc dataset - if not is_rtdc_resource(res_id): + if not is_dc_resource(rid): raise logic.ValidationError( - f"Resource ID {res_id} must be an .rtdc dataset!") - - version_is_2 = data_dict["version"] == "2" + f"Resource ID {rid} must be an .rtdc dataset!") if query == "valid": - path = get_resource_path(res_id) - data = path.exists() + data = s3cc.artifact_exists(rid, artifact="resource") elif query == "metadata": - return get_resource_dc_config(res_id) + return get_resource_dc_config(rid) else: - ds = get_rtdc_instance(res_id, force_s3=version_is_2) - if query == "feature": - if version_is_2: - # We are forcing the usage of basins in the client. - raise logic.ValidationError( - "Features unavailable, use basins!") - else: - data = get_feature_data(data_dict, ds) - elif query == "feature_list": - if version_is_2: - data = [] - else: - data = ds.features_loaded + if query == "feature_list": + data = [] elif query == "logs": - data = get_rtdc_logs(ds, from_basins=version_is_2) + with s3cc.get_s3_dc_handle_basin_based(rid) as ds: + data = get_dc_logs(ds, from_basins=True) elif query == "size": - data = len(ds) + with s3cc.get_s3_dc_handle(rid) as ds: + data = len(ds) elif query == "basins": # Return all basins from the condensed file # (the S3 basins are already in there). - data = ds.basins_get_dicts() + with s3cc.get_s3_dc_handle_basin_based(rid) as ds: + data = ds.basins_get_dicts() elif query == "tables": - data = get_rtdc_tables(ds, from_basins=version_is_2) - elif query == "trace": - warnings.warn("A dc_serve client is using the 'trace' query!", - DeprecationWarning) - # backwards-compatibility - data_dict["query"] = "feature" - data_dict["feature"] = "trace" - data = get_feature_data(data_dict, ds) + with s3cc.get_s3_dc_handle_basin_based(rid) as ds: + data = get_dc_tables(ds, from_basins=True) elif query == "trace_list": - if "trace" in ds: - data = sorted(ds["trace"].keys()) - else: - data = [] + with s3cc.get_s3_dc_handle(rid) as ds: + if "trace" in ds: + data = sorted(ds["trace"].keys()) + else: + data = [] else: raise logic.ValidationError( f"Invalid query parameter '{query}'!") @@ -200,49 +130,9 @@ def dcserv(context, data_dict=None): @functools.lru_cache(maxsize=1024) -def is_rtdc_resource(res_id): +def is_dc_resource(res_id): resource = model.Resource.get(res_id) return resource.mimetype in DC_MIME_TYPES -def get_feature_data(data_dict, ds): - query = data_dict["query"] - # sanity checks - if query == "feature" and "feature" not in data_dict: - raise logic.ValidationError("Please specify 'feature' parameter!") - - feat = data_dict["feature"] - is_scalar = dclab.dfn.scalar_feature_exists(feat) - - if feat in ds.features_loaded: - if is_scalar: - data = np.array(ds[feat]).tolist() - else: - if "event" not in data_dict: - raise logic.ValidationError("Please specify 'event' for " - + f"non-scalar feature {feat}!" - ) - if feat == "trace": - data = get_trace_data(data_dict, ds) - else: - event = int(data_dict["event"]) - data = ds[feat][event].tolist() - elif not dclab.dfn.feature_exists(feat): - raise logic.ValidationError(f"Unknown feature name '{feat}'!") - else: - raise logic.ValidationError(f"Feature '{feat}' unavailable!") - return data - - -def get_trace_data(data_dict, ds): - if "trace" not in data_dict: - raise logic.ValidationError("Please specify 'trace' parameter!") - event = int(data_dict["event"]) - trace = data_dict["trace"] - - data = ds["trace"][trace][event].tolist() - return data - - -atexit.register(get_rtdc_instance_local.cache_clear) -atexit.register(is_rtdc_resource.cache_clear) +atexit.register(is_dc_resource.cache_clear) diff --git a/ckanext/dc_serve/tests/test_jobs.py b/ckanext/dc_serve/tests/test_jobs.py index 50c6336..1a988f1 100644 --- a/ckanext/dc_serve/tests/test_jobs.py +++ b/ckanext/dc_serve/tests/test_jobs.py @@ -1,4 +1,4 @@ -""" Testing background jobs +"""Testing background jobs Due to the asynchronous nature of background jobs, code that uses them needs to be handled specially when writing tests. @@ -13,7 +13,6 @@ import pytest import ckan.lib -import ckan.tests.factories as factories import dclab import numpy as np import requests @@ -34,50 +33,7 @@ @pytest.mark.usefixtures('clean_db', 'with_request_context') @mock.patch('ckan.plugins.toolkit.enqueue_job', side_effect=synchronous_enqueue_job) -def test_create_condensed_dataset_job(enqueue_job_mock, create_with_upload, - monkeypatch, ckan_config, tmpdir): - monkeypatch.setitem(ckan_config, 'ckan.storage_path', str(tmpdir)) - monkeypatch.setattr(ckan.lib.uploader, - 'get_storage_path', - lambda: str(tmpdir)) - monkeypatch.setattr( - ckanext.dcor_schemas.plugin, - 'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS', - True) - - user = factories.User() - owner_org = factories.Organization(users=[{ - 'name': user['id'], - 'capacity': 'admin' - }]) - # Note: `call_action` bypasses authorization! - # create 1st dataset - create_context = {'ignore_auth': False, - 'user': user['name'], - 'api_version': 3} - dataset = make_dataset(create_context, owner_org, activate=False) - content = (data_path / "calibration_beads_47.rtdc").read_bytes() - result = create_with_upload( - content, 'test.rtdc', - url="upload", - package_id=dataset["id"], - context=create_context, - ) - path = dcor_shared.get_resource_path(result["id"]) - cond = path.with_name(path.name + "_condensed.rtdc") - # existence of original uploaded file - assert path.exists() - # existence of condensed file - assert cond.exists() - - -# We need the dcor_depot extension to make sure that the symbolic- -# linking pipeline is used. -@pytest.mark.ckan_config('ckan.plugins', 'dcor_depot dc_serve dcor_schemas') -@pytest.mark.usefixtures('clean_db', 'with_request_context') -@mock.patch('ckan.plugins.toolkit.enqueue_job', - side_effect=synchronous_enqueue_job) -def test_upload_condensed_dataset_to_s3_job( +def test_create_condensed_dataset_job_upload_s3( enqueue_job_mock, create_with_upload, monkeypatch, ckan_config, tmpdir): monkeypatch.setitem(ckan_config, 'ckan.storage_path', str(tmpdir)) @@ -89,18 +45,7 @@ def test_upload_condensed_dataset_to_s3_job( 'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS', True) - user = factories.User() - owner_org = factories.Organization(users=[{ - 'name': user['id'], - 'capacity': 'admin' - }]) - # Note: `call_action` bypasses authorization! - # create 1st dataset - create_context = {'ignore_auth': False, - 'user': user['name'], - 'api_version': 3} ds_dict, res_dict = make_dataset( - create_context, owner_org, create_with_upload=create_with_upload, resource_path=data_path / "calibration_beads_47.rtdc", activate=True) @@ -115,13 +60,19 @@ def test_upload_condensed_dataset_to_s3_job( response = requests.get(cond_url) assert response.ok, "resource is public" assert response.status_code == 200 - # Verify SHA256sum - path = dcor_shared.get_resource_path(res_dict["id"]) - path_cond = path.with_name(path.name + "_condensed.rtdc") - dl_path = tmpdir / "calbeads.rtdc" + + # verify file validity + dl_path = pathlib.Path(tmpdir) / "calbeads.rtdc" with dl_path.open("wb") as fd: fd.write(response.content) - assert dcor_shared.sha256sum(dl_path) == dcor_shared.sha256sum(path_cond) + with dclab.new_dataset(dl_path) as ds: + assert "volume" in ds + assert np.allclose(ds["deform"][0], 0.011666297) + + # the local file path should not exist anymore since version 0.15.0 + path = dcor_shared.get_resource_path(res_dict["id"]) + path_cond = path.with_name(path.name + "_condensed.rtdc") + assert not path_cond.exists() # We need the dcor_depot extension to make sure that the symbolic- @@ -142,18 +93,7 @@ def test_upload_condensed_dataset_to_s3_job_and_verify_basin( 'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS', True) - user = factories.User() - owner_org = factories.Organization(users=[{ - 'name': user['id'], - 'capacity': 'admin' - }]) - # Note: `call_action` bypasses authorization! - # create 1st dataset - create_context = {'ignore_auth': False, - 'user': user['name'], - 'api_version': 3} ds_dict, res_dict = make_dataset( - create_context, owner_org, create_with_upload=create_with_upload, resource_path=data_path / "calibration_beads_47.rtdc", activate=True) @@ -170,7 +110,7 @@ def test_upload_condensed_dataset_to_s3_job_and_verify_basin( assert response.status_code == 200 # Download the condensed resource - dl_path = tmpdir / "calbeads.rtdc" + dl_path = pathlib.Path(tmpdir) / "calbeads.rtdc" with dl_path.open("wb") as fd: fd.write(response.content) diff --git a/ckanext/dc_serve/tests/test_route.py b/ckanext/dc_serve/tests/test_route.py index 8e02e31..67c2467 100644 --- a/ckanext/dc_serve/tests/test_route.py +++ b/ckanext/dc_serve/tests/test_route.py @@ -4,7 +4,6 @@ import ckan.common import ckan.model import ckan.tests.factories as factories -import ckan.tests.helpers as helpers import ckanext.dcor_schemas.plugin import dcor_shared @@ -32,7 +31,7 @@ def test_route_redircet_condensed_to_s3_private( 'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS', True) - user = factories.User() + user = factories.UserWithToken() user_obj = ckan.model.User.by_name(user["name"]) monkeypatch.setattr(ckan.common, 'current_user', @@ -52,37 +51,28 @@ def test_route_redircet_condensed_to_s3_private( resource_path=data_path / "calibration_beads_47.rtdc", activate=True, private=True - ) + ) rid = res_dict["id"] assert "s3_available" in res_dict assert "s3_url" in res_dict - # Remove the local resource to make sure CKAN serves the S3 URL + # Since version 0.15.0 we don't store the condensed resource locally path = dcor_shared.get_resource_path(rid) path_cond = path.with_name(path.name + "_condensed.rtdc") - assert path_cond.exists() - path_cond.unlink() + assert not path_cond.exists(), "sanity check" did = ds_dict["id"] # We should not be authorized to access the resource without API token resp0 = app.get( f"/dataset/{did}/resource/{rid}/condensed.rtdc", status=404 - ) - assert len(resp0.history) == 0 - - # Try again with token - data = helpers.call_action( - u"api_token_create", - context={u"model": ckan.model, u"user": user[u"name"]}, - user=user[u"name"], - name=u"token-name", ) + assert len(resp0.history) == 0 resp = app.get( f"/dataset/{did}/resource/{rid}/condensed.rtdc", - headers={u"authorization": data["token"]}, - ) + headers={u"authorization": user["token"]}, + ) endpoint = dcor_shared.get_ckan_config_option( "dcor_object_store.endpoint_url") @@ -114,7 +104,7 @@ def test_route_condensed_to_s3_public( 'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS', True) - user = factories.User() + user = factories.UserWithToken() user_obj = ckan.model.User.by_name(user["name"]) monkeypatch.setattr(ckan.common, 'current_user', @@ -137,16 +127,15 @@ def test_route_condensed_to_s3_public( assert "s3_available" in res_dict assert "s3_url" in res_dict - # Remove the local resource to make sure CKAN serves the S3 URL + # Since version 0.15.0 we don't store the condensed resource locally path = dcor_shared.get_resource_path(rid) path_cond = path.with_name(path.name + "_condensed.rtdc") - assert path_cond.exists() - path_cond.unlink() + assert not path_cond.exists(), "sanity check" did = ds_dict["id"] resp = app.get( f"/dataset/{did}/resource/{rid}/condensed.rtdc", - ) + ) endpoint = dcor_shared.get_ckan_config_option( "dcor_object_store.endpoint_url") @@ -175,7 +164,7 @@ def test_route_redircet_resource_to_s3_private( 'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS', True) - user = factories.User() + user = factories.UserWithToken() user_obj = ckan.model.User.by_name(user["name"]) monkeypatch.setattr(ckan.common, 'current_user', @@ -195,36 +184,28 @@ def test_route_redircet_resource_to_s3_private( resource_path=data_path / "calibration_beads_47.rtdc", activate=True, private=True - ) + ) rid = res_dict["id"] assert "s3_available" in res_dict assert "s3_url" in res_dict - # Remove the local resource to make sure CKAN serves the S3 URL + # Since version 0.15.0 we don't store the condensed resource locally path = dcor_shared.get_resource_path(rid) - assert path.exists() - path.unlink() + path_cond = path.with_name(path.name + "_condensed.rtdc") + assert not path_cond.exists(), "sanity check" did = ds_dict["id"] # We should not be authorized to access the resource without API token resp0 = app.get( f"/dataset/{did}/resource/{rid}/download/random_name", status=404 - ) - assert len(resp0.history) == 0 - - # Try again with token - data = helpers.call_action( - u"api_token_create", - context={u"model": ckan.model, u"user": user[u"name"]}, - user=user[u"name"], - name=u"token-name", ) + assert len(resp0.history) == 0 resp = app.get( f"/dataset/{did}/resource/{rid}/download/random_name", - headers={u"authorization": data["token"]}, - ) + headers={u"authorization": user["token"]}, + ) endpoint = dcor_shared.get_ckan_config_option( "dcor_object_store.endpoint_url") diff --git a/ckanext/dc_serve/tests/test_serve.py b/ckanext/dc_serve/tests/test_serve.py index 4b8a280..f289ab0 100644 --- a/ckanext/dc_serve/tests/test_serve.py +++ b/ckanext/dc_serve/tests/test_serve.py @@ -5,14 +5,12 @@ import shutil import uuid -import ckan.model as model import ckan.common import ckan.tests.factories as factories -import ckan.tests.helpers as helpers import ckanext.dcor_schemas import dclab +from dclab.rtdc_dataset import fmt_http import h5py -import numpy as np import pytest @@ -25,37 +23,23 @@ @pytest.mark.ckan_config('ckan.plugins', 'dcor_schemas dc_serve') @pytest.mark.usefixtures('clean_db', 'with_request_context') def test_auth_forbidden(app, create_with_upload): - user = factories.User() - user2 = factories.User() - owner_org = factories.Organization(users=[{ - 'name': user['id'], - 'capacity': 'admin' - }]) - # Note: `call_action` bypasses authorization! - create_context = {'ignore_auth': False, - 'user': user['name'], 'api_version': 3} + user2 = factories.UserWithToken() + # create a dataset - dataset, res = make_dataset( - create_context, owner_org, + _, res_dict = make_dataset( create_with_upload=create_with_upload, resource_path=data_path / "calibration_beads_47.rtdc", activate=True, private=True) - # taken from ckanext/example_iapitoken/tests/test_plugin.py - data2 = helpers.call_action( - u"api_token_create", - context={u"model": model, u"user": user2[u"name"]}, - user=user2[u"name"], - name=u"token-name", - ) + resp = app.get( "/api/3/action/dcserv", - params={"id": res["id"], + params={"id": res_dict["id"], "query": "valid", }, - headers={u"authorization": data2["token"]}, + headers={u"authorization": user2["token"]}, status=403 - ) + ) jres = json.loads(resp.body) assert not jres["success"] assert "not authorized to read resource" in jres["error"]["message"] @@ -64,36 +48,30 @@ def test_auth_forbidden(app, create_with_upload): @pytest.mark.ckan_config('ckan.plugins', 'dcor_schemas dc_serve') @pytest.mark.usefixtures('clean_db', 'with_request_context') def test_api_dcserv_error(app, create_with_upload): - user = factories.User() + user = factories.UserWithToken() owner_org = factories.Organization(users=[{ 'name': user['id'], 'capacity': 'admin' }]) # Note: `call_action` bypasses authorization! create_context = {'ignore_auth': False, - 'user': user['name'], 'api_version': 3} + 'user': user['name'], + 'api_version': 3} # create a dataset - dataset, res = make_dataset( + ds_dict, res_dict = make_dataset( create_context, owner_org, create_with_upload=create_with_upload, resource_path=data_path / "calibration_beads_47.rtdc", activate=True) - # taken from ckanext/example_iapitoken/tests/test_plugin.py - data = helpers.call_action( - u"api_token_create", - context={u"model": model, u"user": user[u"name"]}, - user=user[u"name"], - name=u"token-name", - ) # missing query parameter resp = app.get( "/api/3/action/dcserv", - params={"id": res["id"], + params={"id": res_dict["id"], }, - headers={u"authorization": data["token"]}, + headers={u"authorization": user["token"]}, status=409 - ) + ) jres = json.loads(resp.body) assert not jres["success"] assert "Please specify 'query' parameter" in jres["error"]["message"] @@ -103,9 +81,9 @@ def test_api_dcserv_error(app, create_with_upload): "/api/3/action/dcserv", params={"query": "feature", }, - headers={u"authorization": data["token"]}, + headers={u"authorization": user["token"]}, status=409 - ) + ) jres = json.loads(resp.body) assert not jres["success"] assert "Please specify 'id' parameter" in jres["error"]["message"] @@ -117,9 +95,9 @@ def test_api_dcserv_error(app, create_with_upload): params={"query": "feature_list", "id": bid, }, - headers={u"authorization": data["token"]}, + headers={u"authorization": user["token"]}, status=404 - ) + ) jres = json.loads(resp.body) assert not jres["success"] assert "Not found" in jres["error"]["message"] @@ -128,157 +106,31 @@ def test_api_dcserv_error(app, create_with_upload): resp = app.get( "/api/3/action/dcserv", params={"query": "peter", - "id": res["id"], + "id": res_dict["id"], }, - headers={u"authorization": data["token"]}, + headers={u"authorization": user["token"]}, status=409 - ) - jres = json.loads(resp.body) - assert not jres["success"] - assert "Invalid query parameter 'peter'" in jres["error"]["message"] - - -@pytest.mark.ckan_config('ckan.plugins', 'dcor_schemas dc_serve') -@pytest.mark.usefixtures('clean_db', 'with_request_context') -def test_api_dcserv_error_feature(app, create_with_upload): - user = factories.User() - owner_org = factories.Organization(users=[{ - 'name': user['id'], - 'capacity': 'admin' - }]) - # Note: `call_action` bypasses authorization! - create_context = {'ignore_auth': False, - 'user': user['name'], 'api_version': 3} - # create a dataset - dataset, res = make_dataset( - create_context, owner_org, - create_with_upload=create_with_upload, - resource_path=data_path / "calibration_beads_47.rtdc", - activate=True) - # taken from ckanext/example_iapitoken/tests/test_plugin.py - data = helpers.call_action( - u"api_token_create", - context={u"model": model, u"user": user[u"name"]}, - user=user[u"name"], - name=u"token-name", ) - - # missing feature parameter - resp = app.get( - "/api/3/action/dcserv", - params={"id": res["id"], - "query": "feature", - }, - headers={u"authorization": data["token"]}, - status=409 - ) - jres = json.loads(resp.body) - assert not jres["success"] - assert "Please specify 'feature' parameter" in jres["error"]["message"] - - # missing event parameter - resp = app.get( - "/api/3/action/dcserv", - params={"id": res["id"], - "query": "feature", - "feature": "image", - }, - headers={u"authorization": data["token"]}, - status=409 - ) jres = json.loads(resp.body) assert not jres["success"] - assert "Please specify 'event' for non-scalar" in jres["error"]["message"] - - # bad feature name - resp = app.get( - "/api/3/action/dcserv", - params={"id": res["id"], - "query": "feature", - "feature": "peter", - }, - headers={u"authorization": data["token"]}, - status=409 - ) - jres = json.loads(resp.body) - assert not jres["success"] - assert "Unknown feature name 'peter'" in jres["error"]["message"] - - # feature unavailable - resp = app.get( - "/api/3/action/dcserv", - params={"id": res["id"], - "query": "feature", - "feature": "ml_score_xyz", - }, - headers={u"authorization": data["token"]}, - status=409 - ) - jres = json.loads(resp.body) - assert not jres["success"] - assert "Feature 'ml_score_xyz' unavailable" in jres["error"]["message"] - - -@pytest.mark.ckan_config('ckan.plugins', 'dcor_schemas dc_serve') -@pytest.mark.usefixtures('clean_db', 'with_request_context') -def test_api_dcserv_error_feature_trace(app, create_with_upload): - user = factories.User() - owner_org = factories.Organization(users=[{ - 'name': user['id'], - 'capacity': 'admin' - }]) - # Note: `call_action` bypasses authorization! - create_context = {'ignore_auth': False, - 'user': user['name'], 'api_version': 3} - # create a dataset - dataset, res = make_dataset( - create_context, owner_org, - create_with_upload=create_with_upload, - resource_path=data_path / "calibration_beads_47.rtdc", - activate=True) - # taken from ckanext/example_iapitoken/tests/test_plugin.py - data = helpers.call_action( - u"api_token_create", - context={u"model": model, u"user": user[u"name"]}, - user=user[u"name"], - name=u"token-name", - ) - - # missing trace parameter - resp = app.get( - "/api/3/action/dcserv", - params={"id": res["id"], - "query": "feature", - "feature": "trace", - "event": 2, - }, - headers={u"authorization": data["token"]}, - status=409 - ) - jres = json.loads(resp.body) - assert not jres["success"] - assert "Please specify 'trace' parameter" in jres["error"]["message"] - - # missing event parameter - resp = app.get( - "/api/3/action/dcserv", - params={"id": res["id"], - "query": "feature", - "feature": "trace", - "trace": "fl1_raw" - }, - headers={u"authorization": data["token"]}, - status=409 - ) - jres = json.loads(resp.body) - assert not jres["success"] - assert "lease specify 'event' for non-scalar" in jres["error"]["message"] + assert "Invalid query parameter 'peter'" in jres["error"]["message"] -@pytest.mark.ckan_config('ckan.plugins', 'dcor_schemas dc_serve') +@pytest.mark.ckan_config('ckan.plugins', 'dcor_depot dcor_schemas dc_serve') @pytest.mark.usefixtures('clean_db', 'with_request_context') -def test_api_dcserv_basin(app, create_with_upload, tmp_path): - user = factories.User() +@mock.patch('ckan.plugins.toolkit.enqueue_job', + side_effect=synchronous_enqueue_job) +def test_api_dcserv_basin(enqueue_job_mock, app, create_with_upload, + monkeypatch, ckan_config, tmpdir): + monkeypatch.setitem(ckan_config, 'ckan.storage_path', str(tmpdir)) + monkeypatch.setattr(ckan.lib.uploader, + 'get_storage_path', + lambda: str(tmpdir)) + monkeypatch.setattr( + ckanext.dcor_schemas.plugin, + 'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS', + True) + user = factories.UserWithToken() owner_org = factories.Organization(users=[{ 'name': user['id'], 'capacity': 'admin' @@ -289,45 +141,47 @@ def test_api_dcserv_basin(app, create_with_upload, tmp_path): 'api_version': 3} # create a dataset path_orig = data_path / "calibration_beads_47.rtdc" - path_test = tmp_path / "calibration_beads_47_test.rtdc" + path_test = pathlib.Path(tmpdir) / "calibration_beads_47_test.rtdc" shutil.copy2(path_orig, path_test) with dclab.RTDCWriter(path_test) as hw: hw.store_basin(basin_name="example basin", - basin_type="file", - basin_format="hdf5", - basin_locs=[path_orig], + basin_type="remote", + basin_format="http", + basin_locs=["http://example.org/peter/pan.rtdc"], basin_descr="an example test basin", + verify=False, # does not exist ) - dataset, res = make_dataset( + ds_dict, res_dict = make_dataset( create_context, owner_org, create_with_upload=create_with_upload, resource_path=path_test, - activate=True) - - # taken from ckanext/example_iapitoken/tests/test_plugin.py - data = helpers.call_action( - u"api_token_create", - context={u"model": model, u"user": user[u"name"]}, - user=user[u"name"], - name=u"token-name", + activate=True, + private=False, ) resp = app.get( "/api/3/action/dcserv", - params={"id": res["id"], + params={"id": res_dict["id"], "query": "basins", }, - headers={u"authorization": data["token"]}, + headers={u"authorization": user["token"]}, status=200 - ) + ) + jres = json.loads(resp.body) assert jres["success"] - basin = jres["result"][0] - assert basin["name"] == "example basin" - assert basin["type"] == "file" - assert basin["format"] == "hdf5" - assert basin["description"] == "an example test basin" + # Fetch the http resource basin + for bn_dict in jres["result"]: + if bn_dict["name"] == "resource": + break + + with fmt_http.RTDC_HTTP(bn_dict["urls"][0]) as ds: + basin = ds.basins[0].as_dict() + assert basin["basin_name"] == "example basin" + assert basin["basin_type"] == "remote" + assert basin["basin_format"] == "http" + assert basin["basin_descr"] == "an example test basin" @pytest.mark.ckan_config('ckan.plugins', 'dcor_depot dcor_schemas dc_serve') @@ -345,11 +199,7 @@ def test_api_dcserv_basin_v2(enqueue_job_mock, app, create_with_upload, 'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS', True) - user = factories.User() - owner_org = factories.Organization(users=[{ - 'name': user['id'], - 'capacity': 'admin' - }]) + user = factories.UserWithToken() user_obj = ckan.model.User.by_name(user["name"]) monkeypatch.setattr(ckan.common, 'current_user', @@ -359,13 +209,13 @@ def test_api_dcserv_basin_v2(enqueue_job_mock, app, create_with_upload, 'user': user['name'], 'api_version': 3} - dataset, res = make_dataset( - copy.deepcopy(create_context), owner_org, + _, res_dict = make_dataset( + copy.deepcopy(create_context), create_with_upload=create_with_upload, resource_path=data_path / "calibration_beads_47.rtdc", activate=True) - s3_url = res["s3_url"] + s3_url = res_dict["s3_url"] # create a dataset path_orig = data_path / "calibration_beads_47.rtdc" @@ -382,7 +232,7 @@ def test_api_dcserv_basin_v2(enqueue_job_mock, app, create_with_upload, basin_format="s3", basin_locs=[s3_url], basin_descr="an example test basin", - verify=False, # we don't have s3fs installed + verify=True, ) del hw.h5file["events/deform"] @@ -390,44 +240,22 @@ def test_api_dcserv_basin_v2(enqueue_job_mock, app, create_with_upload, # sanity check assert "deform" not in h5["events"] - dataset, res = make_dataset( - copy.deepcopy(create_context), owner_org, + ds_dict, res_dict = make_dataset( + copy.deepcopy(create_context), create_with_upload=create_with_upload, resource_path=path_test, activate=True) - # taken from ckanext/example_iapitoken/tests/test_plugin.py - data = helpers.call_action( - u"api_token_create", - context={u"model": model, u"user": user[u"name"]}, - user=user[u"name"], - name=u"token-name", - ) - - # Version 1 API does serve all features - resp = app.get( - "/api/3/action/dcserv", - params={"id": res["id"], - "query": "feature_list", - "version": "1", - }, - headers={u"authorization": data["token"]}, - status=200 - ) - jres = json.loads(resp.body) - assert jres["success"] - assert len(jres["result"]) == 37 - # Version 2 API does not serve any features resp = app.get( "/api/3/action/dcserv", - params={"id": res["id"], + params={"id": res_dict["id"], "query": "feature_list", "version": "2", }, - headers={u"authorization": data["token"]}, + headers={u"authorization": user["token"]}, status=200 - ) + ) jres = json.loads(resp.body) assert jres["success"] assert len(jres["result"]) == 0 @@ -435,27 +263,27 @@ def test_api_dcserv_basin_v2(enqueue_job_mock, app, create_with_upload, # Version 2 API does not serve any features resp = app.get( "/api/3/action/dcserv", - params={"id": res["id"], + params={"id": res_dict["id"], "query": "feature", "feature": "area_um", "version": "2", }, - headers={u"authorization": data["token"]}, + headers={u"authorization": user["token"]}, status=409 # ValidationError - ) + ) jres = json.loads(resp.body) assert not jres["success"] # Version two API serves basins resp = app.get( "/api/3/action/dcserv", - params={"id": res["id"], + params={"id": res_dict["id"], "query": "basins", "version": "2", }, - headers={u"authorization": data["token"]}, + headers={u"authorization": user["token"]}, status=200 - ) + ) jres = json.loads(resp.body) assert jres["success"] @@ -472,48 +300,20 @@ def test_api_dcserv_basin_v2(enqueue_job_mock, app, create_with_upload, @pytest.mark.ckan_config('ckan.plugins', 'dcor_schemas dc_serve') @pytest.mark.usefixtures('clean_db', 'with_request_context') -def test_api_dcserv_feature(app, create_with_upload): - user = factories.User() - owner_org = factories.Organization(users=[{ - 'name': user['id'], - 'capacity': 'admin' - }]) - # Note: `call_action` bypasses authorization! - create_context = {'ignore_auth': False, - 'user': user['name'], 'api_version': 3} - # create a dataset - dataset, res = make_dataset( - create_context, owner_org, - create_with_upload=create_with_upload, - resource_path=data_path / "calibration_beads_47.rtdc", - activate=True) - # taken from ckanext/example_iapitoken/tests/test_plugin.py - data = helpers.call_action( - u"api_token_create", - context={u"model": model, u"user": user[u"name"]}, - user=user[u"name"], - name=u"token-name", - ) - - resp = app.get( - "/api/3/action/dcserv", - params={"id": res["id"], - "query": "feature", - "feature": "deform", - }, - headers={u"authorization": data["token"]}, - status=200 - ) - jres = json.loads(resp.body) - assert jres["success"] - with dclab.new_dataset(data_path / "calibration_beads_47.rtdc") as ds: - assert np.allclose(ds["deform"], jres["result"]) - +@mock.patch('ckan.plugins.toolkit.enqueue_job', + side_effect=synchronous_enqueue_job) +def test_api_dcserv_feature_list(enqueue_job_mock, app, ckan_config, + create_with_upload, monkeypatch, tmpdir): + monkeypatch.setitem(ckan_config, 'ckan.storage_path', str(tmpdir)) + monkeypatch.setattr(ckan.lib.uploader, + 'get_storage_path', + lambda: str(tmpdir)) + monkeypatch.setattr( + ckanext.dcor_schemas.plugin, + 'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS', + True) -@pytest.mark.ckan_config('ckan.plugins', 'dcor_schemas dc_serve') -@pytest.mark.usefixtures('clean_db', 'with_request_context') -def test_api_dcserv_feature_list(app, create_with_upload): - user = factories.User() + user = factories.UserWithToken() owner_org = factories.Organization(users=[{ 'name': user['id'], 'capacity': 'admin' @@ -522,78 +322,41 @@ def test_api_dcserv_feature_list(app, create_with_upload): create_context = {'ignore_auth': False, 'user': user['name'], 'api_version': 3} # create a dataset - dataset, res = make_dataset( + ds_dict, res_dict = make_dataset( create_context, owner_org, create_with_upload=create_with_upload, resource_path=data_path / "calibration_beads_47.rtdc", activate=True) - # taken from ckanext/example_iapitoken/tests/test_plugin.py - data = helpers.call_action( - u"api_token_create", - context={u"model": model, u"user": user[u"name"]}, - user=user[u"name"], - name=u"token-name", - ) resp = app.get( "/api/3/action/dcserv", - params={"id": res["id"], + params={"id": res_dict["id"], "query": "feature_list", }, - headers={u"authorization": data["token"]}, + headers={u"authorization": user["token"]}, status=200 - ) - jres = json.loads(resp.body) - assert jres["success"] - assert "deform" in jres["result"] - - -@pytest.mark.ckan_config('ckan.plugins', 'dcor_schemas dc_serve') -@pytest.mark.usefixtures('clean_db', 'with_request_context') -def test_api_dcserv_feature_trace(app, create_with_upload): - user = factories.User() - owner_org = factories.Organization(users=[{ - 'name': user['id'], - 'capacity': 'admin' - }]) - # Note: `call_action` bypasses authorization! - create_context = {'ignore_auth': False, - 'user': user['name'], 'api_version': 3} - # create a dataset - dataset, res = make_dataset( - create_context, owner_org, - create_with_upload=create_with_upload, - resource_path=data_path / "calibration_beads_47.rtdc", - activate=True) - # taken from ckanext/example_iapitoken/tests/test_plugin.py - data = helpers.call_action( - u"api_token_create", - context={u"model": model, u"user": user[u"name"]}, - user=user[u"name"], - name=u"token-name", ) - - resp = app.get( - "/api/3/action/dcserv", - params={"id": res["id"], - "query": "feature", - "feature": "trace", - "trace": "fl1_raw", - "event": 1, - }, - headers={u"authorization": data["token"]}, - status=200 - ) jres = json.loads(resp.body) assert jres["success"] - with dclab.new_dataset(data_path / "calibration_beads_47.rtdc") as ds: - assert np.allclose(ds["trace"]["fl1_raw"][1], jres["result"]) + assert len(jres["result"]) == 0, "deprecated" @pytest.mark.ckan_config('ckan.plugins', 'dcor_schemas dc_serve') @pytest.mark.usefixtures('clean_db', 'with_request_context') -def test_api_dcserv_logs(app, create_with_upload): - user = factories.User() +@mock.patch('ckan.plugins.toolkit.enqueue_job', + side_effect=synchronous_enqueue_job) +def test_api_dcserv_logs(enqueue_job_mock, app, ckan_config, + create_with_upload, monkeypatch, tmpdir): + monkeypatch.setitem(ckan_config, 'ckan.storage_path', str(tmpdir)) + monkeypatch.setattr(ckan.lib.uploader, + 'get_storage_path', + lambda: str(tmpdir)) + monkeypatch.setattr( + ckanext.dcor_schemas.plugin, + 'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS', + True) + + user = factories.UserWithToken() owner_org = factories.Organization(users=[{ 'name': user['id'], 'capacity': 'admin' @@ -602,27 +365,20 @@ def test_api_dcserv_logs(app, create_with_upload): create_context = {'ignore_auth': False, 'user': user['name'], 'api_version': 3} # create a dataset - dataset, res = make_dataset( + ds_dict, res_dict = make_dataset( create_context, owner_org, create_with_upload=create_with_upload, resource_path=data_path / "calibration_beads_47.rtdc", activate=True) - # taken from ckanext/example_iapitoken/tests/test_plugin.py - data = helpers.call_action( - u"api_token_create", - context={u"model": model, u"user": user[u"name"]}, - user=user[u"name"], - name=u"token-name", - ) resp = app.get( "/api/3/action/dcserv", - params={"id": res["id"], + params={"id": res_dict["id"], "query": "logs", }, - headers={u"authorization": data["token"]}, + headers={u"authorization": user["token"]}, status=200 - ) + ) jres = json.loads(resp.body) assert jres["success"] assert jres["result"]["hans"][0] == "peter" @@ -643,7 +399,7 @@ def test_api_dcserv_metadata(enqueue_job_mock, app, create_with_upload, 'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS', True) - user = factories.User() + user = factories.UserWithToken() owner_org = factories.Organization(users=[{ 'name': user['id'], 'capacity': 'admin' @@ -653,36 +409,41 @@ def test_api_dcserv_metadata(enqueue_job_mock, app, create_with_upload, 'user': user['name'], 'api_version': 3} # create a dataset - dataset, res = make_dataset( + ds_dict, res_dict = make_dataset( create_context, owner_org, create_with_upload=create_with_upload, resource_path=data_path / "calibration_beads_47.rtdc", activate=True) - # taken from ckanext/example_iapitoken/tests/test_plugin.py - data = helpers.call_action( - u"api_token_create", - context={u"model": model, u"user": user[u"name"]}, - user=user[u"name"], - name=u"token-name", - ) resp = app.get( "/api/3/action/dcserv", - params={"id": res["id"], + params={"id": res_dict["id"], "query": "metadata", }, - headers={u"authorization": data["token"]}, + headers={u"authorization": user["token"]}, status=200 - ) + ) jres = json.loads(resp.body) assert jres["success"] assert jres["result"]["setup"]["channel width"] == 20 -@pytest.mark.ckan_config('ckan.plugins', 'dcor_schemas dc_serve') +@pytest.mark.ckan_config('ckan.plugins', 'dcor_depot dcor_schemas dc_serve') @pytest.mark.usefixtures('clean_db', 'with_request_context') -def test_api_dcserv_size(app, create_with_upload): - user = factories.User() +@mock.patch('ckan.plugins.toolkit.enqueue_job', + side_effect=synchronous_enqueue_job) +def test_api_dcserv_size(enqueue_job_mock, app, create_with_upload, + monkeypatch, ckan_config, tmpdir): + monkeypatch.setitem(ckan_config, 'ckan.storage_path', str(tmpdir)) + monkeypatch.setattr(ckan.lib.uploader, + 'get_storage_path', + lambda: str(tmpdir)) + monkeypatch.setattr( + ckanext.dcor_schemas.plugin, + 'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS', + True) + + user = factories.UserWithToken() owner_org = factories.Organization(users=[{ 'name': user['id'], 'capacity': 'admin' @@ -691,27 +452,20 @@ def test_api_dcserv_size(app, create_with_upload): create_context = {'ignore_auth': False, 'user': user['name'], 'api_version': 3} # create a dataset - dataset, res = make_dataset( + ds_dict, res_dict = make_dataset( create_context, owner_org, create_with_upload=create_with_upload, resource_path=data_path / "calibration_beads_47.rtdc", activate=True) - # taken from ckanext/example_iapitoken/tests/test_plugin.py - data = helpers.call_action( - u"api_token_create", - context={u"model": model, u"user": user[u"name"]}, - user=user[u"name"], - name=u"token-name", - ) resp = app.get( "/api/3/action/dcserv", - params={"id": res["id"], + params={"id": res_dict["id"], "query": "size", }, - headers={u"authorization": data["token"]}, + headers={u"authorization": user["token"]}, status=200 - ) + ) jres = json.loads(resp.body) assert jres["success"] with dclab.new_dataset(data_path / "calibration_beads_47.rtdc") as ds: @@ -720,8 +474,20 @@ def test_api_dcserv_size(app, create_with_upload): @pytest.mark.ckan_config('ckan.plugins', 'dcor_schemas dc_serve') @pytest.mark.usefixtures('clean_db', 'with_request_context') -def test_api_dcserv_tables(app, create_with_upload): - user = factories.User() +@mock.patch('ckan.plugins.toolkit.enqueue_job', + side_effect=synchronous_enqueue_job) +def test_api_dcserv_tables(enqueue_job_mock, app, create_with_upload, + monkeypatch, ckan_config, tmpdir): + monkeypatch.setitem(ckan_config, 'ckan.storage_path', str(tmpdir)) + monkeypatch.setattr(ckan.lib.uploader, + 'get_storage_path', + lambda: str(tmpdir)) + monkeypatch.setattr( + ckanext.dcor_schemas.plugin, + 'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS', + True) + + user = factories.UserWithToken() owner_org = factories.Organization(users=[{ 'name': user['id'], 'capacity': 'admin' @@ -730,27 +496,20 @@ def test_api_dcserv_tables(app, create_with_upload): create_context = {'ignore_auth': False, 'user': user['name'], 'api_version': 3} # create a dataset - dataset, res = make_dataset( + ds_dict, res_dict = make_dataset( create_context, owner_org, create_with_upload=create_with_upload, resource_path=data_path / "cytoshot_blood.rtdc", activate=True) - # taken from ckanext/example_iapitoken/tests/test_plugin.py - data = helpers.call_action( - u"api_token_create", - context={u"model": model, u"user": user[u"name"]}, - user=user[u"name"], - name=u"token-name", - ) resp = app.get( "/api/3/action/dcserv", - params={"id": res["id"], + params={"id": res_dict["id"], "query": "tables", }, - headers={u"authorization": data["token"]}, + headers={u"authorization": user["token"]}, status=200 - ) + ) jres = json.loads(resp.body) assert jres["success"] assert "src_cytoshot_monitor" in jres["result"] @@ -758,10 +517,22 @@ def test_api_dcserv_tables(app, create_with_upload): assert "brightness" in names -@pytest.mark.ckan_config('ckan.plugins', 'dcor_schemas dc_serve') +@pytest.mark.ckan_config('ckan.plugins', 'dcor_depot dcor_schemas dc_serve') @pytest.mark.usefixtures('clean_db', 'with_request_context') -def test_api_dcserv_trace_list(app, create_with_upload): - user = factories.User() +@mock.patch('ckan.plugins.toolkit.enqueue_job', + side_effect=synchronous_enqueue_job) +def test_api_dcserv_trace_list(enqueue_job_mock, app, create_with_upload, + monkeypatch, ckan_config, tmpdir): + monkeypatch.setitem(ckan_config, 'ckan.storage_path', str(tmpdir)) + monkeypatch.setattr(ckan.lib.uploader, + 'get_storage_path', + lambda: str(tmpdir)) + monkeypatch.setattr( + ckanext.dcor_schemas.plugin, + 'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS', + True) + + user = factories.UserWithToken() owner_org = factories.Organization(users=[{ 'name': user['id'], 'capacity': 'admin' @@ -770,27 +541,20 @@ def test_api_dcserv_trace_list(app, create_with_upload): create_context = {'ignore_auth': False, 'user': user['name'], 'api_version': 3} # create a dataset - dataset, res = make_dataset( + _, res_dict = make_dataset( create_context, owner_org, create_with_upload=create_with_upload, resource_path=data_path / "calibration_beads_47.rtdc", activate=True) - # taken from ckanext/example_iapitoken/tests/test_plugin.py - data = helpers.call_action( - u"api_token_create", - context={u"model": model, u"user": user[u"name"]}, - user=user[u"name"], - name=u"token-name", - ) resp = app.get( "/api/3/action/dcserv", - params={"id": res["id"], + params={"id": res_dict["id"], "query": "trace_list", }, - headers={u"authorization": data["token"]}, + headers={u"authorization": user["token"]}, status=200 - ) + ) jres = json.loads(resp.body) assert jres["success"] with dclab.new_dataset(data_path / "calibration_beads_47.rtdc") as ds: @@ -798,10 +562,22 @@ def test_api_dcserv_trace_list(app, create_with_upload): assert key in jres["result"] -@pytest.mark.ckan_config('ckan.plugins', 'dcor_schemas dc_serve') +@pytest.mark.ckan_config('ckan.plugins', 'dcor_depot dcor_schemas dc_serve') @pytest.mark.usefixtures('clean_db', 'with_request_context') -def test_api_dcserv_valid(app, create_with_upload): - user = factories.User() +@mock.patch('ckan.plugins.toolkit.enqueue_job', + side_effect=synchronous_enqueue_job) +def test_api_dcserv_valid(enqueue_job_mock, app, create_with_upload, + monkeypatch, ckan_config, tmpdir): + monkeypatch.setitem(ckan_config, 'ckan.storage_path', str(tmpdir)) + monkeypatch.setattr(ckan.lib.uploader, + 'get_storage_path', + lambda: str(tmpdir)) + monkeypatch.setattr( + ckanext.dcor_schemas.plugin, + 'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS', + True) + + user = factories.UserWithToken() owner_org = factories.Organization(users=[{ 'name': user['id'], 'capacity': 'admin' @@ -810,27 +586,20 @@ def test_api_dcserv_valid(app, create_with_upload): create_context = {'ignore_auth': False, 'user': user['name'], 'api_version': 3} # create a dataset - dataset, res = make_dataset( + _, res_dict = make_dataset( create_context, owner_org, create_with_upload=create_with_upload, resource_path=data_path / "calibration_beads_47.rtdc", activate=True) - # taken from ckanext/example_iapitoken/tests/test_plugin.py - data = helpers.call_action( - u"api_token_create", - context={u"model": model, u"user": user[u"name"]}, - user=user[u"name"], - name=u"token-name", - ) resp = app.get( "/api/3/action/dcserv", - params={"id": res["id"], + params={"id": res_dict["id"], "query": "valid", }, - headers={u"authorization": data["token"]}, + headers={u"authorization": user["token"]}, status=200 - ) + ) jres = json.loads(resp.body) assert jres["success"] assert jres["result"] diff --git a/setup.py b/setup.py index 3ab3165..6e1b9ec 100644 --- a/setup.py +++ b/setup.py @@ -42,7 +42,7 @@ namespace_packages=['ckanext'], install_requires=[ # the "ckan" dependency is implied - "dclab[http]>=0.56.0", + "dclab[http]>=0.57.7", "dcor_shared>=0.8.0", ], include_package_data=True,