Skip to content

Commit

Permalink
Asset Subselection (#7835)
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed May 18, 2022
1 parent fb025dc commit 422fa71
Show file tree
Hide file tree
Showing 61 changed files with 10,123 additions and 7,494 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,13 @@ export const LaunchAssetExecutionButton: React.FC<{
],
},
runConfigData: {},
stepKeys: assets.map((o) => o.opNames).flat(),
selector: {
repositoryLocationName: repoAddress.location,
repositoryName: repoAddress.name,
pipelineName: jobName,
assetSelection: assets.map((asset) => ({
path: asset.assetKey.path,
})),
},
},
})}
Expand Down
2 changes: 2 additions & 0 deletions js_modules/dagit/packages/core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions js_modules/dagit/packages/core/src/runs/RunFragments.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ export const RunFragments = {
parentRunId
pipelineName
solidSelection
assetSelection {
... on AssetKey {
path
}
}
pipelineSnapshotId
executionPlan {
artifactsPersisted
Expand Down
5 changes: 5 additions & 0 deletions js_modules/dagit/packages/core/src/runs/RunTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ export const RUN_TABLE_RUN_FRAGMENT = gql`
repositoryLocationName
}
solidSelection
assetSelection {
... on AssetKey {
path
}
}
status
tags {
key
Expand Down
6 changes: 5 additions & 1 deletion js_modules/dagit/packages/core/src/runs/RunUtils.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ export function getReexecutionVariables(input: {
repositoryName,
pipelineName: run.pipelineName,
solidSelection: run.solidSelection,
assetSelection: run.assetSelection
? run.assetSelection.map((asset_key) => ({
path: asset_key.path,
}))
: null,
},
};

Expand All @@ -180,7 +185,6 @@ export function getReexecutionVariables(input: {
value: style.selection.query,
});
}

return {executionParams};
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions js_modules/dagit/packages/core/src/runs/types/RunFragment.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions js_modules/dagit/packages/core/src/runs/types/RunRootQuery.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions js_modules/dagit/packages/core/src/types/globalTypes.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ def do_launch(graphene_info, execution_params, is_reexecuted=False):
)
check.str_param(execution_metadata.root_run_id, "root_run_id")
check.str_param(execution_metadata.parent_run_id, "parent_run_id")

external_pipeline = get_external_pipeline_or_raise(graphene_info, execution_params.selector)

pipeline_run = create_valid_pipeline_run(graphene_info, external_pipeline, execution_params)

return graphene_info.context.instance.submit_run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ def create_valid_pipeline_run(graphene_info, external_pipeline, execution_params
run_id=execution_params.execution_metadata.run_id
if execution_params.execution_metadata.run_id
else make_new_run_id(),
asset_selection=frozenset(execution_params.selector.asset_selection)
if execution_params.selector.asset_selection
else None,
solid_selection=execution_params.selector.solid_selection,
solids_to_execute=frozenset(execution_params.selector.solid_selection)
if execution_params.selector.solid_selection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def get_external_pipeline_or_raise(graphene_info, selector):

full_pipeline = get_full_external_pipeline_or_raise(graphene_info, selector)

if selector.solid_selection is None:
if selector.solid_selection is None and selector.asset_selection is None:
return full_pipeline

return get_subset_external_pipeline(graphene_info.context, selector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from graphql.execution.base import ResolveInfo

import dagster._check as check
from dagster.core.definitions.events import AssetKey
from dagster.core.host_representation import GraphSelector, PipelineSelector
from dagster.core.workspace.context import BaseWorkspaceRequestContext
from dagster.utils.error import serializable_error_info_from_exc_info
Expand Down Expand Up @@ -65,11 +66,13 @@ def __init__(self, error):


def pipeline_selector_from_graphql(data):
asset_selection = data.get("assetSelection", [])
return PipelineSelector(
location_name=data["repositoryLocationName"],
repository_name=data["repositoryName"],
pipeline_name=data.get("pipelineName") or data.get("jobName"),
solid_selection=data.get("solidSelection"),
asset_selection=[AssetKey.from_graphql_input(asset_key) for asset_key in asset_selection],
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class GrapheneJobOrPipelineSelector(graphene.InputObjectType):
repositoryName = graphene.NonNull(graphene.String)
repositoryLocationName = graphene.NonNull(graphene.String)
solidSelection = graphene.List(graphene.NonNull(graphene.String))
assetSelection = graphene.List(graphene.NonNull(GrapheneAssetKeyInput))

class Meta:
description = """This type represents the fields necessary to identify a job or pipeline"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ class GrapheneRun(graphene.ObjectType):
pipelineName = graphene.NonNull(graphene.String)
jobName = graphene.NonNull(graphene.String)
solidSelection = graphene.List(graphene.NonNull(graphene.String))
assetSelection = graphene.List(graphene.NonNull(GrapheneAssetKey))
resolvedOpSelection = graphene.List(graphene.NonNull(graphene.String))
stats = graphene.NonNull(GrapheneRunStatsSnapshotOrError)
stepStats = non_null_list(GrapheneRunStepStats)
Expand Down Expand Up @@ -287,6 +288,9 @@ def resolve_jobName(self, _graphene_info):
def resolve_solidSelection(self, _graphene_info):
return self._pipeline_run.solid_selection

def resolve_assetSelection(self, _graphene_info):
return self._pipeline_run.asset_selection

def resolve_resolvedOpSelection(self, _graphene_info):
return self._pipeline_run.solids_to_execute

Expand Down

0 comments on commit 422fa71

Please sign in to comment.