Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
tchaton committed Jun 18, 2024
1 parent 1e5caf4 commit 2d4f662
Show file tree
Hide file tree
Showing 6 changed files with 377 additions and 138 deletions.
110 changes: 109 additions & 1 deletion src/lightning/app/cli/lightning_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,19 @@
from lightning.app.cli.lightning_cli_delete import delete
from lightning.app.cli.lightning_cli_launch import launch
from lightning.app.cli.lightning_cli_list import get_list
from lightning.app.core.constants import ENABLE_APP_COMMENT_COMMAND_EXECUTION, get_lightning_cloud_url
from lightning.app.core.constants import (
APP_SERVER_HOST,
APP_SERVER_PORT,
ENABLE_APP_COMMENT_COMMAND_EXECUTION,
get_lightning_cloud_url,
)
from lightning.app.launcher.launcher import (
run_lightning_flow,
run_lightning_work,
serve_frontend,
start_application_server,
start_flow_and_servers,
)
from lightning.app.runners.cloud import CloudRuntime
from lightning.app.runners.runtime import dispatch
from lightning.app.runners.runtime_type import RuntimeType
Expand Down Expand Up @@ -393,3 +405,99 @@ def _prepare_file(file: str) -> str:
return file

raise FileNotFoundError(f"The provided file {file} hasn't been found.")


@run.command("server")
@click.argument("file", type=click.Path(exists=True))
@click.option("--queue-id", help="ID for identifying queue", default="", type=str)
@click.option("--host", help="Application running host", default=APP_SERVER_HOST, type=str)
@click.option("--port", help="Application running port", default=APP_SERVER_PORT, type=int)
def run_server(file: str, queue_id: str, host: str, port: int):
"""It takes the application file as input, build the application object and then use that to run the application
server.
This is used by the cloud runners to start the status server for the application
"""
logger.debug(f"Run Server: {file} {queue_id} {host} {port}")
start_application_server(file, host, port, queue_id=queue_id)


@run.command("flow")
@click.argument("file", type=click.Path(exists=True))
@click.option("--queue-id", help="ID for identifying queue", default="", type=str)
@click.option("--base-url", help="Base url at which the app server is hosted", default="")
def run_flow(file: str, queue_id: str, base_url: str):
"""It takes the application file as input, build the application object, proxy all the work components and then run
the application flow defined in the root component.
It does exactly what a singleprocess dispatcher would do but with proxied work components.
"""
logger.debug(f"Run Flow: {file} {queue_id} {base_url}")
run_lightning_flow(file, queue_id=queue_id, base_url=base_url)


@run.command("work")
@click.argument("file", type=click.Path(exists=True))
@click.option("--work-name", type=str)
@click.option("--queue-id", help="ID for identifying queue", default="", type=str)
def run_work(file: str, work_name: str, queue_id: str):
"""Unlike other entrypoints, this command will take the file path or module details for a work component and run
that by fetching the states from the queues."""
logger.debug(f"Run Work: {file} {work_name} {queue_id}")
run_lightning_work(
file=file,
work_name=work_name,
queue_id=queue_id,
)


@run.command("frontend")
@click.argument("file", type=click.Path(exists=True))
@click.option("--flow-name")
@click.option("--host")
@click.option("--port", type=int)
def run_frontend(file: str, flow_name: str, host: str, port: int):
"""Serve the frontend specified by the given flow."""
logger.debug(f"Run Frontend: {file} {flow_name} {host}")
serve_frontend(file=file, flow_name=flow_name, host=host, port=port)


@run.command("flow-and-servers")
@click.argument("file", type=click.Path(exists=True))
@click.option("--queue-id", help="ID for identifying queue", default="", type=str)
@click.option("--base-url", help="Base url at which the app server is hosted", default="")
@click.option("--host", help="Application running host", default=APP_SERVER_HOST, type=str)
@click.option("--port", help="Application running port", default=APP_SERVER_PORT, type=int)
@click.option(
"--flow-port",
help="Pair of flow name and frontend port",
type=(str, int),
multiple=True,
)
def run_flow_and_servers(
file: str,
base_url: str,
queue_id: str,
host: str,
port: int,
flow_port: Tuple[Tuple[str, int]],
):
"""It takes the application file as input, build the application object and then use that to run the application
flow defined in the root component, the application server and all the flow frontends.
This is used by the cloud runners to start the flow, the status server and all frontends for the application
"""
logger.debug(f"Run Flow: {file} {queue_id} {base_url}")
logger.debug(f"Run Server: {file} {queue_id} {host} {port}.")
logger.debug(f"Run Frontend's: {flow_port}")
start_flow_and_servers(
entrypoint_file=file,
base_url=base_url,
queue_id=queue_id,
host=host,
port=port,
flow_names_and_ports=flow_port,
)
137 changes: 77 additions & 60 deletions src/lightning/app/launcher/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,38 @@
from multiprocessing import Process
from typing import Callable, Dict, List, Optional, Tuple, TypedDict

ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER = bool(int(os.getenv("ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", "0")))

