Skip to content

Commit

Permalink
Add grpc server timeout config to instance settings (#7387)
Browse files Browse the repository at this point in the history
Summary:
Creates an instance setting that you can use to say that your server should be allowed more than 60 seconds to start up when waiting for a gRPC server to start.
  • Loading branch information
gibsondan committed Apr 12, 2022
1 parent 72e5b85 commit 104e780
Show file tree
Hide file tree
Showing 35 changed files with 327 additions and 265 deletions.
72 changes: 38 additions & 34 deletions integration_tests/test_suites/daemon-test-suite/test_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ def example_repo():


@contextmanager
def get_example_repository_location():
def get_example_repository_location(instance):
load_target = workspace_load_target()
origin = load_target.create_origins()[0]

with origin.create_single_location() as location:
with origin.create_single_location(instance) as location:
yield location


Expand All @@ -63,8 +63,8 @@ def workspace_load_target():


@contextmanager
def get_example_repo():
with get_example_repository_location() as location:
def get_example_repo(instance):
with get_example_repository_location(instance) as location:
yield location.get_repository("example_repo")


Expand All @@ -83,44 +83,48 @@ def test_no_memory_leaks():
},
},
}
) as instance, get_example_repo() as repo:
) as instance:
with get_example_repo(instance) as repo:

external_schedule = repo.get_external_schedule("always_run_schedule")
external_sensor = repo.get_external_sensor("always_on_sensor")
external_schedule = repo.get_external_schedule("always_run_schedule")
external_sensor = repo.get_external_sensor("always_on_sensor")

instance.start_schedule(external_schedule)
instance.start_sensor(external_sensor)
instance.start_schedule(external_schedule)
instance.start_sensor(external_sensor)

with daemon_controller_from_instance(
instance, workspace_load_target=workspace_load_target(), wait_for_processes_on_exit=True
) as controller:
start_time = time.time()

growth = objgraph.growth(
limit=10,
filter=lambda obj: inspect.getmodule(obj)
and "dagster" in inspect.getmodule(obj).__name__,
)
while True:
time.sleep(30)

controller.check_daemon_threads()
controller.check_daemon_heartbeats()
with daemon_controller_from_instance(
instance,
workspace_load_target=workspace_load_target(),
wait_for_processes_on_exit=True,
) as controller:
start_time = time.time()

growth = objgraph.growth(
limit=10,
filter=lambda obj: inspect.getmodule(obj)
and "dagster" in inspect.getmodule(obj).__name__,
)
if not growth:
print( # pylint: disable=print-call
f"Memory stopped growing after {int(time.time() - start_time)} seconds"
)
break
while True:
time.sleep(30)

if (time.time() - start_time) > 300:
raise Exception(
"Memory still growing after 5 minutes. Most recent growth: " + str(growth)
)
controller.check_daemon_threads()
controller.check_daemon_heartbeats()

print("Growth: " + str(growth)) # pylint: disable=print-call
growth = objgraph.growth(
limit=10,
filter=lambda obj: inspect.getmodule(obj)
and "dagster" in inspect.getmodule(obj).__name__,
)
if not growth:
print( # pylint: disable=print-call
f"Memory stopped growing after {int(time.time() - start_time)} seconds"
)
break

if (time.time() - start_time) > 300:
raise Exception(
"Memory still growing after 5 minutes. Most recent growth: "
+ str(growth)
)

print("Growth: " + str(growth)) # pylint: disable=print-call
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import sys

from dagster.core.host_representation import ManagedGrpcPythonEnvRepositoryLocationOrigin
from dagster.core.test_utils import instance_for_test
from dagster.core.types.loadable_target_origin import LoadableTargetOrigin
from dagster.utils import file_relative_path


def test_dagster_out_of_process_location():
with ManagedGrpcPythonEnvRepositoryLocationOrigin(
location_name="test_location",
loadable_target_origin=LoadableTargetOrigin(
executable_path=sys.executable,
python_file=file_relative_path(__file__, "setup.py"),
attribute="test_repo",
),
).create_single_location() as env:
assert env.get_repository("test_repo")
with instance_for_test() as instance:
with ManagedGrpcPythonEnvRepositoryLocationOrigin(
location_name="test_location",
loadable_target_origin=LoadableTargetOrigin(
executable_path=sys.executable,
python_file=file_relative_path(__file__, "setup.py"),
attribute="test_repo",
),
).create_single_location(instance) as env:
assert env.get_repository("test_repo")
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@

