Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[App] Remove SingleProcessRuntime #15933

Merged
merged 22 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/source-app/api_reference/runners.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,4 @@ ______________
:template: classtemplate.rst

~cloud.CloudRuntime
~singleprocess.SingleProcessRuntime
~multiprocess.MultiProcessRuntime
1 change: 0 additions & 1 deletion docs/source-app/api_references.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ _______
:template: classtemplate_no_index.rst

~cloud.CloudRuntime
~singleprocess.SingleProcessRuntime
~multiprocess.MultiProcessRuntime

----
Expand Down
3 changes: 0 additions & 3 deletions docs/source-app/testing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ We provide ``application_testing`` as a helper funtion to get your application u
os.path.join(_PROJECT_ROOT, "examples/app_v0/app.py"),
"--blocking",
"False",
"--multiprocess",
"--open-ui",
"False",
]
Expand All @@ -129,9 +128,7 @@ First in the list for ``command_line`` is the location of your script. It is an

Next there are a couple of options you can leverage:


* ``blocking`` - Blocking is an app status that says "Do not run until I click run in the UI". For our integration test, since we are not using the UI, we are setting this to "False".
* ``multiprocess/singleprocess`` - This is the runtime your app is expected to run under.
* ``open-ui`` - We set this to false since this is the routine that opens a browser for your local execution.

Once you have your commandline ready, you will then be able to kick off the test and gather results:
Expand Down
8 changes: 4 additions & 4 deletions examples/app_template_streamlit_ui/app.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import logging

from lightning_app import LightningApp, LightningFlow
from lightning_app.frontend import StreamlitFrontend
from lightning_app.utilities.state import AppState
from lightning.app import LightningApp, LightningFlow
from lightning.app.frontend import StreamlitFrontend
from lightning.app.utilities.state import AppState

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -45,4 +45,4 @@ def configure_layout(self):
return [{"name": "StreamLitUI", "content": self.streamlit_ui}]


