Skip to content

Commit

Permalink
[App] Add CloudMultiProcessBackend to run an children App within the …
Browse files Browse the repository at this point in the history
…Flow in the cloud (#15800)

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* updte

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* Update src/lightning_app/CHANGELOG.md

Co-authored-by: Ethan Harris <ethanwharris@gmail.com>

* Update src/lightning_app/utilities/port.py

Co-authored-by: Ethan Harris <ethanwharris@gmail.com>

* Update src/lightning_app/utilities/port.py

Co-authored-by: Ethan Harris <ethanwharris@gmail.com>

* Update src/lightning_app/utilities/port.py

Co-authored-by: Ethan Harris <ethanwharris@gmail.com>

* Update src/lightning_app/utilities/port.py

Co-authored-by: Ethan Harris <ethanwharris@gmail.com>

* Update src/lightning_app/utilities/port.py

Co-authored-by: Ethan Harris <ethanwharris@gmail.com>

* Update src/lightning_app/utilities/port.py

Co-authored-by: Ethan Harris <ethanwharris@gmail.com>

Co-authored-by: Ethan Harris <ethanwharris@gmail.com>
  • Loading branch information
tchaton and ethanwharris committed Nov 24, 2022
1 parent 0a12731 commit 8ca6dfe
Show file tree
Hide file tree
Showing 11 changed files with 312 additions and 10 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Expand Up @@ -138,6 +138,7 @@ module = [
"lightning_app.utilities.packaging.cloud_compute",
"lightning_app.utilities.packaging.docker",
"lightning_app.utilities.packaging.lightning_utils",
"lightning_app.utilities.port",
"lightning_app.utilities.proxies",
"lightning_app.utilities.scheduler",
"lightning_app.utilities.state",
Expand Down
3 changes: 3 additions & 0 deletions src/lightning_app/CHANGELOG.md
Expand Up @@ -13,6 +13,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- Show a message when `BuildConfig(requirements=[...])` is passed but a `requirements.txt` file is already present in the Work ([#15799](https://github.com/Lightning-AI/lightning/pull/15799))
- Show a message when `BuildConfig(dockerfile="...")` is passed but a `Dockerfile` file is already present in the Work ([#15799](https://github.com/Lightning-AI/lightning/pull/15799))

- Added a CloudMultiProcessBackend which enables running a child App from within the Flow in the cloud ([#15800](https://github.com/Lightning-AI/lightning/pull/15800))


### Changed

- `lightning add ssh-key` CLI command has been transitioned to `lightning create ssh-key` with the same calling signature ([#15761](https://github.com/Lightning-AI/lightning/pull/15761))
Expand Down
2 changes: 1 addition & 1 deletion src/lightning_app/cli/lightning_cli.py
Expand Up @@ -54,7 +54,7 @@ def get_app_url(runtime_type: RuntimeType, *args: Any, need_credits: bool = Fals
action = "?action=add_credits" if need_credits else ""
return f"{get_lightning_cloud_url()}/me/apps/{lit_app.id}{action}"
else:
return "http://127.0.0.1:7501/view"
return os.getenv("APP_SERVER_HOST", "http://127.0.0.1:7501/view")


def main() -> None:
Expand Down
6 changes: 5 additions & 1 deletion src/lightning_app/core/app.py
Expand Up @@ -138,6 +138,7 @@ def __init__(
self._schedules: Dict[str, Dict] = {}
self.threads: List[threading.Thread] = []
self.exception = None
self.collect_changes: bool = True

# NOTE: Checkpointing is disabled by default for the time being. We
# will enable it when resuming from full checkpoint is supported. Also,
Expand Down Expand Up @@ -362,11 +363,14 @@ def _collect_deltas_from_ui_and_work_queues(self) -> List[Union[Delta, _APIReque
delta.raise_errors = False
return deltas

def maybe_apply_changes(self) -> bool:
def maybe_apply_changes(self) -> None:
"""Get the deltas from both the flow queue and the work queue, merge the two deltas and update the
state."""
self._send_flow_to_work_deltas(self.state)

if not self.collect_changes:
return None

deltas = self._collect_deltas_from_ui_and_work_queues()

if not deltas:
Expand Down
12 changes: 8 additions & 4 deletions src/lightning_app/core/constants.py
Expand Up @@ -3,6 +3,8 @@

import lightning_cloud.env

from lightning_app.utilities.port import _find_lit_app_port


def get_lightning_cloud_url() -> str:
# DO NOT CHANGE!
Expand All @@ -19,7 +21,8 @@ def get_lightning_cloud_url() -> str:
FLOW_DURATION_SAMPLES = 5

APP_SERVER_HOST = os.getenv("LIGHTNING_APP_STATE_URL", "http://127.0.0.1")
APP_SERVER_PORT = 7501
APP_SERVER_IN_CLOUD = "http://lightningapp" in APP_SERVER_HOST
APP_SERVER_PORT = _find_lit_app_port(7501)
APP_STATE_MAX_SIZE_BYTES = 1024 * 1024 # 1 MB

CLOUD_QUEUE_TYPE = os.getenv("LIGHTNING_CLOUD_QUEUE_TYPE", None)
Expand Down Expand Up @@ -52,9 +55,6 @@ def get_lightning_cloud_url() -> str:

# EXPERIMENTAL: ENV VARIABLES TO ENABLE MULTIPLE WORKS IN THE SAME MACHINE
DEFAULT_NUMBER_OF_EXPOSED_PORTS = int(os.getenv("DEFAULT_NUMBER_OF_EXPOSED_PORTS", "50"))
ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER = bool(
int(os.getenv("ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", "0"))
) # Note: This is disabled for the time being.
ENABLE_MULTIPLE_WORKS_IN_NON_DEFAULT_CONTAINER = bool(
int(os.getenv("ENABLE_MULTIPLE_WORKS_IN_NON_DEFAULT_CONTAINER", "0"))
) # This isn't used in the cloud yet.
Expand All @@ -71,3 +71,7 @@ def get_lightning_cloud_url() -> str:
)
ENABLE_STATE_WEBSOCKET = bool(int(os.getenv("ENABLE_STATE_WEBSOCKET", "0")))
ENABLE_UPLOAD_ENDPOINT = bool(int(os.getenv("ENABLE_UPLOAD_ENDPOINT", "1")))


def enable_multiple_works_in_default_container() -> bool:
return bool(int(os.getenv("ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", "0")))
5 changes: 4 additions & 1 deletion src/lightning_app/runners/backends/__init__.py
@@ -1,9 +1,10 @@
from enum import Enum

from lightning_app.core.constants import APP_SERVER_IN_CLOUD
from lightning_app.runners.backends.backend import Backend
from lightning_app.runners.backends.cloud import CloudBackend
from lightning_app.runners.backends.docker import DockerBackend
from lightning_app.runners.backends.mp_process import MultiProcessingBackend
from lightning_app.runners.backends.mp_process import CloudMultiProcessingBackend, MultiProcessingBackend


class BackendType(Enum):
Expand All @@ -13,6 +14,8 @@ class BackendType(Enum):

def get_backend(self, entrypoint_file: str) -> "Backend":
if self == BackendType.MULTIPROCESSING:
if APP_SERVER_IN_CLOUD:
return CloudMultiProcessingBackend(entrypoint_file)
return MultiProcessingBackend(entrypoint_file)
elif self == BackendType.DOCKER:
return DockerBackend(entrypoint_file)
Expand Down
22 changes: 22 additions & 0 deletions src/lightning_app/runners/backends/mp_process.py
Expand Up @@ -6,6 +6,7 @@
from lightning_app.runners.backends.backend import Backend, WorkManager
from lightning_app.utilities.enum import WorkStageStatus
from lightning_app.utilities.network import _check_service_url_is_ready
from lightning_app.utilities.port import disable_port, enable_port
from lightning_app.utilities.proxies import ProxyWorkRun, WorkRunner


Expand Down Expand Up @@ -83,3 +84,24 @@ def resolve_url(self, app, base_url: Optional[str] = None) -> None:
def stop_work(self, app, work: "lightning_app.LightningWork") -> None:
work_manager: MultiProcessWorkManager = app.processes[work.name]
work_manager.kill()


class CloudMultiProcessingBackend(MultiProcessingBackend):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

# Note: Track the open ports to close them on termination.
self.ports = []

def create_work(self, app, work) -> None:
work._host = "0.0.0.0"
nc = enable_port()
self.ports.append(nc.port)
work._port = nc.port
work._future_url = f"https://{nc.host}"
return super().create_work(app, work)

def stop_work(self, app, work: "lightning_app.LightningWork") -> None:
disable_port(work._port)
self.ports = [port for port in self.ports if port != work._port]
return super().stop_work(app, work)
6 changes: 3 additions & 3 deletions src/lightning_app/runners/cloud.py
Expand Up @@ -51,7 +51,7 @@
DISABLE_DEPENDENCY_CACHE,
DOT_IGNORE_FILENAME,
ENABLE_APP_COMMENT_COMMAND_EXECUTION,
ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER,
enable_multiple_works_in_default_container,
ENABLE_MULTIPLE_WORKS_IN_NON_DEFAULT_CONTAINER,
ENABLE_PULLING_STATE_ENDPOINT,
ENABLE_PUSHING_STATE_ENDPOINT,
Expand Down Expand Up @@ -243,7 +243,7 @@ def dispatch(
if self.run_app_comment_commands or ENABLE_APP_COMMENT_COMMAND_EXECUTION:
v1_env_vars.append(V1EnvVar(name="ENABLE_APP_COMMENT_COMMAND_EXECUTION", value="1"))

if ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER:
if enable_multiple_works_in_default_container():
v1_env_vars.append(V1EnvVar(name="ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", value="1"))

if ENABLE_MULTIPLE_WORKS_IN_NON_DEFAULT_CONTAINER:
Expand Down Expand Up @@ -303,7 +303,7 @@ def dispatch(
)

network_configs: Optional[List[V1NetworkConfig]] = None
if ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER:
if enable_multiple_works_in_default_container():
network_configs = []
initial_port = 8080 + 1 + len(frontend_specs)
for _ in range(DEFAULT_NUMBER_OF_EXPOSED_PORTS):
Expand Down
13 changes: 13 additions & 0 deletions src/lightning_app/runners/multiprocess.py
Expand Up @@ -5,6 +5,7 @@

from lightning_app.api.http_methods import _add_tags_to_api, _validate_api
from lightning_app.core.api import start_server
from lightning_app.core.constants import APP_SERVER_IN_CLOUD
from lightning_app.runners.backends import Backend
from lightning_app.runners.runtime import Runtime
from lightning_app.storage.orchestrator import StorageOrchestrator
Expand All @@ -13,6 +14,7 @@
from lightning_app.utilities.component import _set_flow_context, _set_frontend_context
from lightning_app.utilities.load_app import extract_metadata_from_app
from lightning_app.utilities.network import find_free_network_port
from lightning_app.utilities.port import disable_port


@dataclass
Expand All @@ -31,6 +33,9 @@ def dispatch(self, *args: Any, on_before_run: Optional[Callable] = None, **kwarg
try:
_set_flow_context()

# Note: In case the runtime is used in the cloud.
self.host = "0.0.0.0" if APP_SERVER_IN_CLOUD else self.host

self.app.backend = self.backend
self.backend._prepare_queues(self.app)
self.backend.resolve_url(self.app, "http://127.0.0.1")
Expand Down Expand Up @@ -109,3 +114,11 @@ def dispatch(self, *args: Any, on_before_run: Optional[Callable] = None, **kwarg
raise
finally:
self.terminate()

def terminate(self):
if APP_SERVER_IN_CLOUD:
# Close all the ports open for the App within the App.
ports = [self.port] + getattr(self.backend, "ports", [])
for port in ports:
disable_port(port)
super().terminate()
143 changes: 143 additions & 0 deletions src/lightning_app/utilities/port.py
@@ -0,0 +1,143 @@
import os
from typing import Optional

from lightning_cloud.openapi import AppinstancesIdBody, Externalv1LightningappInstance, V1NetworkConfig

from lightning_app.utilities.network import LightningClient


def _find_lit_app_port(default_port: int) -> int:
"""Make a request to the cloud controlplane to find a disabled port of the flow, enable it and return it."""

app_id = os.getenv("LIGHTNING_CLOUD_APP_ID", None)
project_id = os.getenv("LIGHTNING_CLOUD_PROJECT_ID", None)
enable_multiple_works_in_default_container = bool(int(os.getenv("ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", "0")))

if not app_id or not project_id or not enable_multiple_works_in_default_container:
return default_port

client = LightningClient()
list_apps_resp = client.lightningapp_instance_service_list_lightningapp_instances(project_id=project_id)
lit_app: Optional[Externalv1LightningappInstance] = None

for lightningapp in list_apps_resp.lightningapps:
if lightningapp.id == app_id:
lit_app = lightningapp

if not lit_app:
raise RuntimeError(
"App was not found. Please open an issue at https://github.com/lightning-AI/lightning/issues."
)

found_nc = None

for nc in lit_app.spec.network_config:
if not nc.enable:
found_nc = nc
nc.enable = True
break

client.lightningapp_instance_service_update_lightningapp_instance(
project_id=project_id,
id=lit_app.id,
body=AppinstancesIdBody(name=lit_app.name, spec=lit_app.spec),
)

if not found_nc:
raise RuntimeError(
"No available port was found. Please open an issue at https://github.com/lightning-AI/lightning/issues."
)

# Note: This is required for the framework to know we need to use the CloudMultiProcessRuntime.
os.environ["APP_SERVER_HOST"] = f"https://{found_nc.host}"

return found_nc.port


def enable_port() -> V1NetworkConfig:
"""Make a request to the cloud controlplane to open a port of the flow."""
app_id = os.getenv("LIGHTNING_CLOUD_APP_ID", None)
project_id = os.getenv("LIGHTNING_CLOUD_PROJECT_ID", None)

if not app_id or not project_id:
raise Exception("The app_id and project_id should be defined.")

client = LightningClient()
list_apps_resp = client.lightningapp_instance_service_list_lightningapp_instances(project_id=project_id)
lit_app: Optional[Externalv1LightningappInstance] = None

for lightningapp in list_apps_resp.lightningapps:
if lightningapp.id == app_id:
lit_app = lightningapp

if not lit_app:
raise RuntimeError(
"App was not found. Please open an issue at https://github.com/lightning-AI/lightning/issues."
)

found_nc = None

for nc in lit_app.spec.network_config:
if not nc.enable:
found_nc = nc
nc.enable = True
break

client.lightningapp_instance_service_update_lightningapp_instance(
project_id=project_id,
id=lit_app.id,
body=AppinstancesIdBody(name=lit_app.name, spec=lit_app.spec),
)

if not found_nc:
raise RuntimeError(
"No available port was found. Please open an issue at https://github.com/lightning-AI/lightning/issues."
)

return found_nc


def disable_port(port: int, ignore_disabled: bool = True) -> None:
"""Make a request to the cloud controlplane to close a port of the flow."""

app_id = os.getenv("LIGHTNING_CLOUD_APP_ID", None)
project_id = os.getenv("LIGHTNING_CLOUD_PROJECT_ID", None)

if not app_id or not project_id:
raise Exception("The app_id and project_id should be defined.")

client = LightningClient()
list_apps_resp = client.lightningapp_instance_service_list_lightningapp_instances(project_id=project_id)
lit_app: Optional[Externalv1LightningappInstance] = None

for lightningapp in list_apps_resp.lightningapps:
if lightningapp.id == app_id:
lit_app = lightningapp

if not lit_app:
raise RuntimeError(
"App was not found. Please open an issue at https://github.com/lightning-AI/lightning/issues."
)

found_nc = None

for nc in lit_app.spec.network_config:
if nc.port == port:
if not nc.enable and not ignore_disabled:
raise RuntimeError(f"The port {port} was already disabled.")

nc.enable = False
found_nc = nc
break

client.lightningapp_instance_service_update_lightningapp_instance(
project_id=project_id,
id=lit_app.id,
body=AppinstancesIdBody(name=lit_app.name, spec=lit_app.spec),
)

if not found_nc:
ports = [nc.port for nc in lit_app.spec.network_config]
raise ValueError(f"The provided port doesn't exists. Available ports are {ports}.")

assert found_nc

0 comments on commit 8ca6dfe

Please sign in to comment.