Skip to content

Commit

Permalink
Merge identical get_location and get_repository_location methods (#7952)
Browse files Browse the repository at this point in the history
Summary:
Somehow things evolved so that we had two completely identical methods on the workspace class (except one was throwing a more specific exception). Merge them.

Test Plan: BK
  • Loading branch information
gibsondan committed May 18, 2022
1 parent 448ca76 commit 15887bc
Show file tree
Hide file tree
Showing 10 changed files with 20 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ def resolve_asset_nodes(self, graphene_info):
else:
repo_handle = self._represented_pipeline.repository_handle
origin = repo_handle.repository_location_origin
location = graphene_info.context.get_location(origin.location_name)
location = graphene_info.context.get_repository_location(origin.location_name)
ext_repo = location.get_repository(repo_handle.repository_name)
nodes = [
node
Expand Down
4 changes: 4 additions & 0 deletions python_modules/dagster/dagster/core/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,10 @@ def __init__(self, *args, **kwargs):
super(DagsterUserCodeProcessError, self).__init__(*args, **kwargs)


class DagsterRepositoryLocationNotFoundError(DagsterError):
pass


class DagsterRepositoryLocationLoadError(DagsterError):
def __init__(self, *args, **kwargs):
from dagster.utils.error import SerializableErrorInfo
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time
from typing import cast

import dagster.seven as seven
from dagster import Bool, Field
Expand Down Expand Up @@ -104,7 +105,7 @@ def launch_run(self, context: LaunchRunContext) -> None:
)

external_pipeline_origin = check.not_none(run.external_pipeline_origin)
repository_location = context.workspace.get_location(
repository_location = context.workspace.get_repository_location(
external_pipeline_origin.external_repository_origin.repository_location_origin.location_name
)

Expand All @@ -115,7 +116,7 @@ def launch_run(self, context: LaunchRunContext) -> None:
)

DefaultRunLauncher.launch_run_from_grpc_client(
self._instance, run, repository_location.client
self._instance, run, cast(GrpcServerRepositoryLocation, repository_location).client
)

self._run_ids.add(run.run_id)
Expand Down
19 changes: 6 additions & 13 deletions python_modules/dagster/dagster/core/workspace/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
from typing import TYPE_CHECKING, Dict, List, Optional, Union, cast

import dagster._check as check
from dagster.core.errors import DagsterInvariantViolationError, DagsterRepositoryLocationLoadError
from dagster.core.errors import (
DagsterRepositoryLocationLoadError,
DagsterRepositoryLocationNotFoundError,
)
from dagster.core.execution.plan.state import KnownExecutionState
from dagster.core.host_representation import (
ExternalExecutionPlan,
Expand Down Expand Up @@ -100,10 +103,10 @@ def has_permission(self, permission: str) -> bool:
def show_instance_config(self) -> bool:
return True

def get_location(self, location_name: str):
def get_repository_location(self, location_name: str) -> RepositoryLocation:
location_entry = self.get_location_entry(location_name)
if not location_entry:
raise DagsterInvariantViolationError(
raise DagsterRepositoryLocationNotFoundError(
f"Location {location_name} does not exist in workspace"
)

Expand Down Expand Up @@ -133,16 +136,6 @@ def repository_location_errors(self) -> List[SerializableErrorInfo]:
entry.load_error for entry in self.get_workspace_snapshot().values() if entry.load_error
]

def get_repository_location(self, name: str) -> RepositoryLocation:
location_entry = self.get_location_entry(name)

if not location_entry:
raise Exception(f"Location {name} not in workspace")
if location_entry.load_error:
raise Exception(f"Error loading location {name}: {location_entry.load_error}")

return cast(RepositoryLocation, location_entry.repository_location)

def has_repository_location_error(self, name: str) -> bool:
return self.get_repository_location_error(name) != None

Expand Down
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/core/workspace/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class IWorkspace(ABC):
"""

@abstractmethod
def get_location(self, location_name: str):
def get_repository_location(self, location_name: str) -> RepositoryLocation:
"""Return the RepositoryLocation for the given location name, or raise an error if there is an error loading it."""

@abstractmethod
Expand Down
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/daemon/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def execute_backfill_iteration(instance, workspace, logger, debug_crash_flags=No
)

try:
repo_location = workspace.get_location(origin.location_name)
repo_location = workspace.get_repository_location(origin.location_name)
repo_name = backfill_job.partition_set_origin.external_repository_origin.repository_name
partition_set_name = backfill_job.partition_set_origin.partition_set_name
if not repo_location.has_repository(repo_name):
Expand Down
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/daemon/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ def _evaluate_sensor(

sensor_origin = external_sensor.get_external_origin()
repository_handle = external_sensor.handle.repository_handle
repo_location = workspace.get_location(
repo_location = workspace.get_repository_location(
sensor_origin.external_repository_origin.repository_location_origin.location_name
)

Expand Down
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/daemon/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def get_workspace_snapshot(self) -> Dict[str, WorkspaceLocationEntry]:
def _load_workspace(self) -> Dict[str, WorkspaceLocationEntry]:
pass

def get_location(self, location_name: str) -> RepositoryLocation:
def get_repository_location(self, location_name: str) -> RepositoryLocation:
if self._location_entries == None:
self._location_entries = self._load_workspace()

Expand Down
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ def _schedule_runs_at_time(
solid_selection=external_schedule.solid_selection,
)

repo_location = workspace.get_location(
repo_location = workspace.get_repository_location(
schedule_origin.external_repository_origin.repository_location_origin.location_name
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1950,7 +1950,7 @@ def test_grpc_server_down(instance):
with _grpc_server_external_repo(port) as external_repo:
external_schedule = external_repo.get_external_schedule("simple_schedule")
instance.start_schedule(external_schedule)
workspace.get_location(location_origin.location_name)
workspace.get_repository_location(location_origin.location_name)

# Server is no longer running, ticks fail but indicate it will resume once it is reachable
for _trial in range(3):
Expand Down

0 comments on commit 15887bc

Please sign in to comment.