Skip to content

Commit

Permalink
Rename "base asset job" to "implicit asset job" in RepositoryDefiniti…
Browse files Browse the repository at this point in the history
…on methods (#11344)

### Summary & Motivation

The language of "implicit asset jobs" is more accurate and clear than "base asset jobs" so we are changing these method names.

### How I Tested These Changes

BK
  • Loading branch information
schrockn committed Jan 18, 2023
1 parent 822676a commit 78ad6b6
Show file tree
Hide file tree
Showing 8 changed files with 17 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def test_partitioned_self_dep():


def _materialize_assets(context: WorkspaceRequestContext, repo: RepositoryDefinition):
selector = infer_job_or_pipeline_selector(context, repo.get_base_asset_job_names()[0])
selector = infer_job_or_pipeline_selector(context, repo.get_implicit_asset_job_names()[0])
return execute_dagster_graphql(
context,
LAUNCH_PIPELINE_EXECUTION_MUTATION,
Expand All @@ -104,7 +104,7 @@ def _materialize_assets(context: WorkspaceRequestContext, repo: RepositoryDefini


def _fetch_logical_versions(context: WorkspaceRequestContext, repo: RepositoryDefinition):
selector = infer_job_or_pipeline_selector(context, repo.get_base_asset_job_names()[0])
selector = infer_job_or_pipeline_selector(context, repo.get_implicit_asset_job_names()[0])
return execute_dagster_graphql(
context,
GET_ASSET_LOGICAL_VERSIONS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ def get_implicit_global_asset_job_def(self) -> JobDefinition:
def get_implicit_job_def_for_assets(
self, asset_keys: Iterable[AssetKey]
) -> Optional[JobDefinition]:
return self.get_repository_def().get_base_job_for_assets(asset_keys)
return self.get_repository_def().get_implicit_job_def_for_assets(asset_keys)

@cached_method
def get_repository_def(self) -> RepositoryDefinition:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1368,12 +1368,14 @@ def get_implicit_global_asset_job_def(self) -> JobDefinition:

return self.get_job(ASSET_BASE_JOB_PREFIX)

def get_base_asset_job_names(self) -> Sequence[str]:
def get_implicit_asset_job_names(self) -> Sequence[str]:
return [
job_name for job_name in self.job_names if job_name.startswith(ASSET_BASE_JOB_PREFIX)
]

def get_base_job_for_assets(self, asset_keys: Iterable[AssetKey]) -> Optional[JobDefinition]:
def get_implicit_job_def_for_assets(
self, asset_keys: Iterable[AssetKey]
) -> Optional[JobDefinition]:
"""
Returns the asset base job that contains all the given assets, or None if there is no such
job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ def _run_requests_with_base_asset_jobs(
else:
asset_keys = outer_asset_selection.resolve(asset_graph)

base_job = context.repository_def.get_base_job_for_assets(asset_keys)
base_job = context.repository_def.get_implicit_job_def_for_assets(asset_keys)
result.append(
run_request.with_replaced_attrs(
job_name=base_job.name, asset_selection=list(asset_keys)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1310,7 +1310,7 @@ def external_sensor_data_from_def(
base_asset_job_name: ExternalTargetData(
pipeline_name=base_asset_job_name, mode=DEFAULT_MODE_NAME, solid_selection=None
)
for base_asset_job_name in repository_def.get_base_asset_job_names()
for base_asset_job_name in repository_def.get_implicit_asset_job_names()
}
else:
target_dict = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ def test_backfill_from_partitioned_job(instance, workspace_context, external_rep
def test_backfill_with_asset_selection(instance, workspace_context, external_repo):
partition_name_list = [partition.name for partition in static_partitions.get_partitions()]
asset_selection = [AssetKey("foo"), AssetKey("a1"), AssetKey("bar")]
asset_job_name = the_repo.get_base_job_for_assets(asset_selection).name
asset_job_name = the_repo.get_implicit_job_def_for_assets(asset_selection).name
partition_set_name = f"{asset_job_name}_partition_set"
external_partition_set = external_repo.get_external_partition_set(partition_set_name)
instance.add_backfill(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def repo():
run_requests, cursor = self.cursor_from.do_scenario(instance)
for run_request in run_requests:
instance.create_run_for_pipeline(
repo.get_base_job_for_assets(run_request.asset_selection),
repo.get_implicit_job_def_for_assets(run_request.asset_selection),
asset_selection=set(run_request.asset_selection),
tags=run_request.tags,
)
Expand Down Expand Up @@ -125,7 +125,7 @@ def test_time_fn():
)

for run_request in run_requests:
base_job = repo.get_base_job_for_assets(run_request.asset_selection)
base_job = repo.get_implicit_job_def_for_assets(run_request.asset_selection)
assert base_job is not None

return run_requests, cursor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1473,9 +1473,11 @@ def asset3():
def repo():
return [asset1, asset2, asset3]

assert sorted(repo.get_base_asset_job_names()) == ["__ASSET_JOB_0", "__ASSET_JOB_1"]
assert repo.get_base_job_for_assets([asset1.key, asset2.key]).asset_layer.asset_keys == {
assert sorted(repo.get_implicit_asset_job_names()) == ["__ASSET_JOB_0", "__ASSET_JOB_1"]
assert repo.get_implicit_job_def_for_assets(
[asset1.key, asset2.key]
).asset_layer.asset_keys == {
asset1.key,
asset2.key,
}
assert repo.get_base_job_for_assets([asset2.key, asset3.key]) is None
assert repo.get_implicit_job_def_for_assets([asset2.key, asset3.key]) is None

0 comments on commit 78ad6b6

Please sign in to comment.