Skip to content

Commit

Permalink
Use better adapter macros. Full pg support
Browse files Browse the repository at this point in the history
  • Loading branch information
jtcohen6 committed Mar 27, 2020
1 parent abac49b commit 0e69304
Show file tree
Hide file tree
Showing 21 changed files with 161 additions and 160 deletions.
34 changes: 25 additions & 9 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ jobs:
build:
docker:
- image: circleci/python:3.6.2-stretch
- image: circleci/postgres:9.6.5-alpine-ram

steps:
- checkout
Expand All @@ -28,19 +29,20 @@ jobs:
cp integration_tests/ci/sample.profiles.yml ~/.dbt/profiles.yml
- run:
name: "Run Tests - BigQuery"
name: "Run Tests - Postgres"
environment:
GCLOUD_SERVICE_KEY_PATH: "/home/circleci/gcloud-service-key.json"

CI_DBT_USER: root
CI_DBT_PASS: ''
CI_DBT_PORT: 5432
CI_DBT_DBNAME: circle_test
command: |
. venv/bin/activate
echo `pwd`
cd integration_tests
dbt deps --target bigquery
dbt seed --target bigquery --full-refresh
dbt run --target bigquery --full-refresh --vars 'update: false'
dbt run --target bigquery --vars 'update: true'
dbt test --target bigquery
dbt deps --target postgres
dbt seed --target postgres --full-refresh
dbt run --target postgres --full-refresh --vars 'update: false'
dbt run --target postgres --vars 'update: true'
dbt test --target postgres
- run:
name: "Run Tests - Redshift"
Expand All @@ -66,6 +68,20 @@ jobs:
dbt run --target snowflake --vars 'update: true'
dbt test --target snowflake
- run:
name: "Run Tests - BigQuery"
environment:
GCLOUD_SERVICE_KEY_PATH: "/home/circleci/gcloud-service-key.json"

command: |
. venv/bin/activate
echo `pwd`
cd integration_tests
dbt deps --target bigquery
dbt seed --target bigquery --full-refresh
dbt run --target bigquery --full-refresh --vars 'update: false'
dbt run --target bigquery --vars 'update: true'
dbt test --target bigquery
- save_cache:
key: deps1-{{ .Branch }}
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ test-bigquery:
dbt run --target bigquery --vars 'update: true'
dbt test --target bigquery

test-all: test-redshift test-snowflake test-bigquery
test-all: test-postgres test-redshift test-snowflake test-bigquery
echo "Completed successfully"
3 changes: 0 additions & 3 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ models:
'snowplow:context:performance_timing': FALSE
'snowplow:context:useragent': FALSE
'snowplow:pass_through_columns': ['test_add_col']

seeds:
sp_event_update:

seeds:
snowplow_integration_tests:
Expand Down
68 changes: 68 additions & 0 deletions macros/adapters/get_start_ts.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@

{%- macro get_max_sql(relation, field = 'collector_tstamp') -%}

select

coalesce(
max({{field}}),
'0001-01-01' -- a long, long time ago
) as start_ts

from {{ relation }}

{%- endmacro -%}


{%- macro get_most_recent_record(relation, field = 'collector_tstamp') -%}

{%- set result = run_query(get_max_sql(relation, field)) -%}

{% if execute %}
{% set start_ts = result.columns['start_ts'].values()[0] %}
{% else %}
{% set start_ts = '' %}
{% endif %}

{{ return(start_ts) }}

{%- endmacro -%}


{%- macro get_start_ts(relation, field = 'collector_tstamp') -%}
{{ adapter_macro('get_start_ts', relation, field) }}
{%- endmacro -%}


{%- macro default__get_start_ts(relation, field = 'collector_tstamp') -%}
({{get_max_sql(relation, field)}})
{%- endmacro -%}


{%- macro bigquery__get_start_ts(relation, field = 'collector_tstamp') -%}

{%- set partition_by = config.get('partition_by', none) -%}
{%- set partitions = config.get('partitions', none) -%}

{%- set start_ts -%}
{%- if config.incremental_strategy == 'insert_overwrite' -%}

{%- if partitions -%} least({{partitions|join(',')}})
{%- elif partition_by.data_type == 'date' -%} _dbt_max_partition
{%- else -%} date(_dbt_max_partition)
{%- endif -%}

{%- else -%}

{%- set rendered -%}
{%- if partition_by.data_type == 'date' -%} {{partition_by.field}}
{%- else -%} date({{partition_by.field}}) {%- endif -%}
{%- endset -%}
{%- set record = get_most_recent_record(relation, rendered) -%}
'{{record}}'

{%- endif -%}
{%- endset -%}

{%- do return(start_ts) -%}

{%- endmacro -%}
File renamed without changes.
File renamed without changes.
21 changes: 21 additions & 0 deletions macros/adapters/udf_convert_timezone.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{%- macro create_udf_convert_timezone() -%}
{{ adapter_macro('create_udf_convert_timezone') }}
{%- endmacro -%}

{% macro default__create_udf_convert_timezone() %}
select 1 as fun -- noop
{% endmacro %}

{% macro postgres__create_udf_convert_timezone() %}
create or replace function convert_timezone(
in_tzname text,
out_tzname text,
in_t timestamptz
) returns timestamptz
as $$
declare
begin
return in_t at time zone out_tzname at time zone in_tzname;
end;
$$ language plpgsql;
{% endmacro %}
48 changes: 0 additions & 48 deletions macros/bigquery/get_start_date.sql

