Skip to content

Commit

Permalink
"dagster dev" command that spins up dagit and the daemon as subproces…
Browse files Browse the repository at this point in the history
…ses and requires no DAGSTER_HOME env var to be set (#11584)

Summary:
It is not uncommon for users to hit a roadblock when trying to get the
daemon running for the first time - they need to set the DAGSTER_HOME
env var, make sure its the same between two different services, keep
them running in two separate windows, etc.

This proposes a single "dagster dev" command that:
- spins up both services for you as subprocesses, and spins them down
when the process is interrupted
- uses a subset of the workspace args available to both dagit and
dagster-daemon currently
- requires no env vars: uses your current working directory as a home
folder if none is set

As part of this I reduces the log level on the gRPC servers since they
were a bit overwhelming out of the box (a nice future improvement would
be to share gRPC servers between the two processes). We could also
discuss what kind of log output we should give for people using this
command for the first time to explain what's going on

Feedback on the direction welcome, if it seems promising I will clean it
up and add tests.

### Summary & Motivation

### How I Tested These Changes
  • Loading branch information
gibsondan committed Jan 17, 2023
1 parent 46479d9 commit 3de3060
Show file tree
Hide file tree
Showing 17 changed files with 466 additions and 61 deletions.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from dagster import DefaultSensorStatus, RunRequest, repository, sensor
from dagster._legacy import pipeline, solid


@solid()
def foo_solid(_):
pass


@pipeline
def foo_pipeline():
foo_solid()


@pipeline
def other_foo_pipeline():
foo_solid()


@sensor(job_name="foo_pipeline", default_status=DefaultSensorStatus.RUNNING)
def always_on_sensor(_context):
return RunRequest(run_key="only_one", run_config={}, tags={})


@repository
def example_repo():
return [foo_pipeline, always_on_sensor]


@repository
def other_example_repo():
return [other_foo_pipeline]
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import os
import signal
import subprocess
import tempfile
import time

import requests
import yaml
from dagster import DagsterEventType, DagsterInstance, EventRecordsFilter
from dagster._core.test_utils import environ, new_cwd
from dagster._utils import find_free_port


# E2E test that spins up "dagster dev", accesses dagit,
# and waits for a schedule run to launch
def test_dagster_dev_command_no_dagster_home():
with tempfile.TemporaryDirectory() as tempdir:
with environ({"DAGSTER_HOME": ""}):
with new_cwd(tempdir):
dagster_yaml = {
"run_coordinator": {
"module": "dagster.core.run_coordinator",
"class": "QueuedRunCoordinator",
},
}
with open(os.path.join(str(tempdir), "dagster.yaml"), "w") as config_file:
yaml.dump(dagster_yaml, config_file)

dagit_port = find_free_port()

dev_process = subprocess.Popen(
[
"dagster",
"dev",
"-f",
os.path.join(
os.path.dirname(__file__),
"repo.py",
),
"--dagit-port",
str(dagit_port),
],
cwd=tempdir,
)

start_time = time.time()
while True:
try:
dagit_json = requests.get(
f"http://localhost:{dagit_port}/dagit_info"
).json()
if dagit_json:
break
except:
print("Waiting for Dagit to be ready..")

if time.time() - start_time > 30:
raise Exception("Timed out waiting for Dagit to serve requests")

time.sleep(1)

instance = None

try:
start_time = time.time()
instance_dir = None
while True:
if time.time() - start_time > 30:
raise Exception("Timed out waiting for instance files to exist")
subfolders = [
name
for name in os.listdir(tempdir)
if name.startswith("tmp")
and os.path.exists(os.path.join(tempdir, name, "history"))
]

if len(subfolders):
assert len(subfolders) == 1
instance_dir = os.path.join(str(tempdir), subfolders[0])
break

time.sleep(1)

with DagsterInstance.from_config(instance_dir) as instance:
start_time = time.time()
while True:
if (
len(instance.get_runs()) > 0
and len(
instance.get_event_records(
event_records_filter=EventRecordsFilter(
event_type=DagsterEventType.PIPELINE_ENQUEUED
)
)
)
> 0
):
# Verify the run was queued (so the dagster.yaml was applied)
break

if time.time() - start_time > 30:
raise Exception("Timed out waiting for queued run to exist")

time.sleep(1)

finally:
dev_process.send_signal(signal.SIGINT)
dev_process.communicate()
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
load_from:
- python_file: repo.py
1 change: 1 addition & 0 deletions integration_tests/test_suites/daemon-test-suite/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ deps =
objgraph
-e ../../../python_modules/dagster[mypy,test]
-e ../../../python_modules/dagster-graphql
-e ../../../python_modules/dagit
-e ../../../python_modules/dagster-test
-e ../../../python_modules/libraries/dagster-aws
-e ../../../python_modules/libraries/dagster-pandas
Expand Down
36 changes: 29 additions & 7 deletions python_modules/dagit/dagit/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
workspace_target_argument,
)
from dagster._cli.workspace.cli_target import WORKSPACE_TARGET_WARNING
from dagster._core.instance import InstanceRef
from dagster._core.telemetry import START_DAGIT_WEBSERVER, log_action
from dagster._core.telemetry_upload import uploading_logging_thread
from dagster._core.workspace.context import WorkspaceProcessContext
from dagster._serdes import deserialize_as
from dagster._utils import DEFAULT_WORKSPACE_YAML_FILENAME, find_free_port, is_port_in_use
from dagster._utils.log import configure_loggers

