diff --git a/CHANGELOG.md b/CHANGELOG.md index 227ed385..2a4796d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,15 @@ +# dbt_fivetran_log v1.6.0 + +## Bug Fixes +- It was observed that the `fivetran_platform__audit_table` model could potentially introduce duplicates for database like connectors. To address this issue, the join logic in the end model was adjusted to no longer be determined by date ranges and instead be defined via model generated indexes. This ensures true uniqueness in the end model. + +## Testing Updates +- Addition of the `tests/` folder which includes various custom validation tests to ensure the the output of the end models are matching our expectations. + - These tests are disabled by default as they are not necessary to run after every execution of the models. If you would like to enabled these tests you may activate them by defining the `fivetran_validation_tests_enabled` variable as `true`. + +## Documentation Updates +- Updated the incremental strategy documentation in the README to be more accurate. + # dbt_fivetran_log v1.5.0 [PR #114](https://github.com/fivetran/dbt_fivetran_log/pull/114) includes the following updates: diff --git a/README.md b/README.md index f24cc78c..1347ce96 100644 --- a/README.md +++ b/README.md @@ -53,12 +53,13 @@ dispatch: ``` ### Database Incremental Strategies -Some of the end models in this package are materialized incrementally. We have chosen `insert_overwrite` as the default strategy for **BigQuery** and **Databricks** databases, as it is only available for these dbt adapters. For **Snowflake**, **Redshift**, and **Postgres** databases, we have chosen `delete+insert` as the default strategy. +Many of the end models in this package are materialized incrementally, so we have configured our models to work with the different strategies available to each supported warehouse. -`insert_overwrite` is our preferred incremental strategy because it will be able to properly handle updates to records that exist outside the immediate incremental window. That is, because it leverages partitions, `insert_overwrite` will appropriately update existing rows that have been changed upstream instead of inserting duplicates of them--all without requiring a full table scan. +For **BigQuery** and **Databricks** destinations, we have chosen `insert_overwrite` as the default strategy, which benefits from the partitioning capability. -`delete+insert` is our second-choice as it resembles `insert_overwrite` but lacks partitions. This strategy works most of the time and appropriately handles incremental loads that do not contain changes to past records. However, if a past record has been updated and is outside of the incremental window, `delete+insert` will insert a duplicate record. 😱 -> Because of this, we highly recommend that **Snowflake**, **Redshift**, and **Postgres** users periodically run a `--full-refresh` to ensure a high level of data quality and remove any possible duplicates. +For **Snowflake**, **Redshift**, and **Postgres** databases, we have chosen `delete+insert` as the default strategy. + +> Regardless of strategy, we recommend that users periodically run a `--full-refresh` to ensure a high level of data quality. ## Step 2: Installing the Package Include the following Fivetran Platform package version range in your `packages.yml` @@ -66,7 +67,7 @@ Include the following Fivetran Platform package version range in your `packages. ```yaml packages: - package: fivetran/fivetran_log - version: [">=1.5.0", "<1.6.0"] + version: [">=1.6.0", "<1.7.0"] ``` > Note that altough the source connector is now "Fivetran Platform", the package retains the old name of "fivetran_log". @@ -124,8 +125,15 @@ dispatch: - macro_namespace: dbt_utils search_order: ['spark_utils', 'dbt_utils'] ``` +## (Optional) Step 6: Run Model Validation Tests +All end models in this package include a suite of singular validation tests located within the `tests/` folder of this project. These validation tests are designed to ensure the end models are matching the expected outcomes. By default these singular tests are not necessary to be executed after every `dbt run`; therefore, these tests are disabled by default. If you would like to run these tests as part of your own validation efforts you may define the `fivetran_validation_tests_enabled` variable as `true`. + +Further, all these singular validation tests utilize the `validations` tag. Therefore, if you would like to run these validation tests one-off you may run the following command to run **only** these validation tests and not the other generic schema tests already included in the base package. +```zsh +dbt test --select tag:validations --vars '{fivetran_validation_tests_enabled: true}' +``` -## (Optional) Step 6: Orchestrate your models with Fivetran Transformations for dbt Core™ +## (Optional) Step 7: Orchestrate your models with Fivetran Transformations for dbt Core™
Expand for details
diff --git a/dbt_project.yml b/dbt_project.yml index 839bac9d..c7cb501b 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -1,6 +1,6 @@ config-version: 2 name: 'fivetran_log' -version: '1.5.0' +version: '1.6.0' require-dbt-version: [">=1.3.0", "<2.0.0"] models: diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index ac77104a..b836e543 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -1,5 +1,5 @@ name: 'fivetran_log_integration_tests' -version: '1.5.0' +version: '1.6.0' config-version: 2 profile: 'integration_tests' @@ -9,6 +9,7 @@ dispatch: search_order: ['spark_utils', 'dbt_utils'] vars: + fivetran_validation_tests_enabled: true fivetran_log: fivetran_platform_schema: fivetran_platform_integration_tests fivetran_platform_account_identifier: "account" diff --git a/models/fivetran_platform__audit_table.sql b/models/fivetran_platform__audit_table.sql index 3734a4b1..6adda4f1 100644 --- a/models/fivetran_platform__audit_table.sql +++ b/models/fivetran_platform__audit_table.sql @@ -70,7 +70,9 @@ sync_timestamps as ( -- this will be the base for every record in the final CTE limit_to_table_starts as ( - select * + select + *, + row_number() over (partition by connector_id, table_name order by write_to_table_start) as row_match from sync_timestamps where event_subtype = 'write_to_table_start' ), @@ -83,7 +85,8 @@ records_modified_log as ( {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }} as table_name, {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['schema']) }} as schema_name, {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['operationType']) }} as operation_type, - cast ({{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['count']) }} as {{ dbt.type_int() }}) as row_count + cast ({{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['count']) }} as {{ dbt.type_int() }}) as row_count, + row_number() over (partition by connector_id, table_name order by created_at) as row_match from sync_log where event_subtype = 'records_modified' @@ -109,10 +112,8 @@ sum_records_modified as ( left join records_modified_log on limit_to_table_starts.connector_id = records_modified_log.connector_id and limit_to_table_starts.table_name = records_modified_log.table_name - - -- confine it to one sync - and records_modified_log.created_at > limit_to_table_starts.sync_start - and records_modified_log.created_at < coalesce(limit_to_table_starts.sync_end, limit_to_table_starts.next_sync_start) + -- Each `write_to_table_start` log event should be matched with a corresponding `records_modified` event. Therefore, the generated index will be how we match these up and ensure a complete and accurate join. + and limit_to_table_starts.row_match = records_modified_log.row_match -- explicit group by needed for SQL Server group by diff --git a/tests/row_count__audit_table.sql b/tests/row_count__audit_table.sql new file mode 100644 index 00000000..a9b23e7d --- /dev/null +++ b/tests/row_count__audit_table.sql @@ -0,0 +1,35 @@ + +{{ config( + tags="validations", + enabled=var('fivetran_validation_tests_enabled', false) +) }} + +with end_model as ( + select + connector_id, + table_name, + count(*) as row_count + from {{ ref('fivetran_platform__audit_table') }} + group by connector_id, table_name +), + +staging_model as ( + select + connector_id, + {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }} as table_name, + count(*) as row_count + from {{ ref('stg_fivetran_platform__log') }} + where event_subtype in ('write_to_table_start') + group by connector_id, {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }} +) + +select + end_model.connector_id, + end_model.table_name, + end_model.row_count as end_model_row_count, + staging_model.row_count as staging_model_row_count +from end_model +left join staging_model + on end_model.connector_id = staging_model.connector_id + and end_model.table_name = staging_model.table_name +where staging_model.row_count != end_model.row_count \ No newline at end of file diff --git a/tests/row_count__audit_user_activity.sql b/tests/row_count__audit_user_activity.sql new file mode 100644 index 00000000..c9889e0b --- /dev/null +++ b/tests/row_count__audit_user_activity.sql @@ -0,0 +1,50 @@ + +{{ config( + tags="validations", + enabled=var('fivetran_validation_tests_enabled', false) +) }} + +with end_model as ( + select + connector_id, + email, + date_day, + count(*) as row_count + from {{ ref('fivetran_platform__audit_user_activity') }} + group by connector_id, email, date_day +), + +staging_model as ( + + select + *, + {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['actor']) }} as email + from {{ ref('stg_fivetran_platform__log') }} + where lower(message_data) like '%actor%' +), + +staging_cleanup as ( + + select + connector_id, + email, + {{ dbt.date_trunc('day', 'created_at') }} as date_day, + count(*) as row_count + from staging_model + where email is not null + and lower(email) != 'fivetran' + group by connector_id, email, {{ dbt.date_trunc('day', 'created_at') }} +) + +select + end_model.connector_id, + end_model.email, + end_model.date_day, + end_model.row_count as end_model_row_count, + staging_cleanup.row_count as staging_model_row_count +from end_model +left join staging_cleanup + on end_model.connector_id = staging_cleanup.connector_id + and end_model.email = staging_cleanup.email + and end_model.date_day = staging_cleanup.date_day +where staging_cleanup.row_count != end_model.row_count \ No newline at end of file diff --git a/tests/row_count__connector_daily_events.sql b/tests/row_count__connector_daily_events.sql new file mode 100644 index 00000000..df42c0bc --- /dev/null +++ b/tests/row_count__connector_daily_events.sql @@ -0,0 +1,69 @@ + +{{ config( + tags="validations", + enabled=var('fivetran_validation_tests_enabled', false) +) }} + +with end_model as ( + select + date_day, + connector_id, + destination_id, + count(*) as row_count + from {{ ref('fivetran_platform__connector_daily_events') }} + group by date_day, connector_id, destination_id +), + +staging_model as ( + + select * + from {{ ref('fivetran_platform__connector_status') }} +), + +spine as ( + + {% if execute %} + {% set first_date_query %} + select min( signed_up ) as min_date from {{ var('connector') }} + {% endset %} + {% set first_date = run_query(first_date_query).columns[0][0]|string %} + + {% else %} {% set first_date = "2016-01-01" %} + {% endif %} + + select + cast(date_day as date) as date_day + from ( + {{ fivetran_utils.fivetran_date_spine( + datepart = "day", + start_date = "cast('" ~ first_date[0:10] ~ "' as date)", + end_date = dbt.dateadd("week", 1, dbt.date_trunc('day', dbt.current_timestamp_backcompat() if target.type != 'sqlserver' else dbt.current_timestamp())) + ) + }} + ) as date_spine +), + +staging_cleanup as ( + select + spine.date_day, + staging_model.connector_id, + staging_model.destination_id, + count(*) as row_count + from spine + left join staging_model + on spine.date_day >= cast({{ dbt.date_trunc('day', 'cast(staging_model.set_up_at as date)') }} as date) + group by spine.date_day, staging_model.connector_id, staging_model.destination_id +) + +select + end_model.date_day, + end_model.connector_id, + end_model.destination_id, + end_model.row_count as end_model_row_count, + staging_cleanup.row_count as staging_model_row_count +from end_model +left join staging_cleanup + on end_model.connector_id = staging_cleanup.connector_id + and end_model.destination_id = staging_cleanup.destination_id + and end_model.date_day = staging_cleanup.date_day +where staging_cleanup.row_count != end_model.row_count \ No newline at end of file diff --git a/tests/row_count__connector_status.sql b/tests/row_count__connector_status.sql new file mode 100644 index 00000000..4188a4a6 --- /dev/null +++ b/tests/row_count__connector_status.sql @@ -0,0 +1,43 @@ + +{{ config( + tags="validations", + enabled=var('fivetran_validation_tests_enabled', false) +) }} + +with end_model as ( + select + connector_id, + connector_name, + connector_type, + destination_id, + set_up_at, + count(*) as row_count + from {{ ref('fivetran_platform__connector_status') }} + group by connector_id, connector_name, connector_type, destination_id, set_up_at +), + +staging_model as ( + select + connector_id, + connector_name, + connector_type, + destination_id, + set_up_at, + count(*) as row_count + from {{ ref('stg_fivetran_platform__connector') }} + group by connector_id, connector_name, connector_type, destination_id, set_up_at +) + +select + end_model.connector_id, + end_model.connector_name, + end_model.connector_type, + end_model.destination_id, + end_model.set_up_at, + end_model.row_count as end_model_row_count, + staging_model.row_count as staging_model_row_count +from end_model +left join staging_model + on end_model.connector_id = staging_model.connector_id + and end_model.destination_id = staging_model.destination_id +where staging_model.row_count != end_model.row_count \ No newline at end of file diff --git a/tests/row_count__mar_table_history.sql b/tests/row_count__mar_table_history.sql new file mode 100644 index 00000000..d8861cff --- /dev/null +++ b/tests/row_count__mar_table_history.sql @@ -0,0 +1,44 @@ + +{{ config( + tags="validations", + enabled=var('fivetran_validation_tests_enabled', false) +) }} + +with end_model as ( + select + connector_name, + schema_name, + table_name, + destination_id, + measured_month, + total_monthly_active_rows as mar_count + from {{ ref('fivetran_platform__mar_table_history') }} +), + +staging_model as ( + select + connector_name, + schema_name, + table_name, + destination_id, + {{ dbt.date_trunc('month', 'measured_date') }} as measured_month, + sum(incremental_rows) as mar_count + from {{ ref('stg_fivetran_platform__incremental_mar') }} + group by connector_name, schema_name, table_name, destination_id, {{ dbt.date_trunc('month', 'measured_date') }} +) + +select + end_model.connector_name, + end_model.schema_name, + end_model.destination_id, + end_model.measured_month, + end_model.mar_count as end_model_mar_count, + staging_model.mar_count as staging_model_mar_count +from end_model +left join staging_model + on end_model.connector_name = staging_model.connector_name + and end_model.schema_name = staging_model.schema_name + and end_model.table_name = staging_model.table_name + and end_model.destination_id = staging_model.destination_id + and end_model.measured_month = staging_model.measured_month +where staging_model.mar_count != end_model.mar_count \ No newline at end of file diff --git a/tests/row_count__schema_changelog.sql b/tests/row_count__schema_changelog.sql new file mode 100644 index 00000000..7c4faeeb --- /dev/null +++ b/tests/row_count__schema_changelog.sql @@ -0,0 +1,38 @@ + +{{ config( + tags="validations", + enabled=var('fivetran_validation_tests_enabled', false) +) }} + +with end_model as ( + select + connector_id, + table_name, + count(*) as row_count + from {{ ref('fivetran_platform__schema_changelog') }} + group by connector_id, table_name +), + +staging_model as ( + select + connector_id, + case + when event_subtype = 'alter_table' then {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }} + when event_subtype = 'create_table' then {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['name']) }} + end as table_name, + count(*) as row_count + from {{ ref('stg_fivetran_platform__log') }} + where event_subtype in ('create_table', 'alter_table', 'create_schema', 'change_schema_config') + group by connector_id, case when event_subtype = 'alter_table' then {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }} when event_subtype = 'create_table' then {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['name']) }} end +) + +select + end_model.connector_id, + end_model.table_name, + end_model.row_count as end_model_row_count, + staging_model.row_count as staging_model_row_count +from end_model +left join staging_model + on end_model.connector_id = staging_model.connector_id + and end_model.table_name = staging_model.table_name +where staging_model.row_count != end_model.row_count \ No newline at end of file diff --git a/tests/row_count__usage_mar_destination_history.sql b/tests/row_count__usage_mar_destination_history.sql new file mode 100644 index 00000000..9700c54c --- /dev/null +++ b/tests/row_count__usage_mar_destination_history.sql @@ -0,0 +1,42 @@ + +{{ config( + tags="validations", + enabled=var('fivetran_validation_tests_enabled', false) +) }} + +with end_model as ( + select + measured_month, + destination_id, + count(*) as row_count + from {{ ref('fivetran_platform__usage_mar_destination_history') }} + group by measured_month, destination_id +), + +staging_model as ( + select + cast({{ dbt.date_trunc('month', 'measured_date') }} as date) as measured_month, + destination_id + from {{ ref('stg_fivetran_platform__incremental_mar') }} + group by cast({{ dbt.date_trunc('month', 'measured_date') }} as date), destination_id +), + +staging_cleanup as ( + select + measured_month, + destination_id, + count(*) as row_count + from staging_model + group by measured_month, destination_id +) + +select + end_model.measured_month, + end_model.destination_id, + end_model.row_count as end_model_row_count, + staging_cleanup.row_count as staging_model_row_count +from end_model +left join staging_cleanup + on end_model.destination_id = staging_cleanup.destination_id + and end_model.measured_month = staging_cleanup.measured_month +where staging_cleanup.row_count != end_model.row_count \ No newline at end of file