Skip to content

Commit

Permalink
Fix the examples/app_dag App (#14359)
Browse files Browse the repository at this point in the history
* Fix app dag example
* Add test
* Update doc
* Update tests/tests_app_examples/test_app_dag.py

Co-authored-by: Sherin Thomas <sherin@grid.ai>
  • Loading branch information
kaushikb11 and Sherin Thomas committed Nov 22, 2022
1 parent cfb27bd commit 2b61c92
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 8 deletions.
5 changes: 2 additions & 3 deletions docs/source-app/examples/dag/dag_from_scratch.rst
Expand Up @@ -39,10 +39,9 @@ First, let's define the component we need:
:lines: 55-79

And its run method executes the steps described above.
Additionally, ``work.stop`` is used to reduce cost when running in the cloud.

.. literalinclude:: ../../../examples/app_dag/app.py
:lines: 81-108
:lines: 80-103

----

Expand All @@ -51,4 +50,4 @@ Step 2: Define the scheduling
*****************************

.. literalinclude:: ../../../examples/app_dag/app.py
:lines: 109-137
:lines: 106-135
6 changes: 2 additions & 4 deletions examples/app_dag/app.py
Expand Up @@ -56,7 +56,7 @@ class DAG(L.LightningFlow):

"""This component is a DAG."""

def __init__(self, models_paths):
def __init__(self, models_paths: list):
super().__init__()
# Step 1: Create a work to get the data.
self.data_collector = GetDataWork()
Expand All @@ -80,12 +80,10 @@ def __init__(self, models_paths):
def run(self):
# Step 1 and 2: Download and process the data.
self.data_collector.run()
self.data_collector.stop() # Stop the data_collector to reduce cost
self.processing.run(
df_data=self.data_collector.df_data,
df_target=self.data_collector.df_target,
)
self.processing.stop() # Stop the processing to reduce cost

# Step 3: Launch n models training in parallel.
for model, work in self.dict.items():
Expand Down Expand Up @@ -128,7 +126,7 @@ def run(self):
app = L.LightningApp(
ScheduledDAG(
DAG,
models=[
models_paths=[
"svm.SVR",
"linear_model.LinearRegression",
"tree.DecisionTreeRegressor",
Expand Down
3 changes: 3 additions & 0 deletions src/lightning_app/CHANGELOG.md
Expand Up @@ -221,6 +221,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- Resolved a bug where the `install` command was not installing the latest version of an app/component by default ([#14181](https://github.com/Lightning-AI/lightning/pull/14181))


- Fixed the `examples/app_dag` example ([#14359](https://github.com/Lightning-AI/lightning/pull/14359))


## [0.5.5] - 2022-08-9

### Deprecated
Expand Down
2 changes: 1 addition & 1 deletion src/lightning_app/testing/testing.py
Expand Up @@ -219,7 +219,7 @@ def _run_cli(args) -> Generator:
def run_app_in_cloud(
app_folder: str, app_name: str = "app.py", extra_args: List[str] = [], debug: bool = True
) -> Generator:
"""This utility is used to automate testing e2e application with lightning_app.ai."""
"""This utility is used to automate testing e2e application with lightning.ai."""
# 1. Validate the provide app_folder is correct.
if not os.path.exists(os.path.join(app_folder, "app.py")):
raise Exception("The app folder should contain an app.py file.")
Expand Down
21 changes: 21 additions & 0 deletions tests/tests_app_examples/test_app_dag.py
@@ -0,0 +1,21 @@
import os
from time import sleep

import pytest
from tests_app import _PROJECT_ROOT

from lightning_app.testing.testing import run_app_in_cloud


@pytest.mark.cloud
def test_app_dag_example_cloud() -> None:
with run_app_in_cloud(os.path.join(_PROJECT_ROOT, "examples/app_dag")) as (_, _, fetch_logs, _):

launch_log, finish_log = False, False
while not (launch_log and finish_log):
for log in fetch_logs(["flow"]):
if "Launching a new DAG" in log:
launch_log = True
elif "Finished training and evaluating" in log:
finish_log = True
sleep(1)

0 comments on commit 2b61c92

Please sign in to comment.