Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/singular-tests-addition #116

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
# dbt_fivetran_log v1.6.0

## Bug Fixes
- It was observed that the `fivetran_platform__audit_table` model could potentially introduce duplicates for database like connectors. To address this issue, the join logic in the end model was adjusted to no longer be determined by date ranges and instead be defined via model generated indexes. This ensures true uniqueness in the end model.

## Testing Updates
- Addition of the `tests/` folder which includes various custom validation tests to ensure the the output of the end models are matching our expectations.
- These tests are disabled by default as they are not necessary to run after every execution of the models. If you would like to enabled these tests you may activate them by defining the `fivetran_validation_tests_enabled` variable as `true`.

## Documentation Updates
- Updated the incremental strategy documentation in the README to be more accurate.

# dbt_fivetran_log v1.5.0
[PR #114](https://github.com/fivetran/dbt_fivetran_log/pull/114) includes the following updates:

Expand Down
20 changes: 14 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,21 @@ dispatch:
```

### Database Incremental Strategies
Some of the end models in this package are materialized incrementally. We have chosen `insert_overwrite` as the default strategy for **BigQuery** and **Databricks** databases, as it is only available for these dbt adapters. For **Snowflake**, **Redshift**, and **Postgres** databases, we have chosen `delete+insert` as the default strategy.
Many of the end models in this package are materialized incrementally, so we have configured our models to work with the different strategies available to each supported warehouse.

`insert_overwrite` is our preferred incremental strategy because it will be able to properly handle updates to records that exist outside the immediate incremental window. That is, because it leverages partitions, `insert_overwrite` will appropriately update existing rows that have been changed upstream instead of inserting duplicates of them--all without requiring a full table scan.
For **BigQuery** and **Databricks** destinations, we have chosen `insert_overwrite` as the default strategy, which benefits from the partitioning capability.

`delete+insert` is our second-choice as it resembles `insert_overwrite` but lacks partitions. This strategy works most of the time and appropriately handles incremental loads that do not contain changes to past records. However, if a past record has been updated and is outside of the incremental window, `delete+insert` will insert a duplicate record. 😱
> Because of this, we highly recommend that **Snowflake**, **Redshift**, and **Postgres** users periodically run a `--full-refresh` to ensure a high level of data quality and remove any possible duplicates.
For **Snowflake**, **Redshift**, and **Postgres** databases, we have chosen `delete+insert` as the default strategy.

> Regardless of strategy, we recommend that users periodically run a `--full-refresh` to ensure a high level of data quality.

## Step 2: Installing the Package
Include the following Fivetran Platform package version range in your `packages.yml`
> Check [dbt Hub](https://hub.getdbt.com/) for the latest installation instructions, or [read the dbt docs](https://docs.getdbt.com/docs/package-management) for more information on installing packages.
```yaml
packages:
- package: fivetran/fivetran_log
version: [">=1.5.0", "<1.6.0"]
version: [">=1.6.0", "<1.7.0"]
```

> Note that altough the source connector is now "Fivetran Platform", the package retains the old name of "fivetran_log".
Expand Down Expand Up @@ -124,8 +125,15 @@ dispatch:
- macro_namespace: dbt_utils
search_order: ['spark_utils', 'dbt_utils']
```
## (Optional) Step 6: Run Model Validation Tests
All end models in this package include a suite of singular validation tests located within the `tests/` folder of this project. These validation tests are designed to ensure the end models are matching the expected outcomes. By default these singular tests are not necessary to be executed after every `dbt run`; therefore, these tests are disabled by default. If you would like to run these tests as part of your own validation efforts you may define the `fivetran_validation_tests_enabled` variable as `true`.

Further, all these singular validation tests utilize the `validations` tag. Therefore, if you would like to run these validation tests one-off you may run the following command to run **only** these validation tests and not the other generic schema tests already included in the base package.
```zsh
dbt test --select tag:validations --vars '{fivetran_validation_tests_enabled: true}'
```

## (Optional) Step 6: Orchestrate your models with Fivetran Transformations for dbt Core™
## (Optional) Step 7: Orchestrate your models with Fivetran Transformations for dbt Core™
<details><summary>Expand for details</summary>
<br>

Expand Down
2 changes: 1 addition & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
config-version: 2
name: 'fivetran_log'
version: '1.5.0'
version: '1.6.0'
require-dbt-version: [">=1.3.0", "<2.0.0"]

models:
Expand Down
3 changes: 2 additions & 1 deletion integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: 'fivetran_log_integration_tests'
version: '1.5.0'
version: '1.6.0'

config-version: 2
profile: 'integration_tests'
Expand All @@ -9,6 +9,7 @@ dispatch:
search_order: ['spark_utils', 'dbt_utils']

vars:
fivetran_validation_tests_enabled: true
fivetran_log:
fivetran_platform_schema: fivetran_platform_integration_tests
fivetran_platform_account_identifier: "account"
Expand Down
13 changes: 7 additions & 6 deletions models/fivetran_platform__audit_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ sync_timestamps as (
-- this will be the base for every record in the final CTE
limit_to_table_starts as (

select *
select
*,
row_number() over (partition by connector_id, table_name order by write_to_table_start) as row_match
from sync_timestamps
where event_subtype = 'write_to_table_start'
),
Expand All @@ -83,7 +85,8 @@ records_modified_log as (
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }} as table_name,
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['schema']) }} as schema_name,
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['operationType']) }} as operation_type,
cast ({{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['count']) }} as {{ dbt.type_int() }}) as row_count
cast ({{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['count']) }} as {{ dbt.type_int() }}) as row_count,
row_number() over (partition by connector_id, table_name order by created_at) as row_match
from sync_log
where event_subtype = 'records_modified'

