Skip to content

Commit

Permalink
Merge pull request #27 from Datavault-UK/feat/fix-178
Browse files Browse the repository at this point in the history
Fix for hourly loads in satellites (#178)
  • Loading branch information
DVAlexHiggs committed Jun 1, 2023
2 parents 6209bf2 + 64a29ef commit bbe9118
Show file tree
Hide file tree
Showing 88 changed files with 3,911 additions and 1,488 deletions.
6 changes: 3 additions & 3 deletions .pipeline/azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ extends:
- groupName: 'sats'
displayName: 'Satellites'
disabledForPlatform: []
subGroups: [ 'main', 'cycles', 'incremental', 'pm_core', 'pm_range', 'pm_datepart', 'rank' ]
subGroups: [ 'main', 'comp_pk', 'cycles', 'incremental', 'pm_core', 'pm_range', 'pm_datepart', 'pm_small_datepart', 'rank' ]
- groupName: 'eff_sats'
displayName: 'Effectivity Satellites'
disabledForPlatform: ['databricks', 'postgres']
subGroups: [ 'main', 'auto', 'disabled', 'mat' ]
subGroups: [ 'main', 'comp_pk', 'auto', 'disabled', 'mat' ]
- groupName: 'ma_sats'
displayName: 'Multi-Active Satellites'
disabledForPlatform: ['databricks', 'postgres']
subGroups: [ '1cdk', '1cdk_cycles', '2cdk', '2cdk_cycles', 'incremental', 'pm', 'rm', 'rm_dup' ]
subGroups: [ 'comp_pk', '1cdk', '1cdk_cycles', '2cdk', '2cdk_cycles', 'incremental', 'pm', 'rm', 'rm_dup' ]
- groupName: 'xts'
displayName: 'Extended Tracking Satellites'
disabledForPlatform: ['databricks', 'postgres']
Expand Down
2 changes: 1 addition & 1 deletion .run/Harness Tests.run.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Harness Tests" type="tests" factoryName="py.test">
<module name="automate_dv_dev" />
<module name="automate-dv-dev" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<option name="SDK_HOME" value="" />
Expand Down
2 changes: 1 addition & 1 deletion .run/Macro Tests.run.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Macro Tests" type="tests" factoryName="py.test">
<module name="automate_dv_dev" />
<module name="automate-dv-dev" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<option name="SDK_HOME" value="" />
Expand Down
4 changes: 2 additions & 2 deletions automate_dv_dev/macros/internal/helpers/dateadd.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
{% macro sqlserver__dateadd(datepart, interval, from_date_or_timestamp) %}

dateadd(
millisecond,
86399999,
{{ datepart }},
{{ interval }},
CAST({{ from_date_or_timestamp }} AS DATETIME2)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
* This software includes code developed by the AutomateDV (f.k.a dbtvault) Team at Business Thinking Ltd. Trading as Datavault
*/

AutomateDV (f.k.a automate_dv)

{%- macro extract_column_names(columns_dict=none) -%}

{%- set extracted_column_names = [] -%}
Expand Down
47 changes: 47 additions & 0 deletions automate_dv_dev/macros/internal/helpers/timestamp_add.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) Business Thinking Ltd. 2019-2023
* This software includes code developed by the AutomateDV (f.k.a dbtvault) Team at Business Thinking Ltd. Trading as Datavault
*/

{% macro timestamp_add(datepart, interval, from_date_or_timestamp) %}
{{ return(adapter.dispatch('timestamp_add', 'automate_dv')(datepart=datepart, interval=interval,
from_date_or_timestamp=from_date_or_timestamp)) }}
{%- endmacro -%}

{%- macro default__timestamp_add(datepart, interval, from_date_or_timestamp) -%}

{%- if datepart is in ['day', 'week', 'month', 'quarter', 'year'] -%}
{{ automate_dv.dateadd('millisecond', 86399999, from_date_or_timestamp) }}
{%- elif datepart == 'microsecond' -%}
{{ automate_dv.dateadd('microsecond', 1, from_date_or_timestamp) }}
{%- elif datepart == 'millisecond' -%}
{{ automate_dv.dateadd('microsecond', 999, from_date_or_timestamp) }}
{%- elif datepart == 'second' -%}
{{ automate_dv.dateadd('millisecond', 999, from_date_or_timestamp) }}
{%- elif datepart == 'minute' -%}
{{ automate_dv.dateadd('millisecond', 5999, from_date_or_timestamp) }}
{%- elif datepart == 'hour' -%}
{{ automate_dv.dateadd('millisecond', 3599999, from_date_or_timestamp) }}
{%- endif -%}

{%- endmacro -%}

{% macro bigquery__timestamp_add(datepart, interval, from_date_or_timestamp) %}

{%- if datepart is in ['day', 'week', 'month', 'quarter', 'year'] -%}
{{ automate_dv.dateadd('millisecond', 86399999, from_date_or_timestamp) }}
{%- elif datepart == 'microsecond' -%}
TIMESTAMP_ADD(CAST( {{from_date_or_timestamp}} AS TIMESTAMP), INTERVAL 1 microsecond)
{%- elif datepart == 'millisecond' -%}
TIMESTAMP_ADD(CAST( {{from_date_or_timestamp}} AS TIMESTAMP), INTERVAL 999 microsecond)
{%- elif datepart == 'second' -%}
TIMESTAMP_ADD(CAST( {{from_date_or_timestamp}} AS TIMESTAMP), INTERVAL 999 millisecond)
{%- elif datepart == 'minute' -%}
TIMESTAMP_ADD(CAST( {{from_date_or_timestamp}} AS TIMESTAMP), INTERVAL 5999 millisecond)
{%- elif datepart == 'hour' -%}
TIMESTAMP_ADD(CAST( {{from_date_or_timestamp}} AS TIMESTAMP), INTERVAL 3599999 millisecond)
{%- endif -%}

{% endmacro %}


2 changes: 2 additions & 0 deletions automate_dv_dev/macros/materialisations/drop_temporary.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
DROP VIEW {{ tmp_relation }};
{% elif target.type == 'sqlserver' %}
DROP TABLE {{ tmp_relation }};
{% elif target.type == 'postgres' %}
DROP TABLE {{ tmp_relation }};
{% endif %}
{%- endcall %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@

{%- macro check_num_periods(start_date, stop_date, period) -%}

{% set num_periods = adapter.dispatch('check_num_periods', 'automate_dv')(
start_date=start_date,
stop_date=stop_date,
period=period) %}
{% set num_periods = adapter.dispatch('check_num_periods',
'automate_dv')(start_date=start_date,
stop_date=stop_date,
period=period) %}

{%- if num_periods > 100000 -%}
{%- set error_message -%}
'Max iterations is 100,000. Consider using a different datepart value (e.g. day)
or loading data for a shorter time period.
vault_insert_by materialisations are not intended for this purpose,
please see https://automate_dv.readthedocs.io/en/latest/materialisations/'
please see https://automate-dv.readthedocs.io/en/latest/materialisations/'
{%- endset -%}

{{- exceptions.raise_compiler_error(error_message) -}}
Expand All @@ -31,7 +31,7 @@
SELECT {{ datediff('start_timestamp', 'stop_timestamp', period) }} AS NUM_PERIODS
FROM
(SELECT CAST('{{ start_date }}' AS {{ dbt.type_timestamp() }}) AS start_timestamp,
CAST(NULLIF('{{ stop_date | lower }}', 'none') AS {{ dbt.type_timestamp() }}) AS stop_timestamp)
CAST(NULLIF('{{ stop_date | lower }}', 'none') AS {{ dbt.type_timestamp() }}) AS stop_timestamp) AS SUBQUERY_ALIAS
{% endset %}
{% set num_periods_dict = automate_dv.get_query_results_as_dict(num_periods_check_sql) %}
{% set num_periods = num_periods_dict['NUM_PERIODS'][0] | int %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,26 @@

{% set macro = adapter.dispatch('get_period_boundaries',
'automate_dv')(target_relation=target_relation,
timestamp_field=timestamp_field,
start_date=start_date,
stop_date=stop_date,
period=period) %}
timestamp_field=timestamp_field,
start_date=start_date,
stop_date=stop_date,
period=period) %}

