Skip to content

Commit

Permalink
Merge remote-tracking branch 'private/release/0.10.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
DVAlexHiggs committed Aug 14, 2023
2 parents 82c61bf + f5b8e51 commit 838522a
Show file tree
Hide file tree
Showing 17 changed files with 387 additions and 150 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -198,4 +198,4 @@
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
limitations under the License.
2 changes: 1 addition & 1 deletion NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
limitations under the License.
2 changes: 1 addition & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: automate_dv
version: 0.9.7
version: 0.10.0
require-dbt-version: [">=1.0.0", "<2.0.0"]
config-version: 2

Expand Down
89 changes: 89 additions & 0 deletions macros/materialisations/error_messages.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (c) Business Thinking Ltd. 2019-2023
* This software includes code developed by the AutomateDV (f.k.a dbtvault) Team at Business Thinking Ltd. Trading as Datavault
*/


{% macro wrap_warning(warning_message) %}

{%- set new_message = [] -%}
{%- set length_list = [] -%}

{%- for ln in warning_message.split('\n') -%}
{%- do new_message.append((ln | trim)) -%}
{%- do length_list.append((ln | length)) -%}
{%- endfor -%}

{%- set max_line_length = length_list | max -%}
{%- set padding_length = (max_line_length - 7) // 2 -%}

{%- set border = modules.itertools.repeat('=', padding_length) | join ('') ~ 'WARNING' ~ modules.itertools.repeat('=', padding_length) | join ('') -%}

{%- set wrapped_message = '\n' ~ border ~ '\n' ~ new_message | join('\n') ~ '\n' ~ border -%}

{%- do return(wrapped_message) -%}

{% endmacro %}


{%- macro datepart_too_small_error(period) -%}

{%- set message -%}
This datepart ({{ period }}) is too small and not recommended, consider using a different datepart value (e.g. day) or rank column.
'vault_insert_by_x' materialisations are intended for experimental or testing purposes only. They are not intended for use in production.

Please see: https://automate-dv.readthedocs.io/en/latest/materialisations/
{%- endset -%}

{%- if execute -%}
{{- exceptions.raise_compiler_error(message) -}}
{%- endif -%}

{%- endmacro -%}


{%- macro datepart_not_recommended_warning(period) -%}

{%- set message -%}
This datepart ({{ period }}) is too small and not recommended, consider using a different datepart value (e.g. day) or rank column.
'vault_insert_by_x' materialisations are intended for experimental or testing purposes only. They are not intended for use in production.

Please see: https://automate-dv.readthedocs.io/en/latest/materialisations/
{%- endset -%}

{%- if execute -%}
{{- exceptions.warn(automate_dv.wrap_warning(message)) -}}
{%- endif -%}

{%- endmacro -%}


{%- macro max_iterations_error() -%}

{%- set message -%}
Max iterations is 100,000. Consider using a different datepart value (e.g. day), rank column or loading data for a shorter time period.
'vault_insert_by_x' materialisations are intended for experimental or testing purposes only. They are not intended for use in production.

Please see: https://automate-dv.readthedocs.io/en/latest/materialisations/
{%- endset %}

{%- if execute -%}
{{- exceptions.raise_compiler_error(message) -}}
{%- endif -%}

{%- endmacro -%}


{%- macro experimental_not_recommended_warning(func_name) -%}

{%- set message -%}
This functionality ({{ func_name }}) is intended for experimental or testing purposes only.
Its behavior, reliability, and performance have not been thoroughly vetted for production environments.
Using this functionality in a live production setting may result in unpredictable outcomes, data loss, or system instability.
{%- endset -%}

{%- if execute -%}
{{- exceptions.warn(automate_dv.wrap_warning(message)) -}}
{%- endif -%}

{%- endmacro -%}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
period=period) %}

{%- if num_periods > 100000 -%}
{{ automate_dv.sqlserver_max_iterations_error() }}
{{ automate_dv.max_iterations_error() }}
{%- endif -%}

