Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manager: allow multiple queue tags #541

Merged
merged 4 commits into from Jan 13, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions devtools/conda-envs/adapters.yaml
Expand Up @@ -30,6 +30,7 @@ dependencies:
- plotly >=4.0.0
- pyarrow >=0.13.0
- tqdm
- pint <=0.9
mattwelborn marked this conversation as resolved.
Show resolved Hide resolved

# Test depends
- codecov
Expand Down
3 changes: 2 additions & 1 deletion devtools/conda-envs/base.yaml
Expand Up @@ -30,6 +30,7 @@ dependencies:
- plotly >=4.0.0
- pyarrow >=0.13.0
- tqdm
- pint <=0.9

# Test depends
- codecov
Expand All @@ -39,4 +40,4 @@ dependencies:

# QCArchive includes
- qcengine >=0.11.0
- qcelemental >=0.11.0
- qcelemental >=0.9.0
1 change: 1 addition & 0 deletions devtools/conda-envs/dev_head.yaml
Expand Up @@ -30,6 +30,7 @@ dependencies:
- plotly >=4.0.0
- pyarrow >=0.13.0
- tqdm
- pint <=0.9

# Test depends
- codecov
Expand Down
1 change: 1 addition & 0 deletions devtools/conda-envs/generate_envs.py
Expand Up @@ -41,6 +41,7 @@
- plotly >=4.0.0
- pyarrow >=0.13.0
- tqdm
- pint <=0.9

# Test depends
- codecov
Expand Down
1 change: 1 addition & 0 deletions devtools/conda-envs/openff.yaml
Expand Up @@ -31,6 +31,7 @@ dependencies:
- plotly >=4.0.0
- pyarrow >=0.13.0
- tqdm
- pint <=0.9

# Test depends
- codecov
Expand Down
7 changes: 4 additions & 3 deletions qcfractal/cli/qcfractal_manager.py
Expand Up @@ -9,7 +9,7 @@
import signal
from enum import Enum
from math import ceil
from typing import List, Optional
from typing import List, Optional, Union