{% do return(macro) %}
{%- endmacro %}



{% macro default__get_period_boundaries(target_relation, timestamp_field, start_date, stop_date, period) -%}
{%- set from_date_or_timestamp = "NULLIF('{}','none')::TIMESTAMP".format(stop_date | lower) -%}

{%- set datepart = period -%}
{% set period_boundary_sql -%}
WITH period_data AS (
SELECT
SELECT
COALESCE(MAX({{ timestamp_field }}), '{{ start_date }}')::TIMESTAMP AS start_timestamp,
COALESCE({{ automate_dv.dateadd('millisecond', 86399999, from_date_or_timestamp) }},
{{ current_timestamp() }} ) AS stop_timestamp
COALESCE({{ automate_dv.timestamp_add(datepart, interval, from_date_or_timestamp) }},
{{ current_timestamp() }} )::TIMESTAMP AS stop_timestamp
FROM {{ target_relation }}
)
)
SELECT
start_timestamp,
stop_timestamp,
Expand All @@ -47,18 +46,22 @@
{%- endmacro %}




{% macro bigquery__get_period_boundaries(target_relation, timestamp_field, start_date, stop_date, period) -%}

{%- set from_date_or_timestamp = "NULLIF('{}','none')".format(stop_date | lower) -%}
{%- set datepart = period -%}

