Skip to content

Commit

Permalink
[0.15.0] remove attempt count from step launching APIs (#8068)
Browse files Browse the repository at this point in the history
### Summary & Motivation

This information is not transferred with `KnownExecutionState` along with any other required information about execution state.

### How I Tested These Changes

BK
  • Loading branch information
alangenfeld committed Jun 3, 2022
1 parent 6e684e1 commit cca4561
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ class StepRunRef(
("retry_mode", RetryMode),
("step_key", str),
("recon_pipeline", ReconstructablePipeline),
("prior_attempts_count", int),
("known_state", Optional["KnownExecutionState"]),
],
)
Expand All @@ -39,7 +38,6 @@ def __new__(
retry_mode: RetryMode,
step_key: str,
recon_pipeline: ReconstructablePipeline,
prior_attempts_count: int,
known_state: Optional["KnownExecutionState"],
):
from dagster.core.execution.plan.state import KnownExecutionState
Expand All @@ -52,7 +50,6 @@ def __new__(
check.inst_param(retry_mode, "retry_mode", RetryMode),
check.str_param(step_key, "step_key"),
check.inst_param(recon_pipeline, "recon_pipeline", ReconstructablePipeline),
check.int_param(prior_attempts_count, "prior_attempts_count"),
check.opt_inst_param(known_state, "known_state", KnownExecutionState),
)

Expand All @@ -63,11 +60,10 @@ class StepLauncher(ABC):
"""

@abstractmethod
def launch_step(self, step_context, prior_attempts_count):
def launch_step(self, step_context):
"""
Args:
step_context (StepExecutionContext): The context that we're executing the step in.
prior_attempts_count (int): The number of times this step has been attempted in the same run.
Returns:
Iterator[DagsterEvent]: The events for the step.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,7 @@ def dagster_event_sequence_for_step(
try:
if step_context.step_launcher and not force_local_execution:
# info all on step_context - should deprecate second arg
step_events = step_context.step_launcher.launch_step(
step_context, step_context.previous_attempt_count
)
step_events = step_context.step_launcher.launch_step(step_context)
else:
step_events = core_dagster_event_sequence_for_step(step_context)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ def __init__(self, scratch_dir: str):
def launch_step(
self,
step_context: StepExecutionContext,
prior_attempts_count: int,
) -> Iterator[DagsterEvent]:
step_run_ref = step_context_to_step_run_ref(step_context, prior_attempts_count)
step_run_ref = step_context_to_step_run_ref(step_context)
run_id = step_context.pipeline_run.run_id

step_run_dir = os.path.join(self.scratch_dir, run_id, step_run_ref.step_key)
Expand Down Expand Up @@ -108,14 +107,11 @@ def _module_in_package_dir(file_path: str, package_dir: str) -> str:

def step_context_to_step_run_ref(
step_context: StepExecutionContext,
prior_attempts_count: int,
package_dir: Optional[str] = None,
) -> StepRunRef:
"""
Args:
step_context (StepExecutionContext): The step context.
prior_attempts_count (int): The number of times this time has been tried before in the same
pipeline run.
package_dir (Optional[str]): If set, the reconstruction file code pointer will be converted
to be relative a module pointer relative to the package root. This enables executing
steps in remote setups where the package containing the pipeline resides at a different
Expand All @@ -127,7 +123,6 @@ def step_context_to_step_run_ref(
"""

check.inst_param(step_context, "step_context", StepExecutionContext)
check.int_param(prior_attempts_count, "prior_attempts_count")

retry_mode = step_context.retry_mode

Expand Down Expand Up @@ -161,7 +156,6 @@ def step_context_to_step_run_ref(
step_key=step_context.step.key,
retry_mode=retry_mode,
recon_pipeline=recon_pipeline, # type: ignore
prior_attempts_count=prior_attempts_count,
known_state=step_context.get_known_state(),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ def make_run_config(scratch_dir, mode):


class RequestRetryLocalExternalStepLauncher(LocalExternalStepLauncher):
def launch_step(self, step_context, prior_attempts_count):
if prior_attempts_count == 0:
def launch_step(self, step_context):
if step_context.previous_attempt_count == 0:
raise RetryRequested()
else:
return super(RequestRetryLocalExternalStepLauncher, self).launch_step(
step_context, prior_attempts_count
step_context, step_context.previous_attempt_count
)


Expand Down Expand Up @@ -323,7 +323,7 @@ def test_step_context_to_step_run_ref():
with DagsterInstance.ephemeral() as instance:
step_context = initialize_step_context("", instance)
step = step_context.step
step_run_ref = step_context_to_step_run_ref(step_context, 0)
step_run_ref = step_context_to_step_run_ref(step_context)
assert step_run_ref.run_config == step_context.pipeline_run.run_config
assert step_run_ref.run_id == step_context.pipeline_run.run_id

Expand All @@ -344,7 +344,7 @@ def test_local_external_step_launcher():
step_context = initialize_step_context(tmpdir, instance)

step_launcher = LocalExternalStepLauncher(tmpdir)
events = list(step_launcher.launch_step(step_context, 0))
events = list(step_launcher.launch_step(step_context))
event_types = [event.event_type for event in events]
assert DagsterEventType.STEP_START in event_types
assert DagsterEventType.STEP_SUCCESS in event_types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,8 @@ def _upload_file_to_s3(local_path, s3_filename):

_upload_file_to_s3(step_run_ref_local_path, PICKLED_STEP_RUN_REF_FILE_NAME)

def launch_step(self, step_context, prior_attempts_count):
step_run_ref = step_context_to_step_run_ref(
step_context, prior_attempts_count, self.local_job_package_path
)
def launch_step(self, step_context):
step_run_ref = step_context_to_step_run_ref(step_context, self.local_job_package_path)

run_id = step_context.pipeline_run.run_id
log = step_context.log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ def __init__(
max_wait_time_sec=max_completion_wait_time_seconds,
)

def launch_step(self, step_context, prior_attempts_count):
def launch_step(self, step_context):
step_run_ref = step_context_to_step_run_ref(
step_context, prior_attempts_count, self.local_dagster_job_package_path
step_context, self.local_dagster_job_package_path
)
run_id = step_context.pipeline_run.run_id
log = step_context.log
Expand Down

0 comments on commit cca4561

Please sign in to comment.