Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 39 additions & 17 deletions sqlmesh/core/plan/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,25 +187,37 @@ def build(self) -> Plan:

self._apply_effective_from()

dag, ignored = self._build_filtered_dag()
dag = self._build_dag()
directly_modified, indirectly_modified = self._build_directly_and_indirectly_modified(dag)
models_to_backfill = self._build_models_to_backfill(dag)

self._categorize_snapshots(dag, directly_modified, indirectly_modified)
self._adjust_new_snapshot_intervals()

deployability_index = (
DeployabilityIndex.create(self._context_diff.snapshots.values())
if self._is_dev
else DeployabilityIndex.all_deployable()
)

filtered_dag, ignored = self._build_filtered_dag(dag, deployability_index)

# Exclude ignored snapshots from the modified sets.
directly_modified = {s_id for s_id in directly_modified if s_id not in ignored}
for s_id in list(indirectly_modified):
if s_id in ignored:
indirectly_modified.pop(s_id, None)
else:
indirectly_modified[s_id] = {
s_id for s_id in indirectly_modified[s_id] if s_id not in ignored
}

filtered_snapshots = {
s.snapshot_id: s
for s in self._context_diff.snapshots.values()
if s.snapshot_id not in ignored
}

deployability_index = (
DeployabilityIndex.create(filtered_snapshots)
if self._is_dev
else DeployabilityIndex.all_deployable()
)

models_to_backfill = self._build_models_to_backfill(filtered_dag)
restatements = self._build_restatements(
dag, earliest_interval_start(filtered_snapshots.values())
)
Expand Down Expand Up @@ -236,25 +248,35 @@ def build(self) -> Plan:
self._latest_plan = plan
return plan

def _build_filtered_dag(self) -> t.Tuple[DAG[SnapshotId], t.Set[SnapshotId]]:
def _build_dag(self) -> DAG[SnapshotId]:
dag: DAG[SnapshotId] = DAG()
for s_id, context_snapshot in self._context_diff.snapshots.items():
dag.add(s_id, context_snapshot.parents)
return dag

def _build_filtered_dag(
self, full_dag: DAG[SnapshotId], deployability_index: DeployabilityIndex
) -> t.Tuple[DAG[SnapshotId], t.Set[SnapshotId]]:
ignored_snapshot_ids: t.Set[SnapshotId] = set()
full_dag: DAG[SnapshotId] = DAG()
filtered_dag: DAG[SnapshotId] = DAG()
cache: t.Optional[t.Dict[str, datetime]] = {}
for s_id, context_snapshot in self._context_diff.snapshots.items():
full_dag.add(s_id, context_snapshot.parents)
for s_id in full_dag:
snapshot = self._context_diff.snapshots.get(s_id)
# If the snapshot doesn't exist then it must be an external model
if not snapshot:
continue
if snapshot.is_valid_start(

is_deployable = deployability_index.is_deployable(s_id)
is_valid_start = snapshot.is_valid_start(
self._start, start_date(snapshot, self._context_diff.snapshots.values(), cache)
) and set(snapshot.parents).isdisjoint(ignored_snapshot_ids):
filtered_dag.add(snapshot.snapshot_id, snapshot.parents)
)
if not is_deployable or (
is_valid_start and set(snapshot.parents).isdisjoint(ignored_snapshot_ids)
):
filtered_dag.add(s_id, snapshot.parents)
else:
ignored_snapshot_ids.add(snapshot.snapshot_id)
return (filtered_dag, ignored_snapshot_ids)
ignored_snapshot_ids.add(s_id)
return filtered_dag, ignored_snapshot_ids

def _build_restatements(
self, dag: DAG[SnapshotId], earliest_interval_start: TimeLike
Expand Down
104 changes: 104 additions & 0 deletions tests/core/test_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -1415,6 +1415,13 @@ def test_dev_plan_depends_past(make_snapshot, mocker: MockerFixture):
'"a_child"',
'"b"',
]
assert dev_plan_start_aligned.directly_modified == {
snapshot.snapshot_id,
snapshot_child.snapshot_id,
unrelated_snapshot.snapshot_id,
}
assert dev_plan_start_aligned.indirectly_modified == {}

