diff --git a/integration_tests/data/expected/snowplow_sessions_expected.csv b/integration_tests/data/expected/snowplow_sessions_expected.csv index 1afa351..db86958 100644 --- a/integration_tests/data/expected/snowplow_sessions_expected.csv +++ b/integration_tests/data/expected/snowplow_sessions_expected.csv @@ -1,6 +1,6 @@ user_custom_id,inferred_user_id,user_snowplow_domain_id,user_snowplow_crossdomain_id,app_id,first_page_url,marketing_medium,marketing_source,marketing_term,marketing_campaign,marketing_content,referer_url,session_start,session_end,session_id,time_engaged_in_s,session_index NULL,user_id_2,domain_userid_2,network_userid_2,vandelay,vandelay.com/,social,facebook,abc,campaign_1,def,facebook.com/,2018-01-01 7:00:00,2018-01-01 7:00:02,user_2_session_1,30,1 NULL,user_id_3,domain_userid_3,network_userid_3,vandelay,vandelay.com/,NULL,NULL,NULL,NULL,NULL,google.com/,2018-01-01 7:00:00,2018-01-01 7:00:00,user_3_session_1,0,1 -user_id_1,user_id_1,domain_userid_1,network_userid_1,vandelay,vandelay.com/,NULL,NULL,NULL,NULL,NULL,NULL,2018-01-01 7:00:00,2018-01-01 7:00:01,user_1_session_1,30,1 +user_id_1,user_id_1,domain_userid_1,network_userid_1,vandelay,vandelay.com/,NULL,NULL,NULL,NULL,NULL,NULL,2018-01-01 7:00:00,2018-01-01 7:00:05,user_1_session_1,30,1 NULL,domain_userid_4,domain_userid_4,network_userid_4,vandelay,vandelay.com/,NULL,NULL,NULL,NULL,NULL,NULL,2018-01-01 7:00:30,2018-01-01 7:00:30,user_4_session_1,0,1 user_id_3,user_id_3,domain_userid_3,network_userid_3,vandelay,vandelay.com/,medium,source,term,campaign,content,google.com/,2018-01-01 7:00:40,2018-01-01 7:00:40,user_3_session_2,0,2 diff --git a/macros/adapters/bigquery/identification/snowplow_id_map.sql b/macros/adapters/bigquery/identification/snowplow_id_map.sql index 5b2f524..0e79e21 100644 --- a/macros/adapters/bigquery/identification/snowplow_id_map.sql +++ b/macros/adapters/bigquery/identification/snowplow_id_map.sql @@ -9,7 +9,6 @@ config( materialized='incremental', partition_by='DATE(max_tstamp)', - sql_where='TRUE', unique_key="domain_userid" ) }} diff --git a/macros/adapters/bigquery/pageviews/snowplow_page_views.sql b/macros/adapters/bigquery/pageviews/snowplow_page_views.sql index 653d381..93386c5 100644 --- a/macros/adapters/bigquery/pageviews/snowplow_page_views.sql +++ b/macros/adapters/bigquery/pageviews/snowplow_page_views.sql @@ -5,7 +5,6 @@ config( materialized='incremental', partition_by='DATE(page_view_start)', - sql_where='TRUE', unique_key="page_view_id" ) }} diff --git a/macros/adapters/bigquery/sessions/snowplow_sessions_tmp.sql b/macros/adapters/bigquery/sessions/snowplow_sessions_tmp.sql index 7ce635d..0f5eb21 100644 --- a/macros/adapters/bigquery/sessions/snowplow_sessions_tmp.sql +++ b/macros/adapters/bigquery/sessions/snowplow_sessions_tmp.sql @@ -5,7 +5,6 @@ config( materialized='incremental', partition_by='DATE(session_start)', - sql_where='TRUE', unique_key="session_id" ) }} diff --git a/macros/adapters/default/identification/snowplow_id_map.sql b/macros/adapters/default/identification/snowplow_id_map.sql index a0422ae..6824d13 100644 --- a/macros/adapters/default/identification/snowplow_id_map.sql +++ b/macros/adapters/default/identification/snowplow_id_map.sql @@ -17,7 +17,6 @@ materialized='incremental', sort='domain_userid', dist='domain_userid', - sql_where='TRUE', unique_key='domain_userid' ) }} @@ -33,7 +32,7 @@ new_events as ( select * from all_events - {% if adapter.already_exists(this.schema, this.name) and not flags.FULL_REFRESH %} + {% if is_incremental() %} where collector_tstamp > ( select coalesce(max(max_tstamp), '0001-01-01') from {{ this }} ) diff --git a/macros/adapters/default/page_views/snowplow_page_views.sql b/macros/adapters/default/page_views/snowplow_page_views.sql index 38b8601..87d2450 100644 --- a/macros/adapters/default/page_views/snowplow_page_views.sql +++ b/macros/adapters/default/page_views/snowplow_page_views.sql @@ -13,7 +13,6 @@ materialized='incremental', sort='max_tstamp', dist='user_snowplow_domain_id', - sql_where='TRUE', unique_key='page_view_id' ) }} @@ -28,13 +27,12 @@ with all_events as ( select * from {{ ref('snowplow_web_events') }} - ), -web_events as ( +filtered_events as ( select * from all_events - {% if adapter.already_exists(this.schema, this.name) and not flags.FULL_REFRESH %} + {% if is_incremental() %} where collector_tstamp > ( select coalesce(max(max_tstamp), '0001-01-01') from {{ this }} ) @@ -42,6 +40,22 @@ web_events as ( ), +-- we need to grab all events for any session that has appeared +-- in order to correctly process the session index below +relevant_sessions as ( + + select distinct domain_sessionid + from filtered_events +), + +web_events as ( + + select all_events.* + from all_events + join relevant_sessions using (domain_sessionid) + +), + internal_session_mapping as ( select * from {{ ref('snowplow_web_events_internal_fixed') }} diff --git a/macros/adapters/default/page_views/snowplow_web_events.sql b/macros/adapters/default/page_views/snowplow_web_events.sql index aec851a..8f466db 100644 --- a/macros/adapters/default/page_views/snowplow_web_events.sql +++ b/macros/adapters/default/page_views/snowplow_web_events.sql @@ -17,7 +17,6 @@ materialized='incremental', sort='page_view_id', dist='page_view_id', - sql_where='TRUE', unique_key='page_view_id' ) }} @@ -32,7 +31,7 @@ with all_events as ( events as ( select * from all_events - {% if adapter.already_exists(this.schema, this.name) and not flags.FULL_REFRESH %} + {% if is_incremental() %} where collector_tstamp > ( select coalesce(max(collector_tstamp), '0001-01-01') from {{ this }} ) diff --git a/macros/adapters/default/page_views/snowplow_web_events_scroll_depth.sql b/macros/adapters/default/page_views/snowplow_web_events_scroll_depth.sql index 895e699..b6293e7 100644 --- a/macros/adapters/default/page_views/snowplow_web_events_scroll_depth.sql +++ b/macros/adapters/default/page_views/snowplow_web_events_scroll_depth.sql @@ -13,14 +13,10 @@ materialized='incremental', sort='page_view_id', dist='page_view_id', - sql_where='TRUE', unique_key='page_view_id' ) }} -{# cache this because we need it below too #} -{% set this_exists = adapter.already_exists(this.schema, this.name) and not flags.FULL_REFRESH%} - with all_events as ( select * from {{ ref('snowplow_base_events') }} @@ -30,7 +26,7 @@ with all_events as ( events as ( select * from all_events - {% if this_exists %} + {% if is_incremental() %} where collector_tstamp > ( select coalesce(max(max_tstamp), '0001-01-01') from {{ this }} ) @@ -101,7 +97,7 @@ relative as ( ), -{% if this_exists %} +{% if is_incremental() %} relevant_existing as ( diff --git a/macros/adapters/default/page_views/snowplow_web_events_time.sql b/macros/adapters/default/page_views/snowplow_web_events_time.sql index 631e6e6..6ef5941 100644 --- a/macros/adapters/default/page_views/snowplow_web_events_time.sql +++ b/macros/adapters/default/page_views/snowplow_web_events_time.sql @@ -13,13 +13,10 @@ materialized='incremental', sort='page_view_id', dist='page_view_id', - sql_where='TRUE', unique_key='page_view_id' ) }} -{# cache this because we need it below too #} -{% set this_exists = adapter.already_exists(this.schema, this.name) and not flags.FULL_REFRESH%} with all_events as ( @@ -30,7 +27,7 @@ with all_events as ( events as ( select * from all_events - {% if this_exists %} + {% if is_incremental() %} where collector_tstamp > ( select coalesce(max(max_tstamp), '0001-01-01') from {{ this }} ) @@ -67,7 +64,7 @@ prep as ( ), -{% if this_exists %} +{% if is_incremental() %} relevant_existing as ( diff --git a/macros/adapters/default/sessions/snowplow_sessions_tmp.sql b/macros/adapters/default/sessions/snowplow_sessions_tmp.sql index c12a442..52eb57d 100644 --- a/macros/adapters/default/sessions/snowplow_sessions_tmp.sql +++ b/macros/adapters/default/sessions/snowplow_sessions_tmp.sql @@ -13,17 +13,39 @@ materialized='incremental', sort='session_start', dist='user_snowplow_domain_id', - sql_where='session_start > (select max(session_start) from {{ this }})', unique_key='session_id' ) }} -with web_page_views as ( +with all_web_page_views as ( select * from {{ ref('snowplow_page_views') }} ), +relevant_sessions as ( + + select distinct session_id + from all_web_page_views + + {% if is_incremental() %} + where page_view_start > (select max(session_start) from {{ this }}) + {% endif %} + +), + +-- only select sessions that had page views in this time frame +-- this strategy helps us grab _all_ of the page views for these +-- sessions, including the ones that occurred before the +-- max(session_start) in the table +web_page_views as ( + + select all_web_page_views.* + from all_web_page_views + join relevant_sessions using (session_id) + +), + prep AS ( select @@ -49,7 +71,6 @@ prep AS ( from web_page_views group by 1 - order by 1 ), diff --git a/models/identification/snowplow_id_map.sql b/models/identification/snowplow_id_map.sql index 5c1dac5..de10f86 100644 --- a/models/identification/snowplow_id_map.sql +++ b/models/identification/snowplow_id_map.sql @@ -4,7 +4,6 @@ materialized='incremental', sort='domain_userid', dist='domain_userid', - sql_where='TRUE', unique_key='domain_userid' ) }}