/
vault_insert_by_rank_materialization.sql
137 lines (93 loc) · 5.35 KB
/
vault_insert_by_rank_materialization.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
{% materialization vault_insert_by_rank, default -%}
{%- set full_refresh_mode = flags.FULL_REFRESH -%}
{%- set target_relation = this -%}
{%- set existing_relation = load_relation(this) -%}
{%- set tmp_relation = make_temp_relation(this) -%}
{%- set rank_column = config.require('rank_column') -%}
{%- set rank_source_models = config.require('rank_source_models') -%}
{%- set min_max_ranks = dbtvault.get_min_max_ranks(rank_column, rank_source_models) | as_native -%}
{%- set to_drop = [] -%}
{%- do dbtvault.check_placeholder(sql, "__RANK_FILTER__") -%}
{{ run_hooks(pre_hooks, inside_transaction=False) }}
-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}
{% if existing_relation is none %}
{% set filtered_sql = dbtvault.replace_placeholder_with_rank_filter(sql, rank_column, 1) %}
{% set build_sql = create_table_as(False, target_relation, filtered_sql) %}
{% do to_drop.append(tmp_relation) %}
{% elif existing_relation.is_view or full_refresh_mode %}
{#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #}
{% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %}
{% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %}
{% do adapter.drop_relation(backup_relation) %}
{% do adapter.rename_relation(target_relation, backup_relation) %}
{% set filtered_sql = dbtvault.replace_placeholder_with_rank_filter(sql, rank_column, 1) %}
{% set build_sql = create_table_as(False, target_relation, filtered_sql) %}
{% do to_drop.append(tmp_relation) %}
{% do to_drop.append(backup_relation) %}
{% else %}
{% set target_columns = adapter.get_columns_in_relation(target_relation) %}
{%- set target_cols_csv = target_columns | map(attribute='quoted') | join(', ') -%}
{%- set loop_vars = {'sum_rows_inserted': 0} -%}
{% for i in range(min_max_ranks.max_rank | int ) -%}
{%- set iteration_number = i + 1 -%}
{%- set filtered_sql = dbtvault.replace_placeholder_with_rank_filter(sql, rank_column, iteration_number) -%}
{{ dbt_utils.log_info("Running for {} {} of {} on column '{}' [{}]".format('rank', iteration_number, min_max_ranks.max_rank, rank_column, model.unique_id)) }}
{% set tmp_relation = make_temp_relation(this) %}
{% call statement() -%}
{{ dbt.create_table_as(True, tmp_relation, filtered_sql) }}
{%- endcall %}
{{ adapter.expand_target_column_types(from_relation=tmp_relation,
to_relation=target_relation) }}
{%- set insert_query_name = 'main-' ~ i -%}
{% call statement(insert_query_name, fetch_result=True) -%}
insert into {{ target_relation }} ({{ target_cols_csv }})
(
select {{ target_cols_csv }}
from {{ tmp_relation.include(schema=True) }}
);
{%- endcall %}
{% set result = load_result(insert_query_name) %}
{% if 'response' in result.keys() %} {# added in v0.19.0 #}
{% set rows_inserted = result['response']['rows_affected'] %}
{% else %} {# older versions #}
{% set rows_inserted = result['status'].split(" ")[2] | int %}
{% endif %}
{%- set sum_rows_inserted = loop_vars['sum_rows_inserted'] + rows_inserted -%}
{%- do loop_vars.update({'sum_rows_inserted': sum_rows_inserted}) %}
{{ dbt_utils.log_info("Ran for {} {} of {}; {} records inserted [{}]".format('rank', iteration_number,
min_max_ranks.max_rank,
rows_inserted,
model.unique_id)) }}
{% do to_drop.append(tmp_relation) %}
{% do adapter.commit() %}
{% endfor %}
{% call noop_statement('main', "INSERT {}".format(loop_vars['sum_rows_inserted']) ) -%}
{{ filtered_sql }}
{%- endcall %}
{% endif %}
{% if build_sql is defined %}
{% call statement("main", fetch_result=True) %}
{{ build_sql }}
{% endcall %}
{% set result = load_result('main') %}
{% if 'response' in result.keys() %} {# added in v0.19.0 #}
{% set rows_inserted = result['response']['rows_affected'] %}
{% else %} {# older versions #}
{% set rows_inserted = result['status'].split(" ")[2] | int %}
{% endif %}
{% call noop_statement('main', "BASE LOAD {}".format(rows_inserted)) -%}
{{ build_sql }}
{%- endcall %}
-- `COMMIT` happens here
{% do adapter.commit() %}
{% endif %}
{{ run_hooks(post_hooks, inside_transaction=True) }}
{% for rel in to_drop %}
{% if rel.type is not none %}
{% do adapter.drop_relation(rel) %}
{% endif %}
{% endfor %}
{{ run_hooks(post_hooks, inside_transaction=False) }}
{{ return({'relations': [target_relation]}) }}
{%- endmaterialization %}