-
Notifications
You must be signed in to change notification settings - Fork 121
/
ma_sat.sql
127 lines (107 loc) · 6.49 KB
/
ma_sat.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
{%- macro ma_sat(src_pk, src_cdk, src_hashdiff, src_payload, src_eff, src_ldts, src_source, source_model) -%}
{{- adapter.dispatch('ma_sat', packages = dbtvault.get_dbtvault_namespaces())(src_pk=src_pk, src_cdk=src_cdk, src_hashdiff=src_hashdiff,
src_payload=src_payload, src_eff=src_eff, src_ldts=src_ldts,
src_source=src_source, source_model=source_model) -}}
{%- endmacro %}
{%- macro default__ma_sat(src_pk, src_cdk, src_hashdiff, src_payload, src_eff, src_ldts, src_source, source_model) -%}
{{- dbtvault.check_required_parameters(src_pk=src_pk, src_cdk=src_cdk, src_hashdiff=src_hashdiff,
src_payload=src_payload, src_ldts=src_ldts, src_source=src_source,
source_model=source_model) -}}
{%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_hashdiff, src_cdk, src_payload, src_eff, src_ldts, src_source]) -%}
{%- set rank_cols = dbtvault.expand_column_list(columns=[src_pk, src_hashdiff, src_ldts]) -%}
{%- set cdk_cols = dbtvault.expand_column_list(columns=[src_cdk]) -%}
{%- if model.config.materialized == 'vault_insert_by_rank' %}
{%- set source_cols_with_rank = source_cols + [config.get('rank_column')] -%}
{%- endif -%}
{{ dbtvault.prepend_generated_by() }}
WITH source_data AS (
{%- if model.config.materialized == 'vault_insert_by_rank' %}
SELECT {{ dbtvault.prefix(source_cols_with_rank, 'a', alias_target='source') }}
{%- else %}
SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='source') }}
{%- endif %}
,COUNT(DISTINCT {{ dbtvault.prefix([src_hashdiff], 'a') }}, {{ dbtvault.prefix(cdk_cols, 'a') }} )
OVER (PARTITION BY {{ dbtvault.prefix([src_pk], 'a') }}) AS source_count
FROM {{ ref(source_model) }} AS a
WHERE {{ dbtvault.prefix([src_pk], 'a') }} IS NOT NULL
{%- for child_key in src_cdk %}
AND {{ dbtvault.multikey(child_key, 'a', condition='IS NOT NULL') }}
{%- endfor %}
{%- if model.config.materialized == 'vault_insert_by_period' %}
AND __PERIOD_FILTER__
{%- elif model.config.materialized == 'vault_insert_by_rank' %}
AND __RANK_FILTER__
{%- endif %}
),
{% if dbtvault.is_any_incremental() %}
{# Select latest records from satellite together with count of distinct hashdiffs for each hashkey #}
latest_records AS (
SELECT *, COUNT(DISTINCT {{ dbtvault.prefix([src_hashdiff], 'latest_selection') }}, {{ dbtvault.prefix(cdk_cols, 'latest_selection') }} )
OVER (PARTITION BY {{ dbtvault.prefix([src_pk], 'latest_selection') }}) AS target_count
FROM (
SELECT {{ dbtvault.prefix(cdk_cols, 'target_records', alias_target='target') }}, {{ dbtvault.prefix(rank_cols, 'target_records', alias_target='target') }}
,RANK() OVER (PARTITION BY {{ dbtvault.prefix([src_pk], 'target_records') }}
ORDER BY {{ dbtvault.prefix([src_ldts], 'target_records') }} DESC) AS rank_value
FROM {{ this }} AS target_records
INNER JOIN
(SELECT DISTINCT {{ dbtvault.prefix([src_pk], 'source_pks') }}
FROM source_data AS source_pks) AS source_records
ON {{ dbtvault.prefix([src_pk], 'target_records') }} = {{ dbtvault.prefix([src_pk], 'source_records') }}
QUALIFY rank_value = 1
) AS latest_selection
),
{# Select PKs and hashdiff counts for matching stage and sat records #}
{# Matching by hashkey + hashdiff + cdk #}
matching_records AS (
SELECT {{ dbtvault.prefix([src_pk], 'stage', alias_target='target') }}
,COUNT(DISTINCT {{ dbtvault.prefix([src_hashdiff], 'stage') }}, {{ dbtvault.prefix(cdk_cols, 'stage') }}) AS match_count
FROM source_data AS stage
INNER JOIN latest_records
ON {{ dbtvault.prefix([src_pk], 'stage') }} = {{ dbtvault.prefix([src_pk], 'latest_records', alias_target='target') }}
AND {{ dbtvault.prefix([src_hashdiff], 'stage') }} = {{ dbtvault.prefix([src_hashdiff], 'latest_records', alias_target='target') }}
{%- for child_key in src_cdk %}
AND {{ dbtvault.prefix([child_key], 'stage') }} = {{ dbtvault.prefix([child_key], 'latest_records') }}
{%- endfor %}
GROUP BY {{ dbtvault.prefix([src_pk], 'stage') }}
),
{# Select stage records with PKs that exist in sat where hashdiffs differ #}
{# either where total counts differ or where match counts differ #}
satellite_update AS (
SELECT DISTINCT {{ dbtvault.prefix([src_pk], 'stage', alias_target='target') }}
FROM source_data AS stage
INNER JOIN latest_records
ON {{ dbtvault.prefix([src_pk], 'latest_records') }} = {{ dbtvault.prefix([src_pk], 'stage') }}
LEFT OUTER JOIN matching_records
ON {{ dbtvault.prefix([src_pk], 'matching_records') }} = {{ dbtvault.prefix([src_pk], 'latest_records') }}
WHERE (stage.source_count != latest_records.target_count
OR COALESCE(matching_records.match_count, 0) != latest_records.target_count)
{%- if model.config.materialized == 'vault_insert_by_rank' or model.config.materialized == 'vault_insert_by_period' %}
AND {{ dbtvault.prefix([src_ldts], 'stage') }} >= {{ dbtvault.prefix([src_ldts], 'latest_records') }}
{%- endif %}
),
{# Select stage records with PKs that do not exist in sat #}
satellite_insert AS (
SELECT DISTINCT {{ dbtvault.prefix([src_pk], 'stage', alias_target='target') }}
FROM source_data AS stage
LEFT OUTER JOIN latest_records
ON {{ dbtvault.prefix([src_pk], 'stage') }} = {{ dbtvault.prefix([src_pk], 'latest_records') }}
WHERE {{ dbtvault.prefix([src_pk], 'latest_records') }} IS NULL
),
{%- endif %}
records_to_insert AS (
SELECT {% if not dbtvault.is_any_incremental() %} DISTINCT {% endif %} {{ dbtvault.alias_all(source_cols, 'stage') }}
FROM source_data AS stage
{#- Restrict to "to-do lists" of keys selected by satellite_update and satellite_insert CTEs #}
{%- if dbtvault.is_vault_insert_by_period() or dbtvault.is_vault_insert_by_rank() or is_incremental() %}
INNER JOIN satellite_update
ON {{ dbtvault.prefix([src_pk], 'satellite_update') }} = {{ dbtvault.prefix([src_pk], 'stage') }}
UNION
SELECT {{ dbtvault.alias_all(source_cols, 'stage') }}
FROM source_data AS stage
INNER JOIN satellite_insert
ON {{ dbtvault.prefix([src_pk], 'satellite_insert') }} = {{ dbtvault.prefix([src_pk], 'stage') }}
{%- endif %}
)
{# Select stage records #}
SELECT * FROM records_to_insert
{%- endmacro -%}