-
Notifications
You must be signed in to change notification settings - Fork 113
/
pit.sql
208 lines (160 loc) · 7.71 KB
/
pit.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
/*
* Copyright (c) Business Thinking Ltd. 2019-2023
* This software includes code developed by the AutomateDV (f.k.a dbtvault) Team at Business Thinking Ltd. Trading as Datavault
*/
{%- macro pit(src_pk, src_extra_columns, as_of_dates_table, satellites, stage_tables_ldts, src_ldts, source_model) -%}
{%- if automate_dv.is_something(src_extra_columns) and execute -%}
{%- do exceptions.warn("WARNING: src_extra_columns not yet available for PITs or Bridges. This parameter will be ignored.") -%}
{%- endif -%}
{{- automate_dv.check_required_parameters(src_pk=src_pk,
as_of_dates_table=as_of_dates_table,
satellites=satellites,
stage_tables_ldts=stage_tables_ldts,
src_ldts=src_ldts,
source_model=source_model) -}}
{{- automate_dv.prepend_generated_by() }}
{%- for stg in stage_tables_ldts %}
{{ "-- depends_on: " ~ ref(stg) -}}
{%- endfor -%}
{{ adapter.dispatch('pit', 'automate_dv')(src_pk=src_pk,
src_extra_columns=src_extra_columns,
as_of_dates_table=as_of_dates_table,
satellites=satellites,
stage_tables_ldts=stage_tables_ldts,
src_ldts=src_ldts,
source_model=source_model) -}}
{%- endmacro -%}
{%- macro default__pit(src_pk, src_extra_columns, as_of_dates_table, satellites, stage_tables_ldts, src_ldts, source_model) -%}
{#- Acquiring the source relation for the AS_OF table -#}
{%- if as_of_dates_table is mapping and as_of_dates_table is not none -%}
{%- set source_name = as_of_dates_table | first -%}
{%- set source_table_name = as_of_dates_table[source_name] -%}
{%- set as_of_table_relation = source(source_name, source_table_name) -%}
{%- elif as_of_dates_table is not mapping and as_of_dates_table is not none -%}
{%- set as_of_table_relation = ref(as_of_dates_table) -%}
{%- endif -%}
{#- Setting ghost values to replace NULLS -#}
{%- set ghost_pk = '0000000000000000' -%}
{%- set ghost_date = '1900-01-01 00:00:00.000' %}
{%- set hash = var('hash', 'MD5') -%}
{%- set enable_ghost_record = var('enable_ghost_records', false) -%}
{%- if automate_dv.is_any_incremental() -%}
{%- set new_as_of_dates_cte = 'new_rows_as_of' -%}
{%- else -%}
{%- set new_as_of_dates_cte = 'as_of_dates' -%}
{%- endif %}
WITH as_of_dates AS (
SELECT * FROM {{ as_of_table_relation }}
),
{%- if automate_dv.is_any_incremental() %}
{{ automate_dv.as_of_date_window(src_pk, src_ldts, stage_tables_ldts, ref(source_model)) }},
backfill_rows_as_of_dates AS (
SELECT
{{ automate_dv.prefix([src_pk], 'a') }},
b.AS_OF_DATE
FROM new_rows_pks AS a
INNER JOIN backfill_as_of AS b
ON (1=1)
),
backfill AS (
SELECT
{{ automate_dv.prefix([src_pk], 'a') }},
a.AS_OF_DATE,
{%- for sat_name in satellites -%}
{%- set sat_pk_name = (satellites[sat_name]['pk'].keys() | list )[0] -%}
{%- set sat_ldts_name = (satellites[sat_name]['ldts'].keys() | list )[0] -%}
{%- set sat_name = sat_name -%}
{%- set sat_pk = satellites[sat_name]['pk'][sat_pk_name] -%}
{%- set sat_ldts = satellites[sat_name]['ldts'][sat_ldts_name] -%}
{%- set column_str = "{}.{}".format(sat_name | lower ~ '_src', sat_ldts) -%}
{% if enable_ghost_record %}
COALESCE(MAX({{ sat_name | lower ~ '_src' }}.{{ sat_pk }}),
{{ automate_dv.binary_ghost(none, hash) }})
AS {{ sat_name }}_{{ sat_pk_name }},
COALESCE(MAX({{ sat_name | lower ~ '_src' }}.{{ sat_ldts }}),
{{ automate_dv.date_ghost(date_type = sat_ldts.dtype, alias=none) }})
AS {{ sat_name }}_{{ sat_ldts_name }}
{%- else %}
COALESCE(MAX({{ sat_name | lower ~ '_src' }}.{{ sat_pk }}),
{{ automate_dv.cast_binary(ghost_pk, quote=true) }})
AS {{ sat_name }}_{{ sat_pk_name }},
COALESCE(MAX({{ sat_name | lower ~ '_src' }}.{{ sat_ldts }}),
{{ automate_dv.cast_date(ghost_date, as_string=true, datetime=true) }})
AS {{ sat_name }}_{{ sat_ldts_name }}
{%- endif -%}
{%- if not loop.last -%},{%- endif -%}
{%- endfor %}
FROM backfill_rows_as_of_dates AS a
{%- for sat_name in satellites -%}
{%- set sat_pk_name = (satellites[sat_name]['pk'].keys() | list )[0] -%}
{%- set sat_ldts_name = (satellites[sat_name]['ldts'].keys() | list )[0] -%}
{%- set sat_pk = satellites[sat_name]['pk'][sat_pk_name] -%}
{%- set sat_ldts = satellites[sat_name]['ldts'][sat_ldts_name] %}
LEFT OUTER JOIN {{ ref(sat_name) }} AS {{ sat_name | lower ~ '_src' }}
ON a.{{ src_pk }} = {{ sat_name | lower }}_src.{{ sat_pk }}
AND {{ sat_name | lower ~ '_src'}}.{{ sat_ldts }} <= a.AS_OF_DATE
{% endfor %}
GROUP BY
{{ automate_dv.prefix([src_pk], 'a') }}, a.AS_OF_DATE
),
{%- endif %}
new_rows_as_of_dates AS (
SELECT
{{ automate_dv.prefix([src_pk], 'a') }},
b.AS_OF_DATE
FROM {{ ref(source_model) }} AS a
INNER JOIN {{ new_as_of_dates_cte }} AS b
ON (1=1)
),
new_rows AS (
SELECT
{{ automate_dv.prefix([src_pk], 'a') }},
a.AS_OF_DATE,
{%- for sat_name in satellites -%}
{%- set sat_pk_name = (satellites[sat_name]['pk'].keys() | list)[0] -%}
{%- set sat_ldts_name = (satellites[sat_name]['ldts'].keys() | list)[0] -%}
{%- set sat_pk = satellites[sat_name]['pk'][sat_pk_name] -%}
{%- set sat_ldts = satellites[sat_name]['ldts'][sat_ldts_name] -%}
{%- set column_str = "{}.{}".format(sat_name | lower ~ '_src', sat_ldts) -%}
{% if enable_ghost_record %}
COALESCE(MAX({{ sat_name | lower ~ '_src' }}.{{ sat_pk }}),
{{ automate_dv.binary_ghost(none, hash) }})
AS {{ sat_name }}_{{ sat_pk_name }},
COALESCE(MAX({{ sat_name | lower ~ '_src' }}.{{ sat_ldts }}),
{{ automate_dv.date_ghost(date_type = sat_ldts.dtype, alias=none) }})
AS {{ sat_name }}_{{ sat_ldts_name }}
{%- else %}
COALESCE(MAX({{ sat_name | lower ~ '_src' }}.{{ sat_pk }}),
{{ automate_dv.cast_binary(ghost_pk, quote=true) }})
AS {{ sat_name }}_{{ sat_pk_name }},
COALESCE(MAX({{ sat_name | lower ~ '_src' }}.{{ sat_ldts }}),
{{ automate_dv.cast_date(ghost_date, as_string=true, datetime=true) }})
AS {{ sat_name }}_{{ sat_ldts_name }}
{%- endif -%}
{%- if not loop.last -%},{%- endif -%}
{%- endfor %}
FROM new_rows_as_of_dates AS a
{%- for sat_name in satellites -%}
{%- set sat_pk_name = (satellites[sat_name]['pk'].keys() | list )[0] -%}
{%- set sat_ldts_name = (satellites[sat_name]['ldts'].keys() | list )[0] -%}
{%- set sat_pk = satellites[sat_name]['pk'][sat_pk_name] -%}
{%- set sat_ldts = satellites[sat_name]['ldts'][sat_ldts_name] %}
LEFT OUTER JOIN {{ ref(sat_name) }} AS {{ sat_name | lower ~ '_src' }}
ON a.{{ src_pk }} = {{ sat_name | lower }}_src.{{ sat_pk }}
AND {{ sat_name | lower ~ '_src'}}.{{ sat_ldts }} <= a.AS_OF_DATE
{% endfor %}
GROUP BY
{{ automate_dv.prefix([src_pk], 'a') }},
a.AS_OF_DATE
),
pit AS (
SELECT * FROM new_rows
{%- if automate_dv.is_any_incremental() %}
UNION ALL
SELECT * FROM overlap_pks
UNION ALL
SELECT * FROM backfill
{% endif %}
)
SELECT DISTINCT * FROM pit
{%- endmacro -%}