-
Notifications
You must be signed in to change notification settings - Fork 12
/
metrics.py
155 lines (134 loc) · 7.29 KB
/
metrics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import traceback
from typing import Generic, List, Sequence
from dbt_semantic_interfaces.errors import ParsingException
from dbt_semantic_interfaces.implementations.metric import PydanticMetricTimeWindow
from dbt_semantic_interfaces.protocols import (
Metric,
SemanticManifest,
SemanticManifestT,
)
from dbt_semantic_interfaces.references import MetricModelReference
from dbt_semantic_interfaces.type_enums import MetricType
from dbt_semantic_interfaces.validations.unique_valid_name import UniqueAndValidNameRule
from dbt_semantic_interfaces.validations.validator_helpers import (
FileContext,
MetricContext,
SemanticManifestValidationRule,
ValidationError,
ValidationIssue,
validate_safely,
)
class CumulativeMetricRule(SemanticManifestValidationRule[SemanticManifestT], Generic[SemanticManifestT]):
"""Checks that cumulative sum metrics are configured properly."""
@staticmethod
@validate_safely(whats_being_done="checking that the params of metric are valid if it is a cumulative sum metric")
def _validate_cumulative_sum_metric_params(metric: Metric) -> List[ValidationIssue]:
issues: List[ValidationIssue] = []
if metric.type == MetricType.CUMULATIVE:
if metric.type_params.window and metric.type_params.grain_to_date:
issues.append(
ValidationError(
context=MetricContext(
file_context=FileContext.from_metadata(metadata=metric.metadata),
metric=MetricModelReference(metric_name=metric.name),
),
message="Both window and grain_to_date set for cumulative metric. Please set one or the other",
)
)
if metric.type_params.window:
try:
window_str = f"{metric.type_params.window.count} {metric.type_params.window.granularity.value}"
# TODO: Should not call an implementation class.
PydanticMetricTimeWindow.parse(window_str)
except ParsingException as e:
issues.append(
ValidationError(
context=MetricContext(
file_context=FileContext.from_metadata(metadata=metric.metadata),
metric=MetricModelReference(metric_name=metric.name),
),
message="".join(traceback.format_exception_only(etype=type(e), value=e)),
extra_detail="".join(traceback.format_tb(e.__traceback__)),
)
)
return issues
@staticmethod
@validate_safely(whats_being_done="running model validation ensuring cumulative sum metrics are valid")
def validate_manifest(semantic_manifest: SemanticManifestT) -> Sequence[ValidationIssue]: # noqa: D
issues: List[ValidationIssue] = []
for metric in semantic_manifest.metrics or []:
issues += CumulativeMetricRule._validate_cumulative_sum_metric_params(metric=metric)
return issues
class DerivedMetricRule(SemanticManifestValidationRule[SemanticManifestT], Generic[SemanticManifestT]):
"""Checks that derived metrics are configured properly."""
@staticmethod
@validate_safely(whats_being_done="checking that the alias set are not unique and distinct")
def _validate_alias_collision(metric: Metric) -> List[ValidationIssue]:
issues: List[ValidationIssue] = []
if metric.type == MetricType.DERIVED:
metric_context = MetricContext(
file_context=FileContext.from_metadata(metadata=metric.metadata),
metric=MetricModelReference(metric_name=metric.name),
)
used_names = {input_metric.name for input_metric in metric.input_metrics}
for input_metric in metric.input_metrics:
if input_metric.alias:
issues += UniqueAndValidNameRule.check_valid_name(input_metric.alias, metric_context)
if input_metric.alias in used_names:
issues.append(
ValidationError(
context=metric_context,
message=f"Alias '{input_metric.alias}' for input metric: '{input_metric.name}' is "
"already being used. Please choose another alias.",
)
)
used_names.add(input_metric.alias)
return issues
@staticmethod
@validate_safely(whats_being_done="checking that the input metrics exist")
def _validate_input_metrics_exist(semantic_manifest: SemanticManifest) -> List[ValidationIssue]:
issues: List[ValidationIssue] = []
all_metrics = {m.name for m in semantic_manifest.metrics}
for metric in semantic_manifest.metrics:
if metric.type == MetricType.DERIVED:
for input_metric in metric.input_metrics:
if input_metric.name not in all_metrics:
issues.append(
ValidationError(
context=MetricContext(
file_context=FileContext.from_metadata(metadata=metric.metadata),
metric=MetricModelReference(metric_name=metric.name),
),
message=f"For metric: {metric.name}, input metric: '{input_metric.name}' does not "
"exist as a configured metric in the model.",
)
)
return issues
@staticmethod
@validate_safely(whats_being_done="checking that input metric time offset params are valid")
def _validate_time_offset_params(metric: Metric) -> List[ValidationIssue]:
issues: List[ValidationIssue] = []
for input_metric in metric.input_metrics or []:
if input_metric.offset_window and input_metric.offset_to_grain:
issues.append(
ValidationError(
context=MetricContext(
file_context=FileContext.from_metadata(metadata=metric.metadata),
metric=MetricModelReference(metric_name=metric.name),
),
message=f"Both offset_window and offset_to_grain set for derived metric '{metric.name}' on "
f"input metric '{input_metric.name}'. Please set one or the other.",
)
)
return issues
@staticmethod
@validate_safely(
whats_being_done="running model validation ensuring derived metrics properties are configured properly"
)
def validate_manifest(semantic_manifest: SemanticManifestT) -> Sequence[ValidationIssue]: # noqa: D
issues: List[ValidationIssue] = []
issues += DerivedMetricRule._validate_input_metrics_exist(semantic_manifest=semantic_manifest)
for metric in semantic_manifest.metrics or []:
issues += DerivedMetricRule._validate_alias_collision(metric=metric)
issues += DerivedMetricRule._validate_time_offset_params(metric=metric)
return issues