def _create_sensor_tick(graphql_context):
with create_test_daemon_workspace(
graphql_context.process_context.workspace_load_target
graphql_context.process_context.workspace_load_target,
graphql_context.instance,
) as workspace:
list(
execute_sensor_iteration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,8 @@ def test_sensor_next_ticks(graphql_context):

def _create_tick(graphql_context):
with create_test_daemon_workspace(
graphql_context.process_context.workspace_load_target
graphql_context.process_context.workspace_load_target,
graphql_context.instance,
) as workspace:
list(
execute_sensor_iteration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
InProcessRepositoryLocation,
RepositoryLocation,
)
from dagster.core.instance import DagsterInstance
from dagster.grpc.client import DagsterGrpcClient

# This is a hard-coded name for the special "in-process" location.
Expand Down Expand Up @@ -242,19 +243,18 @@ def create_location(self) -> NoReturn:
)

@contextmanager
def create_single_location(self) -> Generator["RepositoryLocation", None, None]:
from dagster.core.workspace.context import (
DAGIT_GRPC_SERVER_HEARTBEAT_TTL,
DAGIT_GRPC_SERVER_STARTUP_TIMEOUT,
)
def create_single_location(
self, instance: "DagsterInstance"
) -> Generator["RepositoryLocation", None, None]:
from dagster.core.workspace.context import DAGIT_GRPC_SERVER_HEARTBEAT_TTL

from .grpc_server_registry import ProcessGrpcServerRegistry
from .repository_location import GrpcServerRepositoryLocation

with ProcessGrpcServerRegistry(
reload_interval=0,
heartbeat_ttl=DAGIT_GRPC_SERVER_HEARTBEAT_TTL,
startup_timeout=DAGIT_GRPC_SERVER_STARTUP_TIMEOUT,
startup_timeout=instance.code_server_process_startup_timeout,
) as grpc_server_registry:
endpoint = grpc_server_registry.get_grpc_endpoint(self)
with GrpcServerRepositoryLocation(
Expand Down
17 changes: 15 additions & 2 deletions python_modules/dagster/dagster/core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@
from dagster.utils.backcompat import experimental_functionality_warning
from dagster.utils.error import serializable_error_info_from_exc_info

from .config import DAGSTER_CONFIG_YAML_FILENAME, is_dagster_home_set
from .config import (
DAGSTER_CONFIG_YAML_FILENAME,
DEFAULT_LOCAL_CODE_SERVER_STARTUP_TIMEOUT,
is_dagster_home_set,
)
from .ref import InstanceRef

# 'airflow_execution_date' and 'is_airflow_ingest_pipeline' are hardcoded tags used in the
Expand All @@ -72,7 +76,6 @@
AIRFLOW_EXECUTION_DATE_STR = "airflow_execution_date"
IS_AIRFLOW_INGEST_PIPELINE_STR = "is_airflow_ingest_pipeline"


if TYPE_CHECKING:
from dagster.core.debug import DebugRunPayload
from dagster.core.events import DagsterEvent, DagsterEventType
Expand Down Expand Up @@ -595,6 +598,16 @@ def run_monitoring_settings(self) -> Dict:
def run_monitoring_start_timeout_seconds(self) -> int:
return self.run_monitoring_settings.get("start_timeout_seconds", 180)

@property
def code_server_settings(self) -> Dict:
return self.get_settings("code_servers")

@property
def code_server_process_startup_timeout(self) -> int:
return self.code_server_settings.get(
"local_startup_timeout", DEFAULT_LOCAL_CODE_SERVER_STARTUP_TIMEOUT
)

@property
def run_monitoring_max_resume_run_attempts(self) -> int:
default_max_resume_run_attempts = 3 if self.run_launcher.supports_resume_run else 0
Expand Down
7 changes: 7 additions & 0 deletions python_modules/dagster/dagster/core/instance/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ def python_logs_config_schema():
)


DEFAULT_LOCAL_CODE_SERVER_STARTUP_TIMEOUT = 60


def dagster_instance_config_schema():
return {
"local_artifact_storage": config_field_for_configurable_class(),
Expand Down Expand Up @@ -127,4 +130,8 @@ def dagster_instance_config_schema():
"cancellation_thread_poll_interval_seconds": Field(int, is_required=False),
},
),
"code_servers": Field(
{"local_startup_timeout": Field(int, is_required=False)},
is_required=False,
),
}
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/core/instance/ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def from_dir(base_dir, config_filename=DAGSTER_CONFIG_YAML_FILENAME, overrides=N
defaults["run_launcher"],
)

settings_keys = {"telemetry", "python_logs", "run_monitoring"}
settings_keys = {"telemetry", "python_logs", "run_monitoring", "code_servers"}
settings = {key: config_value.get(key) for key in settings_keys if config_value.get(key)}

