Skip to content

Commit

Permalink
Add a static method on DefaultRunLauncher that launches a run without…
Browse files Browse the repository at this point in the history
… a workspace (#7777)

Summary:
In a hypothetical situation where one wanted to use the DefaultRunLauncher but did not have access to a workspace, this method would give one a way to do so.

Test Plan:
Existing DefaultRunLauncher test cases
  • Loading branch information
gibsondan committed May 6, 2022
1 parent afe23af commit 9366a7e
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,50 +55,31 @@ def from_config_value(inst_data, config_value):
inst_data=inst_data, wait_for_processes=config_value.get("wait_for_processes", False)
)

def launch_run(self, context: LaunchRunContext) -> None:
run = context.pipeline_run

check.inst_param(run, "run", PipelineRun)

if not context.workspace:
raise DagsterInvariantViolationError(
"DefaultRunLauncher requires a workspace to be included in its LaunchRunContext"
)

external_pipeline_origin = check.not_none(run.external_pipeline_origin)
repository_location = context.workspace.get_location(
external_pipeline_origin.external_repository_origin.repository_location_origin.location_name
)

check.inst(
repository_location,
GrpcServerRepositoryLocation,
"DefaultRunLauncher: Can't launch runs for pipeline not loaded from a GRPC server",
)

self._instance.add_run_tags(
@staticmethod
def launch_run_from_grpc_client(instance, run, grpc_client):
instance.add_run_tags(
run.run_id,
{
GRPC_INFO_TAG: seven.json.dumps(
merge_dicts(
{"host": repository_location.host},
{"host": grpc_client.host},
(
{"port": repository_location.port}
if repository_location.port
else {"socket": repository_location.socket}
{"port": grpc_client.port}
if grpc_client.port
else {"socket": grpc_client.socket}
),
({"use_ssl": True} if repository_location.use_ssl else {}),
({"use_ssl": True} if grpc_client.use_ssl else {}),
)
)
},
)

res = deserialize_as(
repository_location.client.start_run(
grpc_client.start_run(
ExecuteExternalPipelineArgs(
pipeline_origin=external_pipeline_origin,
pipeline_origin=run.external_pipeline_origin,
pipeline_run_id=run.run_id,
instance_ref=self._instance.get_ref(),
instance_ref=instance.get_ref(),
)
),
StartRunResult,
Expand All @@ -110,6 +91,31 @@ def launch_run(self, context: LaunchRunContext) -> None:
)
)

def launch_run(self, context: LaunchRunContext) -> None:
run = context.pipeline_run

check.inst_param(run, "run", PipelineRun)

if not context.workspace:
raise DagsterInvariantViolationError(
"DefaultRunLauncher requires a workspace to be included in its LaunchRunContext"
)

external_pipeline_origin = check.not_none(run.external_pipeline_origin)
repository_location = context.workspace.get_location(
external_pipeline_origin.external_repository_origin.repository_location_origin.location_name
)

check.inst(
repository_location,
GrpcServerRepositoryLocation,
"DefaultRunLauncher: Can't launch runs for pipeline not loaded from a GRPC server",
)

DefaultRunLauncher.launch_run_from_grpc_client(
self._instance, run, repository_location.client
)

self._run_ids.add(run.run_id)

if self._wait_for_processes:
Expand Down
4 changes: 4 additions & 0 deletions python_modules/dagster/dagster/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ def __init__(self, port=None, socket=None, host="localhost", use_ssl=False):
else:
self._server_address = "unix:" + os.path.abspath(socket)

@property
def use_ssl(self) -> bool:
return self._use_ssl

@contextmanager
def _channel(self):
options = [
Expand Down

0 comments on commit 9366a7e

Please sign in to comment.