/
upload_dbt_artifacts.sql
77 lines (70 loc) · 4.33 KB
/
upload_dbt_artifacts.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
{% macro upload_dbt_artifacts(results) %}
{% set edr_cli_run = elementary.get_config_var('edr_cli_run') %}
{% if execute and not edr_cli_run and results %}
{% set database_name, schema_name = elementary.get_model_database_and_schema('elementary', 'dbt_run_results') %}
{%- set dbt_run_results_relation = adapter.get_relation(database=database_name,
schema=schema_name,
identifier='dbt_run_results') -%}
{%- if dbt_run_results_relation -%}
{% do elementary.upload_artifacts_to_table(dbt_run_results_relation, results, elementary.get_flatten_run_result_callback(),
should_commit=True) %}
{%- endif -%}
{% endif %}
{{ return ('') }}
{% endmacro %}
{% macro get_dbt_run_results_empty_table_query() %}
{% set dbt_run_results_empty_table_query = elementary.empty_table([('model_execution_id', 'long_string'),
('unique_id', 'long_string'),
('invocation_id', 'string'),
('generated_at', 'string'),
('name', 'long_string'),
('status', 'string'),
('resource_type', 'string'),
('execution_time', 'float'),
('execute_started_at', 'string'),
('execute_completed_at', 'string'),
('compile_started_at', 'string'),
('compile_completed_at', 'string'),
('rows_affected', 'bigint'),
('full_refresh', 'boolean')]) %}
{{ return(dbt_run_results_empty_table_query) }}
{% endmacro %}
{%- macro get_flatten_run_result_callback() -%}
{{- return(adapter.dispatch('flatten_run_result', 'elementary')) -}}
{%- endmacro -%}
{%- macro flatten_run_result(node_dict) -%}
{{- return(adapter.dispatch('flatten_run_result', 'elementary')(node_dict)) -}}
{%- endmacro -%}
{% macro default__flatten_run_result(run_result) %}
{% set run_result_dict = run_result.to_dict() %}
{% set node = elementary.safe_get_with_default(run_result_dict, 'node', {}) %}
{% set flatten_run_result_dict = {
'model_execution_id': elementary.get_node_execution_id(node),
'invocation_id': invocation_id,
'unique_id': node.get('unique_id'),
'name': node.get('name'),
'generated_at': run_started_at.strftime('%Y-%m-%d %H:%M:%S'),
'rows_affected': run_result_dict.get('adapter_response', {}).get('rows_affected'),
'execution_time': run_result_dict.get('execution_time'),
'status': run_result_dict.get('status'),
'resource_type': node.get('resource_type'),
'execute_started_at': none,
'execute_completed_at': none,
'compile_started_at': none,
'compile_completed_at': none,
'full_refresh': flags.FULL_REFRESH
}%}
{% set timings = elementary.safe_get_with_default(run_result_dict, 'timing', []) %}
{% if timings %}
{% for timing in timings %}
{% if timing is mapping %}
{% if timing.get('name') == 'execute' %}
{% do flatten_run_result_dict.update({'execute_started_at': timing.get('started_at'), 'execute_completed_at': timing.get('completed_at')}) %}
{% elif timing.get('name') == 'compile' %}
{% do flatten_run_result_dict.update({'compile_started_at': timing.get('started_at'), 'compile_completed_at': timing.get('completed_at')}) %}
{% endif %}
{% endif %}
{% endfor %}
{% endif %}
{{ return(flatten_run_result_dict) }}
{% endmacro %}