Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable multiprocessing groups within project config #10774

Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
8d08a68
Permit multiprocessing groups in YAML
richardpaulhudson May 9, 2022
12e8600
Basic multiprocessing functionality
richardpaulhudson May 9, 2022
e3b4ee7
Mypy corrections
richardpaulhudson May 9, 2022
a2bd489
Secondary functionality and documentation
richardpaulhudson May 10, 2022
8c8b81a
Fixed formatting issues
richardpaulhudson May 10, 2022
a481698
Corrections
richardpaulhudson May 10, 2022
ae82568
Changes after review
richardpaulhudson May 20, 2022
4daffdd
Changes based on review
richardpaulhudson May 23, 2022
2eb13f2
Readability improvement
richardpaulhudson May 23, 2022
9e665f9
Changes after internal discussions
richardpaulhudson Jun 20, 2022
5de1009
Add max_parallel_processes documentation
richardpaulhudson Jun 20, 2022
1cdb92d
Correction
richardpaulhudson Jun 20, 2022
e6a11b5
Extend scope of test
richardpaulhudson Jun 20, 2022
9d7a79e
First draft of new implementation (incomplete, doesn't run yet)
richardpaulhudson Jul 14, 2022
3c64e82
Correction
richardpaulhudson Jul 14, 2022
ca403f8
Seems to work, not yet tested in a structured way
richardpaulhudson Jul 18, 2022
e9ee680
Saved (intermediate version, doesn't compile yet)
richardpaulhudson Jul 18, 2022
4c2fc56
Refactoring into separate module
richardpaulhudson Jul 19, 2022
83d0738
Remove unnecessary changes
richardpaulhudson Jul 19, 2022
2c1f58e
Formal state machine
richardpaulhudson Jul 19, 2022
012578f
Improvements / corrections
richardpaulhudson Jul 19, 2022
44d51f4
Add failure test
richardpaulhudson Jul 19, 2022
d87fcab
Corrections / improvements
richardpaulhudson Jul 19, 2022
bc79e5a
Fix for Windows
richardpaulhudson Jul 19, 2022
9d005a7
Correct test
richardpaulhudson Jul 20, 2022
5d150f2
Add comment
richardpaulhudson Jul 20, 2022
f8301b4
Correction
richardpaulhudson Jul 20, 2022
d2bde9a
Reverse unnecessary change
richardpaulhudson Jul 20, 2022
91a173f
Add note to projects.md
richardpaulhudson Jul 20, 2022
4614a89
Improve error message
richardpaulhudson Jul 20, 2022
4902dd6
Final touches
richardpaulhudson Jul 20, 2022
e2c2ba4
Fix Mypy
richardpaulhudson Jul 20, 2022
48803e1
Improve document output
richardpaulhudson Jul 20, 2022
fcf7b6b
Use multiprocessing context
richardpaulhudson Jul 21, 2022
6b0ebcd
Improve error handling with hung processes
richardpaulhudson Jul 21, 2022
1650912
Add failure messages
richardpaulhudson Jul 21, 2022
fd8dbfd
Initial changes based on review comments
richardpaulhudson Jul 22, 2022
5fad119
Update spacy/tests/test_parallel.py
richardpaulhudson Jul 22, 2022
c7a8956
Update spacy/cli/_util.py
richardpaulhudson Jul 22, 2022
c6a4a7b
Merge 'origin/master' into feature/projects-multiprocessing
richardpaulhudson Jul 22, 2022
ac81dc9
Revert accidentally checked-in line
richardpaulhudson Jul 22, 2022
6ac15ad
Correct comment
richardpaulhudson Jul 23, 2022
4eb61a7
More updates based on review comments
richardpaulhudson Jul 25, 2022
10513a0
Format with black
richardpaulhudson Jul 25, 2022
567d006
Log to temporary directory
richardpaulhudson Jul 25, 2022
3d16625
Increase timeout to support GPU tests
richardpaulhudson Jul 25, 2022
5393df4
More changes based on review comments
richardpaulhudson Jul 27, 2022
8faf070
Specify new wasabi version
richardpaulhudson Jul 27, 2022
40416b1
Restore previous wasabi peg
richardpaulhudson Jul 27, 2022
1bf82db
Widened errors caught from os.kill()
richardpaulhudson Aug 24, 2022
78ee9c3
Revert to diagnose error
richardpaulhudson Aug 24, 2022
5aa95ce
Merge branch 'master' into feature/projects-multiprocessing
richardpaulhudson Oct 4, 2022
70fa1ce
Copied changes from spaCy/tmp/project-multiprocess
richardpaulhudson Oct 4, 2022
afba051
Improve error logging
richardpaulhudson Oct 4, 2022
b48f2e1
Correction
richardpaulhudson Oct 4, 2022
522b0ed
Handle PermissionError in Windows CI
richardpaulhudson Oct 4, 2022
c8b7912
Correction
richardpaulhudson Oct 4, 2022
786473d
Switch to use TemporaryDirectory
richardpaulhudson Oct 4, 2022
b8a299f
Merge branch 'explosion:master' into feature/projects-multiprocessing
richardpaulhudson Oct 4, 2022
2cc2cc1
Use mkdtemp()
richardpaulhudson Oct 4, 2022
cfaa902
Merge branch 'master' into feature/projects-multiprocessing
richardpaulhudson Jan 23, 2023
b3bcfe5
Empty commit to trigger CI
richardpaulhudson Jan 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 31 additions & 10 deletions spacy/cli/_util.py
@@ -1,5 +1,5 @@
from typing import Dict, Any, Union, List, Optional, Tuple, Iterable
from typing import TYPE_CHECKING, overload
from typing import TYPE_CHECKING, overload, cast
import sys
import shutil
from pathlib import Path
Expand Down Expand Up @@ -221,24 +221,45 @@ def validate_project_commands(config: Dict[str, Any]) -> None:

config (Dict[str, Any]): The loaded config.
"""

def verify_workflow_step(workflow_name: str, step: str) -> None:
richardpaulhudson marked this conversation as resolved.
Show resolved Hide resolved
if step not in command_names:
msg.fail(
f"Unknown command specified in workflow '{workflow_name}': {step}",
f"Workflows can only refer to commands defined in the 'commands' "
f"section of the {PROJECT_FILE}.",
exits=1,
)

command_names = [cmd["name"] for cmd in config.get("commands", [])]
workflows = config.get("workflows", {})
duplicates = set([cmd for cmd in command_names if command_names.count(cmd) > 1])
if duplicates:
err = f"Duplicate commands defined in {PROJECT_FILE}: {', '.join(duplicates)}"
msg.fail(err, exits=1)
for workflow_name, workflow_steps in workflows.items():
for workflow_name, workflow_step_or_lists in workflows.items():
richardpaulhudson marked this conversation as resolved.
Show resolved Hide resolved
if workflow_name in command_names:
err = f"Can't use workflow name '{workflow_name}': name already exists as a command"
msg.fail(err, exits=1)
for step in workflow_steps:
if step not in command_names:
msg.fail(
f"Unknown command specified in workflow '{workflow_name}': {step}",
f"Workflows can only refer to commands defined in the 'commands' "
f"section of the {PROJECT_FILE}.",
exits=1,
)
for step_or_list in workflow_step_or_lists:
if isinstance(step_or_list, str):
verify_workflow_step(workflow_name, step_or_list)
else:
workflow_list = cast(List[str], step_or_list)
richardpaulhudson marked this conversation as resolved.
Show resolved Hide resolved
if len(workflow_list) < 2:
msg.fail(
f"Invalid multiprocessing group within '{workflow_name}'.",
f"A multiprocessing group must reference at least two commands.",
exits=1,
)
if len(workflow_list) != len(set(workflow_list)):
msg.fail(
f"A multiprocessing group within '{workflow_name}' contains a command more than once.",
f"This is not permitted because it is then not possible to determine when to rerun.",
exits=1,
)
for step in workflow_list:
verify_workflow_step(workflow_name, step)


def get_hash(data, exclude: Iterable[str] = tuple()) -> str:
Expand Down
16 changes: 13 additions & 3 deletions spacy/cli/project/document.py
Expand Up @@ -14,8 +14,8 @@
Commands are only re-run if their inputs have changed."""
INTRO_WORKFLOWS = f"""The following workflows are defined by the project. They
can be executed using [`spacy project run [name]`]({DOCS_URL}/api/cli#project-run)
and will run the specified commands in order. Commands are only re-run if their
inputs have changed."""
and will run the specified commands in order. Commands grouped within square brackets
are run in parallel. Commands are only re-run if their inputs have changed."""
INTRO_ASSETS = f"""The following assets are defined by the project. They can
be fetched by running [`spacy project assets`]({DOCS_URL}/api/cli#project-assets)
in the project directory."""
Expand Down Expand Up @@ -69,7 +69,17 @@ def project_document(
md.add(md.table(data, ["Command", "Description"]))
# Workflows
wfs = config.get("workflows", {}).items()
data = [(md.code(n), " &rarr; ".join(md.code(w) for w in stp)) for n, stp in wfs]
data = []
for n, steps in wfs:
rendered_steps = []
for step in steps:
if isinstance(step, str):
rendered_steps.append(md.code(step))
else:
rendered_steps.append(
"[" + ", ".join(md.code(p_step) for p_step in step) + "]"
)
data.append([md.code(n), " &rarr; ".join(rendered_steps)])
if data:
md.add(md.title(3, "Workflows", "⏭"))
md.add(INTRO_WORKFLOWS)
Expand Down
10 changes: 8 additions & 2 deletions spacy/cli/project/dvc.py
@@ -1,5 +1,5 @@
"""This module contains helpers and subcommands for integrating spaCy projects
with Data Version Controk (DVC). https://dvc.org"""
with Data Version Control (DVC). https://dvc.org"""
from typing import Dict, Any, List, Optional, Iterable
import subprocess
from pathlib import Path
Expand Down Expand Up @@ -105,7 +105,13 @@ def update_dvc_config(
dvc_config_path.unlink()
dvc_commands = []
config_commands = {cmd["name"]: cmd for cmd in config.get("commands", [])}
for name in workflows[workflow]:
names = []
for cmdOrMultiprocessingGroup in workflows[workflow]:
if isinstance(cmdOrMultiprocessingGroup, str):
names.append(cmdOrMultiprocessingGroup)
else:
names.extend(cmdOrMultiprocessingGroup)
for name in names:
command = config_commands[name]
deps = command.get("deps", [])
outputs = command.get("outputs", [])
Expand Down
4 changes: 3 additions & 1 deletion spacy/cli/project/pull.py
@@ -1,3 +1,4 @@
import multiprocessing
from pathlib import Path
from wasabi import msg
from .remote_storage import RemoteStorage
Expand Down Expand Up @@ -37,6 +38,7 @@ def project_pull(project_dir: Path, remote: str, *, verbose: bool = False):
# We use a while loop here because we don't know how the commands
# will be ordered. A command might need dependencies from one that's later
# in the list.
mult_group_mutex = multiprocessing.Lock()
while commands:
for i, cmd in enumerate(list(commands)):
logger.debug(f"CMD: {cmd['name']}.")
Expand All @@ -52,7 +54,7 @@ def project_pull(project_dir: Path, remote: str, *, verbose: bool = False):

out_locs = [project_dir / out for out in cmd.get("outputs", [])]
if all(loc.exists() for loc in out_locs):
update_lockfile(project_dir, cmd)
update_lockfile(project_dir, cmd, mult_group_mutex=mult_group_mutex)
# We remove the command from the list here, and break, so that
# we iterate over the loop again.
commands.pop(i)
Expand Down
148 changes: 98 additions & 50 deletions spacy/cli/project/run.py
@@ -1,5 +1,7 @@
from typing import Optional, List, Dict, Sequence, Any, Iterable
from typing import Optional, List, Dict, Sequence, Any, Iterable, cast
from pathlib import Path
from multiprocessing import Process, Lock
from multiprocessing.synchronize import Lock as Lock_t
from wasabi import msg
from wasabi.util import locale_escape
import sys
Expand Down Expand Up @@ -50,6 +52,7 @@ def project_run(
force: bool = False,
dry: bool = False,
capture: bool = False,
mult_group_mutex: Optional[Lock_t] = None,
) -> None:
"""Run a named script defined in the project.yml. If the script is part
of the default pipeline (defined in the "run" section), DVC is used to
Expand All @@ -67,21 +70,44 @@ def project_run(
when you want to turn over execution to the command, and capture=True
when you want to run the command more like a function.
"""
if mult_group_mutex is None:
mult_group_mutex = Lock()
config = load_project_config(project_dir, overrides=overrides)
commands = {cmd["name"]: cmd for cmd in config.get("commands", [])}
workflows = config.get("workflows", {})
validate_subcommand(list(commands.keys()), list(workflows.keys()), subcommand)
if subcommand in workflows:
msg.info(f"Running workflow '{subcommand}'")
for cmd in workflows[subcommand]:
project_run(
project_dir,
cmd,
overrides=overrides,
force=force,
dry=dry,
capture=capture,
)
for cmdOrMultiprocessingGroup in workflows[subcommand]:
if isinstance(cmdOrMultiprocessingGroup, str):
project_run(
project_dir,
cmdOrMultiprocessingGroup,
overrides=overrides,
force=force,
dry=dry,
capture=capture,
mult_group_mutex=mult_group_mutex,
)
else:
processes = [
Process(
target=project_run,
args=(project_dir, cmd),
kwargs={
"overrides": overrides,
"force": force,
"dry": dry,
"capture": capture,
"mult_group_mutex": mult_group_mutex,
},
)
for cmd in cmdOrMultiprocessingGroup
]
for process in processes:
process.start()
for process in processes:
process.join()
else:
cmd = commands[subcommand]
for dep in cmd.get("deps", []):
Expand All @@ -93,13 +119,18 @@ def project_run(
check_spacy_commit = check_bool_env_var(ENV_VARS.PROJECT_USE_GIT_VERSION)
with working_dir(project_dir) as current_dir:
msg.divider(subcommand)
rerun = check_rerun(current_dir, cmd, check_spacy_commit=check_spacy_commit)
rerun = check_rerun(
current_dir,
cmd,
check_spacy_commit=check_spacy_commit,
mult_group_mutex=mult_group_mutex,
)
if not rerun and not force:
msg.info(f"Skipping '{cmd['name']}': nothing changed")
else:
run_commands(cmd["script"], dry=dry, capture=capture)
if not dry:
update_lockfile(current_dir, cmd)
update_lockfile(current_dir, cmd, mult_group_mutex=mult_group_mutex)


def print_run_help(project_dir: Path, subcommand: Optional[str] = None) -> None:
Expand All @@ -123,7 +154,12 @@ def print_run_help(project_dir: Path, subcommand: Optional[str] = None) -> None:
if help_text:
print(f"\n{help_text}\n")
elif subcommand in workflows:
steps = workflows[subcommand]
steps = []
for cmdOrMultiprocessingGroup in workflows[subcommand]:
if isinstance(cmdOrMultiprocessingGroup, str):
steps.append(cmdOrMultiprocessingGroup)
else:
steps.extend(cmdOrMultiprocessingGroup)
print(f"\nWorkflow consisting of {len(steps)} commands:")
steps_data = [
(f"{i + 1}. {step}", commands[step].get("help", ""))
Expand Down Expand Up @@ -157,7 +193,7 @@ def run_commands(

commands (List[str]): The string commands.
silent (bool): Don't print the commands.
dry (bool): Perform a dry run and don't execut anything.
dry (bool): Perform a dry run and don't execute anything.
capture (bool): Whether to capture the output and errors of individual commands.
If False, the stdout and stderr will not be redirected, and if there's an error,
sys.exit will be called with the return code. You should use capture=False
Expand Down Expand Up @@ -212,6 +248,7 @@ def check_rerun(
*,
check_spacy_version: bool = True,
check_spacy_commit: bool = False,
mult_group_mutex: Lock_t,
) -> bool:
"""Check if a command should be rerun because its settings or inputs/outputs
changed.
Expand All @@ -224,51 +261,62 @@ def check_rerun(
# Always rerun if no-skip is set
if command.get("no_skip", False):
return True
lock_path = project_dir / PROJECT_LOCK
if not lock_path.exists(): # We don't have a lockfile, run command
return True
data = srsly.read_yaml(lock_path)
if command["name"] not in data: # We don't have info about this command
return True
entry = data[command["name"]]
# Always run commands with no outputs (otherwise they'd always be skipped)
if not entry.get("outs", []):
return True
# Always rerun if spaCy version or commit hash changed
spacy_v = entry.get("spacy_version")
commit = entry.get("spacy_git_version")
if check_spacy_version and not is_minor_version_match(spacy_v, about.__version__):
info = f"({spacy_v} in {PROJECT_LOCK}, {about.__version__} current)"
msg.info(f"Re-running '{command['name']}': spaCy minor version changed {info}")
return True
if check_spacy_commit and commit != GIT_VERSION:
info = f"({commit} in {PROJECT_LOCK}, {GIT_VERSION} current)"
msg.info(f"Re-running '{command['name']}': spaCy commit changed {info}")
return True
# If the entry in the lockfile matches the lockfile entry that would be
# generated from the current command, we don't rerun because it means that
# all inputs/outputs, hashes and scripts are the same and nothing changed
lock_entry = get_lock_entry(project_dir, command)
exclude = ["spacy_version", "spacy_git_version"]
return get_hash(lock_entry, exclude=exclude) != get_hash(entry, exclude=exclude)
with mult_group_mutex:
lock_path = project_dir / PROJECT_LOCK
if not lock_path.exists(): # We don't have a lockfile, run command
return True
data = srsly.read_yaml(lock_path)
if command["name"] not in data: # We don't have info about this command
return True
entry = data[command["name"]]
# Always run commands with no outputs (otherwise they'd always be skipped)
if not entry.get("outs", []):
return True
# Always rerun if spaCy version or commit hash changed
spacy_v = entry.get("spacy_version")
commit = entry.get("spacy_git_version")
if check_spacy_version and not is_minor_version_match(
spacy_v, about.__version__
):
info = f"({spacy_v} in {PROJECT_LOCK}, {about.__version__} current)"
msg.info(
f"Re-running '{command['name']}': spaCy minor version changed {info}"
)
return True
if check_spacy_commit and commit != GIT_VERSION:
info = f"({commit} in {PROJECT_LOCK}, {GIT_VERSION} current)"
msg.info(f"Re-running '{command['name']}': spaCy commit changed {info}")
return True
# If the entry in the lockfile matches the lockfile entry that would be
# generated from the current command, we don't rerun because it means that
# all inputs/outputs, hashes and scripts are the same and nothing changed
lock_entry = get_lock_entry(project_dir, command)
exclude = ["spacy_version", "spacy_git_version"]
return get_hash(lock_entry, exclude=exclude) != get_hash(entry, exclude=exclude)


def update_lockfile(project_dir: Path, command: Dict[str, Any]) -> None:
def update_lockfile(
project_dir: Path,
command: Dict[str, Any],
mult_group_mutex: Lock_t,
) -> None:
"""Update the lockfile after running a command. Will create a lockfile if
it doesn't yet exist and will add an entry for the current command, its
script and dependencies/outputs.

project_dir (Path): The current project directory.
command (Dict[str, Any]): The command, as defined in the project.yml.
mult_group_mutex: the mutex preventing concurrent writes
"""
lock_path = project_dir / PROJECT_LOCK
if not lock_path.exists():
srsly.write_yaml(lock_path, {})
data = {}
else:
data = srsly.read_yaml(lock_path)
data[command["name"]] = get_lock_entry(project_dir, command)
srsly.write_yaml(lock_path, data)
with mult_group_mutex:
lock_path = project_dir / PROJECT_LOCK
if not lock_path.exists():
srsly.write_yaml(lock_path, {})
data = {}
else:
data = srsly.read_yaml(lock_path)
data[command["name"]] = get_lock_entry(project_dir, command)
srsly.write_yaml(lock_path, data)


def get_lock_entry(project_dir: Path, command: Dict[str, Any]) -> Dict[str, Any]:
Expand Down
4 changes: 2 additions & 2 deletions spacy/schemas.py
Expand Up @@ -458,8 +458,8 @@ class ProjectConfigSchema(BaseModel):
vars: Dict[StrictStr, Any] = Field({}, title="Optional variables to substitute in commands")
env: Dict[StrictStr, Any] = Field({}, title="Optional variable names to substitute in commands, mapped to environment variable names")
assets: List[Union[ProjectConfigAssetURL, ProjectConfigAssetGit]] = Field([], title="Data assets")
workflows: Dict[StrictStr, List[StrictStr]] = Field({}, title="Named workflows, mapped to list of project commands to run in order")
commands: List[ProjectConfigCommand] = Field([], title="Project command shortucts")
workflows: Dict[StrictStr, List[Union[StrictStr, List[StrictStr]]]] = Field({}, title="Named workflows, mapped to list of project commands to run in order")
commands: List[ProjectConfigCommand] = Field([], title="Project command shortcuts")
title: Optional[str] = Field(None, title="Project title")
spacy_version: Optional[StrictStr] = Field(None, title="spaCy version range that the project is compatible with")
# fmt: on
Expand Down