diff --git a/requirements.conda.txt b/requirements.conda.txt index c371fe235..91f49d501 100644 --- a/requirements.conda.txt +++ b/requirements.conda.txt @@ -1,6 +1,6 @@ drmaa hdf5plugin -ispyb>=11.1.0 +ispyb>=11.1.2 junit-xml>=1.9 marshmallow-sqlalchemy minio>=7.1.0 diff --git a/src/dlstbx/crud.py b/src/dlstbx/crud.py index d350e85c6..84340d81f 100644 --- a/src/dlstbx/crud.py +++ b/src/dlstbx/crud.py @@ -368,6 +368,22 @@ def get_ssx_events_for_dcid( return query.all() +def get_app_id_for_scaling_id( + session: sqlalchemy.orm.session.Session, + scaling_id: int, +) -> int: + query = ( + session.query(models.AutoProc.autoProcProgramId) + .join( + models.AutoProcScaling, + models.AutoProcScaling.autoProcId == models.AutoProc.autoProcId, + ) + .filter(models.AutoProcScaling.autoProcScalingId == scaling_id) + ) + result = query.first() + return result.autoProcProgramId + + def insert_xray_centring( xrc: schemas.XrayCentring, session: sqlalchemy.orm.session.Session, diff --git a/src/dlstbx/services/ispybsvc.py b/src/dlstbx/services/ispybsvc.py index 7f23a4a97..9a03e99b3 100644 --- a/src/dlstbx/services/ispybsvc.py +++ b/src/dlstbx/services/ispybsvc.py @@ -4,6 +4,7 @@ import os.path import pathlib import time +from datetime import datetime from typing import List import ispyb.sqlalchemy @@ -323,11 +324,21 @@ def do_store_dimple_failure(self, parameters, **kwargs): ) return False - def do_register_processing(self, parameters, **kwargs): + def do_register_processing( + self, parameters, session: sqlalchemy.orm.Session, **kwargs + ): program = parameters("program") cmdline = parameters("cmdline") - environment = parameters("environment") or "" + environment = parameters("environment") or {} upstream_source = parameters("upstream_source") or "" + scaling_id = parameters("scaling_id") or environment.get("scaling_id") + if isinstance(scaling_id, list): + scaling_id = scaling_id[0] + parent_appid = ( + crud.get_app_id_for_scaling_id(session, int(scaling_id)) + if scaling_id + else None + ) processingpipelineid = self.get_pipeline_id(program, upstream_source) if isinstance(environment, dict): environment = ", ".join( @@ -339,21 +350,20 @@ def do_register_processing(self, parameters, **kwargs): self.log.error("Invalid processing id '%s'", rpid) return False try: - result = self.ispyb.mx_processing.upsert_program_ex( - job_id=rpid, - name=program, - command=cmdline, - environment=environment, - pipeline_id=processingpipelineid, + new_app = ispyb.sqlalchemy.AutoProcProgram( + processingJobId=rpid, + processingPrograms=program, + processingCommandLine=cmdline, + processingEnvironment=environment, + processingPipelineId=processingpipelineid, + parentAutoProcProgramId=parent_appid, + recordTimeStamp=datetime.now(), ) + session.add(new_app) + session.commit() + result = new_app.autoProcProgramId self.log.info( - "Registered new program '%s' for processing id '%s' with command line '%s' and environment '%s' and pipeline id '%s' with result '%s'.", - program, - rpid, - cmdline, - environment, - processingpipelineid, - result, + f"Registered new program '{program}' for processing id '{rpid}' with command line '{cmdline}' and environment '{environment}', pipeline id '{processingpipelineid}' and parent program id '{parent_appid}' with result '{result}'.", ) return {"success": True, "return_value": result} except ispyb.ISPyBException as e: