Skip to content

Commit

Permalink
fix issue with repos and partitioned scheduled asset jobs (#8779)
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza authored and prha committed Jul 7, 2022
1 parent 2fe13df commit a42146f
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 7 deletions.
Expand Up @@ -504,11 +504,13 @@ def __init__(
schedules,
self._validate_schedule,
)
schedule_partition_sets = [
schedule.get_partition_set()
for schedule in self._schedules.get_all_definitions()
if isinstance(schedule, PartitionScheduleDefinition)
]
schedule_partition_sets = filter(
None,
[
_get_partition_set_from_schedule(schedule)
for schedule in self._schedules.get_all_definitions()
],
)
self._source_assets_by_key = source_assets_by_key

def load_partition_sets_from_pipelines() -> List[PartitionSetDefinition]:
Expand Down Expand Up @@ -688,8 +690,8 @@ def from_list(
f"Duplicate definition found for {definition.name}"
)
schedules[definition.name] = definition
if isinstance(definition, PartitionScheduleDefinition):
partition_set_def = definition.get_partition_set()
partition_set_def = _get_partition_set_from_schedule(definition)
if partition_set_def:
if (
partition_set_def.name in partition_sets
and partition_set_def != partition_sets[partition_set_def.name]
Expand Down Expand Up @@ -1311,3 +1313,27 @@ def _process_and_validate_target(

def _get_error_msg_for_target_conflict(targeter, target_type, target_name, dupe_target_type):
return f"{targeter} targets {target_type} '{target_name}', but a different {dupe_target_type} with the same name was provided. Disambiguate between these by providing a separate name to one of them."


def _get_partition_set_from_schedule(
schedule: ScheduleDefinition,
) -> Optional[PartitionSetDefinition]:
"""With the legacy APIs, partition sets can live on schedules. With the non-legacy APIs,
they live on jobs. Pulling partition sets from schedules causes problems with unresolved asset
jobs, because two different instances of the same logical partition set end up getting created
- one on the schedule and one on the the resolved job.
To avoid this problem, we avoid pulling partition sets off of schedules that target unresolved
asset jobs. This works, because the partition set still gets pulled directly off the asset job
elsewhere.
When we remove the legacy APIs, we should be able to stop pulling partition sets off of
schedules entirely and remove this entire code path.
"""
if (
isinstance(schedule, PartitionScheduleDefinition)
and not schedule.targets_unresolved_asset_job
):
return schedule.get_partition_set()
else:
return None
Expand Up @@ -521,6 +521,12 @@ def evaluate_tick(self, context: "ScheduleEvaluationContext") -> ScheduleExecuti
def has_loadable_target(self):
return isinstance(self._target, DirectTarget)

@property
def targets_unresolved_asset_job(self) -> bool:
return self.has_loadable_target() and isinstance(
self.load_target(), UnresolvedAssetJobDefinition
)

def load_target(
self,
) -> Union[GraphDefinition, PipelineDefinition, UnresolvedAssetJobDefinition]:
Expand Down
Expand Up @@ -10,6 +10,7 @@
AssetsDefinition,
DagsterInvalidDefinitionError,
DagsterInvariantViolationError,
DailyPartitionsDefinition,
IOManager,
JobDefinition,
PipelineDefinition,
Expand Down Expand Up @@ -1434,3 +1435,22 @@ def the_repo():
]

assert the_repo.get_job("the_job").executor_def == in_process_executor


def test_scheduled_partitioned_asset_job():
partitions_def = DailyPartitionsDefinition(start_date="2022-06-06")

@asset(partitions_def=partitions_def)
def asset1():
...

@repository
def repo():
return [
asset1,
build_schedule_from_partitioned_job(
define_asset_job("fdsjk", partitions_def=partitions_def)
),
]

repo.load_all_definitions()

0 comments on commit a42146f

Please sign in to comment.