diff --git a/sqlmesh/core/plan/builder.py b/sqlmesh/core/plan/builder.py index 146a143b0e..89376c786f 100644 --- a/sqlmesh/core/plan/builder.py +++ b/sqlmesh/core/plan/builder.py @@ -187,25 +187,37 @@ def build(self) -> Plan: self._apply_effective_from() - dag, ignored = self._build_filtered_dag() + dag = self._build_dag() directly_modified, indirectly_modified = self._build_directly_and_indirectly_modified(dag) - models_to_backfill = self._build_models_to_backfill(dag) self._categorize_snapshots(dag, directly_modified, indirectly_modified) self._adjust_new_snapshot_intervals() + deployability_index = ( + DeployabilityIndex.create(self._context_diff.snapshots.values()) + if self._is_dev + else DeployabilityIndex.all_deployable() + ) + + filtered_dag, ignored = self._build_filtered_dag(dag, deployability_index) + + # Exclude ignored snapshots from the modified sets. + directly_modified = {s_id for s_id in directly_modified if s_id not in ignored} + for s_id in list(indirectly_modified): + if s_id in ignored: + indirectly_modified.pop(s_id, None) + else: + indirectly_modified[s_id] = { + s_id for s_id in indirectly_modified[s_id] if s_id not in ignored + } + filtered_snapshots = { s.snapshot_id: s for s in self._context_diff.snapshots.values() if s.snapshot_id not in ignored } - deployability_index = ( - DeployabilityIndex.create(filtered_snapshots) - if self._is_dev - else DeployabilityIndex.all_deployable() - ) - + models_to_backfill = self._build_models_to_backfill(filtered_dag) restatements = self._build_restatements( dag, earliest_interval_start(filtered_snapshots.values()) ) @@ -236,25 +248,35 @@ def build(self) -> Plan: self._latest_plan = plan return plan - def _build_filtered_dag(self) -> t.Tuple[DAG[SnapshotId], t.Set[SnapshotId]]: + def _build_dag(self) -> DAG[SnapshotId]: + dag: DAG[SnapshotId] = DAG() + for s_id, context_snapshot in self._context_diff.snapshots.items(): + dag.add(s_id, context_snapshot.parents) + return dag + + def _build_filtered_dag( + self, full_dag: DAG[SnapshotId], deployability_index: DeployabilityIndex + ) -> t.Tuple[DAG[SnapshotId], t.Set[SnapshotId]]: ignored_snapshot_ids: t.Set[SnapshotId] = set() - full_dag: DAG[SnapshotId] = DAG() filtered_dag: DAG[SnapshotId] = DAG() cache: t.Optional[t.Dict[str, datetime]] = {} - for s_id, context_snapshot in self._context_diff.snapshots.items(): - full_dag.add(s_id, context_snapshot.parents) for s_id in full_dag: snapshot = self._context_diff.snapshots.get(s_id) # If the snapshot doesn't exist then it must be an external model if not snapshot: continue - if snapshot.is_valid_start( + + is_deployable = deployability_index.is_deployable(s_id) + is_valid_start = snapshot.is_valid_start( self._start, start_date(snapshot, self._context_diff.snapshots.values(), cache) - ) and set(snapshot.parents).isdisjoint(ignored_snapshot_ids): - filtered_dag.add(snapshot.snapshot_id, snapshot.parents) + ) + if not is_deployable or ( + is_valid_start and set(snapshot.parents).isdisjoint(ignored_snapshot_ids) + ): + filtered_dag.add(s_id, snapshot.parents) else: - ignored_snapshot_ids.add(snapshot.snapshot_id) - return (filtered_dag, ignored_snapshot_ids) + ignored_snapshot_ids.add(s_id) + return filtered_dag, ignored_snapshot_ids def _build_restatements( self, dag: DAG[SnapshotId], earliest_interval_start: TimeLike diff --git a/tests/core/test_plan.py b/tests/core/test_plan.py index fc5c293c97..e92f9595d4 100644 --- a/tests/core/test_plan.py +++ b/tests/core/test_plan.py @@ -1415,6 +1415,13 @@ def test_dev_plan_depends_past(make_snapshot, mocker: MockerFixture): '"a_child"', '"b"', ] + assert dev_plan_start_aligned.directly_modified == { + snapshot.snapshot_id, + snapshot_child.snapshot_id, + unrelated_snapshot.snapshot_id, + } + assert dev_plan_start_aligned.indirectly_modified == {} + dev_plan_start_ahead_of_model = PlanBuilder( context_diff, start="2023-01-02", end="2023-01-10", is_dev=True ).build() @@ -1425,6 +1432,103 @@ def test_dev_plan_depends_past(make_snapshot, mocker: MockerFixture): snapshot.snapshot_id, snapshot_child.snapshot_id, ] + assert dev_plan_start_ahead_of_model.directly_modified == {unrelated_snapshot.snapshot_id} + assert dev_plan_start_ahead_of_model.indirectly_modified == {} + + +def test_dev_plan_depends_past_non_deployable(make_snapshot, mocker: MockerFixture): + snapshot = make_snapshot( + SqlModel( + name="a", + # self reference query so it depends_on_past + query=parse_one("select 1, ds FROM a"), + start="2023-01-01", + kind=IncrementalByTimeRangeKind(time_column="ds"), + ), + ) + snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + + updated_snapshot = make_snapshot( + SqlModel( + **{ + **snapshot.model.dict(), + "query": parse_one("select 1, ds, 2 FROM a"), + } + ), + ) + updated_snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY) + + snapshot_child = make_snapshot( + SqlModel( + name="a_child", + query=parse_one("select 1, ds FROM a"), + start="2023-01-01", + kind=IncrementalByTimeRangeKind(time_column="ds"), + ), + nodes={'"a"': updated_snapshot.model}, + ) + snapshot_child.categorize_as(SnapshotChangeCategory.BREAKING) + unrelated_snapshot = make_snapshot( + SqlModel( + name="b", + query=parse_one("select 1, ds"), + start="2023-01-01", + kind=IncrementalByTimeRangeKind(time_column="ds"), + ), + ) + unrelated_snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + + assert updated_snapshot.depends_on_past + assert not snapshot_child.depends_on_past + assert not unrelated_snapshot.depends_on_past + assert snapshot_child.model.depends_on == {'"a"'} + assert snapshot_child.parents == (updated_snapshot.snapshot_id,) + assert unrelated_snapshot.model.depends_on == set() + + context_diff = ContextDiff( + environment="test_environment", + is_new_environment=True, + is_unfinalized_environment=False, + create_from="prod", + added={snapshot_child.snapshot_id, unrelated_snapshot.snapshot_id}, + removed_snapshots={}, + modified_snapshots={snapshot.name: (updated_snapshot, snapshot)}, + snapshots={ + updated_snapshot.snapshot_id: updated_snapshot, + snapshot_child.snapshot_id: snapshot_child, + unrelated_snapshot.snapshot_id: unrelated_snapshot, + }, + new_snapshots={ + updated_snapshot.snapshot_id: snapshot, + snapshot_child.snapshot_id: snapshot_child, + unrelated_snapshot.snapshot_id: unrelated_snapshot, + }, + previous_plan_id=None, + previously_promoted_snapshot_ids=set(), + previous_finalized_snapshots=None, + ) + + dev_plan_start_aligned = PlanBuilder( + context_diff, start="2023-01-01", end="2023-01-10", is_dev=True + ).build() + assert len(dev_plan_start_aligned.new_snapshots) == 3 + assert sorted([x.name for x in dev_plan_start_aligned.new_snapshots]) == [ + '"a"', + '"a_child"', + '"b"', + ] + + # There should be no ignored snapshots because all changes are non-deployable. + dev_plan_start_ahead_of_model = PlanBuilder( + context_diff, start="2023-01-02", end="2023-01-10", is_dev=True + ).build() + assert len(dev_plan_start_ahead_of_model.new_snapshots) == 3 + assert sorted([x.name for x in dev_plan_start_aligned.new_snapshots]) == [ + '"a"', + '"a_child"', + '"b"', + ] + assert not dev_plan_start_ahead_of_model.ignored def test_restatement_intervals_after_updating_start(sushi_context: Context):