Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/dlstbx/services/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class MetalIdParameters(pydantic.BaseModel):
backoff_multiplier: float = pydantic.Field(default=2, alias="backoff-multiplier")
automatic: Optional[bool] = False
comment: Optional[str] = None
symlink: str = pydantic.Field(default="")


class ProteinInfo(pydantic.BaseModel):
Expand Down Expand Up @@ -858,6 +859,7 @@ def trigger_metal_id(
"data": [mtz_file_below.as_posix(), mtz_file_above.as_posix()],
"scaling_id": [parameters.scaling_id],
"pdb": pdb_files,
"symlink": [parameters.symlink] if parameters.symlink else [],
}

self.log.debug("Metal_id trigger: Starting")
Expand Down
129 changes: 81 additions & 48 deletions src/dlstbx/wrapper/metal_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,59 +7,53 @@
import re
import shutil
import subprocess
from dataclasses import dataclass
from datetime import datetime
from typing import Any

import dlstbx.util.symlink
from dlstbx import schemas
from dlstbx.wrapper import Wrapper


@dataclass
class PeakData:
density: float
rmsd: float
xyz: tuple[float, float, float]


class MetalIdWrapper(Wrapper):
_logger_name = "dlstbx.wrap.metal_id"

def parse_peak_data(self, peak_data_file):
peak_data = []
def parse_peak_data(self, peak_data_file: pathlib.Path) -> list[PeakData]:
peak_data: list[PeakData] = []
with open(peak_data_file, "r") as file:
for line in file:
match = re.match(
r"Peak \d+: Electron Density = ([\d.]+) e/Å\^3, RMSD = ([\d.]+), XYZ = \(([\d.]+), ([\d.]+), ([\d.]+)\)",
r"Peak \d+: Electron Density = ([\d.]+) e/Å\^3, RMSD = ([\d.]+), XYZ = \((-?[\d.]+), (-?[\d.]+), (-?[\d.]+)\)",
line,
)
if match:
electron_density = float(match.group(1))
density = float(match.group(1))
rmsd = float(match.group(2))
xyz = (
float(match.group(3)),
float(match.group(4)),
float(match.group(5)),
)
peak_data.append(
{"electron_density": electron_density, "rmsd": rmsd, "xyz": xyz}
)
peak_data.append(PeakData(density=density, rmsd=rmsd, xyz=xyz))
return peak_data

def send_results_to_ispyb(
self,
peak_data,
metal_id_command,
dimple_log_file,
results_directory,
start_time,
):
scaling_id = self.params.get("scaling_id", [])
if len(scaling_id) != 1:
self.log.info(f"Scaling ID {scaling_id} provided")
self.log.error(
"Exactly one scaling_id must be provided - cannot insert metal_id results to ISPyB"
)
return False
scaling_id = scaling_id[0]