import qcengine as qcng
import tornado.log
Expand Down Expand Up @@ -156,13 +156,14 @@ class QueueManagerSettings(AutodocBaseSettings):
description="Name of this scheduler to present to the Fractal Server. Descriptive names help the server "
"identify the manager resource and assists with debugging.",
)
queue_tag: Optional[str] = Field(
queue_tag: Optional[Union[str, List[str]]] = Field(
None,
description="Only pull tasks from the Fractal Server with this tag. If not set (None/null), then pull untagged "
"tasks, which should be the majority of tasks. This option should only be used when you want to "
"pull very specific tasks which you know have been tagged as such on the server. If the server has "
"no tasks with this tag, no tasks will be pulled (and no error is raised because this is intended "
"behavior).",
"behavior). If multiple tags are provided, tasks will be pulled (but not necessarily executed) in order of the "
"tags.",
)
log_file_prefix: Optional[str] = Field(
None,
Expand Down
7 changes: 6 additions & 1 deletion qcfractal/interface/models/rest_models.py
Expand Up @@ -993,7 +993,12 @@ class QueueManagerMeta(ProtoModel):
description="A list of procedures which the QueueManager has access to. Affects which Tasks "
"the Manager can pull.",
)
tag: Optional[str] = Field(None, description="Optional queue tag to pull Tasks from.")
tag: QueryStr = Field(
None,
description="Optional queue tag to pull Tasks from. If None, tasks are pulled from all tags. "
"If a list of tags is provided, tasks are pulled in order of tags. (This does not "
"guarantee tasks will be executed in that order, however.)",
)

# Statistics
total_worker_walltime: Optional[float] = Field(None, description="The total worker walltime in core-hours.")
Expand Down
2 changes: 1 addition & 1 deletion qcfractal/queue/managers.py
Expand Up @@ -86,7 +86,7 @@ def __init__(
queue_client: "BaseAdapter",
logger: Optional[logging.Logger] = None,
max_tasks: int = 200,
queue_tag: str = None,
queue_tag: Optional[Union[str, List[str]]] = None,
manager_name: str = "unlabeled",
update_frequency: Union[int, float] = 2,
verbose: bool = True,
Expand Down
17 changes: 11 additions & 6 deletions qcfractal/storage_sockets/sqlalchemy_socket.py
Expand Up @@ -7,6 +7,7 @@
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import sessionmaker, with_polymorphic
from sqlalchemy.sql.expression import desc
from sqlalchemy.sql.expression import case as expression_case
except ImportError:
raise ImportError(
"SQLAlchemy_socket requires sqlalchemy, please install this python " "module or try a different db_socket."
Expand Down Expand Up @@ -1916,13 +1917,17 @@ def queue_get_next(
none_filt = TaskQueueORM.procedure == None # lgtm [py/test-equals-none]
query.append(or_(proc_filt, none_filt))

order_by = []
if tag is not None:
if isinstance(tag, str):
tag = [tag]
task_order = expression_case([(TaskQueueORM.tag == t, num) for num, t in enumerate(tag)])
order_by.append(task_order)

order_by.extend([TaskQueueORM.priority.desc(), TaskQueueORM.created_on])

with self.session_scope() as session:
query = (
session.query(TaskQueueORM)
.filter(*query)
.order_by(TaskQueueORM.priority.desc(), TaskQueueORM.created_on)
.limit(limit)
)
query = session.query(TaskQueueORM).filter(*query).order_by(*order_by).limit(limit)

# print(query.statement.compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True}))
found = query.all()
Expand Down
65 changes: 65 additions & 0 deletions qcfractal/tests/test_managers.py
Expand Up @@ -79,6 +79,71 @@ def test_queue_manager_single_tags(compute_adapter_fixture):
assert isinstance(manager["configuration"], dict)


@testing.using_rdkit
def test_queue_manager_multiple_tags(compute_adapter_fixture):
client, server, adapter = compute_adapter_fixture
reset_server_database(server)

config = {"Hello": "World"}
base_molecule = ptl.data.get_molecule("butane.json")

# Add compute
molecules = [base_molecule.copy(update={"geometry": base_molecule.geometry + 0.1 * i}) for i in range(6)]
tags = ["tag2", "tag1", "tag1", "tag2", "tag1", "tag3"]
tasks = [
client.add_compute("rdkit", "UFF", "", "energy", None, [mol], tag=tag).ids[0]
for mol, tag in zip(molecules, tags)
]

manager = queue.QueueManager(client, adapter, queue_tag=["tag1", "tag2"], configuration=config, max_tasks=2)

# Check that tasks are pulled in the correct order
manager.await_results()
ret = client.query_results(tasks)
ref_status = {
tasks[0]: "INCOMPLETE",
tasks[1]: "COMPLETE",
tasks[2]: "COMPLETE",
tasks[3]: "INCOMPLETE",
tasks[4]: "INCOMPLETE",
tasks[5]: "INCOMPLETE",
}
for result in ret:
assert result.status == ref_status[result.id]

manager.await_results()
ret = client.query_results(tasks)
ref_status = {
tasks[0]: "COMPLETE",
tasks[1]: "COMPLETE",
tasks[2]: "COMPLETE",
tasks[3]: "INCOMPLETE",
tasks[4]: "COMPLETE",
tasks[5]: "INCOMPLETE",
}
for result in ret:
assert result.status == ref_status[result.id]

manager.await_results()
ret = client.query_results(tasks)
ref_status = {
tasks[0]: "COMPLETE",
tasks[1]: "COMPLETE",
tasks[2]: "COMPLETE",
tasks[3]: "COMPLETE",
tasks[4]: "COMPLETE",
tasks[5]: "INCOMPLETE",
}
for result in ret:
assert result.status == ref_status[result.id]

# Check that tag list is correctly validated to not include None
# This could be implemented, but would require greater sophistication
# in SQLAlchemySocket.queue_get_next()
with pytest.raises(TypeError):
queue.QueueManager(client, adapter, queue_tag=["tag1", None])


@pytest.mark.slow
@testing.using_rdkit
def test_queue_manager_log_statistics(compute_adapter_fixture, caplog):
Expand Down