if True: # ToDo: Avoid Module level import not at top of file
from lightning.app.core import constants
from lightning.app.core.api import start_server
from lightning.app.core.flow import LightningFlow
from lightning.app.core.queues import MultiProcessQueue, QueuingSystem
from lightning.app.storage.orchestrator import StorageOrchestrator
from tabulate import tabulate

from lightning.app import LightningFlow
from lightning.app.core import constants
from lightning.app.core.api import start_server
from lightning.app.core.constants import (
CHECK_ERROR_QUEUE_INTERVAL,
ENABLE_ORCHESTRATOR,
enable_multiple_works_in_default_container,
)
from lightning.app.core.queues import MultiProcessQueue, QueuingSystem
from lightning.app.storage.orchestrator import StorageOrchestrator
from lightning.app.utilities.cloud import _sigterm_flow_handler
from lightning.app.utilities.component import _set_flow_context, _set_frontend_context
from lightning.app.utilities.enum import AppStage
from lightning.app.utilities.exceptions import ExitAppException
from lightning.app.utilities.load_app import extract_metadata_from_app, load_app_from_file
from lightning.app.utilities.proxies import WorkRunner
from lightning.app.utilities.redis import check_if_redis_running

try:
from lightning.app.utilities.app_commands import run_app_commands
from lightning.app.utilities.cloud import _sigterm_flow_handler
from lightning.app.utilities.component import _set_flow_context, _set_frontend_context
from lightning.app.utilities.enum import AppStage
from lightning.app.utilities.exceptions import ExitAppException
from lightning.app.utilities.load_app import extract_metadata_from_app, load_app_from_file
from lightning.app.utilities.proxies import WorkRunner
from lightning.app.utilities.redis import check_if_redis_running

if ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER:

ABLE_TO_RUN_APP_COMMANDS = True
except (ImportError, ModuleNotFoundError):
ABLE_TO_RUN_APP_COMMANDS = False

if enable_multiple_works_in_default_container():
from lightning.app.launcher.lightning_hybrid_backend import CloudHybridBackend as CloudBackend
else:
from lightning.app.launcher.lightning_backend import CloudBackend

if True: # Avoid Module level import not at top of file
from lightning.app.utilities.app_helpers import convert_print_to_logger_info
from lightning.app.utilities.packaging.lightning_utils import enable_debugging
from lightning.app.launcher.utils import LIGHTNING_VERSION, convert_print_to_logger_info, enable_debugging, exit_app

if hasattr(constants, "get_cloud_queue_type"):
CLOUD_QUEUE_TYPE = constants.get_cloud_queue_type() or "redis"
Expand All @@ -48,6 +55,22 @@ class FlowRestAPIQueues(TypedDict):
api_response_queue: MultiProcessQueue


def check_error_queue(self) -> None:
if not getattr(self, "_last_check_error_queue", None):
self._last_check_error_queue = 0.0

if (time.time() - self._last_check_error_queue) > CHECK_ERROR_QUEUE_INTERVAL:
exception: Exception = self.get_state_changed_from_queue(self.error_queue) # type: ignore[assignment,arg-type]
if isinstance(exception, Exception):
self.exception = exception
self.stage = AppStage.FAILED
self._last_check_error_queue = time.time()


def patch_app(app):
app.check_error_queue = partial(check_error_queue, self=app)


@convert_print_to_logger_info
@enable_debugging
def start_application_server(
Expand All @@ -72,6 +95,7 @@ def start_application_server(
})

app = load_app_from_file(entrypoint_file)
patch_app(app)

