Skip to content

Commit

Permalink
[assets] Update modern-data-stack assets (#8271)
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Jun 8, 2022
1 parent ec409c2 commit 6552e43
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .assets import analytics_assets
from .assets import mds_repo
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@
from dagster_dbt import dbt_cli_resource, load_assets_from_dbt_project
from scipy import optimize

from dagster import AssetGroup, asset
from dagster import asset, load_assets_from_current_module, repository
from dagster.core.execution.with_resources import with_resources

from .constants import * # pylint: disable=wildcard-import,unused-wildcard-import
from .pandas_io_manager import pandas_io_manager

airbyte_assets = build_airbyte_assets(
connection_id=AIRBYTE_CONNECTION_ID,
destination_tables=["orders", "users"],
asset_key_prefix=["public"],
connection_id=AIRBYTE_CONNECTION_ID, destination_tables=["orders", "users"]
)

dbt_assets = load_assets_from_dbt_project(
Expand Down Expand Up @@ -45,11 +44,17 @@ def predicted_orders(
return pd.DataFrame({"order_date": future_dates, "num_orders": predicted_data})


analytics_assets = AssetGroup(
[*airbyte_assets, *dbt_assets, order_forecast_model, predicted_orders],
resource_defs={
"airbyte": airbyte_resource.configured(AIRBYTE_CONFIG),
"dbt": dbt_cli_resource.configured(DBT_CONFIG),
"pandas_io_manager": pandas_io_manager.configured(PANDAS_IO_CONFIG),
},
).build_job("Assets")
# all of the resources needed for interacting with our tools
resource_defs = {
"airbyte": airbyte_resource.configured(AIRBYTE_CONFIG),
"dbt": dbt_cli_resource.configured(DBT_CONFIG),
"pandas_io_manager": pandas_io_manager.configured(PANDAS_IO_CONFIG),
}


@repository
def mds_repo():
return with_resources(
load_assets_from_current_module(),
resource_defs=resource_defs,
)
Original file line number Diff line number Diff line change
Expand Up @@ -619,8 +619,8 @@ def group_names_by_assets(self) -> Mapping[AssetKey, str]:

def build_asset_selection_job(
name: str,
assets: Sequence["AssetsDefinition"],
source_assets: Sequence["SourceAsset"],
assets: Iterable["AssetsDefinition"],
source_assets: Iterable["SourceAsset"],
executor_def: Optional[ExecutorDefinition] = None,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
description: Optional[str] = None,
Expand Down Expand Up @@ -669,7 +669,7 @@ def _subset_assets_defs(

included_keys: Set[AssetKey] = set()

for asset in assets:
for asset in set(assets):
# intersection
selected_subset = selected_asset_keys & asset.asset_keys
included_keys.update(selected_subset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,8 @@ def _get_job_def_for_asset_selection(

new_job = build_asset_selection_job(
name=self.name,
assets=list(self.asset_layer.assets_defs_by_key.values()),
source_assets=list(self.asset_layer.source_assets_by_key.values()),
assets=set(self.asset_layer.assets_defs_by_key.values()),
source_assets=self.asset_layer.source_assets_by_key.values(),
executor_def=self.executor_def,
resource_defs=self.resource_defs,
description=self.description,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,15 +372,15 @@ def final(a, d):
),
(["d", "e", "f"], False, None),
(["d", "e", "f"], True, None),
(["*final"], False, None),
(["start+"], False, None),
(
["*final"],
["start+"],
True,
(
DagsterInvalidSubsetError,
r"When building job, the AssetsDefinition 'abc_' contains asset keys "
r"\[AssetKey\(\['a'\]\), AssetKey\(\['b'\]\), AssetKey\(\['c'\]\)\], but attempted to "
r"select only \[AssetKey\(\['a'\]\), AssetKey\(\['b'\]\)\]",
r"select only \[AssetKey\(\['a'\]\)\]",
),
),
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,14 @@ def multi_asset_with_internal_deps(thing): # pylint: disable=unused-argument
}


@multi_asset(outs={"a": Out(is_required=False), "b": Out(is_required=False)}, can_subset=True)
def ab(context, foo):
if "a" in context.selected_output_names:
yield Output(foo + 1, "a")
if "b" in context.selected_output_names:
yield Output(foo + 2, "b")


@asset
def foo():
return 5
Expand All @@ -1134,7 +1142,7 @@ def unconnected():
pass


asset_group = AssetGroup([foo, bar, foo_bar, baz, unconnected])
asset_group = AssetGroup([foo, ab, bar, foo_bar, baz, unconnected])


def test_disconnected_subset():
Expand Down Expand Up @@ -1246,6 +1254,41 @@ def complicated_graph():
job.execute_in_process(instance=instance, asset_selection=[AssetKey("comments_table")])


def test_multi_subset():
with instance_for_test() as instance:
job = asset_group.build_job("foo")
result = job.execute_in_process(
instance=instance,
asset_selection=[AssetKey("foo"), AssetKey("a")],
)
materialization_events = sorted(
[event for event in result.all_events if event.is_step_materialization],
key=lambda event: event.asset_key,
)

assert len(materialization_events) == 2
assert materialization_events[0].asset_key == AssetKey("a")
assert materialization_events[1].asset_key == AssetKey("foo")


def test_multi_all():
with instance_for_test() as instance:
job = asset_group.build_job("foo")
result = job.execute_in_process(
instance=instance,
asset_selection=[AssetKey("foo"), AssetKey("a"), AssetKey("b")],
)
materialization_events = sorted(
[event for event in result.all_events if event.is_step_materialization],
key=lambda event: event.asset_key,
)

assert len(materialization_events) == 3
assert materialization_events[0].asset_key == AssetKey("a")
assert materialization_events[1].asset_key == AssetKey("b")
assert materialization_events[2].asset_key == AssetKey("foo")


def test_subset_with_source_asset():
class MyIOManager(IOManager):
def handle_output(self, context, obj):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,15 @@ def final(a, d):
),
(["d", "e", "f"], False, None),
(["d", "e", "f"], True, None),
(["*final"], False, None),
(["start+"], False, None),
(
["*final"],
["start+"],
True,
(
DagsterInvalidSubsetError,
r"When building job, the AssetsDefinition 'abc_' contains asset keys "
r"\[AssetKey\(\['a'\]\), AssetKey\(\['b'\]\), AssetKey\(\['c'\]\)\], but attempted to "
r"select only \[AssetKey\(\['a'\]\), AssetKey\(\['b'\]\)\]",
r"select only \[AssetKey\(\['a'\]\)\]",
),
),
],
Expand Down

0 comments on commit 6552e43

Please sign in to comment.