{% set period_boundary_sql -%}
with data as (
select
COALESCE(CAST(MAX({{ timestamp_field }}) AS DATETIME), CAST('{{ start_date }}' AS DATETIME)) as START_TIMESTAMP,
COALESCE({{ automate_dv.dateadd('millisecond', 86399999, from_date_or_timestamp) }},
CAST({{ current_timestamp() }} AS DATETIME)) as STOP_TIMESTAMP
COALESCE(
CAST(MAX({{ timestamp_field }}) AS TIMESTAMP),
CAST('{{ start_date }}' AS TIMESTAMP))
as START_TIMESTAMP,
COALESCE(
CAST({{ automate_dv.timestamp_add(datepart, interval, from_date_or_timestamp) }} AS TIMESTAMP),
CAST({{ current_timestamp() }} AS TIMESTAMP))
as STOP_TIMESTAMP
from {{ target_relation }}
)
select
Expand All @@ -79,23 +82,30 @@
{%- endmacro %}




{% macro sqlserver__get_period_boundaries(target_relation, timestamp_field, start_date, stop_date, period) -%}

{%- if period is in ['microsecond', 'millisecond', 'second'] -%}
{%- set error_message -%}
'This datepart ({{ period }}) is too small and cannot be used for this purpose in MS SQL Server, consider using a different datepart value (e.g. day).
Vault_insert_by materialisations are not intended for this purpose,
please see https://automate_dv.readthedocs.io/en/latest/materialisations/'
{%- endset -%}