Expand All @@ -109,10 +112,8 @@ sum_records_modified as (
left join records_modified_log on
limit_to_table_starts.connector_id = records_modified_log.connector_id
and limit_to_table_starts.table_name = records_modified_log.table_name

-- confine it to one sync
and records_modified_log.created_at > limit_to_table_starts.sync_start
and records_modified_log.created_at < coalesce(limit_to_table_starts.sync_end, limit_to_table_starts.next_sync_start)
-- Each `write_to_table_start` log event should be matched with a corresponding `records_modified` event. Therefore, the generated index will be how we match these up and ensure a complete and accurate join.
and limit_to_table_starts.row_match = records_modified_log.row_match

-- explicit group by needed for SQL Server
group by
Expand Down
35 changes: 35 additions & 0 deletions tests/row_count__audit_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@

{{ config(
tags="validations",
enabled=var('fivetran_validation_tests_enabled', false)
) }}

with end_model as (
select
connector_id,
table_name,
count(*) as row_count
from {{ ref('fivetran_platform__audit_table') }}
group by connector_id, table_name
),

staging_model as (
select
connector_id,
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }} as table_name,
count(*) as row_count
from {{ ref('stg_fivetran_platform__log') }}
where event_subtype in ('write_to_table_start')
group by connector_id, {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }}
)

select
end_model.connector_id,
end_model.table_name,
end_model.row_count as end_model_row_count,
staging_model.row_count as staging_model_row_count
from end_model
left join staging_model
on end_model.connector_id = staging_model.connector_id
and end_model.table_name = staging_model.table_name
where staging_model.row_count != end_model.row_count
50 changes: 50 additions & 0 deletions tests/row_count__audit_user_activity.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@

{{ config(
tags="validations",
enabled=var('fivetran_validation_tests_enabled', false)
) }}

with end_model as (
select
connector_id,
email,
date_day,
count(*) as row_count
from {{ ref('fivetran_platform__audit_user_activity') }}
group by connector_id, email, date_day
),

staging_model as (

select
*,
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['actor']) }} as email
from {{ ref('stg_fivetran_platform__log') }}
where lower(message_data) like '%actor%'
),

staging_cleanup as (

select
connector_id,
email,
{{ dbt.date_trunc('day', 'created_at') }} as date_day,
count(*) as row_count
from staging_model
where email is not null
and lower(email) != 'fivetran'
group by connector_id, email, {{ dbt.date_trunc('day', 'created_at') }}
)

select
end_model.connector_id,
end_model.email,
end_model.date_day,
end_model.row_count as end_model_row_count,
staging_cleanup.row_count as staging_model_row_count
from end_model
left join staging_cleanup
on end_model.connector_id = staging_cleanup.connector_id
and end_model.email = staging_cleanup.email
and end_model.date_day = staging_cleanup.date_day
where staging_cleanup.row_count != end_model.row_count
69 changes: 69 additions & 0 deletions tests/row_count__connector_daily_events.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@