if not dimple_log_file.is_file():
self.log.error(
f"dimple log file '{dimple_log_file}' not found - cannot insert metal_id results to ISPyB"
)
return False
peak_data: list[PeakData],
metal_id_command: str,
dimple_log_file: pathlib.Path,
results_directory: pathlib.Path,
start_time: datetime,
scaling_id: int,
) -> dict[str, Any]:
self.log.info(
f"Autoproc_prog_id: '{self.recwrap.environment.get('ispyb_autoprocprogram_id')}'"
)
Expand All @@ -82,12 +76,12 @@ def send_results_to_ispyb(
blobs = []
for n_peak, peak in enumerate(peak_data, start=1):
self.log.info(
f"Adding blob {n_peak} to ispyb results - Density: {peak['electron_density']}, rmsd: {peak['rmsd']}, xyz: {peak['xyz']}"
f"Adding blob {n_peak} to ispyb results - Density: {peak.density}, rmsd: {peak.rmsd}, xyz: {peak.xyz}"
)
blobs.append(
schemas.Blob(
xyz=peak["xyz"],
height=peak["electron_density"],
xyz=peak.xyz,
height=peak.density,
# nearest_atom=nearest_atom,
# nearest_atom_distance=distance,
map_type="difference", # TODO change this to anomalous_difference once enum exists.
Expand All @@ -106,20 +100,18 @@ def send_results_to_ispyb(
)

attachments = []

primary_result_files = self.params.get("primary_result_files", {})
self.log.info("Adding attachments for upload to ispyb")
for f in results_directory.iterdir():
if f.suffix not in [".map", ".log", ".py", ".pha", ".pdb", ".dat"]:
self.log.info(f"Skipping file {f.name}")
continue
elif f.suffix in [".map", ".pdb", ".dat"]:
file_type = "result"
importance_rank = 1
elif f.suffix in [".pha", ".mtz"]:
if f.name in primary_result_files:
file_type = primary_result_files[f.name]["type"]
importance_rank = primary_result_files[f.name]["rank"]
elif f.suffix in [".map", ".pdb", ".dat", ".pha", ".mtz"]:
file_type = "result"
importance_rank = 2
else:
file_type = "log"
importance_rank = 3
continue

attachments.append(
schemas.Attachment(
Expand All @@ -132,6 +124,16 @@ def send_results_to_ispyb(
)
self.log.info(f"Added {f.name} as an attachment")

if getattr(self, "final_directory", None):
for att in attachments:
if att.file_name in primary_result_files:
shutil.copy(att.file_path / att.file_name, self.final_directory)
att.file_path = self.final_directory
for blob in blobs:
if blob.filepath and blob.view1:
shutil.copy(blob.filepath / blob.view1, self.final_directory)
blob.filepath = self.final_directory

ispyb_results = {
"mxmrrun": json.loads(mxmrrun.model_dump_json()),
"blobs": [json.loads(blob.model_dump_json()) for blob in blobs],
Expand All @@ -152,6 +154,15 @@ def run(self):
# Get parameters from the recipe file
self.params = self.recwrap.recipe_step["job_parameters"]

scaling_id = self.params.get("scaling_id", [])
if len(scaling_id) != 1:
self.log.info(f"Scaling ID {scaling_id} provided")
self.log.error(
"Exactly one scaling_id must be provided - cannot run metal_id"
)
return False
scaling_id = scaling_id[0]

src_mtz_files = self.params.get("data", [])
if not src_mtz_files:
self.log.error("Could not identify on what data to run")
Expand Down Expand Up @@ -224,33 +235,55 @@ def run(self):
)

self.log.debug("Reading in peak data")
peak_data = self.parse_peak_data(output_directory / "found_peaks.dat")
peak_file = output_directory / "found_peaks.dat"
if not peak_file.is_file():
self.log.info("Metal_ID: No peaks found")
peak_data = []
else:
peak_data = self.parse_peak_data(peak_file)

for f in output_directory.iterdir():
self.log.info(f"Searching for files to copy. Current file is : {f}")
self.log.debug(f"Searching for files to copy. Current file is : {f}")
if f.is_dir():
continue
if f.name.startswith("."):
continue
if any(f.suffix == skipext for skipext in [".r3d"]):
continue
self.log.info("Copying file")
self.log.debug("Copying file")
shutil.copy(f, results_directory)

if self.params.get("create_symlink"):
symlink = self.params.get("create_symlink")
if isinstance(symlink, list):
symlink = symlink[0]
if symlink:
dlstbx.util.symlink.create_parent_symlink(
os.fspath(output_directory), self.params["create_symlink"]
os.fspath(output_directory), symlink
)
dlstbx.util.symlink.create_parent_symlink(
os.fspath(results_directory), self.params["create_symlink"]
os.fspath(results_directory), symlink
)

self.log.info("Sending results to ISPyB")

dimple_log = working_directory / "metal_id" / "dimple_below" / "dimple.log"
if not dimple_log.is_file():
self.log.error(
f"dimple log file '{dimple_log}' not found - cannot insert metal_id results to ISPyB"
)
return False

if pipeine_final_params := self.params.get("pipeline-final", []):
self.final_directory = pathlib.Path(pipeine_final_params["path"])
self.final_directory.mkdir(parents=True, exist_ok=True)
if self.params.get("create_symlink"):
dlstbx.util.symlink.create_parent_symlink(self.final_directory, symlink)

ispyb_results = self.send_results_to_ispyb(
peak_data, metal_id_command, dimple_log, results_directory, start_time
peak_data,
metal_id_command,
dimple_log,
results_directory,
start_time,
scaling_id,
)

self.log.info(f"Sending {str(ispyb_results)} to ispyb service")
Expand Down