from lightning.app.api.http_methods import _add_tags_to_api, _validate_api
from lightning.app.utilities.app_helpers import is_overridden
Expand Down Expand Up @@ -124,7 +148,8 @@ def run_lightning_work(
copy_request_queues = queues.get_orchestrator_copy_request_queue(work_name=work_name, queue_id=queue_id)
copy_response_queues = queues.get_orchestrator_copy_response_queue(work_name=work_name, queue_id=queue_id)

run_app_commands(file)
if ABLE_TO_RUN_APP_COMMANDS:
run_app_commands(file)

load_app_from_file(file)

Expand Down Expand Up @@ -179,15 +204,17 @@ def run_lightning_flow(entrypoint_file: str, queue_id: str, base_url: str, queue

app.should_publish_changes_to_api = True

storage_orchestrator = StorageOrchestrator(
app,
app.request_queues,
app.response_queues,
app.copy_request_queues,
app.copy_response_queues,
)
storage_orchestrator.setDaemon(True)
storage_orchestrator.start()
# reduces the number of requests to the CP
if ENABLE_ORCHESTRATOR:
storage_orchestrator = StorageOrchestrator(
app,
app.request_queues,
app.response_queues,
app.copy_request_queues,
app.copy_response_queues,
)
storage_orchestrator.setDaemon(True)
storage_orchestrator.start()

# refresh the layout with the populated urls.
app._update_layout()
Expand All @@ -211,14 +238,16 @@ def run_lightning_flow(entrypoint_file: str, queue_id: str, base_url: str, queue
app.stage = AppStage.FAILED
print(traceback.format_exc())

storage_orchestrator.join(0)
if ENABLE_ORCHESTRATOR:
storage_orchestrator.join(0)

app.backend.stop_all_works(app.works)

exit_code = 1 if app.stage == AppStage.FAILED else 0
print(f"Finishing the App with exit_code: {str(exit_code)}...")

if not exit_code:
app.backend.stop_app(app)
exit_app(app)

sys.exit(exit_code)

Expand All @@ -243,32 +272,12 @@ def serve_frontend(file: str, flow_name: str, host: str, port: int):
frontend.start_server(host, port)


def start_server_in_process(target: Callable, args: Tuple = (), kwargs: Dict = {}) -> Process:
def start_server_in_process(target: Callable, args: Tuple = tuple(), kwargs: Dict = {}) -> Process:
p = Process(target=target, args=args, kwargs=kwargs)
p.start()
return p


def format_row(elements, col_widths, padding=1):
elements = [el.ljust(w - padding * 2) for el, w in zip(elements, col_widths)]
pad = " " * padding
elements = [f"{pad}{el}{pad}" for el in elements]
return f'|{"|".join(elements)}|'


def tabulate(data, headers):
data = [[str(el) for el in row] for row in data]
col_widths = [len(el) for el in headers]
for row in data:
col_widths = [max(len(el), curr) for el, curr in zip(row, col_widths)]
col_widths = [w + 2 for w in col_widths]
seps = ["-" * w for w in col_widths]
lines = [format_row(headers, col_widths), format_row(seps, col_widths, padding=0)] + [
format_row(row, col_widths) for row in data
]
return "\n".join(lines)


def manage_server_processes(processes: List[Tuple[str, Process]]) -> None:
if not processes:
return
Expand Down Expand Up @@ -309,6 +318,7 @@ def _sigterm_handler(*_):
tabulate(
[(name, p.exitcode) for name, p in processes if not p.is_alive() and p.exitcode != 0],
headers=["Name", "Exit Code"],
tablefmt="github",
)
)
exitcode = 1
Expand Down Expand Up @@ -385,12 +395,13 @@ def start_flow_and_servers(
"api_response_queue": queue_system.get_api_response_queue(queue_id=queue_id),
}

# In order to avoid running this function 3 seperate times while executing the
# `run_lightning_flow`, `start_application_server`, & `serve_frontend` functions
# in a subprocess we extract this to the top level. If we intend to make changes
# to be able to start these components in seperate containers, the implementation
# will have to move a call to this function within the initialization process.
run_app_commands(entrypoint_file)
if ABLE_TO_RUN_APP_COMMANDS:
# In order to avoid running this function 3 seperate times while executing the
# `run_lightning_flow`, `start_application_server`, & `serve_frontend` functions
# in a subprocess we extract this to the top level. If we intend to make changes
# to be able to start these components in seperate containers, the implementation
# will have to move a call to this function within the initialization process.
run_app_commands(entrypoint_file)

flow_process = start_server_in_process(
run_lightning_flow,
Expand Down Expand Up @@ -434,6 +445,12 @@ def wait_for_queues(queue_system: QueuingSystem) -> None:
logger.warning("Waiting for http queues to start...")
time.sleep(1)
else:
if CLOUD_QUEUE_TYPE != "redis":
raise ValueError(
f"Queue system {queue_system} is not correctly configured. You seem to have requested HTTP queues,"
f"but using an old version of lightning framework ({LIGHTNING_VERSION}) that doesn't support "
f"HTTP queues. Try upgrading lightning framework to the latest version."
)
while not check_if_redis_running():
if (int(time.time()) - queue_check_start_time) % 10 == 0:
logger.warning("Waiting for redis queues to start...")
Expand Down
Loading

0 comments on commit 2d4f662

Please sign in to comment.