Skip to content

Commit

Permalink
Merge branch 'main' into experiment_enhancements
Browse files Browse the repository at this point in the history
  • Loading branch information
LuckierDodge committed Apr 26, 2024
2 parents 74b5321 + 151598e commit 343d4bc
Show file tree
Hide file tree
Showing 28 changed files with 211 additions and 153 deletions.
7 changes: 5 additions & 2 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,11 @@ services:
wei_engine:
image: ${IMAGE}
container_name: wei_engine
volumes_from:
- wei_server
volumes:
- diaspora_config:/home/app/.diaspora
- ./tests/workcells:/workcell_defs
- ~/.wei:/home/app/.wei
- ./:/home/app/wei # for development only
environment:
- PYTHONUNBUFFERED=1 # Fix weird bug with empty logging
- USER_ID=${USER_ID:-1000}
Expand Down
5 changes: 3 additions & 2 deletions docs/source/pages/_autosummary/wei.core.storage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ wei.core.storage

get_experiment_directory
get_experiment_log_file
get_experiment_workflows_directory
get_experiments_directory
get_workcell_directory
get_workcell_run_log_path
get_workcell_log_path
get_workflow_result_directory
get_workflow_run_directory
get_workflow_run_log_path
get_workflow_runs_directory
initialize_storage
search_for_experiment_directory


Expand Down
2 changes: 2 additions & 0 deletions docs/source/pages/_autosummary/wei.routers.event_routes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ wei.routers.event\_routes

.. autosummary::

get_all_events
get_event
log_event


Expand Down
29 changes: 0 additions & 29 deletions docs/source/pages/_autosummary/wei.types.resource_types.rst

This file was deleted.

