-
Notifications
You must be signed in to change notification settings - Fork 113
/
link.sql
105 lines (92 loc) · 3.42 KB
/
link.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
/*
* Copyright (c) Business Thinking Ltd. 2019-2023
* This software includes code developed by the dbtvault Team at Business Thinking Ltd. Trading as Datavault
*/
{%- macro postgres__link(src_pk, src_fk, src_extra_columns, src_ldts, src_source, source_model) -%}
{%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_fk, src_extra_columns, src_ldts, src_source]) -%}
{%- set fk_cols = dbtvault.expand_column_list([src_fk]) -%}
{%- if model.config.materialized == 'vault_insert_by_rank' %}
{%- set source_cols_with_rank = source_cols + [config.get('rank_column')] -%}
{%- endif -%}
{{ dbtvault.prepend_generated_by() }}
{{ 'WITH ' -}}
{%- if not (source_model is iterable and source_model is not string) -%}
{%- set source_model = [source_model] -%}
{%- endif -%}
{%- set ns = namespace(last_cte= "") -%}
{%- for src in source_model -%}
{%- set source_number = loop.index | string -%}
row_rank_{{ source_number }} AS (
SELECT * FROM (
{%- if model.config.materialized == 'vault_insert_by_rank' %}
SELECT {{ dbtvault.prefix(source_cols_with_rank, 'rr') }},
{%- else %}
SELECT {{ dbtvault.prefix(source_cols, 'rr') }},
{%- endif %}
ROW_NUMBER() OVER(
PARTITION BY {{ dbtvault.prefix([src_pk], 'rr') }}
ORDER BY {{ dbtvault.prefix([src_ldts], 'rr') }}
) AS row_number
FROM {{ ref(src) }} AS rr
{%- if source_model | length == 1 %}
WHERE {{ dbtvault.multikey(src_pk, prefix='rr', condition='IS NOT NULL') }}
AND {{ dbtvault.multikey(fk_cols, prefix='rr', condition='IS NOT NULL') }}
{%- endif %}
) as l
WHERE row_number = 1
{%- set ns.last_cte = "row_rank_{}".format(source_number) %}
),{{ "\n" if not loop.last }}
{% endfor -%}
{% if source_model | length > 1 %}
stage_union AS (
{%- for src in source_model %}
SELECT * FROM row_rank_{{ loop.index | string }}
{%- if not loop.last %}
UNION ALL
{%- endif %}
{%- endfor %}
{%- set ns.last_cte = "stage_union" %}
),
{%- endif -%}
{%- if model.config.materialized == 'vault_insert_by_period' %}
stage_mat_filter AS (
SELECT *
FROM {{ ns.last_cte }}
WHERE __PERIOD_FILTER__
{%- set ns.last_cte = "stage_mat_filter" %}
),
{%- elif model.config.materialized == 'vault_insert_by_rank' %}
stage_mat_filter AS (
SELECT *
FROM {{ ns.last_cte }}
WHERE __RANK_FILTER__
{%- set ns.last_cte = "stage_mat_filter" %}
),
{% endif %}
{%- if source_model | length > 1 %}
row_rank_union AS (
SELECT * FROM (
SELECT ru.*,
ROW_NUMBER() OVER(
PARTITION BY {{ dbtvault.prefix([src_pk], 'ru') }}
ORDER BY {{ dbtvault.prefix([src_ldts], 'ru') }}, {{ dbtvault.prefix([src_source], 'ru') }} ASC
) AS row_rank_number
FROM {{ ns.last_cte }} AS ru
WHERE {{ dbtvault.multikey(src_pk, prefix='ru', condition='IS NOT NULL') }}
AND {{ dbtvault.multikey(fk_cols, prefix='ru', condition='IS NOT NULL') }}
) AS a
WHERE row_rank_number = 1
{%- set ns.last_cte = "row_rank_union" %}
),
{% endif %}
records_to_insert AS (
SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='target') }}
FROM {{ ns.last_cte }} AS a
{%- if dbtvault.is_any_incremental() %}
LEFT JOIN {{ this }} AS d
ON {{ dbtvault.multikey(src_pk, prefix=['a','d'], condition='=') }}
WHERE {{ dbtvault.multikey(src_pk, prefix='d', condition='IS NULL') }}
{%- endif %}
)
SELECT * FROM records_to_insert
{%- endmacro -%}