/
upload_run_results.sql
84 lines (79 loc) · 5.25 KB
/
upload_run_results.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
{% macro upload_run_results() %}
{% set relation = elementary.get_elementary_relation('dbt_run_results') %}
{% if execute and relation %}
{{ elementary.file_log("Uploading run results.") }}
{% do elementary.upload_artifacts_to_table(relation, results, elementary.flatten_run_result, append=True, should_commit=True, on_query_exceed=elementary.on_run_result_query_exceed) %}
{{ elementary.file_log("Uploaded run results successfully.") }}
{% endif %}
{{ return ('') }}
{% endmacro %}
{% macro get_dbt_run_results_empty_table_query() %}
{% set dbt_run_results_empty_table_query = elementary.empty_table([('model_execution_id', 'long_string'),
('unique_id', 'long_string'),
('invocation_id', 'string'),
('generated_at', 'string'),
('created_at', 'timestamp'),
('name', 'long_string'),
('message', 'long_string'),
('status', 'string'),
('resource_type', 'string'),
('execution_time', 'float'),
('execute_started_at', 'string'),
('execute_completed_at', 'string'),
('compile_started_at', 'string'),
('compile_completed_at', 'string'),
('rows_affected', 'bigint'),
('full_refresh', 'boolean'),
('compiled_code', 'long_string'),
('failures', 'bigint'),
('query_id', 'string'),
('thread_id', 'string'),
('materialization', 'string'),
('adapter_response', 'string')
]) %}
{{ return(dbt_run_results_empty_table_query) }}
{% endmacro %}
{% macro flatten_run_result(run_result) %}
{% set run_result_dict = elementary.get_run_result_dict(run_result) %}
{% set node = elementary.safe_get_with_default(run_result_dict, 'node', {}) %}
{% set config_dict = elementary.safe_get_with_default(node, 'config', {}) %}
{% set flatten_run_result_dict = {
'model_execution_id': elementary.get_node_execution_id(node),
'invocation_id': invocation_id,
'unique_id': node.get('unique_id'),
'name': node.get('name'),
'message': run_result_dict.get('message'),
'generated_at': elementary.datetime_now_utc_as_string(),
'rows_affected': run_result_dict.get('adapter_response', {}).get('rows_affected'),
'execution_time': run_result_dict.get('execution_time'),
'status': run_result_dict.get('status'),
'resource_type': node.get('resource_type'),
'execute_started_at': none,
'execute_completed_at': none,
'compile_started_at': none,
'compile_completed_at': none,
'full_refresh': flags.FULL_REFRESH,
'compiled_code': elementary.get_compiled_code(node, as_column_value=true),
'failures': run_result_dict.get('failures'),
'query_id': run_result_dict.get('adapter_response', {}).get('query_id'),
'thread_id': run_result_dict.get('thread_id'),
'materialization': config_dict.get('materialized'),
'adapter_response': run_result_dict.get('adapter_response', {})
} %}
{% set timings = elementary.safe_get_with_default(run_result_dict, 'timing', []) %}
{% if timings %}
{% for timing in timings %}
{% if timing is mapping %}
{% if timing.get('name') == 'execute' %}
{% do flatten_run_result_dict.update({'execute_started_at': timing.get('started_at'), 'execute_completed_at': timing.get('completed_at')}) %}
{% elif timing.get('name') == 'compile' %}
{% do flatten_run_result_dict.update({'compile_started_at': timing.get('started_at'), 'compile_completed_at': timing.get('completed_at')}) %}
{% endif %}
{% endif %}
{% endfor %}
{% endif %}
{{ return(flatten_run_result_dict) }}
{% endmacro %}
{% macro on_run_result_query_exceed(flattened_node) %}
{% do flattened_node.update({"compiled_code": elementary.get_compiled_code_too_long_err_msg()}) %}
{% endmacro %}