diff --git a/python_modules/dagster/dagster/core/definitions/repository_definition.py b/python_modules/dagster/dagster/core/definitions/repository_definition.py index c1a6bc471db9..9263c4340f1d 100644 --- a/python_modules/dagster/dagster/core/definitions/repository_definition.py +++ b/python_modules/dagster/dagster/core/definitions/repository_definition.py @@ -504,11 +504,13 @@ def __init__( schedules, self._validate_schedule, ) - schedule_partition_sets = [ - schedule.get_partition_set() - for schedule in self._schedules.get_all_definitions() - if isinstance(schedule, PartitionScheduleDefinition) - ] + schedule_partition_sets = filter( + None, + [ + _get_partition_set_from_schedule(schedule) + for schedule in self._schedules.get_all_definitions() + ], + ) self._source_assets_by_key = source_assets_by_key def load_partition_sets_from_pipelines() -> List[PartitionSetDefinition]: @@ -688,8 +690,8 @@ def from_list( f"Duplicate definition found for {definition.name}" ) schedules[definition.name] = definition - if isinstance(definition, PartitionScheduleDefinition): - partition_set_def = definition.get_partition_set() + partition_set_def = _get_partition_set_from_schedule(definition) + if partition_set_def: if ( partition_set_def.name in partition_sets and partition_set_def != partition_sets[partition_set_def.name] @@ -1311,3 +1313,27 @@ def _process_and_validate_target( def _get_error_msg_for_target_conflict(targeter, target_type, target_name, dupe_target_type): return f"{targeter} targets {target_type} '{target_name}', but a different {dupe_target_type} with the same name was provided. Disambiguate between these by providing a separate name to one of them." + + +def _get_partition_set_from_schedule( + schedule: ScheduleDefinition, +) -> Optional[PartitionSetDefinition]: + """With the legacy APIs, partition sets can live on schedules. With the non-legacy APIs, + they live on jobs. Pulling partition sets from schedules causes problems with unresolved asset + jobs, because two different instances of the same logical partition set end up getting created + - one on the schedule and one on the the resolved job. + + To avoid this problem, we avoid pulling partition sets off of schedules that target unresolved + asset jobs. This works, because the partition set still gets pulled directly off the asset job + elsewhere. + + When we remove the legacy APIs, we should be able to stop pulling partition sets off of + schedules entirely and remove this entire code path. + """ + if ( + isinstance(schedule, PartitionScheduleDefinition) + and not schedule.targets_unresolved_asset_job + ): + return schedule.get_partition_set() + else: + return None diff --git a/python_modules/dagster/dagster/core/definitions/schedule_definition.py b/python_modules/dagster/dagster/core/definitions/schedule_definition.py index 68ad5fb21ee8..01275760e68b 100644 --- a/python_modules/dagster/dagster/core/definitions/schedule_definition.py +++ b/python_modules/dagster/dagster/core/definitions/schedule_definition.py @@ -521,6 +521,12 @@ def evaluate_tick(self, context: "ScheduleEvaluationContext") -> ScheduleExecuti def has_loadable_target(self): return isinstance(self._target, DirectTarget) + @property + def targets_unresolved_asset_job(self) -> bool: + return self.has_loadable_target() and isinstance( + self.load_target(), UnresolvedAssetJobDefinition + ) + def load_target( self, ) -> Union[GraphDefinition, PipelineDefinition, UnresolvedAssetJobDefinition]: diff --git a/python_modules/dagster/dagster_tests/core_tests/definitions_tests/test_repository_definition.py b/python_modules/dagster/dagster_tests/core_tests/definitions_tests/test_repository_definition.py index d306758f7f7b..82da5af60a48 100644 --- a/python_modules/dagster/dagster_tests/core_tests/definitions_tests/test_repository_definition.py +++ b/python_modules/dagster/dagster_tests/core_tests/definitions_tests/test_repository_definition.py @@ -10,6 +10,7 @@ AssetsDefinition, DagsterInvalidDefinitionError, DagsterInvariantViolationError, + DailyPartitionsDefinition, IOManager, JobDefinition, PipelineDefinition, @@ -1434,3 +1435,22 @@ def the_repo(): ] assert the_repo.get_job("the_job").executor_def == in_process_executor + + +def test_scheduled_partitioned_asset_job(): + partitions_def = DailyPartitionsDefinition(start_date="2022-06-06") + + @asset(partitions_def=partitions_def) + def asset1(): + ... + + @repository + def repo(): + return [ + asset1, + build_schedule_from_partitioned_job( + define_asset_job("fdsjk", partitions_def=partitions_def) + ), + ] + + repo.load_all_definitions()