-
Notifications
You must be signed in to change notification settings - Fork 113
/
hub.sql
94 lines (80 loc) · 3.01 KB
/
hub.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/*
* Copyright (c) Business Thinking Ltd. 2019-2023
* This software includes code developed by the AutomateDV (f.k.a dbtvault) Team at Business Thinking Ltd. Trading as Datavault
*/
{%- macro bigquery__hub(src_pk, src_nk, src_extra_columns, src_ldts, src_source, source_model) -%}
{%- set source_cols = automate_dv.expand_column_list(columns=[src_pk, src_nk, src_extra_columns, src_ldts, src_source]) -%}
{%- if model.config.materialized == 'vault_insert_by_rank' %}
{%- set source_cols_with_rank = source_cols + [config.get('rank_column')] -%}
{%- endif %}
{{ 'WITH ' -}}
{%- set stage_count = source_model | length -%}
{%- set ns = namespace(last_cte= "") -%}
{%- for src in source_model -%}
{%- set source_number = loop.index | string -%}
row_rank_{{ source_number }} AS (
{%- if model.config.materialized == 'vault_insert_by_rank' %}
SELECT {{ automate_dv.prefix(source_cols_with_rank, 'rr') }},
{%- else %}
SELECT {{ automate_dv.prefix(source_cols, 'rr') }},
{%- endif %}
ROW_NUMBER() OVER(
PARTITION BY {{ automate_dv.prefix([src_pk], 'rr') }}
ORDER BY {{ automate_dv.prefix([src_ldts], 'rr') }}
) AS row_number
FROM {{ ref(src) }} AS rr
WHERE {{ automate_dv.multikey(src_pk, prefix='rr', condition='IS NOT NULL') }}
QUALIFY row_number = 1
{%- set ns.last_cte = "row_rank_{}".format(source_number) %}
),
{% endfor -%}
{% if stage_count > 1 %}
stage_union AS (
{%- for src in source_model %}
SELECT * FROM row_rank_{{ loop.index | string }}
{%- if not loop.last %}
UNION ALL
{%- endif %}
{%- endfor %}
{%- set ns.last_cte = "stage_union" %}
),
{%- endif -%}
{%- if model.config.materialized == 'vault_insert_by_period' %}
stage_mat_filter AS (
SELECT *
FROM {{ ns.last_cte }}
WHERE __PERIOD_FILTER__
{%- set ns.last_cte = "stage_mat_filter" %}
),
{%- elif model.config.materialized == 'vault_insert_by_rank' %}
stage_mat_filter AS (
SELECT *
FROM {{ ns.last_cte }}
WHERE __RANK_FILTER__
{%- set ns.last_cte = "stage_mat_filter" %}
),
{%- endif -%}
{%- if stage_count > 1 %}
row_rank_union AS (
SELECT ru.*,
ROW_NUMBER() OVER(
PARTITION BY {{ automate_dv.prefix([src_pk], 'ru') }}
ORDER BY {{ automate_dv.prefix([src_ldts], 'ru') }}, {{ automate_dv.prefix([src_source], 'ru') }} ASC
) AS row_rank_number
FROM {{ ns.last_cte }} AS ru
WHERE {{ automate_dv.multikey(src_pk, prefix='ru', condition='IS NOT NULL') }}
QUALIFY row_rank_number = 1
{%- set ns.last_cte = "row_rank_union" %}
),
{% endif %}
records_to_insert AS (
SELECT {{ automate_dv.prefix(source_cols, 'a', alias_target='target') }}
FROM {{ ns.last_cte }} AS a
{%- if automate_dv.is_any_incremental() %}
LEFT JOIN {{ this }} AS d
ON {{ automate_dv.multikey(src_pk, prefix=['a','d'], condition='=') }}
WHERE {{ automate_dv.multikey(src_pk, prefix='d', condition='IS NULL') }}
{%- endif %}
)
SELECT * FROM records_to_insert
{%- endmacro -%}