{% do return(num_periods) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@

{% macro sqlserver__get_period_boundaries(target_relation, timestamp_field, start_date, stop_date, period) -%}
{%- if period is in ['microsecond', 'millisecond', 'second'] -%}
{{ automate_dv.sqlserver_datepart_too_small_error(period=period) }}
{{ automate_dv.datepart_too_small_error(period=period) }}
{%- endif -%}

{# MSSQL cannot CAST datetime2 strings with more than 7 decimal places #}
Expand Down
5 changes: 3 additions & 2 deletions macros/materialisations/shared_helpers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
{%- if model_sql.find(placeholder) == -1 -%}
{%- set error_message -%}
Model '{{ model.unique_id }}' does not include the required string '{{ placeholder }}' in its sql
{%- endset -%}
{{- exceptions.raise_compiler_error(error_message) -}}
{%- endset -%}

{{- exceptions.raise_compiler_error(error_message) -}}
{%- endif -%}

{%- endmacro -%}
34 changes: 14 additions & 20 deletions macros/materialisations/vault_insert_by_period_materialization.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,23 @@

{%- set full_refresh_mode = (should_full_refresh()) -%}

{%- set period = config.get('period', default='day') -%}

{#- Raise the errors/warnings in this order so that we do not get both -#}
{%- if period == 'microsecond' -%}
{{ automate_dv.datepart_too_small_error(period=period) }}
{%- elif period is in ['millisecond', 'second', 'minute', 'hour'] -%}
{{ automate_dv.datepart_not_recommended_warning(period=period) }}
{%- endif -%}

{{ automate_dv.experimental_not_recommended_warning(func_name='vault_insert_by_period') }}

{% if target.type == "sqlserver" %}
{%- set target_relation = this.incorporate(type='table') -%}
{% else %}
{%- set target_relation = this -%}
{% endif %}

{%- set existing_relation = load_relation(this) -%}
{%- set tmp_relation = make_temp_relation(target_relation) -%}

Expand All @@ -20,25 +32,6 @@

{%- set start_stop_dates = automate_dv.get_start_stop_dates(timestamp_field, date_source_models) | as_native -%}

{%- set period = config.get('period', default='day') -%}
{%- if period == 'microsecond' -%}
{%- set error_message -%}
'This datepart ({{ period }}) is too small and cannot be used for this purpose, consider using a different datepart value (e.g. day).
Vault_insert_by materialisations are not intended for this purpose,
please see https://automate-dv.readthedocs.io/en/latest/materialisations/'
{%- endset -%}

{{- exceptions.raise_compiler_error(error_message) -}}
{%- elif period is in ['millisecond', 'second', 'minute', 'hour'] -%}
{%- set warn_message -%}
'WARNING: The use of this datepart ({{ period }}) is not recommended, consider using a different datepart value (e.g. day).
Vault_insert_by materialisations are not intended for this purpose,
please see https://automate-dv.readthedocs.io/en/latest/materialisations/'
{%- endset -%}

{{- exceptions.warn(warn_message) -}}
{%- endif -%}

{%- set to_drop = [] -%}

{%- do automate_dv.check_placeholder(sql) -%}
Expand Down Expand Up @@ -76,9 +69,10 @@
start_timestamp=start_stop_dates.start_date,
stop_timestamp=start_stop_dates.stop_date,
offset=0, period=period) %}
{% if target.type == "postgres" %}
{% if target.type in ['postgres', 'sqlserver'] %}
{{ automate_dv.drop_temporary_special(target_relation) }}
{% endif %}

{% set build_sql = create_table_as(False, target_relation, filtered_sql) %}
{% else %}
{% set period_boundaries = automate_dv.get_period_boundaries(target_relation,
Expand Down
155 changes: 76 additions & 79 deletions macros/materialisations/vault_insert_by_rank_materialization.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
{% else %}
{%- set target_relation = this -%}
{% endif %}

{%- set existing_relation = load_relation(this) -%}
{%- set tmp_relation = make_temp_relation(target_relation) -%}

Expand All @@ -20,6 +21,14 @@

{%- set min_max_ranks = automate_dv.get_min_max_ranks(rank_column, rank_source_models) | as_native -%}

{#- Raise the errors/warnings in this order so that we do not get both -#}
{% if min_max_ranks.max_rank | int > 100000 %}
{{ automate_dv.max_iterations_error(func_name='vault_insert_by_rank') }}
{% endif %}

{{ automate_dv.experimental_not_recommended_warning(func_name='vault_insert_by_rank') }}


{%- set to_drop = [] -%}

{%- do automate_dv.check_placeholder(sql, "__RANK_FILTER__") -%}
Expand All @@ -46,92 +55,80 @@

{% elif full_refresh_mode %}
{% set filtered_sql = automate_dv.replace_placeholder_with_rank_filter(sql, rank_column, 1) %}
{% if target.type == "postgres" %}
{% if target.type in ['postgres', 'sqlserver'] %}
{{ automate_dv.drop_temporary_special(target_relation) }}
{% endif %}
{% set build_sql = create_table_as(False, target_relation, filtered_sql) %}
{% else %}

{% if min_max_ranks.max_rank | int > 100000 %}
{%- set error_message -%}
'Max iterations is 100,000. Consider using a different rank column
or loading a smaller amount of data.
vault_insert_by materialisations are not intended for this purpose,
please see https://automate-dv.readthedocs.io/en/latest/materialisations/'
{%- endset -%}

{{- exceptions.raise_compiler_error(error_message) -}}
{% else %}
{% set target_columns = adapter.get_columns_in_relation(target_relation) %}
{%- set target_cols_csv = target_columns | map(attribute='quoted') | join(', ') -%}
{%- set loop_vars = {'sum_rows_inserted': 0} -%}

{% for i in range(min_max_ranks.max_rank | int ) -%}

{%- set iteration_number = i + 1 -%}

{%- set filtered_sql = automate_dv.replace_placeholder_with_rank_filter(sql, rank_column, iteration_number) -%}

{{ dbt_utils.log_info("Running for {} {} of {} on column '{}' [{}]".format('rank', iteration_number, min_max_ranks.max_rank, rank_column, model.unique_id)) }}

{% set tmp_relation = make_temp_relation(target_relation) %}

{# This call statement drops and then creates a temporary table #}
{# but MSSQL will fail to drop any temporary table created by a previous loop iteration #}
{# See MSSQL note and drop code below #}
{% call statement() -%}
{{ create_table_as(True, tmp_relation, filtered_sql) }}
{%- endcall %}

{{ adapter.expand_target_column_types(from_relation=tmp_relation,
to_relation=target_relation) }}

{%- set insert_query_name = 'main-' ~ i -%}
{% call statement(insert_query_name, fetch_result=True) -%}
INSERT INTO {{ target_relation }} ({{ target_cols_csv }})
(
SELECT {{ target_cols_csv }}
FROM {{ tmp_relation.include(schema=True) }}
);
{%- endcall %}

{% set result = load_result(insert_query_name) %}
{% if 'response' in result.keys() %} {# added in v0.19.0 #}
{# Investigate for Databricks #}
{%- if result['response']['rows_affected'] == None %}
{% set rows_inserted = 0 %}
{%- else %}
{% set rows_inserted = result['response']['rows_affected'] %}
{%- endif %}

{% else %} {# older versions #}
{% set rows_inserted = result['status'].split(" ")[2] | int %}
{% endif %}

{%- set sum_rows_inserted = loop_vars['sum_rows_inserted'] + rows_inserted -%}
{%- do loop_vars.update({'sum_rows_inserted': sum_rows_inserted}) %}

{{ dbt_utils.log_info("Ran for {} {} of {}; {} records inserted [{}]".format('rank', iteration_number,
min_max_ranks.max_rank,
rows_inserted,
model.unique_id)) }}

{# In databricks and sqlserver a temporary view/table can only be dropped by #}
{# the connection or session that created it so drop it now before the commit below closes this session #} model.unique_id)) }}
{% if target.type in ['databricks', 'sqlserver'] %}
{{ automate_dv.drop_temporary_special(tmp_relation) }}
{% else %}
{% do to_drop.append(tmp_relation) %}
{% endif %}

{% do adapter.commit() %}

{% endfor %}
{% call noop_statement('main', "INSERT {}".format(loop_vars['sum_rows_inserted']) ) -%}
{{ filtered_sql }}
{% set target_columns = adapter.get_columns_in_relation(target_relation) %}
{%- set target_cols_csv = target_columns | map(attribute='quoted') | join(', ') -%}
{%- set loop_vars = {'sum_rows_inserted': 0} -%}

{% for i in range(min_max_ranks.max_rank | int ) -%}

{%- set iteration_number = i + 1 -%}

{%- set filtered_sql = automate_dv.replace_placeholder_with_rank_filter(sql, rank_column, iteration_number) -%}

{{ dbt_utils.log_info("Running for {} {} of {} on column '{}' [{}]".format('rank', iteration_number, min_max_ranks.max_rank, rank_column, model.unique_id)) }}

{% set tmp_relation = make_temp_relation(target_relation) %}

{# This call statement drops and then creates a temporary table #}
{# but MSSQL will fail to drop any temporary table created by a previous loop iteration #}
{# See MSSQL note and drop code below #}
{% call statement() -%}
{{ create_table_as(True, tmp_relation, filtered_sql) }}
{%- endcall %}
{% endif %}

{{ adapter.expand_target_column_types(from_relation=tmp_relation,
to_relation=target_relation) }}

{%- set insert_query_name = 'main-' ~ i -%}
{% call statement(insert_query_name, fetch_result=True) -%}
INSERT INTO {{ target_relation }} ({{ target_cols_csv }})
(
SELECT {{ target_cols_csv }}
FROM {{ tmp_relation.include(schema=True) }}
);
{%- endcall %}

{% set result = load_result(insert_query_name) %}
{% if 'response' in result.keys() %} {# added in v0.19.0 #}
{# Investigate for Databricks #}
{%- if result['response']['rows_affected'] == None %}
{% set rows_inserted = 0 %}
{%- else %}
{% set rows_inserted = result['response']['rows_affected'] %}
{%- endif %}

{% else %} {# older versions #}
{% set rows_inserted = result['status'].split(" ")[2] | int %}
{% endif %}

{%- set sum_rows_inserted = loop_vars['sum_rows_inserted'] + rows_inserted -%}
{%- do loop_vars.update({'sum_rows_inserted': sum_rows_inserted}) %}

{{ dbt_utils.log_info("Ran for {} {} of {}; {} records inserted [{}]".format('rank', iteration_number,
min_max_ranks.max_rank,
rows_inserted,
model.unique_id)) }}

{# In databricks and sqlserver a temporary view/table can only be dropped by #}
{# the connection or session that created it so drop it now before the commit below closes this session #} model.unique_id)) }}
{% if target.type in ['databricks', 'sqlserver'] %}
{{ automate_dv.drop_temporary_special(tmp_relation) }}
{% else %}
{% do to_drop.append(tmp_relation) %}
{% endif %}

{% do adapter.commit() %}

{% endfor %}
{% call noop_statement('main', "INSERT {}".format(loop_vars['sum_rows_inserted']) ) -%}
{{ filtered_sql }}
{%- endcall %}
{% endif %}

{% if build_sql is defined %}
Expand Down

0 comments on commit 838522a

Please sign in to comment.