Skip to content

Commit

Permalink
partitioned assets toy (#6960)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Mar 7, 2022
1 parent 73ec3a2 commit 9e1fa57
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# pylint: disable=redefined-outer-name
from dagster import AssetGroup, DailyPartitionsDefinition, asset

daily_partitions_def = DailyPartitionsDefinition(start_date="2020-01-01")


@asset(partitions_def=daily_partitions_def)
def upstream_daily_partitioned_asset():
pass


@asset(partitions_def=daily_partitions_def)
def downstream_daily_partitioned_asset(upstream_daily_partitioned_asset):
assert upstream_daily_partitioned_asset is None


partitioned_asset_group = AssetGroup.from_current_module()
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from dagster_test.graph_job_op_toys.longitudinal import longitudinal_job
from dagster_test.graph_job_op_toys.many_events import many_events, many_events_subset_job
from dagster_test.graph_job_op_toys.notebooks import hello_world_notebook_pipeline
from dagster_test.graph_job_op_toys.partitioned_assets import partitioned_asset_group
from dagster_test.graph_job_op_toys.retries import retry_job
from dagster_test.graph_job_op_toys.sleepy import sleepy_job
from dagster_test.graph_job_op_toys.software_defined_assets import software_defined_assets
Expand Down Expand Up @@ -86,3 +87,8 @@ def long_asset_keys_repository():
@repository
def big_honkin_assets_repository():
return [big_honkin_asset_group]


@repository
def partitioned_asset_repository():
return [partitioned_asset_group]
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from dagster_test.graph_job_op_toys.log_spew import log_spew
from dagster_test.graph_job_op_toys.longitudinal import IntentionalRandomFailure, longitudinal
from dagster_test.graph_job_op_toys.many_events import many_events
from dagster_test.graph_job_op_toys.partitioned_assets import partitioned_asset_group
from dagster_test.graph_job_op_toys.pyspark_assets.pyspark_assets_job import (
dir_resources,
pyspark_assets,
Expand Down Expand Up @@ -299,3 +300,11 @@ def test_retry_job(executor_def):

def test_software_defined_assets_job():
assert software_defined_assets.build_job("all_assets").execute_in_process().success


def test_partitioned_assets():
assert (
partitioned_asset_group.build_job("all_assets")
.execute_in_process(partition_key="2020-02-01")
.success
)

0 comments on commit 9e1fa57

Please sign in to comment.