dev_plan_start_ahead_of_model = PlanBuilder(
context_diff, start="2023-01-02", end="2023-01-10", is_dev=True
).build()
Expand All @@ -1425,6 +1432,103 @@ def test_dev_plan_depends_past(make_snapshot, mocker: MockerFixture):
snapshot.snapshot_id,
snapshot_child.snapshot_id,
]
assert dev_plan_start_ahead_of_model.directly_modified == {unrelated_snapshot.snapshot_id}
assert dev_plan_start_ahead_of_model.indirectly_modified == {}


def test_dev_plan_depends_past_non_deployable(make_snapshot, mocker: MockerFixture):
snapshot = make_snapshot(
SqlModel(
name="a",
# self reference query so it depends_on_past
query=parse_one("select 1, ds FROM a"),
start="2023-01-01",
kind=IncrementalByTimeRangeKind(time_column="ds"),
),
)
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)

updated_snapshot = make_snapshot(
SqlModel(
**{
**snapshot.model.dict(),
"query": parse_one("select 1, ds, 2 FROM a"),
}
),
)
updated_snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY)

snapshot_child = make_snapshot(
SqlModel(
name="a_child",
query=parse_one("select 1, ds FROM a"),
start="2023-01-01",
kind=IncrementalByTimeRangeKind(time_column="ds"),
),
nodes={'"a"': updated_snapshot.model},
)
snapshot_child.categorize_as(SnapshotChangeCategory.BREAKING)
unrelated_snapshot = make_snapshot(
SqlModel(
name="b",
query=parse_one("select 1, ds"),
start="2023-01-01",
kind=IncrementalByTimeRangeKind(time_column="ds"),
),
)
unrelated_snapshot.categorize_as(SnapshotChangeCategory.BREAKING)

assert updated_snapshot.depends_on_past
assert not snapshot_child.depends_on_past
assert not unrelated_snapshot.depends_on_past
assert snapshot_child.model.depends_on == {'"a"'}
assert snapshot_child.parents == (updated_snapshot.snapshot_id,)
assert unrelated_snapshot.model.depends_on == set()

context_diff = ContextDiff(
environment="test_environment",
is_new_environment=True,
is_unfinalized_environment=False,
create_from="prod",
added={snapshot_child.snapshot_id, unrelated_snapshot.snapshot_id},
removed_snapshots={},
modified_snapshots={snapshot.name: (updated_snapshot, snapshot)},
snapshots={
updated_snapshot.snapshot_id: updated_snapshot,
snapshot_child.snapshot_id: snapshot_child,
unrelated_snapshot.snapshot_id: unrelated_snapshot,
},
new_snapshots={
updated_snapshot.snapshot_id: snapshot,
snapshot_child.snapshot_id: snapshot_child,
unrelated_snapshot.snapshot_id: unrelated_snapshot,
},
previous_plan_id=None,
previously_promoted_snapshot_ids=set(),
previous_finalized_snapshots=None,
)

dev_plan_start_aligned = PlanBuilder(
context_diff, start="2023-01-01", end="2023-01-10", is_dev=True
).build()
assert len(dev_plan_start_aligned.new_snapshots) == 3
assert sorted([x.name for x in dev_plan_start_aligned.new_snapshots]) == [
'"a"',
'"a_child"',
'"b"',
]

# There should be no ignored snapshots because all changes are non-deployable.
dev_plan_start_ahead_of_model = PlanBuilder(
context_diff, start="2023-01-02", end="2023-01-10", is_dev=True
).build()
assert len(dev_plan_start_ahead_of_model.new_snapshots) == 3
assert sorted([x.name for x in dev_plan_start_aligned.new_snapshots]) == [
'"a"',
'"a_child"',
'"b"',
]
assert not dev_plan_start_ahead_of_model.ignored


def test_restatement_intervals_after_updating_start(sushi_context: Context):
Expand Down