Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[App] Remove SingleProcessRuntime #15933

Merged
merged 22 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/source-app/api_references.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,4 @@ _______
:template: classtemplate_no_index.rst

~cloud.CloudRuntime
~singleprocess.SingleProcessRuntime
~multiprocess.MultiProcessRuntime
3 changes: 0 additions & 3 deletions docs/source-app/testing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ We provide ``application_testing`` as a helper funtion to get your application u
os.path.join(_PROJECT_ROOT, "examples/app_v0/app.py"),
"--blocking",
"False",
"--multiprocess",
"--open-ui",
"False",
]
Expand All @@ -129,9 +128,7 @@ First in the list for ``command_line`` is the location of your script. It is an

Next there are a couple of options you can leverage:


* ``blocking`` - Blocking is an app status that says "Do not run until I click run in the UI". For our integration test, since we are not using the UI, we are setting this to "False".
* ``multiprocess/singleprocess`` - This is the runtime your app is expected to run under.
* ``open-ui`` - We set this to false since this is the routine that opens a browser for your local execution.

Once you have your commandline ready, you will then be able to kick off the test and gather results:
Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ module = [
"lightning_app.runners.cloud",
"lightning_app.runners.multiprocess",
"lightning_app.runners.runtime",
"lightning_app.runners.singleprocess",
"lightning_app.source_code.copytree",
"lightning_app.source_code.hashing",
"lightning_app.source_code.local",
Expand Down
2 changes: 1 addition & 1 deletion src/lightning_app/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

### Removed

-
- Removed the `SingleProcessRuntime` ([#15933](https://github.com/Lightning-AI/lightning/pull/15933))


### Fixed
Expand Down
4 changes: 1 addition & 3 deletions src/lightning_app/core/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
FRONTEND_DIR,
STATE_ACCUMULATE_WAIT,
)
from lightning_app.core.queues import BaseQueue, SingleProcessQueue
from lightning_app.core.queues import BaseQueue
from lightning_app.core.work import LightningWork
from lightning_app.frontend import Frontend
from lightning_app.storage import Drive, Path, Payload
Expand Down Expand Up @@ -549,8 +549,6 @@ def _collect_work_finish_status(self) -> dict:
def _should_snapshot(self) -> bool:
if len(self.works) == 0:
return True
elif isinstance(self.delta_queue, SingleProcessQueue):
return True
elif self._has_updated:
work_finished_status = self._collect_work_finish_status()
if work_finished_status:
Expand Down
20 changes: 1 addition & 19 deletions src/lightning_app/core/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@


class QueuingSystem(Enum):
SINGLEPROCESS = "singleprocess"
MULTIPROCESS = "multiprocess"
REDIS = "redis"
HTTP = "http"
Expand All @@ -59,10 +58,8 @@ def get_queue(self, queue_name: str) -> "BaseQueue":
return MultiProcessQueue(queue_name, default_timeout=STATE_UPDATE_TIMEOUT)
elif self == QueuingSystem.REDIS:
return RedisQueue(queue_name, default_timeout=REDIS_QUEUES_READ_DEFAULT_TIMEOUT)
elif self == QueuingSystem.HTTP:
return HTTPQueue(queue_name, default_timeout=STATE_UPDATE_TIMEOUT)
else:
return SingleProcessQueue(queue_name, default_timeout=STATE_UPDATE_TIMEOUT)
return HTTPQueue(queue_name, default_timeout=STATE_UPDATE_TIMEOUT)

def get_api_response_queue(self, queue_id: Optional[str] = None) -> "BaseQueue":
queue_name = f"{queue_id}_{API_RESPONSE_QUEUE_CONSTANT}" if queue_id else API_RESPONSE_QUEUE_CONSTANT
Expand Down Expand Up @@ -179,21 +176,6 @@ def is_running(self) -> bool:
return True


class SingleProcessQueue(BaseQueue):
def __init__(self, name: str, default_timeout: float):
self.name = name
self.default_timeout = default_timeout
self.queue = queue.Queue()

def put(self, item):
self.queue.put(item)

def get(self, timeout: int = None):
if timeout == 0:
timeout = self.default_timeout
return self.queue.get(timeout=timeout, block=(timeout is None))


class MultiProcessQueue(BaseQueue):
def __init__(self, name: str, default_timeout: float):
self.name = name
Expand Down
2 changes: 0 additions & 2 deletions src/lightning_app/runners/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from lightning_app.runners.cloud import CloudRuntime
from lightning_app.runners.multiprocess import MultiProcessRuntime
from lightning_app.runners.runtime import dispatch, Runtime
from lightning_app.runners.singleprocess import SingleProcessRuntime
from lightning_app.utilities.app_commands import run_app_commands
from lightning_app.utilities.load_app import load_app_from_file

Expand All @@ -11,6 +10,5 @@
"run_app_commands",
"Runtime",
"MultiProcessRuntime",
"SingleProcessRuntime",
"CloudRuntime",
]
7 changes: 2 additions & 5 deletions src/lightning_app/runners/runtime_type.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
from enum import Enum
from typing import Type, TYPE_CHECKING

from lightning_app.runners import CloudRuntime, MultiProcessRuntime, SingleProcessRuntime
from lightning_app.runners import CloudRuntime, MultiProcessRuntime

if TYPE_CHECKING:
from lightning_app.runners.runtime import Runtime


class RuntimeType(Enum):
SINGLEPROCESS = "singleprocess"
MULTIPROCESS = "multiprocess"
CLOUD = "cloud"

def get_runtime(self) -> Type["Runtime"]:
if self == RuntimeType.SINGLEPROCESS:
return SingleProcessRuntime
elif self == RuntimeType.MULTIPROCESS:
if self == RuntimeType.MULTIPROCESS:
return MultiProcessRuntime
elif self == RuntimeType.CLOUD:
return CloudRuntime
Expand Down
62 changes: 0 additions & 62 deletions src/lightning_app/runners/singleprocess.py

This file was deleted.

7 changes: 0 additions & 7 deletions src/lightning_app/utilities/app_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,6 @@ def set_served_session_id(self, k, v):
self.store[k].session_id = v


class DistributedMode(enum.Enum):
SINGLEPROCESS = enum.auto()
MULTIPROCESS = enum.auto()
CONTAINER = enum.auto()
GRID = enum.auto()


class _LightningAppRef:
_app_instance: Optional["LightningApp"] = None

Expand Down
35 changes: 2 additions & 33 deletions tests/tests_app/core/test_lightning_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
UIRefresher,
)
from lightning_app.core.constants import APP_SERVER_PORT
from lightning_app.runners import MultiProcessRuntime, SingleProcessRuntime
from lightning_app.runners import MultiProcessRuntime
from lightning_app.storage.drive import Drive
from lightning_app.testing.helpers import _MockQueue
from lightning_app.utilities.component import _set_frontend_context, _set_work_context
Expand Down Expand Up @@ -71,7 +71,6 @@ def run(self):
self.work_a.run()


