-
Notifications
You must be signed in to change notification settings - Fork 113
/
t_link.sql
54 lines (44 loc) · 2.35 KB
/
t_link.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/*
* Copyright (c) Business Thinking Ltd. 2019-2022
* This software includes code developed by the dbtvault Team at Business Thinking Ltd. Trading as Datavault
*/
{%- macro t_link(src_pk, src_fk, src_payload, src_extra_columns, src_eff, src_ldts, src_source, source_model) -%}
{{- dbtvault.check_required_parameters(src_pk=src_pk, src_fk=src_fk, src_eff=src_eff,
src_ldts=src_ldts, src_source=src_source,
source_model=source_model) -}}
{{ dbtvault.prepend_generated_by() }}
{{ adapter.dispatch('t_link', 'dbtvault')(src_pk=src_pk, src_fk=src_fk, src_payload=src_payload,
src_extra_columns=src_extra_columns,
src_eff=src_eff, src_ldts=src_ldts, src_source=src_source,
source_model=source_model) -}}
{%- endmacro %}
{%- macro default__t_link(src_pk, src_fk, src_payload, src_extra_columns, src_eff, src_ldts, src_source, source_model) -%}
{%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_fk, src_payload, src_extra_columns, src_eff, src_ldts, src_source]) -%}
{%- set fk_cols = dbtvault.expand_column_list([src_fk]) %}
WITH stage AS (
SELECT {{ source_cols | join(', ') }}
FROM {{ ref(source_model) }}
{%- if model.config.materialized == 'vault_insert_by_period' %}
WHERE __PERIOD_FILTER__
AND {{ dbtvault.multikey(src_pk, condition='IS NOT NULL') }}
AND {{ dbtvault.multikey(fk_cols, condition='IS NOT NULL') }}
{%- elif model.config.materialized == 'vault_insert_by_rank' %}
WHERE __RANK_FILTER__
AND {{ dbtvault.multikey(src_pk, condition='IS NOT NULL') }}
AND {{ dbtvault.multikey(fk_cols, condition='IS NOT NULL') }}
{%- else %}
WHERE {{ dbtvault.multikey(src_pk, condition='IS NOT NULL') }}
AND {{ dbtvault.multikey(fk_cols, condition='IS NOT NULL') }}
{%- endif %}
),
records_to_insert AS (
SELECT DISTINCT {{ dbtvault.prefix(source_cols, 'stg') }}
FROM stage AS stg
{% if dbtvault.is_any_incremental() -%}
LEFT JOIN {{ this }} AS tgt
ON {{ dbtvault.multikey(src_pk, prefix=['stg','tgt'], condition='=') }}
WHERE {{ dbtvault.multikey(src_pk, prefix='tgt', condition='IS NULL') }}
{%- endif %}
)
SELECT * FROM records_to_insert
{%- endmacro -%}