Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable multiprocessing groups within project config #10774

Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
8d08a68
Permit multiprocessing groups in YAML
richardpaulhudson May 9, 2022
12e8600
Basic multiprocessing functionality
richardpaulhudson May 9, 2022
e3b4ee7
Mypy corrections
richardpaulhudson May 9, 2022
a2bd489
Secondary functionality and documentation
richardpaulhudson May 10, 2022
8c8b81a
Fixed formatting issues
richardpaulhudson May 10, 2022
a481698
Corrections
richardpaulhudson May 10, 2022
ae82568
Changes after review
richardpaulhudson May 20, 2022
4daffdd
Changes based on review
richardpaulhudson May 23, 2022
2eb13f2
Readability improvement
richardpaulhudson May 23, 2022
9e665f9
Changes after internal discussions
richardpaulhudson Jun 20, 2022
5de1009
Add max_parallel_processes documentation
richardpaulhudson Jun 20, 2022
1cdb92d
Correction
richardpaulhudson Jun 20, 2022
e6a11b5
Extend scope of test
richardpaulhudson Jun 20, 2022
9d7a79e
First draft of new implementation (incomplete, doesn't run yet)
richardpaulhudson Jul 14, 2022
3c64e82
Correction
richardpaulhudson Jul 14, 2022
ca403f8
Seems to work, not yet tested in a structured way
richardpaulhudson Jul 18, 2022
e9ee680
Saved (intermediate version, doesn't compile yet)
richardpaulhudson Jul 18, 2022
4c2fc56
Refactoring into separate module
richardpaulhudson Jul 19, 2022
83d0738
Remove unnecessary changes
richardpaulhudson Jul 19, 2022
2c1f58e
Formal state machine
richardpaulhudson Jul 19, 2022
012578f
Improvements / corrections
richardpaulhudson Jul 19, 2022
44d51f4
Add failure test
richardpaulhudson Jul 19, 2022
d87fcab
Corrections / improvements
richardpaulhudson Jul 19, 2022
bc79e5a
Fix for Windows
richardpaulhudson Jul 19, 2022
9d005a7
Correct test
richardpaulhudson Jul 20, 2022
5d150f2
Add comment
richardpaulhudson Jul 20, 2022
f8301b4
Correction
richardpaulhudson Jul 20, 2022
d2bde9a
Reverse unnecessary change
richardpaulhudson Jul 20, 2022
91a173f
Add note to projects.md
richardpaulhudson Jul 20, 2022
4614a89
Improve error message
richardpaulhudson Jul 20, 2022
4902dd6
Final touches
richardpaulhudson Jul 20, 2022
e2c2ba4
Fix Mypy
richardpaulhudson Jul 20, 2022
48803e1
Improve document output
richardpaulhudson Jul 20, 2022
fcf7b6b
Use multiprocessing context
richardpaulhudson Jul 21, 2022
6b0ebcd
Improve error handling with hung processes
richardpaulhudson Jul 21, 2022
1650912
Add failure messages
richardpaulhudson Jul 21, 2022
fd8dbfd
Initial changes based on review comments
richardpaulhudson Jul 22, 2022
5fad119
Update spacy/tests/test_parallel.py
richardpaulhudson Jul 22, 2022
c7a8956
Update spacy/cli/_util.py
richardpaulhudson Jul 22, 2022
c6a4a7b
Merge 'origin/master' into feature/projects-multiprocessing
richardpaulhudson Jul 22, 2022
ac81dc9
Revert accidentally checked-in line
richardpaulhudson Jul 22, 2022
6ac15ad
Correct comment
richardpaulhudson Jul 23, 2022
4eb61a7
More updates based on review comments
richardpaulhudson Jul 25, 2022
10513a0
Format with black
richardpaulhudson Jul 25, 2022
567d006
Log to temporary directory
richardpaulhudson Jul 25, 2022
3d16625
Increase timeout to support GPU tests
richardpaulhudson Jul 25, 2022
5393df4
More changes based on review comments
richardpaulhudson Jul 27, 2022
8faf070
Specify new wasabi version
richardpaulhudson Jul 27, 2022
40416b1
Restore previous wasabi peg
richardpaulhudson Jul 27, 2022
1bf82db
Widened errors caught from os.kill()
richardpaulhudson Aug 24, 2022
78ee9c3
Revert to diagnose error
richardpaulhudson Aug 24, 2022
5aa95ce
Merge branch 'master' into feature/projects-multiprocessing
richardpaulhudson Oct 4, 2022
70fa1ce
Copied changes from spaCy/tmp/project-multiprocess
richardpaulhudson Oct 4, 2022
afba051
Improve error logging
richardpaulhudson Oct 4, 2022
b48f2e1
Correction
richardpaulhudson Oct 4, 2022
522b0ed
Handle PermissionError in Windows CI
richardpaulhudson Oct 4, 2022
c8b7912
Correction
richardpaulhudson Oct 4, 2022
786473d
Switch to use TemporaryDirectory
richardpaulhudson Oct 4, 2022
b8a299f
Merge branch 'explosion:master' into feature/projects-multiprocessing
richardpaulhudson Oct 4, 2022
2cc2cc1
Use mkdtemp()
richardpaulhudson Oct 4, 2022
cfaa902
Merge branch 'master' into feature/projects-multiprocessing
richardpaulhudson Jan 23, 2023
b3bcfe5
Empty commit to trigger CI
richardpaulhudson Jan 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
62 changes: 49 additions & 13 deletions spacy/cli/_util.py
@@ -1,5 +1,5 @@
from typing import Dict, Any, Union, List, Optional, Tuple, Iterable
from typing import TYPE_CHECKING, overload
from typing import TYPE_CHECKING, overload, cast
import sys
import shutil
from pathlib import Path
Expand Down Expand Up @@ -215,30 +215,54 @@ def validate_project_version(config: Dict[str, Any]) -> None:
msg.fail(err, exits=1)


def verify_workflow_step(workflow_name: str, commands: List[str], step: str) -> None:
if step not in commands:
msg.fail(
f"Unknown command specified in workflow '{workflow_name}': {step}",
f"Workflows can only refer to commands defined in the 'commands' "
f"section of the {PROJECT_FILE}.",
exits=1,
)


def validate_project_commands(config: Dict[str, Any]) -> None:
"""Check that project commands and workflows are valid, don't contain
duplicates, don't clash and only refer to commands that exist.
richardpaulhudson marked this conversation as resolved.
Show resolved Hide resolved

config (Dict[str, Any]): The loaded config.
"""
command_names = [cmd["name"] for cmd in config.get("commands", [])]

commands = [cmd["name"] for cmd in config.get("commands", [])]
workflows = config.get("workflows", {})
duplicates = set([cmd for cmd in command_names if command_names.count(cmd) > 1])
duplicates = set([cmd for cmd in commands if commands.count(cmd) > 1])
if duplicates:
err = f"Duplicate commands defined in {PROJECT_FILE}: {', '.join(duplicates)}"
msg.fail(err, exits=1)
for workflow_name, workflow_steps in workflows.items():
if workflow_name in command_names:
for workflow_name, workflow_items in workflows.items():
if workflow_name in commands:
err = f"Can't use workflow name '{workflow_name}': name already exists as a command"
msg.fail(err, exits=1)
for step in workflow_steps:
if step not in command_names:
msg.fail(
f"Unknown command specified in workflow '{workflow_name}': {step}",
f"Workflows can only refer to commands defined in the 'commands' "
f"section of the {PROJECT_FILE}.",
exits=1,
)
for workflow_item in workflow_items:
if isinstance(workflow_item, str):
verify_workflow_step(workflow_name, commands, workflow_item)
else:
assert isinstance(workflow_item, list)
assert isinstance(workflow_item[0], str)
steps = cast(List[str], workflow_item)
if len(steps) < 2:
msg.fail(
f"Invalid multiprocessing group within '{workflow_name}'.",
f"A multiprocessing group must reference at least two commands.",
exits=1,
)
if len(steps) != len(set(steps)):
msg.fail(
f"A multiprocessing group within '{workflow_name}' contains a command more than once.",
f"This is not permitted because it is then not possible to determine when to rerun.",
exits=1,
)
for step in steps:
verify_workflow_step(workflow_name, commands, step)


def get_hash(data, exclude: Iterable[str] = tuple()) -> str:
Expand Down Expand Up @@ -556,3 +580,15 @@ def setup_gpu(use_gpu: int, silent=None) -> None:
local_msg.info("Using CPU")
if has_cupy and gpu_is_available():
local_msg.info("To switch to GPU 0, use the option: --gpu-id 0")
richardpaulhudson marked this conversation as resolved.
Show resolved Hide resolved


def get_workflow_steps(workflow_items: List[Union[str, List[str]]]) -> List[str]:
steps: List[str] = []
for workflow_item in workflow_items:
if isinstance(workflow_item, str):
steps.append(workflow_item)
else:
assert isinstance(workflow_item, list)
assert isinstance(workflow_item[0], str)
steps.extend(workflow_item)
return steps
16 changes: 13 additions & 3 deletions spacy/cli/project/document.py
Expand Up @@ -14,8 +14,8 @@
Commands are only re-run if their inputs have changed."""
INTRO_WORKFLOWS = f"""The following workflows are defined by the project. They
can be executed using [`spacy project run [name]`]({DOCS_URL}/api/cli#project-run)
and will run the specified commands in order. Commands are only re-run if their
inputs have changed."""
and will run the specified commands in order. Commands grouped within square brackets
are run in parallel. Commands are only re-run if their inputs have changed."""
INTRO_ASSETS = f"""The following assets are defined by the project. They can
be fetched by running [`spacy project assets`]({DOCS_URL}/api/cli#project-assets)
in the project directory."""
Expand Down Expand Up @@ -69,7 +69,17 @@ def project_document(
md.add(md.table(data, ["Command", "Description"]))
# Workflows
wfs = config.get("workflows", {}).items()
data = [(md.code(n), " &rarr; ".join(md.code(w) for w in stp)) for n, stp in wfs]
data = []
for n, steps in wfs:
rendered_steps = []
for step in steps:
if isinstance(step, str):
rendered_steps.append(md.code(step))
else:
rendered_steps.append(
"[" + ", ".join(md.code(p_step) for p_step in step) + "]"
)
data.append([md.code(n), " &rarr; ".join(rendered_steps)])
if data:
md.add(md.title(3, "Workflows", "⏭"))
md.add(INTRO_WORKFLOWS)
Expand Down
13 changes: 10 additions & 3 deletions spacy/cli/project/dvc.py
@@ -1,12 +1,12 @@
"""This module contains helpers and subcommands for integrating spaCy projects
with Data Version Controk (DVC). https://dvc.org"""
with Data Version Control (DVC). https://dvc.org"""
from typing import Dict, Any, List, Optional, Iterable
import subprocess
from pathlib import Path
from wasabi import msg

from .._util import PROJECT_FILE, load_project_config, get_hash, project_cli
from .._util import Arg, Opt, NAME, COMMAND
from .._util import Arg, Opt, NAME, COMMAND, get_workflow_steps
from ...util import working_dir, split_command, join_command, run_command
from ...util import SimpleFrozenList

Expand Down Expand Up @@ -105,13 +105,15 @@ def update_dvc_config(
dvc_config_path.unlink()
dvc_commands = []
config_commands = {cmd["name"]: cmd for cmd in config.get("commands", [])}
for name in workflows[workflow]:
processed_step = False
for name in get_workflow_steps(workflows[workflow]):
richardpaulhudson marked this conversation as resolved.
Show resolved Hide resolved
command = config_commands[name]
deps = command.get("deps", [])
outputs = command.get("outputs", [])
outputs_no_cache = command.get("outputs_no_cache", [])
if not deps and not outputs and not outputs_no_cache:
continue
processed_step = True
# Default to the working dir as the project path since dvc.yaml is auto-generated
# and we don't want arbitrary paths in there
project_cmd = ["python", "-m", NAME, "project", "run", name]
Expand All @@ -123,6 +125,11 @@ def update_dvc_config(
dvc_cmd.append("--always-changed")
full_cmd = [*dvc_cmd, *deps_cmd, *outputs_cmd, *outputs_nc_cmd, *project_cmd]
dvc_commands.append(join_command(full_cmd))
if not processed_step:
svlandeg marked this conversation as resolved.
Show resolved Hide resolved
rmitsch marked this conversation as resolved.
Show resolved Hide resolved
msg.fail(
f"A DVC workflow must have at least one dependency or output",
exits=1,
)
with working_dir(path):
dvc_flags = {"--verbose": verbose, "--quiet": silent}
run_dvc_commands(dvc_commands, flags=dvc_flags)
Expand Down
4 changes: 3 additions & 1 deletion spacy/cli/project/pull.py
@@ -1,3 +1,4 @@
import multiprocessing
from pathlib import Path
from wasabi import msg
from .remote_storage import RemoteStorage
Expand Down Expand Up @@ -37,6 +38,7 @@ def project_pull(project_dir: Path, remote: str, *, verbose: bool = False):
# We use a while loop here because we don't know how the commands
# will be ordered. A command might need dependencies from one that's later
# in the list.
mult_group_mutex = multiprocessing.Lock()
while commands:
for i, cmd in enumerate(list(commands)):
logger.debug(f"CMD: {cmd['name']}.")
Expand All @@ -52,7 +54,7 @@ def project_pull(project_dir: Path, remote: str, *, verbose: bool = False):

out_locs = [project_dir / out for out in cmd.get("outputs", [])]
if all(loc.exists() for loc in out_locs):
update_lockfile(project_dir, cmd)
update_lockfile(project_dir, cmd, mult_group_mutex=mult_group_mutex)
# We remove the command from the list here, and break, so that
# we iterate over the loop again.
commands.pop(i)
Expand Down