From 9516cde1c1c0d78960a3571e1ca028563cd13bc3 Mon Sep 17 00:00:00 2001 From: Robert Navado <837482+navado@users.noreply.github.com> Date: Mon, 22 May 2023 21:04:41 +0300 Subject: [PATCH] Support more sources and targets (#14) * added target options to write to Snowflake target * addes support for ingestion job to write to Snowflake target #10 * added support for merge/insert to load to other targets #11 * override get strategi to make config options insensitive * improved error message for unsupported types #7 * added type dict options support * added add_missing_columns option to snowflake transformation options * added target_type to the models according to the new syntax * changed column_transformations data type * made default Datalake for target_type * corrected typo in options, made made target_connection required * v0.2.2 --------- Co-authored-by: Tanya Shemet --- dbt/adapters/upsolver/__version__.py | 2 +- dbt/adapters/upsolver/impl.py | 76 ++++++++++++++----- .../options/materialized_view_options.py | 6 -- .../upsolver/options/table_options.py | 13 ---- .../upsolver/options/target_options.py | 28 +++++++ .../options/transformation_options.py | 11 +-- .../macros/materializations/connection.sql | 6 +- .../incremental/create_copy_job.sql | 12 ++- .../incremental/create_insert_job.sql | 9 ++- .../incremental/create_merge_job.sql | 10 ++- .../incremental/incremental.sql | 60 +++++++++------ .../materializations/materializedview.sql | 7 +- .../materializations/utils/ater_job.sql | 2 +- .../materializations/utils/create_table.sql | 2 +- 14 files changed, 163 insertions(+), 81 deletions(-) delete mode 100644 dbt/adapters/upsolver/options/materialized_view_options.py delete mode 100644 dbt/adapters/upsolver/options/table_options.py create mode 100644 dbt/adapters/upsolver/options/target_options.py diff --git a/dbt/adapters/upsolver/__version__.py b/dbt/adapters/upsolver/__version__.py index 8d16a47..cdaed6e 100644 --- a/dbt/adapters/upsolver/__version__.py +++ b/dbt/adapters/upsolver/__version__.py @@ -1 +1 @@ -version = "0.2.1" +version = "0.2.2" diff --git a/dbt/adapters/upsolver/impl.py b/dbt/adapters/upsolver/impl.py index 6c2e511..f49ccd9 100644 --- a/dbt/adapters/upsolver/impl.py +++ b/dbt/adapters/upsolver/impl.py @@ -7,8 +7,7 @@ from dbt.adapters.upsolver.options.copy_options import Copy_options from dbt.adapters.upsolver.options.connection_options import Connection_options from dbt.adapters.upsolver.options.transformation_options import Transformation_options -from dbt.adapters.upsolver.options.table_options import Table_options -from dbt.adapters.upsolver.options.materialized_view_options import Materialized_view_options +from dbt.adapters.upsolver.options.target_options import Target_options import agate import datetime import re @@ -50,7 +49,7 @@ def get_connection_from_sql(self, sql): .translate(str.maketrans({'\"':'', '\'':''})) return connection_identifier except Exception: - raise dbt.exceptions.ParsingError(f"Error while parsing connection name from sql:\n{sql}") + raise dbt.exceptions.ParsingError(f"Error while parsing connection name from sql: {sql}") @available def get_columns_names_with_types(self, list_dict): @@ -73,6 +72,31 @@ def separate_options(self, config_options, source): source_options = self.enrich_options(config_options, source, 'source_options') return job_options, source_options + def render_option_from_dict(self, option_value): + res = [] + try: + for key, value in option_value.items(): + item = [f'{key}='] + if isinstance(value, list): + item.append('(') + item.append(' ,'.join(value)) + item.append(')') + else: + item.append(value) + res.append(''.join(item)) + return f"({' ,'.join(res)})" + except Exception: + raise dbt.exceptions.ParsingError(f"Error while parsing value: {value}") + + def render_option_from_list(self, option_value): + try: + if not isinstance(option_value, str): + return tuple(i for i in option_value) + else: + return f"('{option_value}')" + except Exception: + raise dbt.exceptions.ParsingError(f"Error while parsing value: {value}") + @available def enrich_options(self, config_options, source, options_type): options = self.get_options(source, options_type) @@ -81,10 +105,9 @@ def enrich_options(self, config_options, source, options_type): find_value = options.get(option.lower(), None) if find_value: if options[option.lower()]['type'] == 'list': - if not isinstance(value, str): - value = tuple(i for i in value) - else: - value = f"('{value}')" + value = self.render_option_from_list(value) + elif options[option.lower()]['type'] == 'dict': + value = self.render_option_from_dict(value) enriched_options[option] = find_value enriched_options[option]['value'] = value else: @@ -96,18 +119,35 @@ def filter_options(self, options, parametr): editable = {key:val for key, val in options.items() if val[parametr] == True} return editable - def get_options(self, source, options_type): - if options_type == 'connection_options': - options = Connection_options[source.lower()] - elif options_type == 'transformation_options': - options = Transformation_options[source.lower()] - elif options_type == 'table_options': - options = Table_options - elif options_type == 'materialized_view_options': - options = Materialized_view_options + @available + def get(self, config, key, default=None): + config = {k.lower(): v for k, v in config.items()} + value = config.get(key, default) + return value + + @available + def require(self, config, key): + config = {k.lower(): v for k, v in config.items()} + value = config.get(key, None) + if value: + return value else: - options = Copy_options[source.lower()][options_type] - return options + raise dbt.exceptions.ParsingError(f"Required option is missing: {key}") + + + def get_options(self, source, options_type): + try: + if options_type == 'connection_options': + options = Connection_options[source.lower()] + elif options_type == 'transformation_options': + options = Transformation_options[source.lower()] + elif options_type == 'target_options': + options = Target_options[source.lower()] + else: + options = Copy_options[source.lower()][options_type] + return options + except Exception: + raise dbt.exceptions.ParsingError(f"Undefined option value: {source}") def list_relations_without_caching( self, diff --git a/dbt/adapters/upsolver/options/materialized_view_options.py b/dbt/adapters/upsolver/options/materialized_view_options.py deleted file mode 100644 index 456d24c..0000000 --- a/dbt/adapters/upsolver/options/materialized_view_options.py +++ /dev/null @@ -1,6 +0,0 @@ -Materialized_view_options = { - "storage_connection": {"type": "identifier", "editable": False, "optional": True}, - "storage_location": {"type": "text", "editable": False, "optional": True}, - "max_time_travel_duration": {"type": "integer", "editable": True, "optional": True}, - "compute_cluster": {"type": "identifier", "editable": True, "optional": True}, -} diff --git a/dbt/adapters/upsolver/options/table_options.py b/dbt/adapters/upsolver/options/table_options.py deleted file mode 100644 index 6af85b3..0000000 --- a/dbt/adapters/upsolver/options/table_options.py +++ /dev/null @@ -1,13 +0,0 @@ -Table_options = { - "globally_unique_keys": {"type": "boolean", "editable": False, "optional": True}, - "storage_connection": {"type": "identifier", "editable": False, "optional": True}, - "storage_location": {"type": "text", "editable": False, "optional": True}, - "compute_cluster": {"type": "identifier", "editable": True, "optional": True}, - "compression": {"type": "value", "editable": True, "optional": True}, - "compaction_processes": {"type": "integer", "editable": True, "optional": True}, - "disable_compaction": {"type": "boolean", "editable": True, "optional": True}, - "retention_date_partition": {"type": "text", "editable": True, "optional": True}, - "table_data_retention": {"type": "text", "editable": True, "optional": True}, - "column_data_retention": {"type": "text", "editable": True, "optional": True}, - "comment": {"type": "text", "editable": True, "optional": True} -} diff --git a/dbt/adapters/upsolver/options/target_options.py b/dbt/adapters/upsolver/options/target_options.py new file mode 100644 index 0000000..8cb5700 --- /dev/null +++ b/dbt/adapters/upsolver/options/target_options.py @@ -0,0 +1,28 @@ +Target_options = { + "datalake": { + "globally_unique_keys": {"type": "boolean", "editable": False, "optional": True}, + "storage_connection": {"type": "identifier", "editable": False, "optional": True}, + "storage_location": {"type": "text", "editable": False, "optional": True}, + "compute_cluster": {"type": "identifier", "editable": True, "optional": True}, + "compression": {"type": "value", "editable": True, "optional": True}, + "compaction_processes": {"type": "integer", "editable": True, "optional": True}, + "disable_compaction": {"type": "boolean", "editable": True, "optional": True}, + "retention_date_partition": {"type": "text", "editable": True, "optional": True}, + "table_data_retention": {"type": "text", "editable": True, "optional": True}, + "column_data_retention": {"type": "text", "editable": True, "optional": True}, + "comment": {"type": "text", "editable": True, "optional": True} + }, + "materialized_view": { + "storage_connection": {"type": "identifier", "editable": False, "optional": True}, + "storage_location": {"type": "text", "editable": False, "optional": True}, + "max_time_travel_duration": {"type": "integer", "editable": True, "optional": True}, + "compute_cluster": {"type": "identifier", "editable": True, "optional": True}, + }, + "snowflake": { + "column_transformations": {"type": "dict", "editable": False, "optional": True}, + "deduplicate_with": {"type": "dict", "editable": False, "optional": True}, + "exclude_columns": {"type": "list", "editable": False, "optional": True}, + "create_table_if_missing": {"type": "boolean", "editable": False, "optional": True}, + "write_interval": {"type": "integer", "editable": False, "optional": True}, + } +} diff --git a/dbt/adapters/upsolver/options/transformation_options.py b/dbt/adapters/upsolver/options/transformation_options.py index 3a1d49d..744711f 100644 --- a/dbt/adapters/upsolver/options/transformation_options.py +++ b/dbt/adapters/upsolver/options/transformation_options.py @@ -13,7 +13,7 @@ "output_offset": {"type": "integer", "editable": False, "optional": True}, "location": {"type": "text", "editable": False, "optional": True} }, - 'elasticsearch': { + "elasticsearch": { "run_interval": {"type": "ineger", "editable": False, "optional": True}, "start_from": {"type": "value", "editable": False, "optional": True}, "end_at": {"type": "value", "editable": True, "optional": True}, @@ -25,7 +25,8 @@ "index_partition_size": {"type": "value", "editable": True, "optional": True}, "comment": {"type": "text", "editable": True, "optional": True} }, - 'snowflake': { + "snowflake": { + "add_missing_columns": {"type": "boolean", "editable": False, "optional": True}, "run_interval": {"type": "ineger", "editable": False, "optional": True}, "start_from": {"type": "value", "editable": False, "optional": True}, "end_at": {"type": "value", "editable": True, "optional": True}, @@ -35,7 +36,7 @@ "run_parallelism": {"type": "integer", "editable": True, "optional": True}, "comment": {"type": "text", "editable": True, "optional": True} }, - 'upsolver_data_lake': { + "datalake": { "add_missing_columns": {"type": "boolean", "editable": False, "optional": True}, "run_interval": {"type": "ineger", "editable": False, "optional": True}, "start_from": {"type": "value", "editable": False, "optional": True}, @@ -46,7 +47,7 @@ "run_parallelism": {"type": "integer", "editable": True, "optional": True}, "comment": {"type": "text", "editable": True, "optional": True} }, - 'redshift': { + "redshift": { "run_interval": {"type": "ineger", "editable": False, "optional": True}, "start_from": {"type": "value", "editable": False, "optional": True}, "end_at": {"type": "value", "editable": True, "optional": True}, @@ -54,7 +55,7 @@ "allow_cartesian_products": {"type": "boolean", "editable": False, "optional": True}, "aggregation_parallelism": {"type": "integer", "editable": True, "optional": True}, "run_parallelism": {"type": "integer", "editable": True, "optional": True}, - "skip_faild_files": {"type": "boolean", "editable": False, "optional": True}, + "skip_failed_files": {"type": "boolean", "editable": False, "optional": True}, "fail_on_write_error": {"type": "boolean", "editable": False, "optional": True}, "comment": {"type": "text", "editable": True, "optional": True} } diff --git a/dbt/include/upsolver/macros/materializations/connection.sql b/dbt/include/upsolver/macros/materializations/connection.sql index 954e4f1..b87faee 100644 --- a/dbt/include/upsolver/macros/materializations/connection.sql +++ b/dbt/include/upsolver/macros/materializations/connection.sql @@ -1,8 +1,8 @@ {% materialization connection, adapter='upsolver' %} {%- set identifier = model['alias'] -%} - - {% set connection_type = config.require('connection_type') %} - {% set connection_options = config.require('connection_options') %} + {%- set model_config = model['config'] -%} + {% set connection_type = adapter.require(model_config, 'connection_type') %} + {% set connection_options = adapter.require(model_config, 'connection_options') %} {% set enriched_options = adapter.enrich_options(connection_options, connection_type, 'connection_options') %} {% set enriched_editable_options = adapter.filter_options(enriched_options, 'editable') %} diff --git a/dbt/include/upsolver/macros/materializations/incremental/create_copy_job.sql b/dbt/include/upsolver/macros/materializations/incremental/create_copy_job.sql index 3d45e4f..c913ca1 100644 --- a/dbt/include/upsolver/macros/materializations/incremental/create_copy_job.sql +++ b/dbt/include/upsolver/macros/materializations/incremental/create_copy_job.sql @@ -1,7 +1,14 @@ -{% macro get_create_copy_job_sql(job_identifier, sql, table, sync, options, source) -%} +{% macro get_create_copy_job_sql(job_identifier, sql, into_relation, sync, options, source, target_type) -%} {% set connection_identifier = adapter.get_connection_from_sql(sql) %} {% set job_options, source_options = adapter.separate_options(options, source) %} + {%- if target_type != 'datalake' -%} + {% set target_options = adapter.enrich_options(options, target_type, 'target_options') %} + {% set target_type = target_type %} + {%- else -%} + {% set target_options = {} %} + {% set target_type = '' %} + {%- endif -%} CREATE {% if sync %} @@ -9,8 +16,9 @@ {% endif %} JOB {{job_identifier}} {{ render_options(job_options, 'create') }} + {{ render_options(target_options, 'create') }} AS COPY FROM {{source}} {{connection_identifier}} {{ render_options(source_options, 'create') }} - INTO {{table}} + INTO {{target_type}} {{into_relation}} {%- endmacro %} diff --git a/dbt/include/upsolver/macros/materializations/incremental/create_insert_job.sql b/dbt/include/upsolver/macros/materializations/incremental/create_insert_job.sql index 8d52b07..dab9e26 100644 --- a/dbt/include/upsolver/macros/materializations/incremental/create_insert_job.sql +++ b/dbt/include/upsolver/macros/materializations/incremental/create_insert_job.sql @@ -1,6 +1,9 @@ -{% macro get_create_insert_job_sql(job_identifier, table, sync, options, map_columns_by_name) -%} +{% macro get_create_insert_job_sql(job_identifier, into_relation, sync, options, map_columns_by_name, target_type) -%} - {% set enriched_options = adapter.enrich_options(options, 'upsolver_data_lake', 'transformation_options') %} + {% set enriched_options = adapter.enrich_options(options, target_type, 'transformation_options') %} + {%- if target_type == 'datalake' -%} + {% set target_type = '' %} + {%- endif -%} CREATE {% if sync %} @@ -8,7 +11,7 @@ {% endif %} JOB {{job_identifier}} {{ render_options(enriched_options, 'create') }} - AS INSERT INTO {{table}} + AS INSERT INTO {{target_type}} {{into_relation}} {% if map_columns_by_name %} MAP_COLUMNS_BY_NAME {% endif %} diff --git a/dbt/include/upsolver/macros/materializations/incremental/create_merge_job.sql b/dbt/include/upsolver/macros/materializations/incremental/create_merge_job.sql index 40462d5..a382a1a 100644 --- a/dbt/include/upsolver/macros/materializations/incremental/create_merge_job.sql +++ b/dbt/include/upsolver/macros/materializations/incremental/create_merge_job.sql @@ -1,6 +1,10 @@ -{% macro get_create_merge_job_sql(job_identifier, table, sync, options, primary_key, delete_condition) -%} +{% macro get_create_merge_job_sql(job_identifier, into_relation, sync, options, primary_key, delete_condition, target_type) -%} - {% set enriched_options = adapter.enrich_options(options, 'upsolver_data_lake', 'transformation_options') %} + {% set enriched_options = adapter.enrich_options(options, target_type, 'transformation_options') %} + + {%- if target_type == 'datalake' -%} + {% set target_type = '' %} + {%- endif -%} CREATE {% if sync %} @@ -8,7 +12,7 @@ {% endif %} JOB {{ job_identifier }} {{ render_options(enriched_options, 'create') }} - AS MERGE INTO {{ table }} AS target + AS MERGE INTO {{ target_type }} {{ into_relation }} AS target USING ( {{ sql }} ) diff --git a/dbt/include/upsolver/macros/materializations/incremental/incremental.sql b/dbt/include/upsolver/macros/materializations/incremental/incremental.sql index 97f54f1..963f17a 100644 --- a/dbt/include/upsolver/macros/materializations/incremental/incremental.sql +++ b/dbt/include/upsolver/macros/materializations/incremental/incremental.sql @@ -1,15 +1,17 @@ {% materialization incremental, adapter='upsolver' %} {%- set identifier = model['alias'] -%} - {% set incremental_strategy = config.get('incremental_strategy', False) %} - {% set partition_by = config.get('partition_by', []) %} - {% set sync = config.get('sync', False) %} - {% set options = config.get('options', {}) %} - {% set source = config.get('source', none) %} - {% set delete_condition = config.get('delete_condition', False) %} - {% set partition_by = config.get('partition_by', []) %} - {% set primary_key = config.get('primary_key', []) %} - {% set map_columns_by_name = config.get('map_columns_by_name', False) %} + {%- set model_config = model['config'] -%} + {% set incremental_strategy = adapter.get(model_config, 'incremental_strategy', False) %} + {% set sync = adapter.get(model_config, 'sync', False) %} + {% set options = adapter.get(model_config, 'options', {}) %} + {% set source = adapter.get(model_config, 'source') %} + {% set target_type = adapter.get(model_config, 'target_type', 'datalake').lower() %} + {% set target_schema = adapter.get(model_config, 'target_schema', schema) %} + {% set delete_condition = adapter.get(model_config, 'delete_condition', False) %} + {% set partition_by = adapter.get(model_config, 'partition_by', []) %} + {% set primary_key = adapter.get(model_config, 'primary_key', []) %} + {% set map_columns_by_name = adapter.get(model_config, 'map_columns_by_name', False) %} {% set job_identifier = identifier + '_job' %} {%- set old_relation = adapter.get_relation(identifier=job_identifier, @@ -20,17 +22,25 @@ database=database, type='incremental') -%} - {%- set table_relation = api.Relation.create(identifier=identifier, - schema=schema, - database=database, - type='table') -%} {{ run_hooks(pre_hooks, inside_transaction=False) }} {{ run_hooks(pre_hooks, inside_transaction=True) }} + {{ log("model[config]: " ~ model['config'] ) }} - {%- call statement('create_table_if_not_exists') -%} - {{ get_create_table_if_not_exists_sql(table_relation, partition_by, primary_key, options) }} - {%- endcall -%} + + {% if target_type == 'datalake' %} + {%- set table_relation = api.Relation.create(identifier=identifier, + schema=schema, + database=database, + type='table') -%} + {%- set into_relation = table_relation -%} + {%- call statement('create_table_if_not_exists') -%} + {{ get_create_table_if_not_exists_sql(table_relation, partition_by, primary_key, options) }} + {%- endcall -%} + {%- else -%} + {% set target_connection = adapter.require(model_config, 'target_connection') %} + {%- set into_relation = target_connection + '.' + target_schema + '.' + identifier -%} + {%- endif %} {% if old_relation %} {% call statement('main') -%} @@ -39,16 +49,18 @@ {% else %} {% call statement('main') -%} {% if incremental_strategy == 'merge' %} - {{ get_create_merge_job_sql(job_identifier, table_relation, sync, - options, primary_key, delete_condition) }} + {{ get_create_merge_job_sql(job_identifier, into_relation, sync, + options, primary_key, delete_condition, + target_type) }} {% elif incremental_strategy == 'insert' %} {{ get_create_insert_job_sql(job_identifier, - table_relation, sync, options, - map_columns_by_name) }} + into_relation, sync, options, + map_columns_by_name, target_type) }} {% else %} {{ get_create_copy_job_sql(job_identifier, sql, - table_relation, sync, options, source) }} + into_relation, sync, options, source, + target_type) }} {% endif %} {%- endcall %} @@ -60,5 +72,9 @@ {{ run_hooks(post_hooks, inside_transaction=False) }} {{ run_hooks(post_hooks, inside_transaction=True) }} - {{ return({'relations': [target_relation, table_relation]}) }} + {% if target_type == 'datalake' %} + {{ return({'relations': [target_relation, table_relation]}) }} + {% else %} + {{ return({'relations': [target_relation]}) }} + {%- endif %} {% endmaterialization %} diff --git a/dbt/include/upsolver/macros/materializations/materializedview.sql b/dbt/include/upsolver/macros/materializations/materializedview.sql index 783a2a1..d0e5f25 100644 --- a/dbt/include/upsolver/macros/materializations/materializedview.sql +++ b/dbt/include/upsolver/macros/materializations/materializedview.sql @@ -1,8 +1,9 @@ {% materialization materializedview, adapter='upsolver' %} {%- set identifier = model['alias'] -%} - {% set sync = config.get('sync', False) %} - {% set options = config.get('options', {}) %} - {% set enriched_options = adapter.enrich_options(options, None, 'materialized_view_options') %} + {%- set model_config = model['config'] -%} + {% set sync = adapter.get(model_config, 'sync', False) %} + {% set options = adapter.get(model_config, 'options', {}) %} + {% set enriched_options = adapter.enrich_options(options, materialized_view, 'target_options') %} {% set enriched_editable_options = adapter.filter_options(enriched_options, 'editable') %} {%- set old_relation = adapter.get_relation(identifier=identifier, diff --git a/dbt/include/upsolver/macros/materializations/utils/ater_job.sql b/dbt/include/upsolver/macros/materializations/utils/ater_job.sql index dca5c6e..df36d91 100644 --- a/dbt/include/upsolver/macros/materializations/utils/ater_job.sql +++ b/dbt/include/upsolver/macros/materializations/utils/ater_job.sql @@ -1,7 +1,7 @@ {% macro get_alter_job_sql(job_identifier, options, incremental_strategy, source) -%} {% if incremental_strategy %} - {% set enriched_options = adapter.enrich_options(options, 'upsolver_data_lake', 'transformation_options') %} + {% set enriched_options = adapter.enrich_options(options, 'datalake', 'transformation_options') %} {% else %} {% set enriched_options, _ = adapter.separate_options(options, source) %} {% endif %} diff --git a/dbt/include/upsolver/macros/materializations/utils/create_table.sql b/dbt/include/upsolver/macros/materializations/utils/create_table.sql index 60549f1..116fa12 100644 --- a/dbt/include/upsolver/macros/materializations/utils/create_table.sql +++ b/dbt/include/upsolver/macros/materializations/utils/create_table.sql @@ -7,7 +7,7 @@ {%- set columns_with_types = adapter.get_columns_names_with_types(partition_by + primary_key) -%} {%- set columns_partitioned_by = adapter.get_columns_names(partition_by) -%} {%- set columns_primary_key = adapter.get_columns_names(primary_key) -%} - {% set enriched_options = adapter.enrich_options(options, None, 'table_options') %} + {% set enriched_options = adapter.enrich_options(options, 'datalake', 'target_options') %} {% set enriched_editable_options = adapter.filter_options(enriched_options, 'editable') %} {% if old_relation %}