This file was deleted.

10 changes: 0 additions & 10 deletions macros/url_query.sql

This file was deleted.

11 changes: 5 additions & 6 deletions models/identification/bigquery/snowplow_id_map.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,18 @@
'field': 'max_tstamp',
'data_type': 'timestamp'
},
unique_key="domain_userid",
unique_key='domain_userid',
cluster_by='domain_userid',
enabled=is_adapter('bigquery')
)
}}

with all_events as (

select *
from {{ ref('snowplow_base_events') }}
select * from {{ ref('snowplow_base_events') }}

{% if is_incremental() %}
{% set start_date = get_start_date(this) %}
where DATE(collector_tstamp) >= {{start_date}}
{% if is_incremental() %}
where DATE(collector_tstamp) >= {{get_start_ts(this)}}
{% endif %}

),
Expand Down
7 changes: 2 additions & 5 deletions models/identification/default/snowplow_id_map.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,10 @@ with all_events as (

new_events as (

select *
from all_events
select * from all_events

{% if is_incremental() %}
where collector_tstamp > (
select coalesce(max(max_tstamp), '0001-01-01') from {{ this }}
)
where collector_tstamp > {{get_start_ts(this, 'max_tstamp')}}
{% endif %}

),
Expand Down
8 changes: 5 additions & 3 deletions models/page_views/bigquery/snowplow_page_views.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
'field': 'page_view_start',
'data_type': 'timestamp'
},
unique_key="page_view_id",
unique_key='page_view_id',
cluster_by='page_view_id',
enabled=is_adapter('bigquery')
)
}}
Expand All @@ -28,12 +29,13 @@ with all_events as (
from {{ ref('snowplow_base_events') }}

{% if is_incremental() %}
{% set start_date = get_start_date(this) %}

where DATE(collector_tstamp) >=
date_sub(
{{start_date}},
{{get_start_ts(this)}},
interval {{var('snowplow:page_view_lookback_days')}} day
)

{% endif %}

),
Expand Down
20 changes: 14 additions & 6 deletions models/page_views/default/snowplow_page_views.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
sort='max_tstamp',
dist='user_snowplow_domain_id',
unique_key='page_view_id',
enabled=is_adapter('default')
enabled=is_adapter('default'),
pre_hook = create_udf_convert_timezone()
)
}}

Expand All @@ -20,11 +21,18 @@
with all_events as (

select * from {{ ref('snowplow_web_events') }}

{% if is_incremental() %}
where collector_tstamp > (
DATEADD('day', -1 * {{var('snowplow:page_view_lookback_days')}}, (select coalesce(max(max_tstamp), '0001-01-01') from {{ this }}))
)

where collector_tstamp >
{{dbt_utils.dateadd(
'day',
-1 * var('snowplow:page_view_lookback_days'),
get_start_ts(this, 'max_tstamp')
)}}

{% endif %}

),

filtered_events as (
Expand Down Expand Up @@ -101,8 +109,8 @@ prep as (
count(*) over (partition by domain_sessionid) as max_session_page_view_index,

-- page view: time
CONVERT_TIMEZONE('UTC', '{{ timezone }}', b.min_tstamp) as page_view_start,
CONVERT_TIMEZONE('UTC', '{{ timezone }}', b.max_tstamp) as page_view_end,
convert_timezone('UTC', '{{ timezone }}', b.min_tstamp) as page_view_start,
convert_timezone('UTC', '{{ timezone }}', b.max_tstamp) as page_view_end,

-- page view: time in the user's local timezone
convert_timezone('UTC', coalesce(a.os_timezone, '{{ timezone }}'), b.min_tstamp) as page_view_start_local,
Expand Down
5 changes: 2 additions & 3 deletions models/page_views/default/snowplow_web_events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ with all_events as (
events as (

select * from all_events

{% if is_incremental() %}
where collector_tstamp > (
select coalesce(max(collector_tstamp), '0001-01-01') from {{ this }}
)
where collector_tstamp > {{get_start_ts(this, 'collector_tstamp')}}
{% endif %}

),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ with all_events as (
events as (

select * from all_events

{% if is_incremental() %}
where collector_tstamp > (
select coalesce(max(max_tstamp), '0001-01-01') from {{ this }}
)
where collector_tstamp > {{get_start_ts(this, 'max_tstamp')}}
{% endif %}

),
Expand Down
5 changes: 2 additions & 3 deletions models/page_views/default/snowplow_web_events_time.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ with all_events as (
events as (

select * from all_events

{% if is_incremental() %}
where collector_tstamp > (
select coalesce(max(max_tstamp), '0001-01-01') from {{ this }}
)
where collector_tstamp > {{get_start_ts(this, 'max_tstamp')}}
{% endif %}

),
Expand Down
8 changes: 7 additions & 1 deletion models/page_views/optional/snowplow_web_timing_context.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@

{{ config(materialized='table', sort='page_view_id', dist='page_view_id') }}
{{
config(
materialized='table',
sort='page_view_id',
dist='page_view_id'
)
}}


with performance_timing_context as (
Expand Down
Loading

0 comments on commit 0e69304

Please sign in to comment.