Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[App] Accelerate Multi Node Startup Time #15650

Merged
merged 21 commits into from Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/lightning_app/CHANGELOG.md
Expand Up @@ -74,6 +74,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- Fixed bi-directional queues sending delta with Drive Component name changes ([#15642](https://github.com/Lightning-AI/lightning/pull/15642))


- Fixed CloudRuntime works collection with structures and accelerated multi node startup time ([#15650](https://github.com/Lightning-AI/lightning/pull/15650))


## [1.8.0] - 2022-11-01

Expand Down
33 changes: 14 additions & 19 deletions src/lightning_app/components/multi_node/base.py
Expand Up @@ -52,31 +52,26 @@ def run(
work_kwargs: Keywords arguments to be provided to the work on instantiation.
"""
super().__init__()
self.ws = structures.List()
self._work_cls = work_cls
self.ws = structures.List(
*[
work_cls(
*work_args,
cloud_compute=cloud_compute,
**work_kwargs,
parallel=True,
)
for _ in range(num_nodes)
]
)
self.num_nodes = num_nodes
self._cloud_compute = cloud_compute
self._work_args = work_args
self._work_kwargs = work_kwargs
self.has_started = False

def run(self) -> None:
if not self.has_started:

# 1. Create & start the works
if not self.ws:
for node_rank in range(self.num_nodes):
self.ws.append(
self._work_cls(
*self._work_args,
cloud_compute=self._cloud_compute,
**self._work_kwargs,
parallel=True,
)
)

# Starting node `node_rank`` ...
self.ws[-1].start()
# 1. Start all the works.
for w in self.ws:
w.start()
tchaton marked this conversation as resolved.
Show resolved Hide resolved

# 2. Wait for all machines to be started !
if not all(w.status.stage == WorkStageStatus.STARTED for w in self.ws):
Expand Down
129 changes: 64 additions & 65 deletions src/lightning_app/runners/cloud.py
Expand Up @@ -142,78 +142,77 @@ def dispatch(
v1_env_vars.append(V1EnvVar(name="ENABLE_PUSHING_STATE_ENDPOINT", value="0"))

works: List[V1Work] = []
for flow in self.app.flows:
for work in flow.works(recurse=False):
if not work._start_with_flow:
continue

work_requirements = "\n".join(work.cloud_build_config.requirements)
build_spec = V1BuildSpec(
commands=work.cloud_build_config.build_commands(),
python_dependencies=V1PythonDependencyInfo(
package_manager=V1PackageManager.PIP, packages=work_requirements
),
image=work.cloud_build_config.image,
)
user_compute_config = V1UserRequestedComputeConfig(
name=work.cloud_compute.name,
count=1,
disk_size=work.cloud_compute.disk_size,
preemptible=work.cloud_compute.preemptible,
shm_size=work.cloud_compute.shm_size,
)
for work in self.app.works:
if not work._start_with_flow:
continue

drive_specs: List[V1LightningworkDrives] = []
for drive_attr_name, drive in [
(k, getattr(work, k)) for k in work._state if isinstance(getattr(work, k), Drive)
]:
if drive.protocol == "lit://":
drive_type = V1DriveType.NO_MOUNT_S3
source_type = V1SourceType.S3
else:
raise RuntimeError(
f"unknown drive protocol `{drive.protocol}`. Please verify this "
f"drive type has been configured for use in the cloud dispatcher."
)
work_requirements = "\n".join(work.cloud_build_config.requirements)
build_spec = V1BuildSpec(
commands=work.cloud_build_config.build_commands(),
python_dependencies=V1PythonDependencyInfo(
package_manager=V1PackageManager.PIP, packages=work_requirements
),
image=work.cloud_build_config.image,
)
user_compute_config = V1UserRequestedComputeConfig(
name=work.cloud_compute.name,
count=1,
disk_size=work.cloud_compute.disk_size,
preemptible=work.cloud_compute.preemptible,
shm_size=work.cloud_compute.shm_size,
)

drive_specs.append(
V1LightningworkDrives(
drive=V1Drive(
metadata=V1Metadata(
name=f"{work.name}.{drive_attr_name}",
),
spec=V1DriveSpec(
drive_type=drive_type,
source_type=source_type,
source=f"{drive.protocol}{drive.id}",
),
status=V1DriveStatus(),
drive_specs: List[V1LightningworkDrives] = []
for drive_attr_name, drive in [
(k, getattr(work, k)) for k in work._state if isinstance(getattr(work, k), Drive)
]:
if drive.protocol == "lit://":
drive_type = V1DriveType.NO_MOUNT_S3
source_type = V1SourceType.S3
else:
raise RuntimeError(
f"unknown drive protocol `{drive.protocol}`. Please verify this "
f"drive type has been configured for use in the cloud dispatcher."
)

drive_specs.append(
V1LightningworkDrives(
drive=V1Drive(
metadata=V1Metadata(
name=f"{work.name}.{drive_attr_name}",
),
spec=V1DriveSpec(
drive_type=drive_type,
source_type=source_type,
source=f"{drive.protocol}{drive.id}",
),
mount_location=str(drive.root_folder),
status=V1DriveStatus(),
),
)
mount_location=str(drive.root_folder),
),
)

# TODO: Move this to the CloudCompute class and update backend
if work.cloud_compute.mounts is not None:
mounts = work.cloud_compute.mounts
if isinstance(mounts, Mount):
mounts = [mounts]
for mount in mounts:
drive_specs.append(
_create_mount_drive_spec(
work_name=work.name,
mount=mount,
)
# TODO: Move this to the CloudCompute class and update backend
if work.cloud_compute.mounts is not None:
mounts = work.cloud_compute.mounts
if isinstance(mounts, Mount):
mounts = [mounts]
for mount in mounts:
drive_specs.append(
_create_mount_drive_spec(
work_name=work.name,
mount=mount,
)
)

random_name = "".join(random.choice(string.ascii_lowercase) for _ in range(5))
work_spec = V1LightningworkSpec(
build_spec=build_spec,
drives=drive_specs,
user_requested_compute_config=user_compute_config,
network_config=[V1NetworkConfig(name=random_name, port=work.port)],
)
works.append(V1Work(name=work.name, spec=work_spec))
random_name = "".join(random.choice(string.ascii_lowercase) for _ in range(5))
work_spec = V1LightningworkSpec(
build_spec=build_spec,
drives=drive_specs,
user_requested_compute_config=user_compute_config,
network_config=[V1NetworkConfig(name=random_name, port=work.port)],
)
works.append(V1Work(name=work.name, spec=work_spec))

# We need to collect a spec for each flow that contains a frontend so that the backend knows
# for which flows it needs to start servers by invoking the cli (see the serve_frontend() method below)
Expand Down
20 changes: 5 additions & 15 deletions tests/tests_app/runners/test_cloud.py
Expand Up @@ -402,7 +402,6 @@ def test_call_with_work_app(self, lightningapps, start_with_flow, monkeypatch, t
monkeypatch.setattr(cloud, "LocalSourceCodeDir", mock.MagicMock())
monkeypatch.setattr(cloud, "_prepare_lightning_wheels_and_requirements", mock.MagicMock())
app = mock.MagicMock()
flow = mock.MagicMock()

work = MyWork(start_with_flow=start_with_flow)
monkeypatch.setattr(work, "_name", "test-work")
Expand All @@ -412,8 +411,7 @@ def test_call_with_work_app(self, lightningapps, start_with_flow, monkeypatch, t
monkeypatch.setattr(work._cloud_compute, "disk_size", 0)
monkeypatch.setattr(work, "_port", 8080)

flow.works = lambda recurse: [work]
app.flows = [flow]
app.works = [work]
cloud_runtime = cloud.CloudRuntime(app=app, entrypoint_file=(source_code_root_dir / "entrypoint.py"))
monkeypatch.setattr(
"lightning_app.runners.cloud._get_project",
Expand Down Expand Up @@ -575,7 +573,6 @@ def test_call_with_work_app_and_attached_drives(self, lightningapps, monkeypatch
monkeypatch.setattr(cloud, "LocalSourceCodeDir", mock.MagicMock())
monkeypatch.setattr(cloud, "_prepare_lightning_wheels_and_requirements", mock.MagicMock())
app = mock.MagicMock()
flow = mock.MagicMock()

mocked_drive = MagicMock(spec=Drive)
setattr(mocked_drive, "id", "foobar")
Expand All @@ -598,8 +595,7 @@ def test_call_with_work_app_and_attached_drives(self, lightningapps, monkeypatch
monkeypatch.setattr(work._cloud_compute, "disk_size", 0)
monkeypatch.setattr(work, "_port", 8080)

flow.works = lambda recurse: [work]
app.flows = [flow]
app.works = [work]
cloud_runtime = cloud.CloudRuntime(app=app, entrypoint_file=(source_code_root_dir / "entrypoint.py"))
monkeypatch.setattr(
"lightning_app.runners.cloud._get_project",
Expand Down Expand Up @@ -712,7 +708,6 @@ def test_call_with_work_app_and_app_comment_command_execution_set(self, lightnin
monkeypatch.setattr(cloud, "LocalSourceCodeDir", mock.MagicMock())
monkeypatch.setattr(cloud, "_prepare_lightning_wheels_and_requirements", mock.MagicMock())
app = mock.MagicMock()
flow = mock.MagicMock()

work = MyWork()
monkeypatch.setattr(work, "_state", {"_port"})
Expand All @@ -723,8 +718,7 @@ def test_call_with_work_app_and_app_comment_command_execution_set(self, lightnin
monkeypatch.setattr(work._cloud_compute, "disk_size", 0)
monkeypatch.setattr(work, "_port", 8080)

flow.works = lambda recurse: [work]
app.flows = [flow]
app.works = [work]
cloud_runtime = cloud.CloudRuntime(app=app, entrypoint_file=(source_code_root_dir / "entrypoint.py"))
monkeypatch.setattr(
"lightning_app.runners.cloud._get_project",
Expand Down Expand Up @@ -829,7 +823,6 @@ def test_call_with_work_app_and_multiple_attached_drives(self, lightningapps, mo
monkeypatch.setattr(cloud, "LocalSourceCodeDir", mock.MagicMock())
monkeypatch.setattr(cloud, "_prepare_lightning_wheels_and_requirements", mock.MagicMock())
app = mock.MagicMock()
flow = mock.MagicMock()

mocked_lit_drive = MagicMock(spec=Drive)
setattr(mocked_lit_drive, "id", "foobar")
Expand All @@ -853,8 +846,7 @@ def test_call_with_work_app_and_multiple_attached_drives(self, lightningapps, mo
monkeypatch.setattr(work._cloud_compute, "disk_size", 0)
monkeypatch.setattr(work, "_port", 8080)

flow.works = lambda recurse: [work]
app.flows = [flow]
app.works = [work]
cloud_runtime = cloud.CloudRuntime(app=app, entrypoint_file=(source_code_root_dir / "entrypoint.py"))
monkeypatch.setattr(
"lightning_app.runners.cloud._get_project",
Expand Down Expand Up @@ -1034,7 +1026,6 @@ def test_call_with_work_app_and_attached_mount_and_drive(self, lightningapps, mo
monkeypatch.setattr(cloud, "LocalSourceCodeDir", mock.MagicMock())
monkeypatch.setattr(cloud, "_prepare_lightning_wheels_and_requirements", mock.MagicMock())
app = mock.MagicMock()
flow = mock.MagicMock()

mocked_drive = MagicMock(spec=Drive)
setattr(mocked_drive, "id", "foobar")
Expand Down Expand Up @@ -1063,8 +1054,7 @@ def test_call_with_work_app_and_attached_mount_and_drive(self, lightningapps, mo
monkeypatch.setattr(work._cloud_compute, "mounts", mocked_mount)
monkeypatch.setattr(work, "_port", 8080)

flow.works = lambda recurse: [work]
app.flows = [flow]
app.works = [work]
cloud_runtime = cloud.CloudRuntime(app=app, entrypoint_file=(source_code_root_dir / "entrypoint.py"))
monkeypatch.setattr(
"lightning_app.runners.cloud._get_project",
Expand Down