return InstanceRef(
Expand Down
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/core/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,10 +453,10 @@ def in_process_test_workspace(instance, loadable_target_origin, container_image=


@contextmanager
def create_test_daemon_workspace(workspace_load_target):
def create_test_daemon_workspace(workspace_load_target, instance):
"""Creates a DynamicWorkspace suitable for passing into a DagsterDaemon loop when running tests."""
configure_loggers()
with create_daemon_grpc_server_registry() as grpc_server_registry:
with create_daemon_grpc_server_registry(instance) as grpc_server_registry:
with DaemonWorkspace(grpc_server_registry, workspace_load_target) as workspace:
yield workspace

Expand Down
3 changes: 1 addition & 2 deletions python_modules/dagster/dagster/core/workspace/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@


DAGIT_GRPC_SERVER_HEARTBEAT_TTL = 45
DAGIT_GRPC_SERVER_STARTUP_TIMEOUT = 30


class BaseWorkspaceRequestContext(IWorkspace):
Expand Down Expand Up @@ -436,7 +435,7 @@ def __init__(
ProcessGrpcServerRegistry(
reload_interval=0,
heartbeat_ttl=DAGIT_GRPC_SERVER_HEARTBEAT_TTL,
startup_timeout=DAGIT_GRPC_SERVER_STARTUP_TIMEOUT,
startup_timeout=instance.code_server_process_startup_timeout,
)
)

Expand Down
7 changes: 3 additions & 4 deletions python_modules/dagster/dagster/daemon/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@

DAEMON_GRPC_SERVER_RELOAD_INTERVAL = 60
DAEMON_GRPC_SERVER_HEARTBEAT_TTL = 120
DAEMON_GRPC_SERVER_STARTUP_TIMEOUT = 30


def _sorted_quoted(strings):
Expand All @@ -58,11 +57,11 @@ def create_daemons_from_instance(instance):
]


def create_daemon_grpc_server_registry():
def create_daemon_grpc_server_registry(instance):
return ProcessGrpcServerRegistry(
reload_interval=DAEMON_GRPC_SERVER_RELOAD_INTERVAL,
heartbeat_ttl=DAEMON_GRPC_SERVER_HEARTBEAT_TTL,
startup_timeout=DAEMON_GRPC_SERVER_STARTUP_TIMEOUT,
startup_timeout=instance.code_server_process_startup_timeout,
)


Expand All @@ -81,7 +80,7 @@ def daemon_controller_from_instance(
grpc_server_registry = None
try:
with ExitStack() as stack:
grpc_server_registry = stack.enter_context(create_daemon_grpc_server_registry())
grpc_server_registry = stack.enter_context(create_daemon_grpc_server_registry(instance))
daemons = [stack.enter_context(daemon) for daemon in gen_daemons(instance)]

# Create this in each daemon to generate a workspace per-daemon
Expand Down
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/grpc/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ def wait_for_grpc_server(server_process, client, subprocess_args, timeout=60):
except DagsterUserCodeUnreachableError:
last_error = serializable_error_info_from_exc_info(sys.exc_info())

if time.time() - start_time > timeout:
if timeout > 0 and (time.time() - start_time > timeout):
raise Exception(
f"Timed out waiting for gRPC server to start with arguments: \"{' '.join(subprocess_args)}\". Most recent connection error: {str(last_error)}"
)
Expand Down
11 changes: 11 additions & 0 deletions python_modules/dagster/dagster_tests/api_tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# pylint: disable=redefined-outer-name

import pytest

from dagster.core.test_utils import instance_for_test


@pytest.fixture()
def instance():
with instance_for_test() as instance:
yield instance
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def _check_event_log_contains(event_log, expected_type_and_message):

def test_launch_run_with_unloadable_pipeline_grpc():
with instance_for_test() as instance:
with get_bar_repo_repository_location() as repository_location:
with get_bar_repo_repository_location(instance) as repository_location:
pipeline_handle = PipelineHandle(
"foo", repository_location.get_repository("bar_repo").handle
)
Expand Down Expand Up @@ -85,7 +85,7 @@ def test_launch_run_with_unloadable_pipeline_grpc():

def test_launch_run_grpc():
with instance_for_test() as instance:
with get_bar_repo_repository_location() as repository_location:
with get_bar_repo_repository_location(instance) as repository_location:
pipeline_handle = PipelineHandle(
"foo", repository_location.get_repository("bar_repo").handle
)
Expand Down Expand Up @@ -145,7 +145,7 @@ def test_launch_run_grpc():

def test_launch_unloadable_run_grpc():
with instance_for_test() as instance:
with get_bar_repo_repository_location() as repository_location:
with get_bar_repo_repository_location(instance) as repository_location:
pipeline_handle = PipelineHandle(
"foo", repository_location.get_repository("bar_repo").handle
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
from .utils import get_bar_repo_repository_location


def test_external_notebook_grpc():
def test_external_notebook_grpc(instance):
notebook_path = file_relative_path(__file__, "foo.ipynb")
with get_bar_repo_repository_location() as repository_location:
with get_bar_repo_repository_location(instance) as repository_location:
api_client = repository_location.client

content = sync_get_streaming_external_notebook_data_grpc(
Expand Down

0 comments on commit 104e780

Please sign in to comment.