From 7a33172ae51221a7a9c5a0a91ed1b3cb4388447d Mon Sep 17 00:00:00 2001 From: Gemma Down <52132406+glsdown@users.noreply.github.com> Date: Mon, 18 Sep 2023 14:39:50 +0100 Subject: [PATCH] Improve insert method used (#381) * Add named columns * Remove unused `rows_affected` column in `test_executions` #195 * Reorganise macro folder * Add documentation for macros * Consolidate logic in upload results * Simplify how nodes are extracted * Generalise identifying objects to load further * Have all results included in the loop * Split out get_dataset_content * Move the get table content into its own macro * Bug fixing * Move relation definition to insert macro to avoid quoting issues * Revert "Remove unused `rows_affected` column in `test_executions` #195" This reverts commit 9182fdcb281362d04831371636757846cef2c577. * Remove ---debug left in databricks tox * Apply suggestions from code review Co-authored-by: Jared Rimmer <100997264+jared-rimmer@users.noreply.github.com> --------- Co-authored-by: Jared Rimmer <100997264+jared-rimmer@users.noreply.github.com> --- macros/_macros.yml | 246 ++++++++++++++++++ .../column_identifier.sql | 0 .../generate_surrogate_key.sql | 0 .../get_relation.sql | 14 + .../parse_json.sql | 0 .../type_helpers.sql | 8 +- macros/insert_into_metadata_table.sql | 38 --- .../{ => migration}/migrate_from_v0_to_v1.sql | 0 .../upload_exposures.sql | 6 +- .../upload_invocations.sql | 0 .../upload_model_executions.sql | 8 +- .../upload_models.sql | 0 .../upload_seed_executions.sql | 8 +- .../upload_seeds.sql | 6 +- .../upload_snapshot_executions.sql | 8 +- .../upload_snapshots.sql | 7 +- .../upload_sources.sql | 0 .../upload_test_executions.sql | 8 +- .../upload_tests.sql | 0 macros/upload_results.sql | 169 ------------ .../upload_results/get_column_name_lists.sql | 230 ++++++++++++++++ macros/upload_results/get_dataset_content.sql | 22 ++ .../get_table_content_values.sql | 32 +++ .../insert_into_metadata_table.sql | 49 ++++ macros/upload_results/upload_results.sql | 50 ++++ tox.ini | 2 +- 26 files changed, 656 insertions(+), 255 deletions(-) create mode 100644 macros/_macros.yml rename macros/{ => database_specific_helpers}/column_identifier.sql (100%) rename macros/{ => database_specific_helpers}/generate_surrogate_key.sql (100%) create mode 100644 macros/database_specific_helpers/get_relation.sql rename macros/{ => database_specific_helpers}/parse_json.sql (100%) rename macros/{ => database_specific_helpers}/type_helpers.sql (95%) delete mode 100644 macros/insert_into_metadata_table.sql rename macros/{ => migration}/migrate_from_v0_to_v1.sql (100%) rename macros/{ => upload_individual_datasets}/upload_exposures.sql (96%) rename macros/{ => upload_individual_datasets}/upload_invocations.sql (100%) rename macros/{ => upload_individual_datasets}/upload_model_executions.sql (97%) rename macros/{ => upload_individual_datasets}/upload_models.sql (100%) rename macros/{ => upload_individual_datasets}/upload_seed_executions.sql (97%) rename macros/{ => upload_individual_datasets}/upload_seeds.sql (95%) rename macros/{ => upload_individual_datasets}/upload_snapshot_executions.sql (97%) rename macros/{ => upload_individual_datasets}/upload_snapshots.sql (95%) rename macros/{ => upload_individual_datasets}/upload_sources.sql (100%) rename macros/{ => upload_individual_datasets}/upload_test_executions.sql (96%) rename macros/{ => upload_individual_datasets}/upload_tests.sql (100%) delete mode 100644 macros/upload_results.sql create mode 100644 macros/upload_results/get_column_name_lists.sql create mode 100644 macros/upload_results/get_dataset_content.sql create mode 100644 macros/upload_results/get_table_content_values.sql create mode 100644 macros/upload_results/insert_into_metadata_table.sql create mode 100644 macros/upload_results/upload_results.sql diff --git a/macros/_macros.yml b/macros/_macros.yml new file mode 100644 index 00000000..7b798447 --- /dev/null +++ b/macros/_macros.yml @@ -0,0 +1,246 @@ +version: 2 + +macros: + ## DATABASE SPECIFIC HELPERS ## + - name: column_identifier + description: | + Dependent on the adapter type, return the identifier for a column using a numerical index. + arguments: + - name: column_index + type: integer + description: | + The index of the column to return the identifier for + + - name: generate_surrogate_key + description: | + Since folks commonly install dbt_artifacts alongside a myriad of other packages, + we copy the dbt_utils implementation of the surrogate_key macro so we don't have + any dependencies to make conflicts worse! + + This version is: + URL: https://github.com/dbt-labs/dbt-utils/blob/main/macros/sql/generate_surrogate_key.sql + Commit SHA: eaa0e41b033bdf252eff0ae014ec11888f37ebff + Date: 2023-04-28 + arguments: + - name: field_list + type: list + description: | + A list of fields to concatenate together to form the surrogate key + + - name: get_relation + description: | + Identify a relation in the graph from a relation name + arguments: + - name: get_relation_name + type: string + description: | + The name of the relation to return from the graph + + - name: parse_json + description: | + Dependent on the adapter type, return a column which parses the JSON field. + arguments: + - name: field + type: string + description: | + The name of the field to parse + + - name: type_array + description: | + Dependent on the adapter type, returns the native type for storing an array. + + - name: type_boolean + description: | + Dependent on the adapter type, returns the native boolean type. + + - name: type_json + description: | + Dependent on the adapter type, returns the native type for storing JSON. + + ## MIGRATION ## + - name: migrate_from_v0_to_v1 + description: | + A macro to assist with migrating from v0 to v1 of dbt_artifacts. See + https://github.com/brooklyn-data/dbt_artifacts/blob/main/README.md#migrating-from-100-to-100 + for details on the usage. + arguments: + - name: old_database + type: string + description: | + The database of the <1.0.0 output (fct_/dim_) models - does not have to be different to `new_database` + - name: old_schema + type: string + description: | + The schema of the <1.0.0 output (fct_/dim_) models - does not have to be different to `new_schema` + - name: new_database + type: string + description: | + The target database that the v1 artifact sources are in - does not have to be different to `old_database` + - name: new_schema + type: string + description: | + The target schema that the v1 artifact sources are in - does not have to be different to `old_schema` + + ## UPLOAD INDIVIDUAL DATASETS ## + - name: upload_exposures + description: | + The macro to support upload of the data to the exposures table. + arguments: + - name: exposures + type: list + description: | + A list of exposure objects extracted from the dbt graph + + - name: upload_invocations + description: | + The macro to support upload of the data to the invocations table. + + - name: upload_model_executions + description: | + The macro to support upload of the data to the model_executions table. + arguments: + - name: models + type: list + description: | + A list of model execution results objects extracted from the dbt result object + + - name: upload_models + description: | + The macro to support upload of the data to the models table. + arguments: + - name: models + type: list + description: | + A list of test objects extracted from the dbt graph + + - name: upload_seed_executions + description: | + The macro to support upload of the data to the seed_executions table. + arguments: + - name: seeds + type: list + description: | + A list of seed execution results objects extracted from the dbt result object + + - name: upload_seeds + description: | + The macro to support upload of the data to the seeds table. + arguments: + - name: seeds + type: list + description: | + A list of seeds objects extracted from the dbt graph + + - name: upload_snapshot_executions + description: | + The macro to support upload of the data to the snapshot_executions table. + arguments: + - name: snapshots + type: list + description: | + A list of snapshot execution results objects extracted from the dbt result object + + - name: upload_snapshots + description: | + The macro to support upload of the data to the snapshots table. + arguments: + - name: snapshots + type: list + description: | + A list of snapshots objects extracted from the dbt graph + + - name: upload_sources + description: | + The macro to support upload of the data to the sources table. + arguments: + - name: sources + type: list + description: | + A list of sources objects extracted from the dbt graph + + - name: upload_test_executions + description: | + The macro to support upload of the data to the test_executions table. + arguments: + - name: tests + type: list + description: | + A list of test execution results objects extracted from the dbt result object + + - name: upload_tests + description: | + The macro to support upload of the data to the tests table. + arguments: + - name: tests + type: list + description: | + A list of test objects extracted from the dbt graph + + ## UPLOAD RESULTS ## + - name: get_column_name_list + description: | + A macro to return the list of column names for a particular dataset. Returns a comment if the dataset is not + valid. + arguments: + - name: dataset + type: string + description: | + The name of the dataset to return the column names for e.g. `models` + + - name: get_dataset_content + description: | + A macro to extract the data to be uploaded from either the results or the graph object. + arguments: + - name: dataset + type: string + description: | + The name of the dataset to return the data for e.g. `models` + + - name: get_table_content_values + description: | + A macro to create the insert statement values required to be uploaded to the table. + arguments: + - name: dataset + type: string + description: | + The name of the dataset to return the column names for e.g. `models` + - name: objects_to_upload + type: list + description: | + The objects to be used to generate the insert statement values - extracted from `get_dataset_content` + + - name: insert_into_metadata_table + description: | + Dependent on the adapter type, the wrapper to insert the data into a table from a list of values. Used in the + `upload_results` macro, alongside the `get_column_lists` macro to generate the column names and the + `upload_dataset` macros to generate the data to be inserted. + arguments: + - name: database_name + type: string + description: | + The database name for the relation that the data is to be inserted into + - name: schema_name + type: string + description: | + The schema name for the relation that the data is to be inserted into + - name: table_name + type: string + description: | + The table name for the relation that the data is to be inserted into + - name: fields + type: string + description: | + The list of fields for the relation that the data is to be inserted into + - name: content + type: string + description: | + The data content to insert into the relation + + - name: upload_results + description: | + The main macro called to upload the metadata into each of the source tables. + arguments: + - name: results + type: list + description: | + The results object from dbt. diff --git a/macros/column_identifier.sql b/macros/database_specific_helpers/column_identifier.sql similarity index 100% rename from macros/column_identifier.sql rename to macros/database_specific_helpers/column_identifier.sql diff --git a/macros/generate_surrogate_key.sql b/macros/database_specific_helpers/generate_surrogate_key.sql similarity index 100% rename from macros/generate_surrogate_key.sql rename to macros/database_specific_helpers/generate_surrogate_key.sql diff --git a/macros/database_specific_helpers/get_relation.sql b/macros/database_specific_helpers/get_relation.sql new file mode 100644 index 00000000..ed7d02e5 --- /dev/null +++ b/macros/database_specific_helpers/get_relation.sql @@ -0,0 +1,14 @@ +{% macro get_relation(relation_name) %} + {% if execute %} + {% set model_get_relation_node = graph.nodes.values() | selectattr('name', 'equalto', relation_name) | first %} + {% set relation = api.Relation.create( + database = model_get_relation_node.database, + schema = model_get_relation_node.schema, + identifier = model_get_relation_node.alias + ) + %} + {% do return(relation) %} + {% else %} + {% do return(api.Relation.create()) %} + {% endif %} +{% endmacro %} diff --git a/macros/parse_json.sql b/macros/database_specific_helpers/parse_json.sql similarity index 100% rename from macros/parse_json.sql rename to macros/database_specific_helpers/parse_json.sql diff --git a/macros/type_helpers.sql b/macros/database_specific_helpers/type_helpers.sql similarity index 95% rename from macros/type_helpers.sql rename to macros/database_specific_helpers/type_helpers.sql index 19c3a718..4064ad46 100644 --- a/macros/type_helpers.sql +++ b/macros/database_specific_helpers/type_helpers.sql @@ -19,11 +19,11 @@ {% endmacro %} {% macro snowflake__type_json() %} - OBJECT + object {% endmacro %} {% macro bigquery__type_json() %} - JSON + json {% endmacro %} {#- ARRAY -#} @@ -37,9 +37,9 @@ {% endmacro %} {% macro snowflake__type_array() %} - ARRAY + array {% endmacro %} {% macro bigquery__type_array() %} - ARRAY + array {% endmacro %} diff --git a/macros/insert_into_metadata_table.sql b/macros/insert_into_metadata_table.sql deleted file mode 100644 index 235d2326..00000000 --- a/macros/insert_into_metadata_table.sql +++ /dev/null @@ -1,38 +0,0 @@ -{% macro insert_into_metadata_table(database_name, schema_name, table_name, content) -%} - {% if content != "" %} - {{ return(adapter.dispatch('insert_into_metadata_table', 'dbt_artifacts')(database_name, schema_name, table_name, content)) }} - {% endif %} -{%- endmacro %} - -{% macro spark__insert_into_metadata_table(database_name, schema_name, table_name, content) -%} - {% set insert_into_table_query %} - insert into {% if database_name %}{{ database_name }}.{% endif %}{{ schema_name }}.{{ table_name }} - {{ content }} - {% endset %} - - {% do run_query(insert_into_table_query) %} -{%- endmacro %} - -{% macro snowflake__insert_into_metadata_table(database_name, schema_name, table_name, content) -%} - {% set insert_into_table_query %} - insert into {{database_name}}.{{ schema_name }}.{{ table_name }} - {{ content }} - {% endset %} - - {% do run_query(insert_into_table_query) %} -{%- endmacro %} - -{% macro bigquery__insert_into_metadata_table(database_name, schema_name, table_name, content) -%} - - {% set insert_into_table_query %} - insert into `{{database_name}}.{{ schema_name }}.{{ table_name }}` - VALUES - {{ content }} - {% endset %} - - {% do run_query(insert_into_table_query) %} - -{%- endmacro %} - -{% macro default__insert_into_metadata_table(database_name, schema_name, table_name, content) -%} -{%- endmacro %} diff --git a/macros/migrate_from_v0_to_v1.sql b/macros/migration/migrate_from_v0_to_v1.sql similarity index 100% rename from macros/migrate_from_v0_to_v1.sql rename to macros/migration/migrate_from_v0_to_v1.sql diff --git a/macros/upload_exposures.sql b/macros/upload_individual_datasets/upload_exposures.sql similarity index 96% rename from macros/upload_exposures.sql rename to macros/upload_individual_datasets/upload_exposures.sql index 8ef51ce2..488f9ffd 100644 --- a/macros/upload_exposures.sql +++ b/macros/upload_individual_datasets/upload_exposures.sql @@ -1,8 +1,4 @@ -{% macro upload_exposures(graph) -%} - {% set exposures = [] %} - {% for node in graph.exposures.values() %} - {% do exposures.append(node) %} - {% endfor %} +{% macro upload_exposures(exposures) -%} {{ return(adapter.dispatch('get_exposures_dml_sql', 'dbt_artifacts')(exposures)) }} {%- endmacro %} diff --git a/macros/upload_invocations.sql b/macros/upload_individual_datasets/upload_invocations.sql similarity index 100% rename from macros/upload_invocations.sql rename to macros/upload_individual_datasets/upload_invocations.sql diff --git a/macros/upload_model_executions.sql b/macros/upload_individual_datasets/upload_model_executions.sql similarity index 97% rename from macros/upload_model_executions.sql rename to macros/upload_individual_datasets/upload_model_executions.sql index a8ebbe89..3ff87cc5 100644 --- a/macros/upload_model_executions.sql +++ b/macros/upload_individual_datasets/upload_model_executions.sql @@ -1,10 +1,4 @@ -{% macro upload_model_executions(results) -%} - {% set models = [] %} - {% for result in results %} - {% if result.node.resource_type == "model" %} - {% do models.append(result) %} - {% endif %} - {% endfor %} +{% macro upload_model_executions(models) -%} {{ return(adapter.dispatch('get_model_executions_dml_sql', 'dbt_artifacts')(models)) }} {%- endmacro %} diff --git a/macros/upload_models.sql b/macros/upload_individual_datasets/upload_models.sql similarity index 100% rename from macros/upload_models.sql rename to macros/upload_individual_datasets/upload_models.sql diff --git a/macros/upload_seed_executions.sql b/macros/upload_individual_datasets/upload_seed_executions.sql similarity index 97% rename from macros/upload_seed_executions.sql rename to macros/upload_individual_datasets/upload_seed_executions.sql index 4be58b09..69c4540c 100644 --- a/macros/upload_seed_executions.sql +++ b/macros/upload_individual_datasets/upload_seed_executions.sql @@ -1,10 +1,4 @@ -{% macro upload_seed_executions(results) -%} - {% set seeds = [] %} - {% for result in results %} - {% if result.node.resource_type == "seed" %} - {% do seeds.append(result) %} - {% endif %} - {% endfor %} +{% macro upload_seed_executions(seeds) -%} {{ return(adapter.dispatch('get_seed_executions_dml_sql', 'dbt_artifacts')(seeds)) }} {%- endmacro %} diff --git a/macros/upload_seeds.sql b/macros/upload_individual_datasets/upload_seeds.sql similarity index 95% rename from macros/upload_seeds.sql rename to macros/upload_individual_datasets/upload_seeds.sql index ea288e21..2f12507e 100644 --- a/macros/upload_seeds.sql +++ b/macros/upload_individual_datasets/upload_seeds.sql @@ -1,8 +1,4 @@ -{% macro upload_seeds(graph) -%} - {% set seeds = [] %} - {% for node in graph.nodes.values() | selectattr("resource_type", "equalto", "seed") %} - {% do seeds.append(node) %} - {% endfor %} +{% macro upload_seeds(seeds) -%} {{ return(adapter.dispatch('get_seeds_dml_sql', 'dbt_artifacts')(seeds)) }} {%- endmacro %} diff --git a/macros/upload_snapshot_executions.sql b/macros/upload_individual_datasets/upload_snapshot_executions.sql similarity index 97% rename from macros/upload_snapshot_executions.sql rename to macros/upload_individual_datasets/upload_snapshot_executions.sql index 3e8c03d0..a9980b18 100644 --- a/macros/upload_snapshot_executions.sql +++ b/macros/upload_individual_datasets/upload_snapshot_executions.sql @@ -1,10 +1,4 @@ -{% macro upload_snapshot_executions(results) -%} - {% set snapshots = [] %} - {% for result in results %} - {% if result.node.resource_type == "snapshot" %} - {% do snapshots.append(result) %} - {% endif %} - {% endfor %} +{% macro upload_snapshot_executions(snapshots) -%} {{ return(adapter.dispatch('get_snapshot_executions_dml_sql', 'dbt_artifacts')(snapshots)) }} {%- endmacro %} diff --git a/macros/upload_snapshots.sql b/macros/upload_individual_datasets/upload_snapshots.sql similarity index 95% rename from macros/upload_snapshots.sql rename to macros/upload_individual_datasets/upload_snapshots.sql index 3c7a7cbf..82789e3a 100644 --- a/macros/upload_snapshots.sql +++ b/macros/upload_individual_datasets/upload_snapshots.sql @@ -1,8 +1,5 @@ -{% macro upload_snapshots(graph) -%} - {% set snapshots = [] %} - {% for node in graph.nodes.values() | selectattr("resource_type", "equalto", "snapshot") %} - {% do snapshots.append(node) %} - {% endfor %} +{% macro upload_snapshots(snapshots) -%} + {{ return(adapter.dispatch('get_snapshots_dml_sql', 'dbt_artifacts')(snapshots)) }} {%- endmacro %} diff --git a/macros/upload_sources.sql b/macros/upload_individual_datasets/upload_sources.sql similarity index 100% rename from macros/upload_sources.sql rename to macros/upload_individual_datasets/upload_sources.sql diff --git a/macros/upload_test_executions.sql b/macros/upload_individual_datasets/upload_test_executions.sql similarity index 96% rename from macros/upload_test_executions.sql rename to macros/upload_individual_datasets/upload_test_executions.sql index a8747099..60986658 100644 --- a/macros/upload_test_executions.sql +++ b/macros/upload_individual_datasets/upload_test_executions.sql @@ -1,10 +1,4 @@ -{% macro upload_test_executions(results) -%} - {% set tests = [] %} - {% for result in results %} - {% if result.node.resource_type == "test" %} - {% do tests.append(result) %} - {% endif %} - {% endfor %} +{% macro upload_test_executions(tests) -%} {{ return(adapter.dispatch('get_test_executions_dml_sql', 'dbt_artifacts')(tests)) }} {%- endmacro %} diff --git a/macros/upload_tests.sql b/macros/upload_individual_datasets/upload_tests.sql similarity index 100% rename from macros/upload_tests.sql rename to macros/upload_individual_datasets/upload_tests.sql diff --git a/macros/upload_results.sql b/macros/upload_results.sql deleted file mode 100644 index dbfb8051..00000000 --- a/macros/upload_results.sql +++ /dev/null @@ -1,169 +0,0 @@ -{# dbt doesn't like us ref'ing in an operation so we fetch the info from the graph #} -{% macro get_relation(get_relation_name) %} - {% if execute %} - {% set model_get_relation_node = graph.nodes.values() | selectattr('name', 'equalto', get_relation_name) | first %} - {% set relation = api.Relation.create( - database = model_get_relation_node.database, - schema = model_get_relation_node.schema, - identifier = model_get_relation_node.alias - ) - %} - {% do return(relation) %} - {% else %} - {% do return(api.Relation.create()) %} - {% endif %} -{% endmacro %} - -{% macro upload_results(results) -%} - - {% if execute %} - - {% if results != [] %} - {% do log("Uploading model executions", true) %} - {% set model_executions = dbt_artifacts.get_relation('model_executions') %} - {% set content_model_executions = dbt_artifacts.upload_model_executions(results) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=model_executions.database, - schema_name=model_executions.schema, - table_name=model_executions.identifier, - content=content_model_executions - ) - }} - - {% do log("Uploading seed executions", true) %} - {% set seed_executions = dbt_artifacts.get_relation('seed_executions') %} - {% set content_seed_executions = dbt_artifacts.upload_seed_executions(results) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=seed_executions.database, - schema_name=seed_executions.schema, - table_name=seed_executions.identifier, - content=content_seed_executions - ) - }} - - {% do log("Uploading snapshot executions", true) %} - {% set snapshot_executions = dbt_artifacts.get_relation('snapshot_executions') %} - {% set content_snapshot_executions = dbt_artifacts.upload_snapshot_executions(results) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=snapshot_executions.database, - schema_name=snapshot_executions.schema, - table_name=snapshot_executions.identifier, - content=content_snapshot_executions - ) - }} - - {% do log("Uploading test executions", true) %} - {% set test_executions = dbt_artifacts.get_relation('test_executions') %} - {% set content_test_executions = dbt_artifacts.upload_test_executions(results) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=test_executions.database, - schema_name=test_executions.schema, - table_name=test_executions.identifier, - content=content_test_executions - ) - }} - - {% endif %} - - {% do log("Uploading exposures", true) %} - {% set exposures = dbt_artifacts.get_relation('exposures') %} - {% set content_exposures = dbt_artifacts.upload_exposures(graph) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=exposures.database, - schema_name=exposures.schema, - table_name=exposures.identifier, - content=content_exposures - ) - }} - - {% do log("Uploading tests", true) %} - {% set tests = dbt_artifacts.get_relation('tests') %} - {% set tests_set = [] %} - {% for node in graph.nodes.values() | selectattr("resource_type", "equalto", "test") %} - {% do tests_set.append(node) %} - {% endfor %} - {# upload tests in chunks of 5000 tests (300 for BigQuery), or less #} - {% set upload_limit = 300 if target.type == 'bigquery' else 5000 %} - {% for i in range(0, tests_set | length, upload_limit) -%} - {% set content_tests = dbt_artifacts.upload_tests(tests_set[i: i + upload_limit]) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=tests.database, - schema_name=tests.schema, - table_name=tests.identifier, - content=content_tests - ) - }} - {%- endfor %} - - {% do log("Uploading seeds", true) %} - {% set seeds = dbt_artifacts.get_relation('seeds') %} - {% set content_seeds = dbt_artifacts.upload_seeds(graph) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=seeds.database, - schema_name=seeds.schema, - table_name=seeds.identifier, - content=content_seeds - ) - }} - - {% do log("Uploading models", true) %} - {% set models = dbt_artifacts.get_relation('models') %} - {% set models_set = [] %} - {% for node in graph.nodes.values() | selectattr("resource_type", "equalto", "model") %} - {% do models_set.append(node) %} - {% endfor %} - {% set upload_limit = 50 if target.type == 'bigquery' else 100 %} - {% for i in range(0, models_set | length, upload_limit) -%} - {% set content_models = dbt_artifacts.upload_models(models_set[i: i + upload_limit]) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=models.database, - schema_name=models.schema, - table_name=models.identifier, - content=content_models - ) - }} - {%- endfor %} - - {% do log("Uploading sources", true) %} - {% set sources = dbt_artifacts.get_relation('sources') %} - {% set sources_set = [] %} - {% for node in graph.sources.values() %} - {% do sources_set.append(node) %} - {% endfor %} - {# upload sources in chunks of 5000 sources (300 for BigQuery), or less #} - {% set upload_limit = 300 if target.type == 'bigquery' else 5000 %} - {% for i in range(0, sources_set | length, upload_limit) -%} - {% set content_sources = dbt_artifacts.upload_sources(sources_set[i: i + upload_limit]) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=sources.database, - schema_name=sources.schema, - table_name=sources.identifier, - content=content_sources - ) - }} - {%- endfor %} - - {% do log("Uploading snapshots", true) %} - {% set snapshots = dbt_artifacts.get_relation('snapshots') %} - {% set content_snapshots = dbt_artifacts.upload_snapshots(graph) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=snapshots.database, - schema_name=snapshots.schema, - table_name=snapshots.identifier, - content=content_snapshots - ) - }} - - {% do log("Uploading invocations", true) %} - {% set invocations = dbt_artifacts.get_relation('invocations') %} - {% set content_invocations = dbt_artifacts.upload_invocations() %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=invocations.database, - schema_name=invocations.schema, - table_name=invocations.identifier, - content=content_invocations - ) - }} - - {% endif %} -{%- endmacro %} diff --git a/macros/upload_results/get_column_name_lists.sql b/macros/upload_results/get_column_name_lists.sql new file mode 100644 index 00000000..ea708467 --- /dev/null +++ b/macros/upload_results/get_column_name_lists.sql @@ -0,0 +1,230 @@ + +{# + These are the column lists used as part of the upload macros - the order here should be the same + as the order in each individual `upload_dataset` macro. +#} + +{% macro get_column_name_list(dataset) -%} + + {% if dataset == 'exposures' %} + + ( + command_invocation_id, + node_id, + run_started_at, + name, + type, + owner, + maturity, + path, + description, + url, + package_name, + depends_on_nodes, + tags, + all_results + ) + + {% elif dataset == 'invocations' %} + + ( + command_invocation_id, + dbt_version, + project_name, + run_started_at, + dbt_command, + full_refresh_flag, + target_profile_name, + target_name, + target_schema, + target_threads, + dbt_cloud_project_id, + dbt_cloud_job_id, + dbt_cloud_run_id, + dbt_cloud_run_reason_category, + dbt_cloud_run_reason, + env_vars, + dbt_vars, + invocation_args, + dbt_custom_envs + ) + + {% elif dataset == 'model_executions' %} + + ( + node_id, + command_invocation_id, + run_started_at, + was_full_refresh, + thread_id, + status, + compile_started_at, + query_completed_at, + total_node_runtime, + rows_affected, + {% if target.type == 'bigquery' %} + bytes_processed, + {% endif %} + materialization, + schema, + name, + alias, + message, + adapter_response + ) + + {% elif dataset == 'models' %} + + ( + node_id, + command_invocation_id, + run_started_at, + database, + schema, + name, + depends_on_nodes, + package_name, + path, + checksum, + materialization, + tags, + meta, + alias, + all_results + ) + + + {% elif dataset == 'seed_executions' %} + + ( + command_invocation_id, + node_id, + run_started_at, + was_full_refresh, + thread_id, + status, + compile_started_at, + query_completed_at, + total_node_runtime, + rows_affected, + materialization, + schema, + name, + alias, + message, + adapter_response + ) + + {% elif dataset == 'seeds' %} + + ( + command_invocation_id, + node_id, + run_started_at, + database, + schema, + name, + package_name, + path, + checksum, + meta, + alias, + all_results + ) + + {% elif dataset == 'snapshot_executions' %} + + ( + command_invocation_id, + node_id, + run_started_at, + was_full_refresh, + thread_id, + status, + compile_started_at, + query_completed_at, + total_node_runtime, + rows_affected, + materialization, + schema, + name, + alias, + message, + adapter_response + ) + + {% elif dataset == 'snapshots' %} + + ( + command_invocation_id, + node_id, + run_started_at, + database, + schema, + name, + depends_on_nodes, + package_name, + path, + checksum, + strategy, + meta, + alias, + all_results + ) + + {% elif dataset == 'sources' %} + + ( + command_invocation_id, + node_id, + run_started_at, + database, + schema, + source_name, + loader, + name, + identifier, + loaded_at_field, + freshness, + all_results + ) + + {% elif dataset == 'test_executions' %} + + ( + command_invocation_id, + node_id, + run_started_at, + was_full_refresh, + thread_id, + status, + compile_started_at, + query_completed_at, + total_node_runtime, + rows_affected, + failures, + message, + adapter_response + ) + + {% elif dataset == 'tests' %} + + ( + command_invocation_id, + node_id, + run_started_at, + name, + depends_on_nodes, + package_name, + test_path, + tags, + all_results + ) + + {% else %} + + /* No column list available */ + + {% endif %} + +{%- endmacro %} diff --git a/macros/upload_results/get_dataset_content.sql b/macros/upload_results/get_dataset_content.sql new file mode 100644 index 00000000..b34e39cd --- /dev/null +++ b/macros/upload_results/get_dataset_content.sql @@ -0,0 +1,22 @@ +{% macro get_dataset_content(dataset) %} + + {% if dataset in ['model_executions', 'seed_executions', 'test_executions', 'snapshot_executions'] %} + {# Executions make use of the results object #} + {% set objects = results | selectattr("node.resource_type", "equalto", dataset.split("_")[0]) | list %} + {% elif dataset in ['seeds', 'snapshots', 'tests', 'models'] %} + {# Use the nodes in the [graph](https://docs.getdbt.com/reference/dbt-jinja-functions/graph) to extract details #} + {% set objects = graph.nodes.values() | selectattr("resource_type", "equalto", dataset[:-1]) | list %} + {% elif dataset in ['exposures', 'sources'] %} + {# Use the [graph](https://docs.getdbt.com/reference/dbt-jinja-functions/graph) to extract details #} + {% set objects = graph.get(dataset).values() | list %} + {% elif dataset == 'invocations' %} + {# + Invocations doesn't need anything input, but we include this so that it will still be picked up + as part of the loop below - the length must be >0 to allow for an upload, hence the empty string + #} + {% set objects = [''] %} + {% endif %} + + {{ return(objects) }} + +{% endmacro %} diff --git a/macros/upload_results/get_table_content_values.sql b/macros/upload_results/get_table_content_values.sql new file mode 100644 index 00000000..277899c5 --- /dev/null +++ b/macros/upload_results/get_table_content_values.sql @@ -0,0 +1,32 @@ +{% macro get_table_content_values(dataset, objects_to_upload) %} + + {# Convert the results to data to be imported #} + + {% if dataset == 'model_executions' %} + {% set content = dbt_artifacts.upload_model_executions(objects_to_upload) %} + {% elif dataset == 'seed_executions' %} + {% set content = dbt_artifacts.upload_seed_executions(objects_to_upload) %} + {% elif dataset == 'test_executions' %} + {% set content = dbt_artifacts.upload_test_executions(objects_to_upload) %} + {% elif dataset == 'snapshot_executions' %} + {% set content = dbt_artifacts.upload_snapshot_executions(objects_to_upload) %} + {% elif dataset == 'exposures' %} + {% set content = dbt_artifacts.upload_exposures(objects_to_upload) %} + {% elif dataset == 'models' %} + {% set content = dbt_artifacts.upload_models(objects_to_upload) %} + {% elif dataset == 'seeds' %} + {% set content = dbt_artifacts.upload_seeds(objects_to_upload) %} + {% elif dataset == 'snapshots' %} + {% set content = dbt_artifacts.upload_snapshots(objects_to_upload) %} + {% elif dataset == 'sources' %} + {% set content = dbt_artifacts.upload_sources(objects_to_upload) %} + {% elif dataset == 'tests' %} + {% set content = dbt_artifacts.upload_tests(objects_to_upload) %} + {# Invocations only requires data from variables available in the macro #} + {% elif dataset == 'invocations' %} + {% set content = dbt_artifacts.upload_invocations() %} + {% endif %} + + {{ return(content) }} + +{% endmacro %} diff --git a/macros/upload_results/insert_into_metadata_table.sql b/macros/upload_results/insert_into_metadata_table.sql new file mode 100644 index 00000000..2ec4d5d5 --- /dev/null +++ b/macros/upload_results/insert_into_metadata_table.sql @@ -0,0 +1,49 @@ +{% macro insert_into_metadata_table(dataset, fields, content) -%} + + {% if content != "" %} + + {# Get the relation that the results will be uploaded to #} + {% set dataset_relation = dbt_artifacts.get_relation(dataset) %} + {# Insert the data into the table #} + {{ return(adapter.dispatch('insert_into_metadata_table', 'dbt_artifacts')(dataset_relation, fields, content)) }} + + {% endif %} + +{%- endmacro %} + +{% macro spark__insert_into_metadata_table(relation, fields, content) -%} + + {% set insert_into_table_query %} + insert into {{ relation }} {{ fields }} + {{ content }} + {% endset %} + + {% do run_query(insert_into_table_query) %} + +{%- endmacro %} + +{% macro snowflake__insert_into_metadata_table(relation, fields, content) -%} + + {% set insert_into_table_query %} + insert into {{ relation }} {{ fields }} + {{ content }} + {% endset %} + + {% do run_query(insert_into_table_query) %} + +{%- endmacro %} + +{% macro bigquery__insert_into_metadata_table(relation, fields, content) -%} + + {% set insert_into_table_query %} + insert into {{ relation }} {{ fields }} + values + {{ content }} + {% endset %} + + {% do run_query(insert_into_table_query) %} + +{%- endmacro %} + +{% macro default__insert_into_metadata_table(relation, fields, content) -%} +{%- endmacro %} diff --git a/macros/upload_results/upload_results.sql b/macros/upload_results/upload_results.sql new file mode 100644 index 00000000..fcadc199 --- /dev/null +++ b/macros/upload_results/upload_results.sql @@ -0,0 +1,50 @@ +{# dbt doesn't like us ref'ing in an operation so we fetch the info from the graph #} + +{% macro upload_results(results) -%} + + {% if execute %} + + {% set datasets_to_load = ['exposures', 'seeds', 'snapshots', 'invocations', 'sources', 'tests', 'models'] %} + {% if results != [] %} + {# When executing, and results are available, then upload the results #} + {% set datasets_to_load = ['model_executions', 'seed_executions', 'test_executions', 'snapshot_executions'] + datasets_to_load %} + {% endif %} + + {# Upload each data set in turn #} + {% for dataset in datasets_to_load %} + + {% do log("Uploading " ~ dataset.replace("_", " "), true) %} + + {# Get the results that need to be uploaded #} + {% set objects = dbt_artifacts.get_dataset_content(dataset) %} + + {# Upload in chunks to reduce query size #} + {% if dataset == 'model' %} + {% set upload_limit = 50 if target.type == 'bigquery' else 100 %} + {% else %} + {% set upload_limit = 300 if target.type == 'bigquery' else 5000 %} + {% endif %} + + {# Loop through each chunk in turn #} + {% for i in range(0, objects | length, upload_limit) -%} + + {# Get just the objects to load on this loop #} + {% set content = dbt_artifacts.get_table_content_values(dataset, objects[i: i + upload_limit]) %} + + {# Insert the content into the metadata table #} + {{ dbt_artifacts.insert_into_metadata_table( + dataset=dataset, + fields=dbt_artifacts.get_column_name_list(dataset), + content=content + ) + }} + + {# Loop the next 'chunk' #} + {% endfor %} + + {# Loop the next 'dataset' #} + {% endfor %} + + {% endif %} + +{%- endmacro %} diff --git a/tox.ini b/tox.ini index 2070a7eb..fbff0176 100644 --- a/tox.ini +++ b/tox.ini @@ -166,7 +166,7 @@ deps = dbt-databricks~=1.6.0 commands = dbt clean dbt deps - dbt --debug build --target databricks + dbt build --target databricks [testenv:integration_databricks_1_3_0] changedir = integration_test_project