-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_jobs.py
190 lines (170 loc) · 7.1 KB
/
test_jobs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
""" Testing background jobs
Due to the asynchronous nature of background jobs, code that uses them needs
to be handled specially when writing tests.
A common approach is to use the mock package to replace the
ckan.plugins.toolkit.enqueue_job function with a mock that executes jobs
synchronously instead of asynchronously
"""
from unittest import mock
import pathlib
import pytest
import ckan.lib
import ckan.tests.factories as factories
import dclab
import numpy as np
import requests
import ckanext.dcor_schemas.plugin
import dcor_shared
from dcor_shared.testing import make_dataset, synchronous_enqueue_job
data_path = pathlib.Path(__file__).parent / "data"
# We need the dcor_depot extension to make sure that the symbolic-
# linking pipeline is used.
@pytest.mark.ckan_config('ckan.plugins', 'dcor_depot dc_serve dcor_schemas')
@pytest.mark.usefixtures('clean_db', 'with_request_context')
@mock.patch('ckan.plugins.toolkit.enqueue_job',
side_effect=synchronous_enqueue_job)
def test_create_condensed_dataset_job(enqueue_job_mock, create_with_upload,
monkeypatch, ckan_config, tmpdir):
monkeypatch.setitem(ckan_config, 'ckan.storage_path', str(tmpdir))
monkeypatch.setattr(ckan.lib.uploader,
'get_storage_path',
lambda: str(tmpdir))
monkeypatch.setattr(
ckanext.dcor_schemas.plugin,
'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS',
True)
user = factories.User()
owner_org = factories.Organization(users=[{
'name': user['id'],
'capacity': 'admin'
}])
# Note: `call_action` bypasses authorization!
# create 1st dataset
create_context = {'ignore_auth': False,
'user': user['name'],
'api_version': 3}
dataset = make_dataset(create_context, owner_org, activate=False)
content = (data_path / "calibration_beads_47.rtdc").read_bytes()
result = create_with_upload(
content, 'test.rtdc',
url="upload",
package_id=dataset["id"],
context=create_context,
)
path = dcor_shared.get_resource_path(result["id"])
cond = path.with_name(path.name + "_condensed.rtdc")
# existence of original uploaded file
assert path.exists()
# existence of condensed file
assert cond.exists()
# We need the dcor_depot extension to make sure that the symbolic-
# linking pipeline is used.
@pytest.mark.ckan_config('ckan.plugins', 'dcor_depot dc_serve dcor_schemas')
@pytest.mark.usefixtures('clean_db', 'with_request_context')
@mock.patch('ckan.plugins.toolkit.enqueue_job',
side_effect=synchronous_enqueue_job)
def test_upload_condensed_dataset_to_s3_job(
enqueue_job_mock, create_with_upload, monkeypatch, ckan_config,
tmpdir):
monkeypatch.setitem(ckan_config, 'ckan.storage_path', str(tmpdir))
monkeypatch.setattr(ckan.lib.uploader,
'get_storage_path',
lambda: str(tmpdir))
monkeypatch.setattr(
ckanext.dcor_schemas.plugin,
'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS',
True)
user = factories.User()
owner_org = factories.Organization(users=[{
'name': user['id'],
'capacity': 'admin'
}])
# Note: `call_action` bypasses authorization!
# create 1st dataset
create_context = {'ignore_auth': False,
'user': user['name'],
'api_version': 3}
ds_dict, res_dict = make_dataset(
create_context, owner_org,
create_with_upload=create_with_upload,
resource_path=data_path / "calibration_beads_47.rtdc",
activate=True)
bucket_name = dcor_shared.get_ckan_config_option(
"dcor_object_store.bucket_name").format(
organization_id=ds_dict["organization"]["id"])
rid = res_dict["id"]
object_name = f"condensed/{rid[:3]}/{rid[3:6]}/{rid[6:]}"
endpoint = dcor_shared.get_ckan_config_option(
"dcor_object_store.endpoint_url")
cond_url = f"{endpoint}/{bucket_name}/{object_name}"
response = requests.get(cond_url)
assert response.ok, "resource is public"
assert response.status_code == 200
# Verify SHA256sum
path = dcor_shared.get_resource_path(res_dict["id"])
path_cond = path.with_name(path.name + "_condensed.rtdc")
dl_path = tmpdir / "calbeads.rtdc"
with dl_path.open("wb") as fd:
fd.write(response.content)
assert dcor_shared.sha256sum(dl_path) == dcor_shared.sha256sum(path_cond)
# We need the dcor_depot extension to make sure that the symbolic-
# linking pipeline is used.
@pytest.mark.ckan_config('ckan.plugins', 'dcor_depot dc_serve dcor_schemas')
@pytest.mark.usefixtures('clean_db', 'with_request_context')
@mock.patch('ckan.plugins.toolkit.enqueue_job',
side_effect=synchronous_enqueue_job)
def test_upload_condensed_dataset_to_s3_job_and_verify_basin(
enqueue_job_mock, create_with_upload, monkeypatch, ckan_config,
tmpdir):
monkeypatch.setitem(ckan_config, 'ckan.storage_path', str(tmpdir))
monkeypatch.setattr(ckan.lib.uploader,
'get_storage_path',
lambda: str(tmpdir))
monkeypatch.setattr(
ckanext.dcor_schemas.plugin,
'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS',
True)
user = factories.User()
owner_org = factories.Organization(users=[{
'name': user['id'],
'capacity': 'admin'
}])
# Note: `call_action` bypasses authorization!
# create 1st dataset
create_context = {'ignore_auth': False,
'user': user['name'],
'api_version': 3}
ds_dict, res_dict = make_dataset(
create_context, owner_org,
create_with_upload=create_with_upload,
resource_path=data_path / "calibration_beads_47.rtdc",
activate=True)
bucket_name = dcor_shared.get_ckan_config_option(
"dcor_object_store.bucket_name").format(
organization_id=ds_dict["organization"]["id"])
rid = res_dict["id"]
object_name = f"condensed/{rid[:3]}/{rid[3:6]}/{rid[6:]}"
endpoint = dcor_shared.get_ckan_config_option(
"dcor_object_store.endpoint_url")
cond_url = f"{endpoint}/{bucket_name}/{object_name}"
response = requests.get(cond_url)
assert response.ok, "resource is public"
assert response.status_code == 200
# Download the condensed resource
dl_path = tmpdir / "calbeads.rtdc"
with dl_path.open("wb") as fd:
fd.write(response.content)
# Open the condensed resource with dclab and make sure the
# "image" feature is in the basin.
with dclab.new_dataset(pathlib.Path(dl_path)) as ds:
assert len(ds.basins) == 3
assert "image" in ds.features
assert "image" in ds.features_basin
assert "image" not in ds.features_innate
assert np.allclose(np.mean(ds["image"][0]),
47.15595,
rtol=0, atol=1e-4)
# The basin features should only list those that are not in
# the condensed dataset.
assert ds.basins[0].features == [
"contour", "image", "mask", "trace"]