Expand Down Expand Up @@ -120,6 +122,21 @@ def create_dagit_cli():
["critical", "error", "warning", "info", "debug", "trace"], case_sensitive=False
),
)
@click.option(
"--code-server-log-level",
help="Set the log level for any code servers spun up by dagit.",
show_default=True,
default="info",
type=click.Choice(
["critical", "error", "warning", "info", "debug", "trace"], case_sensitive=False
),
)
@click.option(
"--instance-ref",
type=click.STRING,
required=False,
hidden=True,
)
@click.version_option(version=__version__, prog_name="dagit")
def dagit(
host,
Expand All @@ -130,12 +147,21 @@ def dagit(
read_only,
suppress_warnings,
log_level,
code_server_log_level,
instance_ref,
**kwargs,
):
if suppress_warnings:
os.environ["PYTHONWARNINGS"] = "ignore"

with get_instance_for_service("dagit") as instance:
configure_loggers()
logger = logging.getLogger("dagit")

with get_instance_for_service(
"dagit",
instance_ref=deserialize_as(instance_ref, InstanceRef) if instance_ref else None,
logger_fn=logger.info,
) as instance:
# Allow the instance components to change behavior in the context of a long running server process
instance.optimize_for_dagit(db_statement_timeout, db_pool_recycle)

Expand All @@ -144,13 +170,10 @@ def dagit(
version=__version__,
read_only=read_only,
kwargs=kwargs,
code_server_log_level=code_server_log_level,
) as workspace_process_context:
host_dagit_ui_with_workspace_process_context(
workspace_process_context,
host,
port,
path_prefix,
log_level,
workspace_process_context, host, port, path_prefix, log_level
)


Expand All @@ -168,7 +191,6 @@ def host_dagit_ui_with_workspace_process_context(
check.opt_int_param(port, "port")
check.str_param(path_prefix, "path_prefix")

configure_loggers()
logger = logging.getLogger("dagit")

app = create_app_from_workspace_process_context(workspace_process_context, path_prefix)
Expand Down
2 changes: 2 additions & 0 deletions python_modules/dagster/dagster/_cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .api import api_cli
from .asset import asset_cli
from .debug import debug_cli
from .dev import dev_command
from .instance import instance_cli
from .job import job_cli
from .project import project_cli
Expand All @@ -23,6 +24,7 @@ def create_dagster_cli():
"asset": asset_cli,
"debug": debug_cli,
"project": project_cli,
"dev": dev_command,
}

@click.group(
Expand Down

0 comments on commit 3de3060

Please sign in to comment.