From 3603a58e6f653a558b6abc30bf1d184665ccf65a Mon Sep 17 00:00:00 2001 From: Joe Markiewicz <74217849+fivetran-joemarkiewicz@users.noreply.github.com> Date: Wed, 6 Mar 2024 15:38:50 -0600 Subject: [PATCH 1/5] feature/singular-tests-addition --- CHANGELOG.md | 9 +++ README.md | 9 ++- dbt_project.yml | 2 +- integration_tests/dbt_project.yml | 3 +- models/fivetran_platform__audit_table.sql | 13 ++-- tests/row_count__audit_table.sql | 37 ++++++++++ tests/row_count__audit_user_activity.sql | 49 +++++++++++++ tests/row_count__connector_daily_events.sql | 69 +++++++++++++++++++ tests/row_count__connector_status.sql | 43 ++++++++++++ tests/row_count__mar_table_history.sql | 44 ++++++++++++ tests/row_count__schema_changelog.sql | 38 ++++++++++ ...w_count__usage_mar_destination_history.sql | 42 +++++++++++ 12 files changed, 349 insertions(+), 9 deletions(-) create mode 100644 tests/row_count__audit_table.sql create mode 100644 tests/row_count__audit_user_activity.sql create mode 100644 tests/row_count__connector_daily_events.sql create mode 100644 tests/row_count__connector_status.sql create mode 100644 tests/row_count__mar_table_history.sql create mode 100644 tests/row_count__schema_changelog.sql create mode 100644 tests/row_count__usage_mar_destination_history.sql diff --git a/CHANGELOG.md b/CHANGELOG.md index 227ed385..e96222e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +# dbt_fivetran_log v1.6.0 + +## Bug Fixes +- It was observed that the `fivetran_platform__audit_table` model could potentially introduce duplicates for database like connectors. To address this issue, the join logic in the end model was adjusted to no longer be determined by date ranges and instead be defined via model generated indexes. This ensures true uniqueness in the end model. + +## Testing Updates +- Addition of the `tests/` folder which includes various custom validation tests to ensure the the output of the end models are matching our expectations. + - These tests are disabled by default as they are not necessary to run after every execution of the models. If you would like to enabled these tests you may activate them by defining the `fivetran_validation_tests_enabled` variable as `true`. + # dbt_fivetran_log v1.5.0 [PR #114](https://github.com/fivetran/dbt_fivetran_log/pull/114) includes the following updates: diff --git a/README.md b/README.md index f24cc78c..c90c83c7 100644 --- a/README.md +++ b/README.md @@ -124,8 +124,15 @@ dispatch: - macro_namespace: dbt_utils search_order: ['spark_utils', 'dbt_utils'] ``` +## (Optional) Step 6: Run Model Validation Tests +All end models in this package include a suite of singular validation tests located within the `tests/` folder of this project. These validation tests are designed to ensure the end models are matching the expected outcomes. By default these singular tests are not necessary to be executed after every `dbt run`; therefore, these tests are disabled by default. If you would like to run these tests as part of your own validation efforts you may define the `fivetran_validation_tests_enabled` variable as `true`. -## (Optional) Step 6: Orchestrate your models with Fivetran Transformations for dbt Core™ +Further, all these singular validation tests utilize the `validations` tag. Therefore, if you would like to run these validation tests one-off you may run the following command to run **only** these validation tests and not the other generic schema tests already included in the base package. +```zsh +dbt test --select tag:validations --vars '{fivetran_validation_tests_enabled: true}' +``` + +## (Optional) Step 7: Orchestrate your models with Fivetran Transformations for dbt Core™
Expand for details
diff --git a/dbt_project.yml b/dbt_project.yml index 839bac9d..c7cb501b 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -1,6 +1,6 @@ config-version: 2 name: 'fivetran_log' -version: '1.5.0' +version: '1.6.0' require-dbt-version: [">=1.3.0", "<2.0.0"] models: diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index ac77104a..b836e543 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -1,5 +1,5 @@ name: 'fivetran_log_integration_tests' -version: '1.5.0' +version: '1.6.0' config-version: 2 profile: 'integration_tests' @@ -9,6 +9,7 @@ dispatch: search_order: ['spark_utils', 'dbt_utils'] vars: + fivetran_validation_tests_enabled: true fivetran_log: fivetran_platform_schema: fivetran_platform_integration_tests fivetran_platform_account_identifier: "account" diff --git a/models/fivetran_platform__audit_table.sql b/models/fivetran_platform__audit_table.sql index 3734a4b1..d24e4e55 100644 --- a/models/fivetran_platform__audit_table.sql +++ b/models/fivetran_platform__audit_table.sql @@ -70,7 +70,9 @@ sync_timestamps as ( -- this will be the base for every record in the final CTE limit_to_table_starts as ( - select * + select + *, + row_number() over (partition by connector_id, table_name order by write_to_table_start) as index from sync_timestamps where event_subtype = 'write_to_table_start' ), @@ -83,7 +85,8 @@ records_modified_log as ( {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }} as table_name, {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['schema']) }} as schema_name, {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['operationType']) }} as operation_type, - cast ({{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['count']) }} as {{ dbt.type_int() }}) as row_count + cast ({{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['count']) }} as {{ dbt.type_int() }}) as row_count, + row_number() over (partition by connector_id, table_name order by created_at) as index from sync_log where event_subtype = 'records_modified' @@ -109,10 +112,8 @@ sum_records_modified as ( left join records_modified_log on limit_to_table_starts.connector_id = records_modified_log.connector_id and limit_to_table_starts.table_name = records_modified_log.table_name - - -- confine it to one sync - and records_modified_log.created_at > limit_to_table_starts.sync_start - and records_modified_log.created_at < coalesce(limit_to_table_starts.sync_end, limit_to_table_starts.next_sync_start) + -- Each `write_to_table_start` log event should be matched with a corresponding `records_modified` event. Therefore, the generated index will be how we match these up and ensure a complete and accurate join. + and limit_to_table_starts.index = records_modified_log.index -- explicit group by needed for SQL Server group by diff --git a/tests/row_count__audit_table.sql b/tests/row_count__audit_table.sql new file mode 100644 index 00000000..2e7ecee7 --- /dev/null +++ b/tests/row_count__audit_table.sql @@ -0,0 +1,37 @@ + +{{ config( + tags="validations", + enabled=var('fivetran_validation_tests_enabled', false) +) }} + +-- Description: This test ensures the fivetran_platform__audit_table matches the same row count of the staging model with the appropriate filter. +-- This should be run both on a --full-refresh and a normal dbt run in order to ensure the incremental logic is behaving as expected. +with end_model as ( + select + connector_id, + table_name, + count(*) as row_count + from {{ ref('fivetran_platform__audit_table') }} + group by 1, 2 +), + +staging_model as ( + select + connector_id, + {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }} as table_name, + count(*) as row_count + from {{ ref('stg_fivetran_platform__log') }} + where event_subtype in ('write_to_table_start') + group by 1, 2 +) + +select + end_model.connector_id, + end_model.table_name, + end_model.row_count as end_model_row_count, + staging_model.row_count as staging_model_row_count +from end_model +left join staging_model + on end_model.connector_id = staging_model.connector_id + and end_model.table_name = staging_model.table_name +where staging_model.row_count != end_model.row_count \ No newline at end of file diff --git a/tests/row_count__audit_user_activity.sql b/tests/row_count__audit_user_activity.sql new file mode 100644 index 00000000..91eb1c0e --- /dev/null +++ b/tests/row_count__audit_user_activity.sql @@ -0,0 +1,49 @@ +{{ config( + tags="validations", + enabled=var('fivetran_validation_tests_enabled', false) +) }} + +with end_model as ( + select + connector_id, + email, + date_day, + count(*) as row_count + from {{ ref('fivetran_platform__audit_user_activity') }} + group by 1, 2, 3 +), + +staging_model as ( + + select + *, + {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['actor']) }} as email + from {{ ref('stg_fivetran_platform__log') }} + where lower(message_data) like '%actor%' +), + +staging_cleanup as ( + + select + connector_id, + email, + {{ dbt.date_trunc('day', 'created_at') }} as date_day, + count(*) as row_count + from staging_model + where email is not null + and lower(email) != 'fivetran' + group by 1,2,3 +) + +select + end_model.connector_id, + end_model.email, + end_model.date_day, + end_model.row_count as end_model_row_count, + staging_cleanup.row_count as staging_model_row_count +from end_model +left join staging_cleanup + on end_model.connector_id = staging_cleanup.connector_id + and end_model.email = staging_cleanup.email + and end_model.date_day = staging_cleanup.date_day +where staging_cleanup.row_count != end_model.row_count \ No newline at end of file diff --git a/tests/row_count__connector_daily_events.sql b/tests/row_count__connector_daily_events.sql new file mode 100644 index 00000000..5c58011f --- /dev/null +++ b/tests/row_count__connector_daily_events.sql @@ -0,0 +1,69 @@ + +{{ config( + tags="validations", + enabled=var('fivetran_validation_tests_enabled', false) +) }} + +with end_model as ( + select + date_day, + connector_id, + destination_id, + count(*) as row_count + from {{ ref('fivetran_platform__connector_daily_events') }} + group by 1, 2, 3 +), + +staging_model as ( + + select * + from {{ ref('fivetran_platform__connector_status') }} +), + +spine as ( + + {% if execute %} + {% set first_date_query %} + select min( signed_up ) as min_date from {{ var('connector') }} + {% endset %} + {% set first_date = run_query(first_date_query).columns[0][0]|string %} + + {% else %} {% set first_date = "2016-01-01" %} + {% endif %} + + select + cast(date_day as date) as date_day + from ( + {{ fivetran_utils.fivetran_date_spine( + datepart = "day", + start_date = "cast('" ~ first_date[0:10] ~ "' as date)", + end_date = dbt.dateadd("week", 1, dbt.date_trunc('day', dbt.current_timestamp_backcompat() if target.type != 'sqlserver' else dbt.current_timestamp())) + ) + }} + ) as date_spine +), + +staging_cleanup as ( + select + spine.date_day, + staging_model.connector_id, + staging_model.destination_id, + count(*) as row_count + from spine + left join staging_model + on spine.date_day >= cast({{ dbt.date_trunc('day', 'cast(staging_model.set_up_at as date)') }} as date) + group by 1, 2, 3 +) + +select + end_model.date_day, + end_model.connector_id, + end_model.destination_id, + end_model.row_count as end_model_row_count, + staging_cleanup.row_count as staging_model_row_count +from end_model +left join staging_cleanup + on end_model.connector_id = staging_cleanup.connector_id + and end_model.destination_id = staging_cleanup.destination_id + and end_model.date_day = staging_cleanup.date_day +where staging_cleanup.row_count != end_model.row_count \ No newline at end of file diff --git a/tests/row_count__connector_status.sql b/tests/row_count__connector_status.sql new file mode 100644 index 00000000..45e06523 --- /dev/null +++ b/tests/row_count__connector_status.sql @@ -0,0 +1,43 @@ + +{{ config( + tags="validations", + enabled=var('fivetran_validation_tests_enabled', false) +) }} + +with end_model as ( + select + connector_id, + connector_name, + connector_type, + destination_id, + set_up_at, + count(*) as row_count + from {{ ref('fivetran_platform__connector_status') }} + group by 1,2,3,4,5 +), + +staging_model as ( + select + connector_id, + connector_name, + connector_type, + destination_id, + set_up_at, + count(*) as row_count + from {{ ref('stg_fivetran_platform__connector') }} + group by 1,2,3,4,5 +) + +select + end_model.connector_id, + end_model.connector_name, + end_model.connector_type, + end_model.destination_id, + end_model.set_up_at, + end_model.row_count as end_model_row_count, + staging_model.row_count as staging_model_row_count +from end_model +left join staging_model + on end_model.connector_id = staging_model.connector_id + and end_model.destination_id = staging_model.destination_id +where staging_model.row_count != end_model.row_count \ No newline at end of file diff --git a/tests/row_count__mar_table_history.sql b/tests/row_count__mar_table_history.sql new file mode 100644 index 00000000..3c707d03 --- /dev/null +++ b/tests/row_count__mar_table_history.sql @@ -0,0 +1,44 @@ + +{{ config( + tags="validations", + enabled=var('fivetran_validation_tests_enabled', false) +) }} + +with end_model as ( + select + connector_name, + schema_name, + table_name, + destination_id, + measured_month, + total_monthly_active_rows as mar_count + from {{ ref('fivetran_platform__mar_table_history') }} +), + +staging_model as ( + select + connector_name, + schema_name, + table_name, + destination_id, + {{ dbt.date_trunc('month', 'measured_date') }} as measured_month, + sum(incremental_rows) as mar_count + from {{ ref('stg_fivetran_platform__incremental_mar') }} + group by 1, 2, 3, 4, 5 +) + +select + end_model.connector_name, + end_model.schema_name, + end_model.destination_id, + end_model.measured_month, + end_model.mar_count as end_model_mar_count, + staging_model.mar_count as staging_model_mar_count +from end_model +left join staging_model + on end_model.connector_name = staging_model.connector_name + and end_model.schema_name = staging_model.schema_name + and end_model.table_name = staging_model.table_name + and end_model.destination_id = staging_model.destination_id + and end_model.measured_month = staging_model.measured_month +where staging_model.mar_count != end_model.mar_count \ No newline at end of file diff --git a/tests/row_count__schema_changelog.sql b/tests/row_count__schema_changelog.sql new file mode 100644 index 00000000..75fa692c --- /dev/null +++ b/tests/row_count__schema_changelog.sql @@ -0,0 +1,38 @@ + +{{ config( + tags="validations", + enabled=var('fivetran_validation_tests_enabled', false) +) }} + +with end_model as ( + select + connector_id, + table_name, + count(*) as row_count + from {{ ref('fivetran_platform__schema_changelog') }} + group by 1, 2 +), + +staging_model as ( + select + connector_id, + case + when event_subtype = 'alter_table' then {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }} + when event_subtype = 'create_table' then {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['name']) }} + end as table_name, + count(*) as row_count + from {{ ref('stg_fivetran_platform__log') }} + where event_subtype in ('create_table', 'alter_table', 'create_schema', 'change_schema_config') + group by 1, 2 +) + +select + end_model.connector_id, + end_model.table_name, + end_model.row_count as end_model_row_count, + staging_model.row_count as staging_model_row_count +from end_model +left join staging_model + on end_model.connector_id = staging_model.connector_id + and end_model.table_name = staging_model.table_name +where staging_model.row_count != end_model.row_count \ No newline at end of file diff --git a/tests/row_count__usage_mar_destination_history.sql b/tests/row_count__usage_mar_destination_history.sql new file mode 100644 index 00000000..8111af90 --- /dev/null +++ b/tests/row_count__usage_mar_destination_history.sql @@ -0,0 +1,42 @@ + +{{ config( + tags="validations", + enabled=var('fivetran_validation_tests_enabled', false) +) }} + +with end_model as ( + select + measured_month, + destination_id, + count(*) as row_count + from {{ ref('fivetran_platform__usage_mar_destination_history') }} + group by 1, 2 +), + +staging_model as ( + select + cast({{ dbt.date_trunc('month', 'measured_date') }} as date) as measured_month, + destination_id + from {{ ref('stg_fivetran_platform__incremental_mar') }} + group by 1, 2 +), + +staging_cleanup as ( + select + measured_month, + destination_id, + count(*) as row_count + from staging_model + group by 1, 2 +) + +select + end_model.measured_month, + end_model.destination_id, + end_model.row_count as end_model_row_count, + staging_cleanup.row_count as staging_model_row_count +from end_model +left join staging_cleanup + on end_model.destination_id = staging_cleanup.destination_id + and end_model.measured_month = staging_cleanup.measured_month +where staging_cleanup.row_count != end_model.row_count \ No newline at end of file From 60d042d809e48e646e8b51b348906abde29b8721 Mon Sep 17 00:00:00 2001 From: Joe Markiewicz <74217849+fivetran-joemarkiewicz@users.noreply.github.com> Date: Wed, 6 Mar 2024 15:50:20 -0600 Subject: [PATCH 2/5] description removal --- CHANGELOG.md | 3 +++ README.md | 11 ++++++----- tests/row_count__audit_table.sql | 2 -- tests/row_count__audit_user_activity.sql | 1 + 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e96222e7..2a4796d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ - Addition of the `tests/` folder which includes various custom validation tests to ensure the the output of the end models are matching our expectations. - These tests are disabled by default as they are not necessary to run after every execution of the models. If you would like to enabled these tests you may activate them by defining the `fivetran_validation_tests_enabled` variable as `true`. +## Documentation Updates +- Updated the incremental strategy documentation in the README to be more accurate. + # dbt_fivetran_log v1.5.0 [PR #114](https://github.com/fivetran/dbt_fivetran_log/pull/114) includes the following updates: diff --git a/README.md b/README.md index c90c83c7..1347ce96 100644 --- a/README.md +++ b/README.md @@ -53,12 +53,13 @@ dispatch: ``` ### Database Incremental Strategies -Some of the end models in this package are materialized incrementally. We have chosen `insert_overwrite` as the default strategy for **BigQuery** and **Databricks** databases, as it is only available for these dbt adapters. For **Snowflake**, **Redshift**, and **Postgres** databases, we have chosen `delete+insert` as the default strategy. +Many of the end models in this package are materialized incrementally, so we have configured our models to work with the different strategies available to each supported warehouse. -`insert_overwrite` is our preferred incremental strategy because it will be able to properly handle updates to records that exist outside the immediate incremental window. That is, because it leverages partitions, `insert_overwrite` will appropriately update existing rows that have been changed upstream instead of inserting duplicates of them--all without requiring a full table scan. +For **BigQuery** and **Databricks** destinations, we have chosen `insert_overwrite` as the default strategy, which benefits from the partitioning capability. -`delete+insert` is our second-choice as it resembles `insert_overwrite` but lacks partitions. This strategy works most of the time and appropriately handles incremental loads that do not contain changes to past records. However, if a past record has been updated and is outside of the incremental window, `delete+insert` will insert a duplicate record. 😱 -> Because of this, we highly recommend that **Snowflake**, **Redshift**, and **Postgres** users periodically run a `--full-refresh` to ensure a high level of data quality and remove any possible duplicates. +For **Snowflake**, **Redshift**, and **Postgres** databases, we have chosen `delete+insert` as the default strategy. + +> Regardless of strategy, we recommend that users periodically run a `--full-refresh` to ensure a high level of data quality. ## Step 2: Installing the Package Include the following Fivetran Platform package version range in your `packages.yml` @@ -66,7 +67,7 @@ Include the following Fivetran Platform package version range in your `packages. ```yaml packages: - package: fivetran/fivetran_log - version: [">=1.5.0", "<1.6.0"] + version: [">=1.6.0", "<1.7.0"] ``` > Note that altough the source connector is now "Fivetran Platform", the package retains the old name of "fivetran_log". diff --git a/tests/row_count__audit_table.sql b/tests/row_count__audit_table.sql index 2e7ecee7..96c9be13 100644 --- a/tests/row_count__audit_table.sql +++ b/tests/row_count__audit_table.sql @@ -4,8 +4,6 @@ enabled=var('fivetran_validation_tests_enabled', false) ) }} --- Description: This test ensures the fivetran_platform__audit_table matches the same row count of the staging model with the appropriate filter. --- This should be run both on a --full-refresh and a normal dbt run in order to ensure the incremental logic is behaving as expected. with end_model as ( select connector_id, diff --git a/tests/row_count__audit_user_activity.sql b/tests/row_count__audit_user_activity.sql index 91eb1c0e..353c4a96 100644 --- a/tests/row_count__audit_user_activity.sql +++ b/tests/row_count__audit_user_activity.sql @@ -1,3 +1,4 @@ + {{ config( tags="validations", enabled=var('fivetran_validation_tests_enabled', false) From 0f2c2ca384a63feae136bd0637fbe6be9908b82c Mon Sep 17 00:00:00 2001 From: Joe Markiewicz <74217849+fivetran-joemarkiewicz@users.noreply.github.com> Date: Wed, 6 Mar 2024 15:51:39 -0600 Subject: [PATCH 3/5] index change to row_match for sql server --- models/fivetran_platform__audit_table.sql | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/models/fivetran_platform__audit_table.sql b/models/fivetran_platform__audit_table.sql index d24e4e55..6adda4f1 100644 --- a/models/fivetran_platform__audit_table.sql +++ b/models/fivetran_platform__audit_table.sql @@ -72,7 +72,7 @@ limit_to_table_starts as ( select *, - row_number() over (partition by connector_id, table_name order by write_to_table_start) as index + row_number() over (partition by connector_id, table_name order by write_to_table_start) as row_match from sync_timestamps where event_subtype = 'write_to_table_start' ), @@ -86,7 +86,7 @@ records_modified_log as ( {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['schema']) }} as schema_name, {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['operationType']) }} as operation_type, cast ({{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['count']) }} as {{ dbt.type_int() }}) as row_count, - row_number() over (partition by connector_id, table_name order by created_at) as index + row_number() over (partition by connector_id, table_name order by created_at) as row_match from sync_log where event_subtype = 'records_modified' @@ -113,7 +113,7 @@ sum_records_modified as ( limit_to_table_starts.connector_id = records_modified_log.connector_id and limit_to_table_starts.table_name = records_modified_log.table_name -- Each `write_to_table_start` log event should be matched with a corresponding `records_modified` event. Therefore, the generated index will be how we match these up and ensure a complete and accurate join. - and limit_to_table_starts.index = records_modified_log.index + and limit_to_table_starts.row_match = records_modified_log.row_match -- explicit group by needed for SQL Server group by From 568b793058f3771403bc1e88e53399be5fd4f86b Mon Sep 17 00:00:00 2001 From: Joe Markiewicz <74217849+fivetran-joemarkiewicz@users.noreply.github.com> Date: Wed, 6 Mar 2024 16:29:45 -0600 Subject: [PATCH 4/5] sql server compatibility for tests --- tests/row_count__audit_table.sql | 4 ++-- tests/row_count__audit_user_activity.sql | 4 ++-- tests/row_count__connector_daily_events.sql | 4 ++-- tests/row_count__connector_status.sql | 4 ++-- tests/row_count__mar_table_history.sql | 2 +- tests/row_count__schema_changelog.sql | 4 ++-- tests/row_count__usage_mar_destination_history.sql | 6 +++--- 7 files changed, 14 insertions(+), 14 deletions(-) diff --git a/tests/row_count__audit_table.sql b/tests/row_count__audit_table.sql index 96c9be13..a9b23e7d 100644 --- a/tests/row_count__audit_table.sql +++ b/tests/row_count__audit_table.sql @@ -10,7 +10,7 @@ with end_model as ( table_name, count(*) as row_count from {{ ref('fivetran_platform__audit_table') }} - group by 1, 2 + group by connector_id, table_name ), staging_model as ( @@ -20,7 +20,7 @@ staging_model as ( count(*) as row_count from {{ ref('stg_fivetran_platform__log') }} where event_subtype in ('write_to_table_start') - group by 1, 2 + group by connector_id, {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }} ) select diff --git a/tests/row_count__audit_user_activity.sql b/tests/row_count__audit_user_activity.sql index 353c4a96..c9889e0b 100644 --- a/tests/row_count__audit_user_activity.sql +++ b/tests/row_count__audit_user_activity.sql @@ -11,7 +11,7 @@ with end_model as ( date_day, count(*) as row_count from {{ ref('fivetran_platform__audit_user_activity') }} - group by 1, 2, 3 + group by connector_id, email, date_day ), staging_model as ( @@ -33,7 +33,7 @@ staging_cleanup as ( from staging_model where email is not null and lower(email) != 'fivetran' - group by 1,2,3 + group by connector_id, email, {{ dbt.date_trunc('day', 'created_at') }} ) select diff --git a/tests/row_count__connector_daily_events.sql b/tests/row_count__connector_daily_events.sql index 5c58011f..df42c0bc 100644 --- a/tests/row_count__connector_daily_events.sql +++ b/tests/row_count__connector_daily_events.sql @@ -11,7 +11,7 @@ with end_model as ( destination_id, count(*) as row_count from {{ ref('fivetran_platform__connector_daily_events') }} - group by 1, 2, 3 + group by date_day, connector_id, destination_id ), staging_model as ( @@ -52,7 +52,7 @@ staging_cleanup as ( from spine left join staging_model on spine.date_day >= cast({{ dbt.date_trunc('day', 'cast(staging_model.set_up_at as date)') }} as date) - group by 1, 2, 3 + group by spine.date_day, staging_model.connector_id, staging_model.destination_id ) select diff --git a/tests/row_count__connector_status.sql b/tests/row_count__connector_status.sql index 45e06523..4188a4a6 100644 --- a/tests/row_count__connector_status.sql +++ b/tests/row_count__connector_status.sql @@ -13,7 +13,7 @@ with end_model as ( set_up_at, count(*) as row_count from {{ ref('fivetran_platform__connector_status') }} - group by 1,2,3,4,5 + group by connector_id, connector_name, connector_type, destination_id, set_up_at ), staging_model as ( @@ -25,7 +25,7 @@ staging_model as ( set_up_at, count(*) as row_count from {{ ref('stg_fivetran_platform__connector') }} - group by 1,2,3,4,5 + group by connector_id, connector_name, connector_type, destination_id, set_up_at ) select diff --git a/tests/row_count__mar_table_history.sql b/tests/row_count__mar_table_history.sql index 3c707d03..16030a83 100644 --- a/tests/row_count__mar_table_history.sql +++ b/tests/row_count__mar_table_history.sql @@ -24,7 +24,7 @@ staging_model as ( {{ dbt.date_trunc('month', 'measured_date') }} as measured_month, sum(incremental_rows) as mar_count from {{ ref('stg_fivetran_platform__incremental_mar') }} - group by 1, 2, 3, 4, 5 + group by connector_name, schema_name, table_name, destination_id, {{ dbt.date_trunc('month', 'measured_date') }} as measured_month ) select diff --git a/tests/row_count__schema_changelog.sql b/tests/row_count__schema_changelog.sql index 75fa692c..69a6f4d2 100644 --- a/tests/row_count__schema_changelog.sql +++ b/tests/row_count__schema_changelog.sql @@ -10,7 +10,7 @@ with end_model as ( table_name, count(*) as row_count from {{ ref('fivetran_platform__schema_changelog') }} - group by 1, 2 + group by connector_id, table_name ), staging_model as ( @@ -23,7 +23,7 @@ staging_model as ( count(*) as row_count from {{ ref('stg_fivetran_platform__log') }} where event_subtype in ('create_table', 'alter_table', 'create_schema', 'change_schema_config') - group by 1, 2 + group by connector_id, table_name ) select diff --git a/tests/row_count__usage_mar_destination_history.sql b/tests/row_count__usage_mar_destination_history.sql index 8111af90..65853b5f 100644 --- a/tests/row_count__usage_mar_destination_history.sql +++ b/tests/row_count__usage_mar_destination_history.sql @@ -10,7 +10,7 @@ with end_model as ( destination_id, count(*) as row_count from {{ ref('fivetran_platform__usage_mar_destination_history') }} - group by 1, 2 + group by measured_month, destination_id ), staging_model as ( @@ -18,7 +18,7 @@ staging_model as ( cast({{ dbt.date_trunc('month', 'measured_date') }} as date) as measured_month, destination_id from {{ ref('stg_fivetran_platform__incremental_mar') }} - group by 1, 2 + group by cast({{ dbt.date_trunc('month', 'measured_date') }} as date) as measured_month, destination_id ), staging_cleanup as ( @@ -27,7 +27,7 @@ staging_cleanup as ( destination_id, count(*) as row_count from staging_model - group by 1, 2 + group by measured_month, destination_id ) select From d57e6842921a46fddbe8b8acab3267353f301c99 Mon Sep 17 00:00:00 2001 From: Joe Markiewicz <74217849+fivetran-joemarkiewicz@users.noreply.github.com> Date: Wed, 6 Mar 2024 16:35:09 -0600 Subject: [PATCH 5/5] sql server additional changes --- tests/row_count__mar_table_history.sql | 2 +- tests/row_count__schema_changelog.sql | 2 +- tests/row_count__usage_mar_destination_history.sql | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/row_count__mar_table_history.sql b/tests/row_count__mar_table_history.sql index 16030a83..d8861cff 100644 --- a/tests/row_count__mar_table_history.sql +++ b/tests/row_count__mar_table_history.sql @@ -24,7 +24,7 @@ staging_model as ( {{ dbt.date_trunc('month', 'measured_date') }} as measured_month, sum(incremental_rows) as mar_count from {{ ref('stg_fivetran_platform__incremental_mar') }} - group by connector_name, schema_name, table_name, destination_id, {{ dbt.date_trunc('month', 'measured_date') }} as measured_month + group by connector_name, schema_name, table_name, destination_id, {{ dbt.date_trunc('month', 'measured_date') }} ) select diff --git a/tests/row_count__schema_changelog.sql b/tests/row_count__schema_changelog.sql index 69a6f4d2..7c4faeeb 100644 --- a/tests/row_count__schema_changelog.sql +++ b/tests/row_count__schema_changelog.sql @@ -23,7 +23,7 @@ staging_model as ( count(*) as row_count from {{ ref('stg_fivetran_platform__log') }} where event_subtype in ('create_table', 'alter_table', 'create_schema', 'change_schema_config') - group by connector_id, table_name + group by connector_id, case when event_subtype = 'alter_table' then {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }} when event_subtype = 'create_table' then {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['name']) }} end ) select diff --git a/tests/row_count__usage_mar_destination_history.sql b/tests/row_count__usage_mar_destination_history.sql index 65853b5f..9700c54c 100644 --- a/tests/row_count__usage_mar_destination_history.sql +++ b/tests/row_count__usage_mar_destination_history.sql @@ -18,7 +18,7 @@ staging_model as ( cast({{ dbt.date_trunc('month', 'measured_date') }} as date) as measured_month, destination_id from {{ ref('stg_fivetran_platform__incremental_mar') }} - group by cast({{ dbt.date_trunc('month', 'measured_date') }} as date) as measured_month, destination_id + group by cast({{ dbt.date_trunc('month', 'measured_date') }} as date), destination_id ), staging_cleanup as (