Skip to content

Commit

Permalink
Rebase + rework
Browse files Browse the repository at this point in the history
  • Loading branch information
jtcohen6 committed Mar 26, 2020
1 parent 775c576 commit abac49b
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 57 deletions.
2 changes: 2 additions & 0 deletions dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ analysis-paths: ["analysis"]
data-paths: ["data"]
macro-paths: ["macros"]

require-dbt-version: ">=0.16.0"

models:
snowplow:
base:
Expand Down
14 changes: 14 additions & 0 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,17 @@ models:
'snowplow:context:useragent': FALSE
'snowplow:pass_through_columns': ['test_add_col']

seeds:
sp_event_update:

seeds:
snowplow_integration_tests:
snowplow:
sp_event_update:
column_types:
collector_tstamp: timestamp
derived_tstamp: timestamp
sp_event:
column_types:
collector_tstamp: timestamp
derived_tstamp: timestamp
48 changes: 48 additions & 0 deletions macros/bigquery/get_start_date.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@


{%- macro get_most_recent_record(relation) -%}

{#-- do not run the query in parsing mode #}
{%- if not execute -%}
{{ return('') }}
{%- endif -%}

{# fix for tmp suffix #}
{%- set relation = api.Relation.create(identifier=relation.name, schema=relation.schema) -%}
{%- set partition_col = config.partition_by.render() -%}

{%- call statement('_', fetch_result=True) -%}

select max(partition_col) as start_date from {{ relation }}

{%- endcall -%}

{%- set data = load_result('_')['table'].rows -%}
{{ return(data[0]['start_date']) }}

{%- endmacro -%}



{%- macro get_start_date() -%}

{%- set start_date -%}

{%- if config.incremental_strategy == 'insert_overwrite' -%}

{%- if config.partitions -%} least({{partitions|join(',')}})
{%- elif config.partition_by.data_type == 'date' -%} _dbt_max_partition
{%- else -%} date(_dbt_max_partition)
{%- endif -%}

{%- else -%}

'{{ get_most_recent_record() }}'

{%- endif -%}

{%- endset -%}

{%- do return(start_date) -%}

{%- endmacro -%}
26 changes: 0 additions & 26 deletions macros/most_recent_record.sql

This file was deleted.

11 changes: 0 additions & 11 deletions macros/similar_to.sql

This file was deleted.

16 changes: 10 additions & 6 deletions models/identification/bigquery/snowplow_id_map.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,24 @@
{{
config(
materialized='incremental',
partition_by='DATE(max_tstamp)',
partition_by={
'field': 'max_tstamp',
'data_type': 'timestamp'
},
unique_key="domain_userid",
enabled=is_adapter('bigquery')
)
}}

{% set start_date = get_most_recent_record(this, "max_tstamp", "2001-01-01") %}

with all_events as (

select *
from {{ ref('snowplow_base_events') }}
where DATE(collector_tstamp) >= date_sub('{{ start_date }}', interval 1 day)

{% if is_incremental() %}
{% set start_date = get_start_date(this) %}
where DATE(collector_tstamp) >= {{start_date}}
{% endif %}

),

Expand All @@ -27,7 +32,6 @@ new_sessions as (
domain_sessionid

from all_events
where DATE(collector_tstamp) >= '{{ start_date }}'

),

Expand Down Expand Up @@ -59,7 +63,7 @@ prep as (
rows between unbounded preceding and unbounded following
) as user_id,

max(timestamp(collector_tstamp)) over (
max(collector_tstamp) over (
partition by domain_userid
) as max_tstamp

Expand Down
20 changes: 12 additions & 8 deletions models/page_views/bigquery/snowplow_page_views.sql
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
{{
config(
materialized='incremental',
partition_by='DATE(page_view_start)',
partition_by={
'field': 'page_view_start',
'data_type': 'timestamp'
},
unique_key="page_view_id",
enabled=is_adapter('bigquery')
)
}}

{% set timezone = var('snowplow:timezone', 'UTC') %}
{% set start_date = get_most_recent_record(this, "page_view_start", "2001-01-01") %}

/*
General approach: find sessions that happened since the last time
Expand All @@ -25,9 +27,14 @@ with all_events as (
select *
from {{ ref('snowplow_base_events') }}

-- load up events from the start date, and the day before it, to ensure
-- that we capture pageviews that span midnight
where DATE(collector_tstamp) >= date_sub('{{ start_date }}', interval 1 day)
{% if is_incremental() %}
{% set start_date = get_start_date(this) %}
where DATE(collector_tstamp) >=
date_sub(
{{start_date}},
interval {{var('snowplow:page_view_lookback_days')}} day
)
{% endif %}

),

Expand All @@ -38,9 +45,6 @@ new_sessions as (

from all_events

-- only consider events for sessions that occurred on or after the start_date
where DATE(collector_tstamp) >= '{{ start_date }}'

),

relevant_events as (
Expand Down
5 changes: 4 additions & 1 deletion models/sessions/bigquery/snowplow_sessions.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
{{
config(
materialized='table',
partition_by='DATE(session_start)',
partition_by={
'field': 'session_start',
'data_type': 'timestamp'
},
enabled=is_adapter('bigquery')
)
}}
Expand Down
14 changes: 9 additions & 5 deletions models/sessions/bigquery/snowplow_sessions_tmp.sql
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
{{
config(
materialized='incremental',
partition_by='DATE(session_start)',
partition_by={
'field': 'session_start',
'data_type': 'timestamp'
},
unique_key="session_id",
enabled=is_adapter('bigquery')
)
}}

{% set start_date = get_most_recent_record(this, "session_start", "2001-01-01") %}

with all_page_views as (

select * from {{ ref('snowplow_page_views') }}
where DATE(page_view_start) >= date_sub('{{ start_date }}', interval 1 day)

{% if is_incremental() %}
{% set start_date = get_start_date(this) %}
where DATE(collector_tstamp) >= {{start_date}}
{% endif %}

),

Expand All @@ -22,7 +27,6 @@ new_page_views as (
session_id

from all_page_views
where DATE(page_view_start) >= '{{ start_date }}'

),

Expand Down

0 comments on commit abac49b

Please sign in to comment.