Skip to content

Commit

Permalink
Clean sql (#18)
Browse files Browse the repository at this point in the history
* deleted extra spaces in compiled models

* added target_table_alias

* fixed exclude_columns SQL #16

* v1.5.23

---------

Co-authored-by: Tanya Shemet <tanyshak@gmail.com>
  • Loading branch information
navado and tanyshak committed May 29, 2023
1 parent 4145bda commit 81a471d
Show file tree
Hide file tree
Showing 15 changed files with 99 additions and 100 deletions.
2 changes: 1 addition & 1 deletion dbt/adapters/upsolver/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.5.22"
version = "1.5.23"
10 changes: 5 additions & 5 deletions dbt/adapters/upsolver/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ def separate_options(self, config_options, source):
return job_options, source_options

def render_option_from_dict(self, option_value):
res = []
try:
res = []
for key, value in option_value.items():
item = [f'{key}=']
if isinstance(value, list):
Expand All @@ -86,16 +86,16 @@ def render_option_from_dict(self, option_value):
res.append(''.join(item))
return f"({' ,'.join(res)})"
except Exception:
raise dbt.exceptions.ParsingError(f"Error while parsing value: {value}")
raise dbt.exceptions.ParsingError(f"Error while parsing value: {value}. Expected type: dictionary")

def render_option_from_list(self, option_value):
try:
if not isinstance(option_value, str):
if isinstance(option_value, list) and len(option_value) > 1:
return tuple(i for i in option_value)
else:
return f"('{option_value}')"
return f"('{''.join(option_value)}')"
except Exception:
raise dbt.exceptions.ParsingError(f"Error while parsing value: {value}")
raise dbt.exceptions.ParsingError(f"Error while parsing value: {value}. Expected type: list of strings")

@available
def enrich_options(self, config_options, source, options_type):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
{% macro get_create_copy_job_sql(job_identifier, sql, into_relation, sync, options, source, target_type) -%}

{% set connection_identifier = adapter.get_connection_from_sql(sql) %}
{% set job_options, source_options = adapter.separate_options(options, source) %}
{%- set connection_identifier = adapter.get_connection_from_sql(sql) -%}
{%- set job_options, source_options = adapter.separate_options(options, source) -%}
{%- if target_type != 'datalake' -%}
{% set target_options = adapter.enrich_options(options, target_type, 'target_options') %}
{% set target_type = target_type %}
{%- set target_options = adapter.enrich_options(options, target_type, 'target_options') -%}
{%- set target_type = target_type -%}
{%- else -%}
{% set target_options = {} %}
{% set target_type = '' %}
{%- set target_options = {} -%}
{%- set target_type = '' -%}
{%- endif -%}

CREATE
{% if sync %}
SYNC
{% endif %}
CREATE {{''}}
{%- if sync -%}
SYNC {{''}}
{%- endif -%}
JOB {{job_identifier}}
{{ render_options(job_options, 'create') }}
{{ render_options(target_options, 'create') }}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
{% macro get_create_insert_job_sql(job_identifier, into_relation, sync, options, map_columns_by_name, target_type) -%}

{% set enriched_options = adapter.enrich_options(options, target_type, 'transformation_options') %}
{%- set enriched_options = adapter.enrich_options(options, target_type, 'transformation_options') -%}
{%- if target_type == 'datalake' -%}
{% set target_type = '' %}
{%- set target_type = '' -%}
{%- endif -%}

CREATE
{% if sync %}
SYNC
{% endif %}
JOB {{job_identifier}}
CREATE {{''}}
{%- if sync -%}
SYNC {{''}}
{%- endif -%}
JOB {{job_identifier}}
{{ render_options(enriched_options, 'create') }}
AS INSERT INTO {{target_type}} {{into_relation}}
{% if map_columns_by_name %}
{%- if map_columns_by_name %}
MAP_COLUMNS_BY_NAME
{% endif %}
{%- endif %}
{{sql}}

{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,27 @@
{% set enriched_options = adapter.enrich_options(options, target_type, 'transformation_options') %}

{%- if target_type == 'datalake' -%}
{% set target_type = '' %}
{%- set target_type = '' -%}
{%- endif -%}

CREATE
{% if sync %}
SYNC
{% endif %}
CREATE {{''}}
{%- if sync -%}
SYNC {{''}}
{%- endif -%}
JOB {{ job_identifier }}
{{ render_options(enriched_options, 'create') }}
AS MERGE INTO {{ target_type }} {{ into_relation }} AS target
USING (
{{ sql }}
)
{% if primary_key %}
USING ( {{- sql }} )
{% if primary_key -%}
source ON (
{% for item in primary_key %}
{%- for item in primary_key %}
target.{{ item['field'] }} = source.{{ item['field'] }}
{% endfor %}
{%- endfor -%}
)
{% endif %}
{%- endif -%}
{% if delete_condition %}
WHEN MATCHED AND {{ delete_condition}} THEN DELETE
{% endif %}
WHEN MATCHED AND {{ delete_condition}} THEN DELETE
{%- endif %}
WHEN MATCHED THEN REPLACE
WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME
{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@

{%- set identifier = model['alias'] -%}
{%- set model_config = model['config'] -%}
{% set incremental_strategy = adapter.get(model_config, 'incremental_strategy', False) %}
{% set sync = adapter.get(model_config, 'sync', False) %}
{% set options = adapter.get(model_config, 'options', {}) %}
{% set source = adapter.get(model_config, 'source') %}
{% set target_type = adapter.get(model_config, 'target_type', 'datalake').lower() %}
{% set target_schema = adapter.get(model_config, 'target_schema', schema) %}
{% set delete_condition = adapter.get(model_config, 'delete_condition', False) %}
{% set partition_by = adapter.get(model_config, 'partition_by', []) %}
{% set primary_key = adapter.get(model_config, 'primary_key', []) %}
{% set map_columns_by_name = adapter.get(model_config, 'map_columns_by_name', False) %}
{% set job_identifier = identifier + '_job' %}
{%- set incremental_strategy = adapter.get(model_config, 'incremental_strategy', False) -%}
{%- set sync = adapter.get(model_config, 'sync', False) -%}
{%- set options = adapter.get(model_config, 'options', {}) -%}
{%- set source = adapter.get(model_config, 'source') -%}
{%- set target_type = adapter.get(model_config, 'target_type', 'datalake').lower() -%}
{%- set target_schema = adapter.get(model_config, 'target_schema', schema) -%}
{%- set target_table_alias = adapter.get(model_config, 'target_table_alias', identifier) -%}
{%- set delete_condition = adapter.get(model_config, 'delete_condition', False) -%}
{%- set partition_by = adapter.get(model_config, 'partition_by', []) -%}
{%- set primary_key = adapter.get(model_config, 'primary_key', []) -%}
{%- set map_columns_by_name = adapter.get(model_config, 'map_columns_by_name', False) -%}
{%- set job_identifier = identifier + '_job' %}

{%- set old_relation = adapter.get_relation(identifier=job_identifier,
schema=schema,
Expand All @@ -29,7 +30,7 @@


{% if target_type == 'datalake' %}
{%- set table_relation = api.Relation.create(identifier=identifier,
{%- set table_relation = api.Relation.create(identifier=target_table_alias,
schema=schema,
database=database,
type='table') -%}
Expand All @@ -39,42 +40,42 @@
{%- endcall -%}
{%- else -%}
{% set target_connection = adapter.require(model_config, 'target_connection') %}
{%- set into_relation = target_connection + '.' + target_schema + '.' + identifier -%}
{%- set into_relation = target_connection + '.' + target_schema + '.' + target_table_alias -%}
{%- endif %}

{% if old_relation %}
{% call statement('main') -%}
{%- if old_relation -%}
{%- call statement('main') -%}
{{ get_alter_job_sql(job_identifier, options, incremental_strategy, source) }}
{%- endcall %}
{% else %}
{% call statement('main') -%}
{% if incremental_strategy == 'merge' %}
{%- else -%}
{%- call statement('main') -%}
{%- if incremental_strategy == 'merge' -%}
{{ get_create_merge_job_sql(job_identifier, into_relation, sync,
options, primary_key, delete_condition,
target_type) }}
{% elif incremental_strategy == 'insert' %}
{%- elif incremental_strategy == 'insert' -%}
{{ get_create_insert_job_sql(job_identifier,
into_relation, sync, options,
map_columns_by_name, target_type) }}

{% else %}
{%- else -%}
{{ get_create_copy_job_sql(job_identifier, sql,
into_relation, sync, options, source,
target_type) }}

{% endif %}
{%- endcall %}
{%- endif %}
{%- endif -%}
{%- endcall -%}
{%- endif -%}

{% do persist_docs(target_relation, model) %}
{% do persist_docs(table_relation, model) %}
{%- do persist_docs(target_relation, model) -%}
{%- do persist_docs(table_relation, model) -%}

{{ run_hooks(post_hooks, inside_transaction=False) }}
{{ run_hooks(post_hooks, inside_transaction=True) }}

{% if target_type == 'datalake' %}
{%- if target_type == 'datalake' -%}
{{ return({'relations': [target_relation, table_relation]}) }}
{% else %}
{%- else -%}
{{ return({'relations': [target_relation]}) }}
{%- endif %}
{% endmaterialization %}
{%- endif -%}
{%- endmaterialization -%}
12 changes: 6 additions & 6 deletions dbt/include/upsolver/macros/materializations/utils/ater_job.sql
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
{% macro get_alter_job_sql(job_identifier, options, incremental_strategy, source) -%}

{% if incremental_strategy %}
{% set enriched_options = adapter.enrich_options(options, 'datalake', 'transformation_options') %}
{% else %}
{% set enriched_options, _ = adapter.separate_options(options, source) %}
{% endif %}
{% set enriched_editable_options = adapter.filter_options(enriched_options, 'editable') %}
{%- if incremental_strategy -%}
{%- set enriched_options = adapter.enrich_options(options, 'datalake', 'transformation_options') -%}
{%- else -%}
{%- set enriched_options, _ = adapter.separate_options(options, source) -%}
{%- endif -%}
{%- set enriched_editable_options = adapter.filter_options(enriched_options, 'editable') -%}

ALTER JOB {{job_identifier}}
{{ render_options(enriched_editable_options, 'alter') }}
Expand Down
24 changes: 12 additions & 12 deletions dbt/include/upsolver/macros/materializations/utils/create_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,24 @@
{%- set columns_with_types = adapter.get_columns_names_with_types(partition_by + primary_key) -%}
{%- set columns_partitioned_by = adapter.get_columns_names(partition_by) -%}
{%- set columns_primary_key = adapter.get_columns_names(primary_key) -%}
{% set enriched_options = adapter.enrich_options(options, 'datalake', 'target_options') %}
{% set enriched_editable_options = adapter.filter_options(enriched_options, 'editable') %}
{%- set enriched_options = adapter.enrich_options(options, 'datalake', 'target_options') -%}
{%- set enriched_editable_options = adapter.filter_options(enriched_options, 'editable') -%}

{% if old_relation %}
{%- if old_relation -%}
ALTER TABLE {{target_relation}}
{{ render_options(enriched_editable_options, 'alter') }}
{% else %}
{{ render_options(enriched_editable_options, 'alter') }}
{%- else -%}
CREATE TABLE {{ target_relation }}
({{ columns_with_types }})
{% if partition_by %}
PARTITIONED BY
{%- if partition_by %}
PARTITIONED BY
{{ columns_partitioned_by }}
{% endif %}
{% if primary_key %}
PRIMARY KEY
{%- endif -%}
{%- if primary_key %}
PRIMARY KEY
{{ columns_primary_key }}
{% endif %}
{%- endif %}
{{ render_options(enriched_options, 'create') }}
{% endif %}
{%- endif -%}

{%- endmacro %}
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
{% macro render_options(options, statement) -%}
{%- macro render_options(options, statement) -%}

{%- if statement == 'alter' -%}
{% set key = 'SET' %}
{%- set key = 'SET' -%}
{%- endif -%}
{%- for k, v in options.items() -%}
{% set value = v['value'] %}
{% if v['type'] == 'text' %}
{%- for k, v in options.items() %}
{%- set value = v['value'] -%}
{%- if v['type'] == 'text' %}
{{key}} {{k}} = '{{ value }}'
{% elif v['type'] == 'identifier' %}
{%- elif v['type'] == 'identifier' %}
{{key}} {{k}} = "{{ value }}"
{% else %}
{%- else %}
{{key}} {{k}} = {{ value }}
{%- endif -%}
{%- endfor %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@

SELECT
*
FROM {{ source('upsert_records_new', 'orders_raw_data_for_upsert_2') }}
FROM {{ source('upsert_records_new', 'orders_raw_data_for_upsert') }}
WHERE $event_time BETWEEN run_start_time() AND run_end_time()
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ SELECT customer.email AS customer_email,
SUM(nettotal) AS total_sales,
MIN(orderdate) AS first_purchase,
MAX(orderdate) AS last_purchase
FROM {{ ref('orders_raw_data_for_upsert_2') }}
FROM {{ ref('orders_raw_data_for_upsert') }}
WHERE $event_time BETWEEN run_start_time() AND run_end_time()
GROUP BY 1
HAVING COUNT(DISTINCT orderid::string) > 1
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ SELECT customer.email AS customer_email,
SUM(nettotal) AS total_sales,
MIN(orderdate) AS first_purchase,
MAX(orderdate) AS last_purchase
FROM {{ ref('orders_raw_data_for_upsert_2') }}
FROM {{ ref('orders_raw_data_for_upsert') }}
WHERE $event_time BETWEEN run_start_time() AND run_end_time()
GROUP BY 1
HAVING COUNT(DISTINCT orderid::string) > 1
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
)
}}

SELECT * FROM {{ ref('upsolver_s3_samples_2') }}
SELECT * FROM {{ ref('upsolver_s3_samples') }}
6 changes: 3 additions & 3 deletions examples/upsert_records_new/models/sources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ sources:
database: default_glue_catalog
schema: database_16e61b
tables:
- name: insert_orders_upsert_2
- name: merge_orders_upsert_2
- name: orders_raw_data_for_upsert_2
- name: insert_orders_upsert
- name: merge_orders_upsert
- name: orders_raw_data_for_upsert

0 comments on commit 81a471d

Please sign in to comment.