{{- exceptions.raise_compiler_error(error_message) -}}
{%- endif -%}
{# MSSQL cannot CAST datetime2 strings with more than 7 decimal places #}
{% set start_date = start_date[0:27] %}
{% set stop_date = stop_date[0:27] %}
{%- set datepart = period -%}
{%- set from_date_or_timestamp = "CAST(NULLIF('{}','none') AS DATETIME2)".format(stop_date | lower) %}

{% set period_boundary_sql -%}
WITH period_data AS (
SELECT
SELECT
CAST(COALESCE(MAX({{ timestamp_field }}), CAST('{{ start_date }}' AS DATETIME2)) AS DATETIME2) AS start_timestamp,
CAST(COALESCE({{ automate_dv.dateadd('millisecond', 86399999, from_date_or_timestamp) }},
CAST(COALESCE({{ automate_dv.timestamp_add(datepart, interval, from_date_or_timestamp) }},
{{ current_timestamp() }} ) AS DATETIME2) AS stop_timestamp
FROM {{ target_relation }}
)
)
SELECT
start_timestamp,
stop_timestamp,
Expand All @@ -116,13 +126,14 @@
{% macro databricks__get_period_boundaries(target_relation, timestamp_field, start_date, stop_date, period) -%}

{%- set from_date_or_timestamp = "NULLIF('{}','none')::TIMESTAMP".format(stop_date | lower) -%}

{%- set datepart = period -%}
{% set period_boundary_sql -%}

WITH period_data AS (
SELECT
COALESCE(MAX({{ timestamp_field }}), CAST('{{ start_date }}' AS TIMESTAMP)) AS start_timestamp,
COALESCE({{ automate_dv.dateadd('millisecond', 86399999, from_date_or_timestamp) }},
COALESCE(
{{ automate_dv.timestamp_add(datepart, interval, from_date_or_timestamp) }},
{{ current_timestamp() }}) AS stop_timestamp
FROM {{ target_relation }}
)
Expand All @@ -144,15 +155,14 @@
{%- endmacro %}



{% macro postgres__get_period_boundaries(target_relation, timestamp_field, start_date, stop_date, period) -%}

{% set period_boundary_sql -%}
WITH period_data AS (
SELECT
COALESCE(MAX({{ timestamp_field }}), '{{ start_date }}')::TIMESTAMP AS start_timestamp,
COALESCE({{ automate_dv.dateadd('millisecond', 86399999, "NULLIF('" ~ stop_date | lower ~ "','none')::TIMESTAMP") }},
{{ current_timestamp() }} ) AS stop_timestamp
COALESCE({{ automate_dv.timestamp_add('millisecond', 86399999, "NULLIF('" ~ stop_date | lower ~ "','none')::TIMESTAMP") }},
{{ current_timestamp() }} )::TIMESTAMP AS stop_timestamp
FROM {{ target_relation }}
)
SELECT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,26 @@

{% set macro = adapter.dispatch('get_period_filter_sql',
'automate_dv')(target_cols_csv=target_cols_csv,
base_sql=base_sql,
timestamp_field=timestamp_field,
period=period,
start_timestamp=start_timestamp,
stop_timestamp=stop_timestamp,
offset=offset) %}
base_sql=base_sql,
timestamp_field=timestamp_field,
period=period,
start_timestamp=start_timestamp,
stop_timestamp=stop_timestamp,
offset=offset) %}
{% do return(macro) %}
{%- endmacro %}




{% macro default__get_period_filter_sql(target_cols_csv, base_sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%}

{%- set filtered_sql = {'sql': base_sql} -%}

{%- do filtered_sql.update({'sql': automate_dv.replace_placeholder_with_period_filter(filtered_sql.sql,
timestamp_field,
start_timestamp,
stop_timestamp,
offset, period)}) -%}
{%- do filtered_sql.update({'sql': automate_dv.replace_placeholder_with_period_filter(core_sql=filtered_sql.sql,
timestamp_field=timestamp_field,
start_timestamp=start_timestamp,
stop_timestamp=stop_timestamp,
offset=offset, period=period)}) -%}
select {{ target_cols_csv }} from ({{ filtered_sql.sql }})
{%- endmacro %}

Expand All @@ -38,11 +37,11 @@

{%- set filtered_sql = {'sql': base_sql} -%}

{%- do filtered_sql.update({'sql': automate_dv.replace_placeholder_with_period_filter(filtered_sql.sql,
timestamp_field,
start_timestamp,
stop_timestamp,
offset, period)}) -%}
{%- do filtered_sql.update({'sql': automate_dv.replace_placeholder_with_period_filter(core_sql=filtered_sql.sql,
timestamp_field=timestamp_field,
start_timestamp=start_timestamp,
stop_timestamp=stop_timestamp,
offset=offset, period=period)}) -%}
{# MSSQL does not allow CTEs in a subquery #}
{{ filtered_sql.sql }}
{%- endmacro %}
Expand All @@ -53,10 +52,10 @@

{%- set filtered_sql = {'sql': base_sql} -%}

{%- do filtered_sql.update({'sql': automate_dv.replace_placeholder_with_period_filter(filtered_sql.sql,
timestamp_field,
start_timestamp,
stop_timestamp,
offset, period)}) -%}
{%- do filtered_sql.update({'sql': automate_dv.replace_placeholder_with_period_filter(core_sql=filtered_sql.sql,
timestamp_field=timestamp_field,
start_timestamp=start_timestamp,
stop_timestamp=stop_timestamp,
offset=offset, period=period)}) -%}
select {{ target_cols_csv }} from ({{ filtered_sql.sql }})
{%- endmacro %}
Loading

0 comments on commit bbe9118

Please sign in to comment.