Skip to content

Commit

Permalink
BREAKING: drop support for DCOR API version 1
Browse files Browse the repository at this point in the history
  • Loading branch information
paulmueller committed Feb 28, 2024
1 parent 1ffb62f commit 74a84d8
Show file tree
Hide file tree
Showing 9 changed files with 314 additions and 749 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG
@@ -1,5 +1,7 @@
0.15.0
- BREAKING: drop support for DCOR API version "1"
- ref: migrate to dcor_shared 0.8.0
- ref: upload condensed datasets directly to S3 (no copy on block storage)
0.14.1
- maintenance release
0.14.0
Expand Down
85 changes: 42 additions & 43 deletions ckanext/dc_serve/jobs.py
@@ -1,8 +1,12 @@
import pathlib
import tempfile
import warnings

import ckan.plugins.toolkit as toolkit
from dclab import RTDCWriter
from dclab.cli import condense
from dclab.cli import condense_dataset
from dcor_shared import (
DC_MIME_TYPES, s3, sha256sum, get_ckan_config_option, get_resource_path,
DC_MIME_TYPES, get_dc_instance, s3cc, get_ckan_config_option,
wait_for_resource)
import h5py

Expand All @@ -15,13 +19,19 @@ def admin_context():

def generate_condensed_resource_job(resource, override=False):
"""Generates a condensed version of the dataset"""
path = get_resource_path(resource["id"])
if resource["mimetype"] in DC_MIME_TYPES:
wait_for_resource(resource["id"])
cond = path.with_name(path.name + "_condensed.rtdc")
if not cond.exists() or override:
rid = resource["id"]
wait_for_resource(rid)
mtype = resource.get('mimetype', '')
if (mtype in DC_MIME_TYPES
# Check whether the file already exists on S3
and (override
or not s3cc.artifact_exists(resource_id=rid,
artifact="condensed"))):
# Create the condensed file in a temporary location
with tempfile.TemporaryDirectory() as ttd_name:
path_cond = pathlib.Path(ttd_name) / "condensed.rtdc"
with CKANResourceFileLock(
resource_id=resource["id"],
resource_id=rid,
locker_id="DCOR_generate_condensed") as fl:
# The CKANResourceFileLock creates a lock file if not present
# and then sets `is_locked` to True if the lock was acquired.
Expand All @@ -33,20 +43,26 @@ def generate_condensed_resource_job(resource, override=False):
# then several processes would end up condensing the same
# resource.
if fl.is_locked:
# Condense the dataset
condense(path_out=cond,
path_in=path,
ancillaries=True,
check_suffix=False)
# Determine the features that are not in the condensed
# dataset.
with h5py.File(path) as hsrc, h5py.File(cond) as hdst:
feats_src = set(hsrc["events"].keys())
feats_dst = set(hdst["events"].keys())
feats_upstream = sorted(feats_src - feats_dst)
with get_dc_instance(rid) as ds, \
h5py.File(path_cond, "w") as h5_cond:
# Condense the dataset (do not store any warning
# messages during instantiation, because we are
# scared of leaking credentials).
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
condense_dataset(ds=ds,
h5_cond=h5_cond,
ancillaries=True,
warnings_list=w)

# Determine the features that are not in the condensed
# dataset.
feats_src = set(ds.h5file["events"].keys())
feats_dst = set(h5_cond["events"].keys())
feats_upstream = sorted(feats_src - feats_dst)

# Write DCOR basins
with RTDCWriter(cond) as hw:
with RTDCWriter(path_cond) as hw:
# DCOR
site_url = get_ckan_config_option("ckan.site_url")
rid = resource["id"]
Expand Down Expand Up @@ -87,28 +103,11 @@ def generate_condensed_resource_job(resource, override=False):
basin_descr="Public resource access via HTTP",
basin_feats=feats_upstream,
verify=False)

# Upload the condensed file to S3
s3cc.upload_artifact(resource_id=rid,
path_artifact=path_cond,
artifact="condensed",
override=True)
return True
return False


def migrate_condensed_to_s3_job(resource):
"""Migrate a condensed resource to the S3 object store"""
path = get_resource_path(resource["id"])
path_cond = path.with_name(path.name + "_condensed.rtdc")
ds_dict = toolkit.get_action('package_show')(
admin_context(),
{'id': resource["package_id"]})
# Perform the upload
bucket_name = get_ckan_config_option(
"dcor_object_store.bucket_name").format(
organization_id=ds_dict["organization"]["id"])
rid = resource["id"]
sha256 = sha256sum(path_cond)
s3.upload_file(
bucket_name=bucket_name,
object_name=f"condensed/{rid[:3]}/{rid[3:6]}/{rid[6:]}",
path=path_cond,
sha256=sha256,
private=ds_dict["private"],
override=False)
# TODO: delete the local resource after successful upload?
27 changes: 5 additions & 22 deletions ckanext/dc_serve/plugin.py
Expand Up @@ -7,11 +7,10 @@

from .cli import get_commands
from . import helpers as dcor_helpers
from .jobs import generate_condensed_resource_job, migrate_condensed_to_s3_job
from .jobs import generate_condensed_resource_job
from .route_funcs import dccondense, dcresource
from .serve import dcserv


from dcor_shared import DC_MIME_TYPES, s3


Expand Down Expand Up @@ -56,37 +55,21 @@ def update_config(self, config):
# IResourceController
def after_resource_create(self, context, resource):
"""Generate condensed dataset"""
if resource.get('mimetype') in DC_MIME_TYPES:
if resource.get('mimetype') in DC_MIME_TYPES and s3.is_available():
# Generate the condensed dataset
pkg_job_id = f"{resource['package_id']}_{resource['position']}_"
jid_condense = pkg_job_id + "condense"
jid_symlink = pkg_job_id + "symlink"
if not Job.exists(jid_condense, connection=ckan_redis_connect()):
toolkit.enqueue_job(generate_condensed_resource_job,
[resource],
title="Create condensed dataset",
title="Create condensed dataset and "
"upload it to S3",
queue="dcor-long",
rq_kwargs={"timeout": 3600,
"job_id": jid_condense,
"depends_on": [jid_symlink]})

# Upload the condensed dataset to S3
if s3.is_available():
jid_condensed_s3 = pkg_job_id + "condenseds3"
toolkit.enqueue_job(
migrate_condensed_to_s3_job,
[resource],
title="Migrate condensed resource to S3 object store",
queue="dcor-normal",
rq_kwargs={"timeout": 1000,
"job_id": jid_condensed_s3,
"depends_on": [
# symlink is general requirement
jid_symlink,
jid_condense,
]}
)

# IActions
def get_actions(self):
# Registers the custom API method
Expand All @@ -100,5 +83,5 @@ def get_helpers(self):
hlps = {
'dc_serve_resource_has_condensed':
dcor_helpers.resource_has_condensed,
}
}
return hlps
1 change: 1 addition & 0 deletions ckanext/dc_serve/res_file_lock.py
Expand Up @@ -13,6 +13,7 @@ class CKANResourceFileLock:
the `is_locked` property to make sure that you have actually
acquired a lock.
"""

def __init__(self, resource_id, locker_id="CKAN_resource_lock"):
"""
Expand Down

0 comments on commit 74a84d8

Please sign in to comment.