Skip to content

Commit

Permalink
Bring back dagit socket reuse (#11688)
Browse files Browse the repository at this point in the history
Summary:
With one additional line in is_port_in_use that makes our socket check
more closely mimic the socket check that uvicorn does:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

Test Plan: Launch dagit, open browser
Close dagit process
Re-open dagit process
Repeat
Now reliably stays on 3000

### Summary & Motivation

### How I Tested These Changes
  • Loading branch information
gibsondan committed Jan 12, 2023
1 parent d3e7807 commit a8f1400
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 7 deletions.
17 changes: 12 additions & 5 deletions python_modules/dagit/dagit/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dagster._core.telemetry import START_DAGIT_WEBSERVER, log_action
from dagster._core.telemetry_upload import uploading_logging_thread
from dagster._core.workspace.context import WorkspaceProcessContext
from dagster._utils import DEFAULT_WORKSPACE_YAML_FILENAME
from dagster._utils import DEFAULT_WORKSPACE_YAML_FILENAME, find_free_port, is_port_in_use
from dagster._utils.log import configure_loggers

from .app import create_app_from_workspace_process_context
Expand Down Expand Up @@ -66,8 +66,8 @@ def create_dagit_cli():
"--port",
"-p",
type=click.INT,
help="Port to run server on.",
default=DEFAULT_DAGIT_PORT,
help=f"Port to run server on - defaults to {DEFAULT_DAGIT_PORT}",
default=None,
show_default=True,
)
@click.option(
Expand Down Expand Up @@ -157,22 +157,29 @@ def dagit(
def host_dagit_ui_with_workspace_process_context(
workspace_process_context: WorkspaceProcessContext,
host: Optional[str],
port: int,
port: Optional[int],
path_prefix: str,
log_level: str,
):
check.inst_param(
workspace_process_context, "workspace_process_context", WorkspaceProcessContext
)
host = check.opt_str_param(host, "host", "127.0.0.1")
check.int_param(port, "port")
check.opt_int_param(port, "port")
check.str_param(path_prefix, "path_prefix")

configure_loggers()
logger = logging.getLogger("dagit")

app = create_app_from_workspace_process_context(workspace_process_context, path_prefix)

if not port:
if is_port_in_use(host, DEFAULT_DAGIT_PORT):
port = find_free_port()
logger.warning(f"Port {DEFAULT_DAGIT_PORT} is in use - using port {port} instead")
else:
port = DEFAULT_DAGIT_PORT

logger.info(
"Serving dagit on http://{host}:{port}{path_prefix} in process {pid}".format(
host=host, port=port, path_prefix=path_prefix, pid=os.getpid()
Expand Down
55 changes: 53 additions & 2 deletions python_modules/dagit/dagit_tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest
from click.testing import CliRunner
from dagit.app import create_app_from_workspace_process_context
from dagit.cli import dagit, host_dagit_ui_with_workspace_process_context
from dagit.cli import DEFAULT_DAGIT_PORT, dagit, host_dagit_ui_with_workspace_process_context
from dagster import _seven
from dagster._core.instance import DagsterInstance
from dagster._core.telemetry import START_DAGIT_WEBSERVER, UPDATE_REPO_STATS, hash_name
Expand Down Expand Up @@ -127,7 +127,7 @@ def test_graphql_view_at_path_prefix():


def test_successful_host_dagit_ui_from_workspace():
with mock.patch("uvicorn.run"), tempfile.TemporaryDirectory() as temp_dir:
with mock.patch("uvicorn.run") as server_call, tempfile.TemporaryDirectory() as temp_dir:
instance = DagsterInstance.local_temp(temp_dir)

with load_workspace_process_context_from_yaml_paths(
Expand All @@ -141,6 +141,57 @@ def test_successful_host_dagit_ui_from_workspace():
log_level="warning",
)

assert server_call.called_with(mock.ANY, host="127.0.0.1", port=2343, log_level="warning")


@pytest.fixture
def mock_is_port_in_use():
with mock.patch("dagit.cli.is_port_in_use") as mock_is_port_in_use:
yield mock_is_port_in_use


@pytest.fixture
def mock_find_free_port():
with mock.patch("dagit.cli.find_free_port") as mock_find_free_port:
mock_find_free_port.return_value = 1234
yield mock_find_free_port


def test_host_dagit_ui_choose_port(mock_is_port_in_use, mock_find_free_port):
with mock.patch("uvicorn.run") as server_call, tempfile.TemporaryDirectory() as temp_dir:
instance = DagsterInstance.local_temp(temp_dir)

mock_is_port_in_use.return_value = False

with load_workspace_process_context_from_yaml_paths(
instance, [file_relative_path(__file__, "./workspace.yaml")]
) as workspace_process_context:
host_dagit_ui_with_workspace_process_context(
workspace_process_context=workspace_process_context,
host=None,
port=None,
path_prefix="",
log_level="warning",
)

assert server_call.called_with(
mock.ANY, host="127.0.0.1", port=DEFAULT_DAGIT_PORT, log_level="warning"
)

mock_is_port_in_use.return_value = True
with load_workspace_process_context_from_yaml_paths(
instance, [file_relative_path(__file__, "./workspace.yaml")]
) as workspace_process_context:
host_dagit_ui_with_workspace_process_context(
workspace_process_context=workspace_process_context,
host=None,
port=None,
path_prefix="",
log_level="warning",
)

assert server_call.called_with(mock.ANY, host="127.0.0.1", port=1234, log_level="warning")


def test_successful_host_dagit_ui_from_multiple_workspace_files():
with mock.patch("uvicorn.run"), tempfile.TemporaryDirectory() as temp_dir:
Expand Down
14 changes: 14 additions & 0 deletions python_modules/dagster/dagster/_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,20 @@ def find_free_port() -> int:
return s.getsockname()[1]


def is_port_in_use(host, port) -> bool:
# Similar to the socket options that uvicorn uses to bind ports:
# https://github.com/encode/uvicorn/blob/62f19c1c39929c84968712c371c9b7b96a041dec/uvicorn/config.py#L565-L566
sock = socket.socket(family=socket.AF_INET)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.bind((host, port))
return False
except socket.error as e:
return e.errno == errno.EADDRINUSE
finally:
sock.close()


@contextlib.contextmanager
def alter_sys_path(to_add: Sequence[str], to_remove: Sequence[str]) -> Iterator[None]:
to_restore = [path for path in sys.path]
Expand Down

0 comments on commit a8f1400

Please sign in to comment.