-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
snapshot.py
66 lines (59 loc) · 2.84 KB
/
snapshot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from typing import Union, List, Optional, Literal
from dataclasses import dataclass
from dbt_common.dataclass_schema import ValidationError
from dbt.artifacts.resources.types import NodeType
from dbt.artifacts.resources.v1.components import CompiledResource, DeferRelation
from dbt.artifacts.resources.v1.config import NodeConfig
@dataclass
class SnapshotConfig(NodeConfig):
materialized: str = "snapshot"
strategy: Optional[str] = None
unique_key: Optional[str] = None
target_schema: Optional[str] = None
target_database: Optional[str] = None
updated_at: Optional[str] = None
# Not using Optional because of serialization issues with a Union of str and List[str]
check_cols: Union[str, List[str], None] = None
@classmethod
def validate(cls, data):
super().validate(data)
# Note: currently you can't just set these keys in schema.yml because this validation
# will fail when parsing the snapshot node.
if not data.get("strategy") or not data.get("unique_key") or not data.get("target_schema"):
raise ValidationError(
"Snapshots must be configured with a 'strategy', 'unique_key', "
"and 'target_schema'."
)
if data.get("strategy") == "check":
if not data.get("check_cols"):
raise ValidationError(
"A snapshot configured with the check strategy must "
"specify a check_cols configuration."
)
if isinstance(data["check_cols"], str) and data["check_cols"] != "all":
raise ValidationError(
f"Invalid value for 'check_cols': {data['check_cols']}. "
"Expected 'all' or a list of strings."
)
elif data.get("strategy") == "timestamp":
if not data.get("updated_at"):
raise ValidationError(
"A snapshot configured with the timestamp strategy "
"must specify an updated_at configuration."
)
if data.get("check_cols"):
raise ValidationError("A 'timestamp' snapshot should not have 'check_cols'")
# If the strategy is not 'check' or 'timestamp' it's a custom strategy,
# formerly supported with GenericSnapshotConfig
if data.get("materialized") and data.get("materialized") != "snapshot":
raise ValidationError("A snapshot must have a materialized value of 'snapshot'")
# Called by "calculate_node_config_dict" in ContextConfigGenerator
def finalize_and_validate(self):
data = self.to_dict(omit_none=True)
self.validate(data)
return self.from_dict(data)
@dataclass
class Snapshot(CompiledResource):
resource_type: Literal[NodeType.Snapshot]
config: SnapshotConfig
defer_relation: Optional[DeferRelation] = None