{{ config(
tags="validations",
enabled=var('fivetran_validation_tests_enabled', false)
) }}

with end_model as (
select
date_day,
connector_id,
destination_id,
count(*) as row_count
from {{ ref('fivetran_platform__connector_daily_events') }}
group by date_day, connector_id, destination_id
),

staging_model as (

select *
from {{ ref('fivetran_platform__connector_status') }}
),

spine as (

{% if execute %}
{% set first_date_query %}
select min( signed_up ) as min_date from {{ var('connector') }}
{% endset %}
{% set first_date = run_query(first_date_query).columns[0][0]|string %}

{% else %} {% set first_date = "2016-01-01" %}
{% endif %}

select
cast(date_day as date) as date_day
from (
{{ fivetran_utils.fivetran_date_spine(
datepart = "day",
start_date = "cast('" ~ first_date[0:10] ~ "' as date)",
end_date = dbt.dateadd("week", 1, dbt.date_trunc('day', dbt.current_timestamp_backcompat() if target.type != 'sqlserver' else dbt.current_timestamp()))
)
}}
) as date_spine
),

staging_cleanup as (
select
spine.date_day,
staging_model.connector_id,
staging_model.destination_id,
count(*) as row_count
from spine
left join staging_model
on spine.date_day >= cast({{ dbt.date_trunc('day', 'cast(staging_model.set_up_at as date)') }} as date)
group by spine.date_day, staging_model.connector_id, staging_model.destination_id
)

select
end_model.date_day,
end_model.connector_id,
end_model.destination_id,
end_model.row_count as end_model_row_count,
staging_cleanup.row_count as staging_model_row_count
from end_model
left join staging_cleanup
on end_model.connector_id = staging_cleanup.connector_id
and end_model.destination_id = staging_cleanup.destination_id
and end_model.date_day = staging_cleanup.date_day
where staging_cleanup.row_count != end_model.row_count
43 changes: 43 additions & 0 deletions tests/row_count__connector_status.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@

{{ config(
tags="validations",
enabled=var('fivetran_validation_tests_enabled', false)
) }}

with end_model as (
select
connector_id,
connector_name,
connector_type,
destination_id,
set_up_at,
count(*) as row_count
from {{ ref('fivetran_platform__connector_status') }}
group by connector_id, connector_name, connector_type, destination_id, set_up_at
),

staging_model as (
select
connector_id,
connector_name,
connector_type,
destination_id,
set_up_at,
count(*) as row_count
from {{ ref('stg_fivetran_platform__connector') }}
group by connector_id, connector_name, connector_type, destination_id, set_up_at
)

select
end_model.connector_id,
end_model.connector_name,
end_model.connector_type,
end_model.destination_id,
end_model.set_up_at,
end_model.row_count as end_model_row_count,
staging_model.row_count as staging_model_row_count
from end_model
left join staging_model
on end_model.connector_id = staging_model.connector_id
and end_model.destination_id = staging_model.destination_id
where staging_model.row_count != end_model.row_count
44 changes: 44 additions & 0 deletions tests/row_count__mar_table_history.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@

{{ config(
tags="validations",
enabled=var('fivetran_validation_tests_enabled', false)
) }}

with end_model as (
select
connector_name,
schema_name,
table_name,
destination_id,
measured_month,
total_monthly_active_rows as mar_count
from {{ ref('fivetran_platform__mar_table_history') }}
),

staging_model as (
select
connector_name,
schema_name,
table_name,
destination_id,
{{ dbt.date_trunc('month', 'measured_date') }} as measured_month,
sum(incremental_rows) as mar_count
from {{ ref('stg_fivetran_platform__incremental_mar') }}
group by connector_name, schema_name, table_name, destination_id, {{ dbt.date_trunc('month', 'measured_date') }}
)

select
end_model.connector_name,
end_model.schema_name,
end_model.destination_id,
end_model.measured_month,
end_model.mar_count as end_model_mar_count,
staging_model.mar_count as staging_model_mar_count
from end_model
left join staging_model
on end_model.connector_name = staging_model.connector_name
and end_model.schema_name = staging_model.schema_name
and end_model.table_name = staging_model.table_name
and end_model.destination_id = staging_model.destination_id
and end_model.measured_month = staging_model.measured_month
where staging_model.mar_count != end_model.mar_count
Loading