1 change: 0 additions & 1 deletion docs/source/pages/_autosummary/wei.types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ wei.types
wei.types.experiment_types
wei.types.interface_types
wei.types.module_types
wei.types.resource_types
wei.types.step_types
wei.types.workcell_types
wei.types.workflow_types
Expand Down
2 changes: 1 addition & 1 deletion docs/source/pages/concepts/workcell.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ A **Workcell** is a collection of :doc:`module` (instruments, robots, and other
Abstractly, a Workcell is defined by three different things: its **Modules**, its **Locations**, and its **Configuration**. All of this is specified in a ``.yaml`` file, which is loaded by the WEI instance that runs the Workcell.

Modules
=======
========

Modules are the individual components that make up a Workcell. These can be anything from a robot arm to a camera to a PCR machine. Each Module has a number of properties, including a name, a model, an interface, and a configuration.

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "ad_sdl.wei"
version = "0.5.6"
version = "0.5.8"
description = "The Rapid Prototyping Laboratory's Workflow Execution Interface."
authors = [
{name = "Rafael Vescovi", email = "ravescovi@anl.gov"},
Expand Down
2 changes: 2 additions & 0 deletions wei-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ fi


# Best-effort attempt to align permissions
mkdir -p /home/app/.wei/experiments /home/app/.wei/temp /home/app/.wei/.diaspora
chown $USER_ID:$GROUP_ID /home/app || true
chown $USER_ID:$GROUP_ID /home/app/.wei || true
chown $USER_ID:$GROUP_ID /home/app/.wei/experiments || true
chown $USER_ID:$GROUP_ID /home/app/.wei/temp || true
chown $USER_ID:$GROUP_ID /home/app/.diaspora || true

if [ "$USER_ID" -eq 0 ] && [ "$GROUP_ID" -eq 0 ]; then
Expand Down
12 changes: 10 additions & 2 deletions wei/core/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def send_event(event: Event) -> Any:


class EventHandler:
"""Registers Events during the Experiment execution both in a logfile and on Kafka"""
"""Registers Events during the Experiment execution in a logfile, in Redis, and (if `use_diaspora` is true) on Kafka"""

kafka_producer = None
kafka_topic = None
Expand Down Expand Up @@ -80,10 +80,18 @@ def log_event(cls, event: Event) -> None:
if event.workcell_id is None:
event.workcell_id = state_manager.get_workcell_id()
if event.experiment_id is not None:
# Log all events related to an experiment to the experiment's log file
Logger.get_experiment_logger(event.experiment_id).info(
event.model_dump_json()
)
Logger.get_workcell_logger(event.workcell_id).info(event.model_dump_json())
else:
# Log all non-experiment events to the workcell's log file
Logger.get_workcell_logger(event.workcell_id).info(event.model_dump_json())
if event.event_type == "WORKFLOW":
# Log all workflow events to the workflow run's log file, in addition to the experiment log file
Logger.get_workflow_run_logger(event.run_id).info(event.model_dump_json())

state_manager.set_event(event)

if Config.use_diaspora:
cls.log_event_diaspora(event=event)
Expand Down
44 changes: 23 additions & 21 deletions wei/core/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,35 @@
def register_new_experiment(experiment_design: ExperimentDesign) -> Experiment:
"""Creates a new experiment, optionally associating it with a campaign"""
new_experiment = Experiment.model_validate(experiment_design, from_attributes=True)
print(new_experiment.model_dump_json())
get_experiment_directory(
new_experiment.experiment_id, new_experiment.experiment_name
).mkdir(parents=True, exist_ok=True)
# Create the experiment (sub)director(ies) if they don't exist
new_experiment.experiment_directory = get_experiment_directory(
new_experiment.experiment_id, new_experiment.experiment_name, create=True
)
get_workflow_runs_directory(
new_experiment.experiment_id, new_experiment.experiment_name
).mkdir(parents=True, exist_ok=True)
with state_manager.lab_state_lock():
if new_experiment.campaign_id is not None:
try:
state_manager.get_campaign(new_experiment.campaign_id)
except KeyError as e:
raise ValueError(
f"Campaign {new_experiment.campaign_id} not found, please create it first (this only needs to be done once)."
) from e
# If a campaign is specified, check if it exists, and register the experiment
if new_experiment.campaign_id is not None:
try:
state_manager.get_campaign(new_experiment.campaign_id)
except KeyError as e:
raise ValueError(
f"Campaign {new_experiment.campaign_id} not found, please create it first (this only needs to be done once)."
) from e

def append_experiment_id_to_campaign(
campaign: Campaign, experiment_id: str
) -> Campaign:
campaign.experiment_ids.append(experiment_id)
return campaign
def append_experiment_id_to_campaign(
campaign: Campaign, experiment_id: str
) -> Campaign:
campaign.experiment_ids.append(experiment_id)
return campaign

with state_manager.campaign_lock(new_experiment.campaign_id):
state_manager.update_campaign(
new_experiment.campaign_id,
append_experiment_id_to_campaign,
new_experiment.experiment_id,
)
state_manager.set_experiment(new_experiment)
state_manager.set_experiment(new_experiment)
return new_experiment


Expand All @@ -54,6 +55,7 @@ def get_experiment(experiment_id: str) -> Experiment:
try:
experiment = state_manager.get_experiment(experiment_id)
except KeyError:
# Experiment not cached, so search the disk
experiment = Experiment(
experiment_id=experiment_id,
experiment_name=search_for_experiment_directory(experiment_id).split(
Expand All @@ -76,13 +78,13 @@ def parse_experiments_from_disk():
experiment_id = regex_match[2]
try:
experiment = state_manager.get_experiment(experiment_id)
# Experiment already in state_manager, so skip it
continue
except KeyError:
experiment_name = regex_match[1]
# TODO: Try to extract campaign_id and data_point_definition
experiment = Experiment(
experiment_id=experiment_id,
experiment_name=experiment_name,
experiment_directory=experiment_dir,
)
with state_manager.lab_state_lock():
state_manager.set_experiment(experiment)
state_manager.set_experiment(experiment)
7 changes: 3 additions & 4 deletions wei/core/loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
get_workflow_run_directory,
)
from wei.types.base_types import PathLike
from wei.types.workflow_types import WorkflowRun


class Logger:
Expand Down Expand Up @@ -96,7 +95,7 @@ def get_experiment_logger(
)

@staticmethod
def get_workflow_run_logger(wf_run: WorkflowRun) -> logging.Logger:
def get_workflow_run_logger(wf_run_id: str) -> logging.Logger:
"""Finds the existing logger with the given name or creates a new one if it doesn't exist
Parameters
Expand All @@ -110,8 +109,8 @@ def get_workflow_run_logger(wf_run: WorkflowRun) -> logging.Logger:
"""

return Logger.get_logger(
f"{wf_run.run_id}",
get_workflow_run_directory(wf_run.run_id),
f"{wf_run_id}",
get_workflow_run_directory(wf_run_id),
log_level=Config.log_level,
)

Expand Down
4 changes: 2 additions & 2 deletions wei/core/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def update_module(module_name: str, module: Module) -> None:
if module.state in [ModuleStatus.INIT, ModuleStatus.UNKNOWN]:
module.about = get_module_about(module, require_schema_compliance=False)
module.state = state
with state_manager.state_lock():
with state_manager.wc_state_lock():
state_manager.set_module(module_name, module)
if module.reserved:
reserving_wf = state_manager.get_workflow_run(module.reserved)
Expand All @@ -65,7 +65,7 @@ def update_module(module_name: str, module: Module) -> None:
# *but that workflow isn't actually using the module,
# *so release the reservation, and allow the current workflow to proceed
print(f"Clearing reservation on module {module_name}")
with state_manager.state_lock():
with state_manager.wc_state_lock():
clear_module_reservation(module)
except Exception:
traceback.print_exc()
Expand Down
10 changes: 7 additions & 3 deletions wei/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def run_iteration(self) -> None:
If a workflow is able to run, it is started in a separate process.
Workflows are processed in the order they are received, so older workflows have priority.
"""
with state_manager.state_lock():
with state_manager.wc_state_lock():
# * Update all queued workflows
for run_id, wf_run in state_manager.get_all_workflow_runs().items():
if wf_run.status == WorkflowStatus.NEW:
Expand All @@ -38,15 +38,19 @@ def run_iteration(self) -> None:
)
send_event(WorkflowQueuedEvent.from_wf_run(wf_run=wf_run))
state_manager.set_workflow_run(wf_run)
elif wf_run.status in [WorkflowStatus.QUEUED, WorkflowStatus.WAITING]:
elif wf_run.status in [
WorkflowStatus.QUEUED,
WorkflowStatus.IN_PROGRESS,
]:
step = wf_run.steps[wf_run.step_index]
if check_step(wf_run.experiment_id, run_id, step):
module = find_step_module(
state_manager.get_workcell(), step.module
)
reserve_module(module, wf_run.run_id)
reserve_source_and_target(wf_run)
send_event(WorkflowStartEvent.from_wf_run(wf_run=wf_run))
if wf_run.status == WorkflowStatus.QUEUED:
send_event(WorkflowStartEvent.from_wf_run(wf_run=wf_run))
wf_run.status = WorkflowStatus.RUNNING
print(
f"Starting step {wf_run.name}.{step.name} for run: {run_id}"
Expand Down

0 comments on commit 343d4bc

Please sign in to comment.