app = LightningApp(HelloWorld(), log_level="debug")
app = LightningApp(HelloWorld())
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ module = [
"lightning_app.runners.cloud",
"lightning_app.runners.multiprocess",
"lightning_app.runners.runtime",
"lightning_app.runners.singleprocess",
"lightning_app.source_code.copytree",
"lightning_app.source_code.hashing",
"lightning_app.source_code.local",
Expand Down
2 changes: 1 addition & 1 deletion src/lightning_app/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

### Removed

-
- Removed the `SingleProcessRuntime` ([#15933](https://github.com/Lightning-AI/lightning/pull/15933))


### Fixed
Expand Down
2 changes: 1 addition & 1 deletion src/lightning_app/components/auto_scaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from lightning_app.utilities.packaging.cloud_compute import CloudCompute

logger = Logger(__name__)
lock = asyncio.Lock()


def _raise_granular_exception(exception: Exception) -> None:
Expand Down Expand Up @@ -209,6 +208,7 @@ async def process_request(self, data: BaseModel):
def run(self):

logger.info(f"servers: {self.servers}")
lock = asyncio.Lock()

self._iter = cycle(self.servers)
self._last_batch_sent = time.time()
Expand Down
4 changes: 1 addition & 3 deletions src/lightning_app/core/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
FRONTEND_DIR,
STATE_ACCUMULATE_WAIT,
)
from lightning_app.core.queues import BaseQueue, SingleProcessQueue
from lightning_app.core.queues import BaseQueue
from lightning_app.core.work import LightningWork
from lightning_app.frontend import Frontend
from lightning_app.storage import Drive, Path, Payload
Expand Down Expand Up @@ -549,8 +549,6 @@ def _collect_work_finish_status(self) -> dict:
def _should_snapshot(self) -> bool:
if len(self.works) == 0:
return True
elif isinstance(self.delta_queue, SingleProcessQueue):
return True
elif self._has_updated:
work_finished_status = self._collect_work_finish_status()
if work_finished_status:
Expand Down
20 changes: 1 addition & 19 deletions src/lightning_app/core/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@


class QueuingSystem(Enum):
SINGLEPROCESS = "singleprocess"
MULTIPROCESS = "multiprocess"
REDIS = "redis"
HTTP = "http"
Expand All @@ -59,10 +58,8 @@ def get_queue(self, queue_name: str) -> "BaseQueue":
return MultiProcessQueue(queue_name, default_timeout=STATE_UPDATE_TIMEOUT)
elif self == QueuingSystem.REDIS:
return RedisQueue(queue_name, default_timeout=REDIS_QUEUES_READ_DEFAULT_TIMEOUT)
elif self == QueuingSystem.HTTP:
return HTTPQueue(queue_name, default_timeout=STATE_UPDATE_TIMEOUT)
else:
return SingleProcessQueue(queue_name, default_timeout=STATE_UPDATE_TIMEOUT)
return HTTPQueue(queue_name, default_timeout=STATE_UPDATE_TIMEOUT)

def get_api_response_queue(self, queue_id: Optional[str] = None) -> "BaseQueue":
queue_name = f"{queue_id}_{API_RESPONSE_QUEUE_CONSTANT}" if queue_id else API_RESPONSE_QUEUE_CONSTANT
Expand Down Expand Up @@ -179,21 +176,6 @@ def is_running(self) -> bool:
return True


class SingleProcessQueue(BaseQueue):
def __init__(self, name: str, default_timeout: float):
self.name = name
self.default_timeout = default_timeout
self.queue = queue.Queue()

def put(self, item):
self.queue.put(item)

def get(self, timeout: int = None):
if timeout == 0:
timeout = self.default_timeout
return self.queue.get(timeout=timeout, block=(timeout is None))


class MultiProcessQueue(BaseQueue):
def __init__(self, name: str, default_timeout: float):
self.name = name
Expand Down
2 changes: 0 additions & 2 deletions src/lightning_app/runners/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from lightning_app.runners.cloud import CloudRuntime
from lightning_app.runners.multiprocess import MultiProcessRuntime
from lightning_app.runners.runtime import dispatch, Runtime
from lightning_app.runners.singleprocess import SingleProcessRuntime
from lightning_app.utilities.app_commands import run_app_commands
from lightning_app.utilities.load_app import load_app_from_file

Expand All @@ -11,6 +10,5 @@
"run_app_commands",
"Runtime",
"MultiProcessRuntime",
"SingleProcessRuntime",
"CloudRuntime",
]
7 changes: 2 additions & 5 deletions src/lightning_app/runners/runtime_type.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
from enum import Enum
from typing import Type, TYPE_CHECKING

from lightning_app.runners import CloudRuntime, MultiProcessRuntime, SingleProcessRuntime
from lightning_app.runners import CloudRuntime, MultiProcessRuntime

if TYPE_CHECKING:
from lightning_app.runners.runtime import Runtime


class RuntimeType(Enum):
SINGLEPROCESS = "singleprocess"
MULTIPROCESS = "multiprocess"
CLOUD = "cloud"

def get_runtime(self) -> Type["Runtime"]:
if self == RuntimeType.SINGLEPROCESS:
return SingleProcessRuntime
elif self == RuntimeType.MULTIPROCESS:
if self == RuntimeType.MULTIPROCESS:
return MultiProcessRuntime
elif self == RuntimeType.CLOUD:
return CloudRuntime
Expand Down
62 changes: 0 additions & 62 deletions src/lightning_app/runners/singleprocess.py

This file was deleted.

7 changes: 0 additions & 7 deletions src/lightning_app/utilities/app_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,6 @@ def set_served_session_id(self, k, v):
self.store[k].session_id = v


class DistributedMode(enum.Enum):
SINGLEPROCESS = enum.auto()
MULTIPROCESS = enum.auto()
CONTAINER = enum.auto()
GRID = enum.auto()


class _LightningAppRef:
_app_instance: Optional["LightningApp"] = None

Expand Down
29 changes: 20 additions & 9 deletions src/lightning_app/utilities/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import os
from copy import deepcopy
from time import sleep
from typing import Any, Dict, List, Optional, Tuple, Union

from deepdiff import DeepDiff
Expand Down Expand Up @@ -149,16 +150,26 @@ def _request_state(self) -> None:
return
app_url = f"{self._url}/api/v1/state"
headers = headers_for(self._plugin.get_context()) if self._plugin else {}
try:
response = self._session.get(app_url, headers=headers, timeout=1)
except ConnectionError as e:
raise AttributeError("Failed to connect and fetch the app state. Is the app running?") from e

self._authorized = response.status_code
if self._authorized != 200:
return
logger.debug(f"GET STATE {response} {response.json()}")
self._store_state(response.json())
response_json = {}

# Sometimes the state URL can return an empty JSON when things are being set-up,
# so we wait for it to be ready here.
while response_json == {}:
sleep(0.5)
try:
response = self._session.get(app_url, headers=headers, timeout=1)
except ConnectionError as e:
raise AttributeError("Failed to connect and fetch the app state. Is the app running?") from e

self._authorized = response.status_code
if self._authorized != 200:
return

response_json = response.json()

logger.debug(f"GET STATE {response} {response_json}")
self._store_state(response_json)

def __getattr__(self, name: str) -> Union[Any, "AppState"]:
if name in self._APP_PRIVATE_KEYS:
Expand Down
20 changes: 7 additions & 13 deletions tests/tests_app/core/test_lightning_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
UIRefresher,
)
from lightning_app.core.constants import APP_SERVER_PORT
from lightning_app.runners import MultiProcessRuntime, SingleProcessRuntime
from lightning_app.runners import MultiProcessRuntime
from lightning_app.storage.drive import Drive
from lightning_app.testing.helpers import _MockQueue
from lightning_app.utilities.component import _set_frontend_context, _set_work_context
Expand Down Expand Up @@ -71,12 +71,10 @@ def run(self):
self.work_a.run()


