Skip to content

Commit

Permalink
Resolve Lightning App with remote storage (#17426)
Browse files Browse the repository at this point in the history
* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

---------

Co-authored-by: thomas <thomas@thomass-MacBook-Pro.local>
(cherry picked from commit 3688b64)
  • Loading branch information
tchaton authored and lantiga committed Apr 24, 2023
1 parent 24ddce0 commit cf64e87
Show file tree
Hide file tree
Showing 18 changed files with 95 additions and 106 deletions.
24 changes: 4 additions & 20 deletions .azure/app-cloud-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,57 +67,42 @@ jobs:
'App: v0_app':
name: "v0_app"
dir: "public"
queue_type: "redis"
'App: boring_app':
name: "boring_app"
dir: "public"
queue_type: "redis"
'App: boring_app / HTTP':
name: "boring_app"
dir: "public"
queue_type: "http"
'App: template_streamlit_ui':
name: "template_streamlit_ui"
dir: "public"
queue_type: "redis"
# TODO: RESOLVE ME ASAP
# 'App: template_streamlit_ui':
# name: "template_streamlit_ui"
# dir: "public"
'App: template_react_ui':
name: "template_react_ui"
dir: "public"
queue_type: "redis"
# 'App: template_jupyterlab': # TODO: clarify where these files lives
# name: "template_jupyterlab"
'App: installation_commands_app':
name: "installation_commands_app"
dir: "public"
queue_type: "redis"
'App: drive':
name: "drive"
dir: "public"
queue_type: "redis"
'App: payload':
name: "payload"
dir: "public"
queue_type: "redis"
'App: commands_and_api':
name: "commands_and_api"
dir: "public"
queue_type: "redis"
#'App: quick_start': # todo: consider adding back when fixed
# name: "quick_start"
# dir: "public"
# queue_type: "redis"
'App: idle_timeout':
name: "idle_timeout"
dir: "local"
queue_type: "redis"
'App: collect_failures':
name: "collect_failures"
dir: "local"
queue_type: "redis"
'App: custom_work_dependencies':
name: "custom_work_dependencies"
dir: "local"
queue_type: "redis"
timeoutInMinutes: "15"
cancelTimeoutInMinutes: "1"
# values: https://docs.microsoft.com/en-us/azure/devops/pipelines/process/phases?view=azure-devops&tabs=yaml#workspace
Expand All @@ -135,7 +120,6 @@ jobs:
HAR_LOCATION: './artifacts/hars'
SLOW_MO: '50'
LIGHTNING_DEBUG: '1'
LIGHTNING_CLOUD_QUEUE_TYPE: $(queue_type)
steps:

- script: echo '##vso[task.setvariable variable=local_id]$(System.PullRequest.PullRequestNumber)'
Expand Down
1 change: 1 addition & 0 deletions examples/app/commands_and_api/.lightningignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
venv/
17 changes: 4 additions & 13 deletions src/lightning/app/core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@


def get_lightning_cloud_url() -> str:
# detect local development
if os.getenv("VSCODE_PROXY_URI", "").startswith("http://localhost:9800"):
return "http://localhost:9800"
# DO NOT CHANGE!
return os.getenv("LIGHTNING_CLOUD_URL", "https://lightning.ai")

Expand Down Expand Up @@ -115,17 +118,5 @@ def enable_interruptible_works() -> bool:
return bool(int(os.getenv("LIGHTNING_INTERRUPTIBLE_WORKS", "0")))


# Get Cluster Driver
_CLUSTER_DRIVERS = [None, "k8s", "direct"]


def get_cluster_driver() -> Optional[str]:
value = os.getenv("LIGHTNING_CLUSTER_DRIVER", None)
if value is None:
if enable_interruptible_works():
value = "direct"
else:
value = None
if value not in _CLUSTER_DRIVERS:
raise ValueError(f"Found {value} cluster driver. The value needs to be in {_CLUSTER_DRIVERS}.")
return value
return "direct"
24 changes: 19 additions & 5 deletions src/lightning/app/core/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,19 +388,30 @@ def get(self, timeout: Optional[float] = None) -> Any:
if timeout is None:
while True:
try:
return self._get()
try:
return self._get()
except requests.exceptions.HTTPError:
pass
except queue.Empty:
time.sleep(HTTP_QUEUE_REFRESH_INTERVAL)

# make one request and return the result
if timeout == 0:
return self._get()
try:
return self._get()
except requests.exceptions.HTTPError:
return None

# timeout is some value - loop until the timeout is reached
start_time = time.time()
while (time.time() - start_time) < timeout:
try:
return self._get()
try:
return self._get()
except requests.exceptions.HTTPError:
if timeout > self.default_timeout:
return None
raise queue.Empty
except queue.Empty:
# Note: In theory, there isn't a need for a sleep as the queue shouldn't
# block the flow if the queue is empty.
Expand Down Expand Up @@ -441,8 +452,11 @@ def length(self) -> int:
if not self.app_id:
raise ValueError(f"App ID couldn't be extracted from the queue name: {self.name}")

val = self.client.get(f"/v1/{self.app_id}/{self._name_suffix}/length")
return int(val.text)
try:
val = self.client.get(f"/v1/{self.app_id}/{self._name_suffix}/length")
return int(val.text)
except requests.exceptions.HTTPError:
return 0

@staticmethod
def _split_app_id_and_queue_name(queue_name: str) -> Tuple[str, str]:
Expand Down
7 changes: 0 additions & 7 deletions src/lightning/app/runners/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
ENABLE_PULLING_STATE_ENDPOINT,
ENABLE_PUSHING_STATE_ENDPOINT,
get_cloud_queue_type,
get_cluster_driver,
get_lightning_cloud_url,
LIGHTNING_CLOUD_PRINT_SPECS,
SYS_CUSTOMIZATIONS_SYNC_ROOT,
Expand Down Expand Up @@ -874,12 +873,6 @@ def _get_env_vars(
if not ENABLE_PUSHING_STATE_ENDPOINT:
v1_env_vars.append(V1EnvVar(name="ENABLE_PUSHING_STATE_ENDPOINT", value="0"))

if get_cloud_queue_type():
v1_env_vars.append(V1EnvVar(name="LIGHTNING_CLOUD_QUEUE_TYPE", value=get_cloud_queue_type()))

if get_cluster_driver():
v1_env_vars.append(V1EnvVar(name="LIGHTNING_CLUSTER_DRIVER", value=get_cluster_driver()))

if enable_interruptible_works():
v1_env_vars.append(
V1EnvVar(
Expand Down
16 changes: 6 additions & 10 deletions src/lightning/app/storage/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,16 +427,12 @@ def _filesystem() -> AbstractFileSystem:
endpoint_url = os.getenv("LIGHTNING_BUCKET_ENDPOINT_URL", "")
bucket_name = os.getenv("LIGHTNING_BUCKET_NAME", "")
if endpoint_url != "" and bucket_name != "":
key = os.getenv("LIGHTNING_AWS_ACCESS_KEY_ID", "")
secret = os.getenv("LIGHTNING_AWS_SECRET_ACCESS_KEY", "")
# TODO: Remove when updated on the platform side.
if key == "" or secret == "":
key = os.getenv("AWS_ACCESS_KEY_ID", "")
secret = os.getenv("AWS_SECRET_ACCESS_KEY", "")
if key == "" or secret == "":
raise RuntimeError("missing S3 bucket credentials")

fs = S3FileSystem(key=key, secret=secret, use_ssl=False, client_kwargs={"endpoint_url": endpoint_url})
# FIXME: Temporary fix until we remove the injection from the platform
if "AWS_ACCESS_KEY_ID" in os.environ:
del os.environ["AWS_ACCESS_KEY_ID"]
del os.environ["AWS_SECRET_ACCESS_KEY"]

fs = S3FileSystem()

app_id = os.getenv("LIGHTNING_CLOUD_APP_ID", "")
if app_id == "":
Expand Down
23 changes: 22 additions & 1 deletion src/lightning/app/testing/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,8 @@ def run_app_in_cloud(
[constants.LIGHTNING_CLOUD_PROJECT_ID],
)

admin_page.reload()

view_page = context.new_page()
i = 1
while True:
Expand All @@ -385,10 +387,10 @@ def run_app_in_cloud(

# wait until the app is running and openapi.json is ready
if app.status.phase == V1LightningappInstanceState.RUNNING:
view_page.goto(f"{app.status.url}/view")
status_code = requests.get(f"{app.status.url}/openapi.json").status_code
if status_code == 200:
print("App is running, continuing with testing...")
view_page.goto(f"{app.status.url}/view")
break
msg = f"Received status code {status_code} at {app.status.url!r}"
elif app.status.phase not in (V1LightningappInstanceState.PENDING, V1LightningappInstanceState.NOT_STARTED):
Expand Down Expand Up @@ -478,6 +480,19 @@ def _delete_lightning_app(client, project_id, app_id, app_name):
print(f"Failed to delete {app_name}. Exception {ex}")


def _delete_cloud_space(client, project_id, cloud_space_id, app_name):
"""Used to delete the parent cloudspace."""
print(f"Deleting {app_name} id: {cloud_space_id}")
try:
res = client.cloud_space_service_delete_cloud_space(
project_id=project_id,
id=cloud_space_id,
)
assert res == {}
except ApiException as ex:
print(f"Failed to delete {app_name}. Exception {ex}")


def delete_cloud_lightning_apps():
"""Cleanup cloud apps that start with the name test-{PR_NUMBER}-{TEST_APP_NAME}.
Expand All @@ -502,10 +517,16 @@ def delete_cloud_lightning_apps():
if pr_number and app_name and not lit_app.name.startswith(f"test-{pr_number}-{app_name}-"):
continue
_delete_lightning_app(client, project_id=project_id, app_id=lit_app.id, app_name=lit_app.name)
_delete_cloud_space(
client, project_id=project_id, cloud_space_id=lit_app.spec.cloud_space_id, app_name=lit_app.name
)

print("deleting apps that were created more than 1 hour ago.")

for lit_app in list_apps.lightningapps:

if lit_app.created_at < datetime.datetime.now(lit_app.created_at.tzinfo) - datetime.timedelta(hours=1):
_delete_lightning_app(client, project_id=project_id, app_id=lit_app.id, app_name=lit_app.name)
_delete_cloud_space(
client, project_id=project_id, cloud_space_id=lit_app.spec.cloud_space_id, app_name=lit_app.name
)
17 changes: 10 additions & 7 deletions src/lightning/app/utilities/app_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@

@dataclass
class _LogEventLabels:
app: str
container: str
filename: str
job: str
namespace: str
node_name: str
pod: str
app: Optional[str] = None
container: Optional[str] = None
filename: Optional[str] = None
job: Optional[str] = None
namespace: Optional[str] = None
node_name: Optional[str] = None
pod: Optional[str] = None
clusterID: Optional[str] = None
component: Optional[str] = None
projectID: Optional[str] = None
stream: Optional[str] = None


Expand Down
7 changes: 6 additions & 1 deletion src/lightning/app/utilities/packaging/cloud_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ def __post_init__(self) -> None:
if "gpu" not in self.name:
raise ValueError("CloudCompute `interruptible=True` is supported only with GPU.")

# FIXME: Clean the mess on the platform side
if self.name == "default" or self.name == "cpu":
self.name = "cpu-small"
self._internal_id = "default"

# TODO: Remove from the platform first.
self.preemptible = self.interruptible

Expand Down Expand Up @@ -147,7 +152,7 @@ def id(self) -> Optional[str]:
return self._internal_id

def is_default(self) -> bool:
return self.name == "default"
return self.name in ("default", "cpu-small")

def _generate_id(self):
return "default" if self.name == "default" else uuid4().hex[:7]
Expand Down
1 change: 1 addition & 0 deletions tests/integrations_app/public/test_v0_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def check_content(button_name, text_content):
has_logs = False
while not has_logs:
for log in fetch_logs(["flow"]):
print(log)
if "'a': 'a', 'b': 'b'" in log:
has_logs = True
sleep(1)
Expand Down
18 changes: 1 addition & 17 deletions tests/tests_app/core/test_constants.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,9 @@
import os
from unittest import mock

import pytest

from lightning.app.core.constants import get_cluster_driver, get_lightning_cloud_url
from lightning.app.core.constants import get_lightning_cloud_url


@mock.patch.dict(os.environ, {"LIGHTNING_CLOUD_URL": "https://beta.lightning.ai"})
def test_defaults():
assert get_lightning_cloud_url() == "https://beta.lightning.ai"


def test_cluster_drive(monkeypatch):
assert get_cluster_driver() is None

monkeypatch.setenv("LIGHTNING_INTERRUPTIBLE_WORKS", "1")
assert get_cluster_driver() == "direct"

monkeypatch.setenv("LIGHTNING_CLUSTER_DRIVER", "k8s")
assert get_cluster_driver() == "k8s"

with pytest.raises(ValueError, match="The value needs to be in"):
monkeypatch.setenv("LIGHTNING_CLUSTER_DRIVER", "something_else")
assert get_cluster_driver() == "k8s"
5 changes: 2 additions & 3 deletions tests/tests_app/core/test_lightning_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -987,8 +987,8 @@ def run(self):
def test_state_size_constant_growth():
app = LightningApp(SizeFlow())
MultiProcessRuntime(app, start_server=False).dispatch()
assert app.root._state_sizes[0] <= 7952
assert app.root._state_sizes[20] <= 26500
assert app.root._state_sizes[0] <= 7965
assert app.root._state_sizes[20] <= 26550


class FlowUpdated(LightningFlow):
Expand Down Expand Up @@ -1108,7 +1108,6 @@ def __init__(self, flow):


def test_cloud_compute_binding():

cloud_compute.ENABLE_MULTIPLE_WORKS_IN_NON_DEFAULT_CONTAINER = True

assert cloud_compute._CLOUD_COMPUTE_STORE == {}
Expand Down
8 changes: 4 additions & 4 deletions tests/tests_app/core/test_lightning_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ def run(self):
"_display_name": "",
"_cloud_compute": {
"type": "__cloud_compute__",
"name": "default",
"name": "cpu-small",
"disk_size": 0,
"idle_timeout": None,
"mounts": None,
Expand All @@ -358,7 +358,7 @@ def run(self):
"_display_name": "",
"_cloud_compute": {
"type": "__cloud_compute__",
"name": "default",
"name": "cpu-small",
"disk_size": 0,
"idle_timeout": None,
"mounts": None,
Expand Down Expand Up @@ -399,7 +399,7 @@ def run(self):
"_display_name": "",
"_cloud_compute": {
"type": "__cloud_compute__",
"name": "default",
"name": "cpu-small",
"disk_size": 0,
"idle_timeout": None,
"mounts": None,
Expand All @@ -424,7 +424,7 @@ def run(self):
"_display_name": "",
"_cloud_compute": {
"type": "__cloud_compute__",
"name": "default",
"name": "cpu-small",
"disk_size": 0,
"idle_timeout": None,
"mounts": None,
Expand Down

0 comments on commit cf64e87

Please sign in to comment.