# TODO: Resolve singleprocess - idea: explore frame calls recursively.
@pytest.mark.parametrize("runtime_cls", [MultiProcessRuntime])
def test_app_state_api(runtime_cls):
"""This test validates the AppState can properly broadcast changes from work within its own process."""
Expand All @@ -85,36 +84,6 @@ def test_app_state_api(runtime_cls):
os.remove("test_app_state_api.txt")


class A2(LightningFlow):
def __init__(self):
super().__init__()
self.var_a = 0
self.a = _A()

def update_state(self):
state = AppState()
# this would download and push data to the REST API.
assert state.a.work_a.var_a == 0
assert state.var_a == 0
state.var_a = -1

def run(self):
if self.var_a == 0:
self.update_state()
elif self.var_a == -1:
self._exit()


# TODO: Find why this test is flaky.
@pytest.mark.skip(reason="flaky test.")
@pytest.mark.parametrize("runtime_cls", [SingleProcessRuntime])
ethanwharris marked this conversation as resolved.
Show resolved Hide resolved
def test_app_state_api_with_flows(runtime_cls, tmpdir):
"""This test validates the AppState can properly broadcast changes from flows."""
app = LightningApp(A2(), log_level="debug")
runtime_cls(app, start_server=True).dispatch()
assert app.root.var_a == -1


class NestedFlow(LightningFlow):
def run(self):
pass
Expand Down Expand Up @@ -181,7 +150,7 @@ def maybe_apply_changes(self):

