-
Notifications
You must be signed in to change notification settings - Fork 113
/
sat.sql
85 lines (71 loc) · 3.35 KB
/
sat.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/*
* Copyright (c) Business Thinking Ltd. 2019-2023
* This software includes code developed by the dbtvault Team at Business Thinking Ltd. Trading as Datavault
*/
{%- macro postgres__sat(src_pk, src_hashdiff, src_payload, src_extra_columns, src_eff, src_ldts, src_source, source_model) -%}
{%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_hashdiff, src_payload, src_extra_columns, src_eff, src_ldts, src_source]) -%}
{%- set window_cols = dbtvault.expand_column_list(columns=[src_pk, src_hashdiff, src_ldts]) -%}
{%- set pk_cols = dbtvault.expand_column_list(columns=[src_pk]) -%}
{%- set enable_ghost_record = var('enable_ghost_records', false) -%}
{%- if model.config.materialized == 'vault_insert_by_rank' %}
{%- set source_cols_with_rank = source_cols + [config.get('rank_column')] -%}
{%- endif -%}
WITH source_data AS (
{%- if model.config.materialized == 'vault_insert_by_rank' %}
SELECT {{ dbtvault.prefix(source_cols_with_rank, 'a', alias_target='source') }}
{%- else %}
SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='source') }}
{%- endif %}
FROM {{ ref(source_model) }} AS a
WHERE {{ dbtvault.multikey(src_pk, prefix='a', condition='IS NOT NULL') }}
{%- if model.config.materialized == 'vault_insert_by_period' %}
AND __PERIOD_FILTER__
{% elif model.config.materialized == 'vault_insert_by_rank' %}
AND __RANK_FILTER__
{% endif %}
),
{%- if dbtvault.is_any_incremental() %}
latest_records AS (
SELECT {{ dbtvault.prefix(window_cols, 'a', alias_target='target') }}
FROM (
SELECT {{ dbtvault.prefix(window_cols, 'current_records', alias_target='target') }},
RANK() OVER (
PARTITION BY {{ dbtvault.prefix([src_pk], 'current_records') }}
ORDER BY {{ dbtvault.prefix([src_ldts], 'current_records') }} DESC
) AS rank
FROM {{ this }} AS current_records
JOIN (
SELECT DISTINCT {{ dbtvault.prefix([src_pk], 'source_data') }}
FROM source_data
) AS source_records
ON {{ dbtvault.multikey(src_pk, prefix=['current_records','source_records'], condition='=') }}
) AS a
WHERE a.rank = 1
),
{%- endif %}
{%- if enable_ghost_record %}
ghost AS (
{{- dbtvault.create_ghost_record(src_pk, src_hashdiff, src_payload, src_extra_columns, src_eff, src_ldts, src_source, source_model) }}
),
{%- endif %}
records_to_insert AS (
{%- if enable_ghost_record -%}
SELECT
{{ dbtvault.alias_all(source_cols, 'g') }}
FROM ghost AS g
{%- if dbtvault.is_any_incremental() %}
WHERE NOT EXISTS ( SELECT 1 FROM {{ this }} AS h WHERE {{ dbtvault.prefix([src_hashdiff], 'h', alias_target='target') }} = {{ dbtvault.prefix([src_hashdiff], 'g') }} )
{%- endif %}
UNION
{%- endif %}
SELECT DISTINCT {{ dbtvault.alias_all(source_cols, 'stage') }}
FROM source_data AS stage
{%- if dbtvault.is_any_incremental() %}
LEFT JOIN latest_records
ON {{ dbtvault.multikey(src_pk, prefix=['latest_records','stage'], condition='=') }}
AND {{ dbtvault.prefix([src_hashdiff], 'latest_records', alias_target='target') }} = {{ dbtvault.prefix([src_hashdiff], 'stage') }}
WHERE {{ dbtvault.prefix([src_hashdiff], 'latest_records', alias_target='target') }} IS NULL
{%- endif %}
)
SELECT * FROM records_to_insert
{%- endmacro -%}