# TODO: Resolve singleprocess - idea: explore frame calls recursively.
@pytest.mark.parametrize("runtime_cls", [MultiProcessRuntime])
def test_app_state_api(runtime_cls):
def test_app_state_api():
"""This test validates the AppState can properly broadcast changes from work within its own process."""
app = LightningApp(_A(), log_level="debug")
runtime_cls(app, start_server=True).dispatch()
MultiProcessRuntime(app, start_server=True).dispatch()
assert app.root.work_a.var_a == -1
_set_work_context()
assert app.root.work_a.drive.list(".") == ["test_app_state_api.txt"]
Expand Down Expand Up @@ -105,13 +103,10 @@ def run(self):
self._exit()


# TODO: Find why this test is flaky.
@pytest.mark.skip(reason="flaky test.")
@pytest.mark.parametrize("runtime_cls", [SingleProcessRuntime])
ethanwharris marked this conversation as resolved.
Show resolved Hide resolved
def test_app_state_api_with_flows(runtime_cls, tmpdir):
def test_app_state_api_with_flows(tmpdir):
"""This test validates the AppState can properly broadcast changes from flows."""
app = LightningApp(A2(), log_level="debug")
runtime_cls(app, start_server=True).dispatch()
MultiProcessRuntime(app, start_server=True).dispatch()
assert app.root.var_a == -1


Expand Down Expand Up @@ -181,13 +176,12 @@ def maybe_apply_changes(self):

# FIXME: This test doesn't assert anything
@pytest.mark.skip(reason="TODO: Resolve flaky test.")
@pytest.mark.parametrize("runtime_cls", [SingleProcessRuntime, MultiProcessRuntime])
def test_app_stage_from_frontend(runtime_cls):
def test_app_stage_from_frontend():
"""This test validates that delta from the `api_delta_queue` manipulating the ['app_state']['stage'] would
start and stop the app."""
app = AppStageTestingApp(FlowA(), log_level="debug")
app.stage = AppStage.BLOCKING
runtime_cls(app, start_server=True).dispatch()
MultiProcessRuntime(app, start_server=True).dispatch()


def test_update_publish_state_and_maybe_refresh_ui():
Expand Down