# FIXME: This test doesn't assert anything
@pytest.mark.skip(reason="TODO: Resolve flaky test.")
@pytest.mark.parametrize("runtime_cls", [SingleProcessRuntime, MultiProcessRuntime])
@pytest.mark.parametrize("runtime_cls", [MultiProcessRuntime])
ethanwharris marked this conversation as resolved.
Show resolved Hide resolved
def test_app_stage_from_frontend(runtime_cls):
"""This test validates that delta from the `api_delta_queue` manipulating the ['app_state']['stage'] would
start and stop the app."""
Expand Down
60 changes: 4 additions & 56 deletions tests/tests_app/core/test_lightning_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from re import escape
from time import sleep
from unittest import mock
from unittest.mock import ANY

import pytest
from deepdiff import Delta
Expand All @@ -19,9 +18,9 @@
REDIS_QUEUES_READ_DEFAULT_TIMEOUT,
STATE_UPDATE_TIMEOUT,
)
from lightning_app.core.queues import BaseQueue, MultiProcessQueue, RedisQueue, SingleProcessQueue
from lightning_app.core.queues import BaseQueue, MultiProcessQueue, RedisQueue
from lightning_app.frontend import StreamlitFrontend
from lightning_app.runners import MultiProcessRuntime, SingleProcessRuntime
from lightning_app.runners import MultiProcessRuntime
from lightning_app.storage import Path
from lightning_app.storage.path import _storage_root_dir
from lightning_app.testing.helpers import _RunIf
Expand Down Expand Up @@ -89,56 +88,6 @@ def run(self):
self.has_finished = True


class SimpleFlow(LightningFlow):
def __init__(self):
super().__init__()
self.work_a = Work(cache_calls=True)
self.work_b = Work(cache_calls=False)

def run(self):
self.work_a.run()
self.work_b.run()
if self.work_a.has_finished and self.work_b.has_finished:
self._exit()


@pytest.mark.skip
@pytest.mark.parametrize("component_cls", [SimpleFlow])
@pytest.mark.parametrize("runtime_cls", [SingleProcessRuntime])
ethanwharris marked this conversation as resolved.
Show resolved Hide resolved
def test_simple_app(component_cls, runtime_cls, tmpdir):
comp = component_cls()
app = LightningApp(comp, log_level="debug")
assert app.root == comp
expected = {
"app_state": ANY,
"vars": {"_layout": ANY, "_paths": {}},
"calls": {},
"flows": {},
"works": {
"work_b": {
"vars": {"has_finished": False, "counter": 0, "_urls": {}, "_paths": {}},
"calls": {},
"changes": {},
},
"work_a": {
"vars": {"has_finished": False, "counter": 0, "_urls": {}, "_paths": {}},
"calls": {},
"changes": {},
},
},
"changes": {},
}
assert app.state == expected
runtime_cls(app, start_server=False).dispatch()

assert comp.work_a.has_finished
assert comp.work_b.has_finished
# possible the `work_a` takes for ever to
# start and `work_b` has already completed multiple iterations.
assert comp.work_a.counter == 1
assert comp.work_b.counter >= 3


class WorkCounter(LightningWork):
def __init__(self):
super().__init__()
Expand Down Expand Up @@ -357,7 +306,7 @@ def _apply_restarting(self):
return True


@pytest.mark.parametrize("runtime_cls", [SingleProcessRuntime, MultiProcessRuntime])
@pytest.mark.parametrize("runtime_cls", [MultiProcessRuntime])
ethanwharris marked this conversation as resolved.
Show resolved Hide resolved
def test_app_restarting_move_to_blocking(runtime_cls, tmpdir):
"""Validates sending restarting move the app to blocking again."""
app = SimpleApp2(CounterFlow(), log_level="debug")
Expand Down Expand Up @@ -411,7 +360,6 @@ def run(self):
@pytest.mark.parametrize(
"queue_type_cls, default_timeout",
[
(SingleProcessQueue, STATE_UPDATE_TIMEOUT),
(MultiProcessQueue, STATE_UPDATE_TIMEOUT),
pytest.param(
RedisQueue,
Expand Down Expand Up @@ -477,7 +425,7 @@ def test_maybe_apply_changes_from_flow():
"""This test validates the app `_updated` is set to True only if the state was changed in the flow."""

app = LightningApp(SimpleFlow())
app.delta_queue = SingleProcessQueue("a", 0)
app.delta_queue = MultiProcessQueue("a", 0)
assert app._has_updated
app.maybe_apply_changes()
app.root.run()
Expand Down