Skip to content

Commit

Permalink
avoid creating twice the temp table in dynamic insert overwrite (#845)
Browse files Browse the repository at this point in the history
* avoid creating twice the temp table in dynamic insert overwrite

* add tests for copy_partitions & time ingestion on on_schema_change

---------

Co-authored-by: colin-rogers-dbt <111200756+colin-rogers-dbt@users.noreply.github.com>
  • Loading branch information
Kayrnt and colin-rogers-dbt committed Aug 2, 2023
1 parent 6e167f4 commit 6a2a90c
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 27 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230727-213526.yaml
@@ -0,0 +1,6 @@
kind: Fixes
body: Avoid creating twice the temp table in dynamic insert overwrite for sql on_schema_change != 'ignore' or python models
time: 2023-07-27T21:35:26.531164+02:00
custom:
Author: Kayrnt
Issue: "810"
Expand Up @@ -19,7 +19,9 @@
{% macro bq_copy_partitions(tmp_relation, target_relation, partitions, partition_by) %}

{% for partition in partitions %}
{% if partition_by.granularity == 'hour' %}
{% if partition_by.data_type == 'int64' %}
{% set partition = partition | as_text %}
{% elif partition_by.granularity == 'hour' %}
{% set partition = partition.strftime("%Y%m%d%H") %}
{% elif partition_by.granularity == 'day' %}
{% set partition = partition.strftime("%Y%m%d") %}
Expand Down Expand Up @@ -97,11 +99,13 @@
{% macro bq_dynamic_copy_partitions_insert_overwrite_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions
) %}
{# We run temp table creation in a separated script to move to partitions copy #}
{%- call statement('create_tmp_relation_for_copy', language='sql') -%}
{{ bq_create_table_as(partition_by, True, tmp_relation, sql, 'sql')
}}
{%- endcall %}
{%- if tmp_relation_exists is false -%}
{# We run temp table creation in a separated script to move to partitions copy if it doesn't already exist #}
{%- call statement('create_tmp_relation_for_copy', language='sql') -%}
{{ bq_create_table_as(partition_by, True, tmp_relation, sql, 'sql')
}}
{%- endcall %}
{%- endif -%}
{%- set partitions_sql -%}
select distinct {{ partition_by.render_wrapped() }}
from {{ tmp_relation }}
Expand Down
Expand Up @@ -58,11 +58,12 @@ class TestIncrementalOnSchemaChange(BaseIncrementalOnSchemaChange):
{% endif %}
"""

_MODELS__INCREMENTAL_TIME_INGESTION_PARTITIONING = """
_MODELS__INCREMENTAL_SYNC_ALL_COLUMNS_TIME_INGESTION_PARTITIONING = """
{{
config(
materialized="incremental",
on_schema_change='sync_all_columns',
incremental_strategy='insert_overwrite',
partition_by={
"field": "date_hour",
Expand All @@ -78,19 +79,43 @@ class TestIncrementalOnSchemaChange(BaseIncrementalOnSchemaChange):
{% if not is_incremental() %}
select 1 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all
select 2 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all
select 3 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all
select 4 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour
select 1 as id,
cast('2020-01-01 01:00:00' as datetime) as date_hour,
1 as field_1,
2 as field_2 union all
select 2 as id,
cast('2020-01-01 01:00:00' as datetime) as date_hour,
1 as field_1,
2 as field_2 union all
select 3 as id,
cast('2020-01-01 01:00:00' as datetime) as date_hour,
1 as field_1,
2 as field_2 union all
select 4 as id,
cast('2020-01-01 01:00:00' as datetime) as date_hour,
1 as field_1,
2 as field_2
{% else %}
-- we want to overwrite the 4 records in the 2020-01-01 01:00:00 partition
-- with the 2 records below, but add two more in the 2020-01-00 02:00:00 partition
select 10 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all
select 20 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all
select 30 as id, cast('2020-01-01 02:00:00' as datetime) as date_hour union all
select 40 as id, cast('2020-01-01 02:00:00' as datetime) as date_hour
select 10 as id,
cast('2020-01-01 01:00:00' as datetime) as date_hour,
3 as field_3,
2 as field_2 union all
select 20 as id,
cast('2020-01-01 01:00:00' as datetime) as date_hour,
3 as field_3,
2 as field_2 union all
select 30 as id,
cast('2020-01-01 02:00:00' as datetime) as date_hour,
3 as field_3,
2 as field_2 union all
select 40 as id,
cast('2020-01-01 02:00:00' as datetime) as date_hour,
3 as field_3,
2 as field_2
{% endif %}
Expand All @@ -99,10 +124,11 @@ class TestIncrementalOnSchemaChange(BaseIncrementalOnSchemaChange):
select * from data
"""

_MODELS__INCREMENTAL_TIME_INGESTION_PARTITIONING_TARGET = """
_MODELS__INCREMENTAL_SYNC_ALL_COLUMNS_TIME_INGESTION_PARTITIONING_TARGET = """
{{
config(
materialized="incremental",
on_schema_change='sync_all_columns',
partition_by={
"field": "date_hour",
"data_type": "datetime",
Expand All @@ -114,13 +140,71 @@ class TestIncrementalOnSchemaChange(BaseIncrementalOnSchemaChange):
{% if not is_incremental() %}
select 10 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all
select 30 as id, cast('2020-01-01 02:00:00' as datetime) as date_hour
select 10 as id,
cast('2020-01-01 01:00:00' as datetime) as date_hour,
3 as field_3,
2 as field_2
union all
select 30 as id,
cast('2020-01-01 02:00:00' as datetime) as date_hour,
3 as field_3,
2 as field_2
{% else %}
select 20 as id, cast('2020-01-01 01:00:00' as datetime) as date_hour union all
select 40 as id, cast('2020-01-01 02:00:00' as datetime) as date_hour
select 20 as id,
cast('2020-01-01 01:00:00' as datetime) as date_hour,
3 as field_3,
2 as field_2
union all
select 40 as id,
cast('2020-01-01 02:00:00' as datetime) as date_hour,
3 as field_3,
2 as field_2
{% endif %}
"""

_MODELS__INCREMENTAL_SYNC_ALL_COLUMNS_DYNAMIC_INSERT_OVERWRITE_COPY_PARTITIONS = """
{{
config(
materialized='incremental',
unique_key='id',
on_schema_change='sync_all_columns',
partition_by={
"field": "id",
"data_type": "int64",
"range": {
"start": 1,
"end": 7,
"interval": 1
},
"copy_partitions": true
},
incremental_strategy='insert_overwrite'
)
}}
WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )
{% set string_type = 'string' %}
{% if is_incremental() %}
SELECT id,
cast(field1 as {{string_type}}) as field1,
cast(field3 as {{string_type}}) as field3, -- to validate new fields
cast(field4 as {{string_type}}) AS field4 -- to validate new fields
FROM source_data WHERE id > _dbt_max_partition
{% else %}
select id,
cast(field1 as {{string_type}}) as field1,
cast(field2 as {{string_type}}) as field2
from source_data where id <= 3
{% endif %}
"""
Expand All @@ -132,9 +216,10 @@ def models(self):
return {
"model_a.sql": _MODELS__A,
"incremental_sync_all_columns_dynamic_insert_overwrite.sql": _MODELS__INCREMENTAL_SYNC_ALL_COLUMNS_DYNAMIC_INSERT_OVERWRITE,
"incremental_sync_all_columns_dynamic_insert_overwrite_copy_partitions.sql": _MODELS__INCREMENTAL_SYNC_ALL_COLUMNS_DYNAMIC_INSERT_OVERWRITE_COPY_PARTITIONS,
"incremental_sync_all_columns_target.sql": _MODELS__INCREMENTAL_SYNC_ALL_COLUMNS_TARGET,
"incremental_time_ingestion_partitioning.sql": _MODELS__INCREMENTAL_TIME_INGESTION_PARTITIONING,
"incremental_time_ingestion_partitioning_target.sql": _MODELS__INCREMENTAL_TIME_INGESTION_PARTITIONING_TARGET,
"incremental_sync_all_columns_time_ingestion_partitioning.sql": _MODELS__INCREMENTAL_SYNC_ALL_COLUMNS_TIME_INGESTION_PARTITIONING,
"incremental_sync_all_columns_time_ingestion_partitioning_target.sql": _MODELS__INCREMENTAL_SYNC_ALL_COLUMNS_TIME_INGESTION_PARTITIONING_TARGET,
}

def test_run_incremental_sync_all_columns_dynamic_insert_overwrite(self, project):
Expand All @@ -143,9 +228,22 @@ def test_run_incremental_sync_all_columns_dynamic_insert_overwrite(self, project
compare_target = "incremental_sync_all_columns_target"
self.run_twice_and_assert(select, compare_source, compare_target, project)

# TODO: this test was added here, but it doesn't actually use 'on_schema_change'
def test_run_incremental_time_ingestion_partitioning(self, project):
select = "model_a incremental_time_ingestion_partitioning incremental_time_ingestion_partitioning_target"
compare_source = "incremental_time_ingestion_partitioning"
compare_target = "incremental_time_ingestion_partitioning_target"
def test_run_incremental_sync_all_columns_dynamic_insert_overwrite_copy_partitions(
self, project
):
select = (
"model_a incremental_sync_all_columns_dynamic_insert_overwrite_copy_partitions "
"incremental_sync_all_columns_target"
)
compare_source = "incremental_sync_all_columns_dynamic_insert_overwrite_copy_partitions"
compare_target = "incremental_sync_all_columns_target"
self.run_twice_and_assert(select, compare_source, compare_target, project)

def test_run_incremental_sync_all_columns_time_ingestion_partitioning(self, project):
select = (
"model_a incremental_sync_all_columns_time_ingestion_partitioning "
"incremental_sync_all_columns_time_ingestion_partitioning_target"
)
compare_source = "incremental_sync_all_columns_time_ingestion_partitioning"
compare_target = "incremental_sync_all_columns_time_ingestion_partitioning_target"
self.run_twice_and_assert(select, compare_source, compare_target, project)

0 comments on commit 6a2a90c

Please sign in to comment.