Skip to content

Commit

Permalink
Support more sources and targets (#14)
Browse files Browse the repository at this point in the history
* added target options to write to Snowflake target

* addes support for ingestion job to write to Snowflake target #10

* added support for merge/insert to load to other targets #11

* override get strategi to make config options insensitive

* improved error message for unsupported types #7

* added type dict options support

* added add_missing_columns option to snowflake transformation options

* added target_type to the models according to the new syntax

* changed column_transformations data type

* made default Datalake for target_type

* corrected typo in options, made made target_connection required

* v0.2.2

---------

Co-authored-by: Tanya Shemet <tanyshak@gmail.com>
  • Loading branch information
navado and tanyshak committed May 22, 2023
1 parent ce69d21 commit 9516cde
Show file tree
Hide file tree
Showing 14 changed files with 163 additions and 81 deletions.
2 changes: 1 addition & 1 deletion dbt/adapters/upsolver/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "0.2.1"
version = "0.2.2"
76 changes: 58 additions & 18 deletions dbt/adapters/upsolver/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
from dbt.adapters.upsolver.options.copy_options import Copy_options
from dbt.adapters.upsolver.options.connection_options import Connection_options
from dbt.adapters.upsolver.options.transformation_options import Transformation_options
from dbt.adapters.upsolver.options.table_options import Table_options
from dbt.adapters.upsolver.options.materialized_view_options import Materialized_view_options
from dbt.adapters.upsolver.options.target_options import Target_options
import agate
import datetime
import re
Expand Down Expand Up @@ -50,7 +49,7 @@ def get_connection_from_sql(self, sql):
.translate(str.maketrans({'\"':'', '\'':''}))
return connection_identifier
except Exception:
raise dbt.exceptions.ParsingError(f"Error while parsing connection name from sql:\n{sql}")
raise dbt.exceptions.ParsingError(f"Error while parsing connection name from sql: {sql}")

@available
def get_columns_names_with_types(self, list_dict):
Expand All @@ -73,6 +72,31 @@ def separate_options(self, config_options, source):
source_options = self.enrich_options(config_options, source, 'source_options')
return job_options, source_options

def render_option_from_dict(self, option_value):
res = []
try:
for key, value in option_value.items():
item = [f'{key}=']
if isinstance(value, list):
item.append('(')
item.append(' ,'.join(value))
item.append(')')
else:
item.append(value)
res.append(''.join(item))
return f"({' ,'.join(res)})"
except Exception:
raise dbt.exceptions.ParsingError(f"Error while parsing value: {value}")

def render_option_from_list(self, option_value):
try:
if not isinstance(option_value, str):
return tuple(i for i in option_value)
else:
return f"('{option_value}')"
except Exception:
raise dbt.exceptions.ParsingError(f"Error while parsing value: {value}")

@available
def enrich_options(self, config_options, source, options_type):
options = self.get_options(source, options_type)
Expand All @@ -81,10 +105,9 @@ def enrich_options(self, config_options, source, options_type):
find_value = options.get(option.lower(), None)
if find_value:
if options[option.lower()]['type'] == 'list':
if not isinstance(value, str):
value = tuple(i for i in value)
else:
value = f"('{value}')"
value = self.render_option_from_list(value)
elif options[option.lower()]['type'] == 'dict':
value = self.render_option_from_dict(value)
enriched_options[option] = find_value
enriched_options[option]['value'] = value
else:
Expand All @@ -96,18 +119,35 @@ def filter_options(self, options, parametr):
editable = {key:val for key, val in options.items() if val[parametr] == True}
return editable

def get_options(self, source, options_type):
if options_type == 'connection_options':
options = Connection_options[source.lower()]
elif options_type == 'transformation_options':
options = Transformation_options[source.lower()]
elif options_type == 'table_options':
options = Table_options
elif options_type == 'materialized_view_options':
options = Materialized_view_options
@available
def get(self, config, key, default=None):
config = {k.lower(): v for k, v in config.items()}
value = config.get(key, default)
return value

@available
def require(self, config, key):
config = {k.lower(): v for k, v in config.items()}
value = config.get(key, None)
if value:
return value
else:
options = Copy_options[source.lower()][options_type]
return options
raise dbt.exceptions.ParsingError(f"Required option is missing: {key}")


def get_options(self, source, options_type):
try:
if options_type == 'connection_options':
options = Connection_options[source.lower()]
elif options_type == 'transformation_options':
options = Transformation_options[source.lower()]
elif options_type == 'target_options':
options = Target_options[source.lower()]
else:
options = Copy_options[source.lower()][options_type]
return options
except Exception:
raise dbt.exceptions.ParsingError(f"Undefined option value: {source}")

def list_relations_without_caching(
self,
Expand Down
6 changes: 0 additions & 6 deletions dbt/adapters/upsolver/options/materialized_view_options.py

This file was deleted.

13 changes: 0 additions & 13 deletions dbt/adapters/upsolver/options/table_options.py

This file was deleted.

28 changes: 28 additions & 0 deletions dbt/adapters/upsolver/options/target_options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
Target_options = {
"datalake": {
"globally_unique_keys": {"type": "boolean", "editable": False, "optional": True},
"storage_connection": {"type": "identifier", "editable": False, "optional": True},
"storage_location": {"type": "text", "editable": False, "optional": True},
"compute_cluster": {"type": "identifier", "editable": True, "optional": True},
"compression": {"type": "value", "editable": True, "optional": True},
"compaction_processes": {"type": "integer", "editable": True, "optional": True},
"disable_compaction": {"type": "boolean", "editable": True, "optional": True},
"retention_date_partition": {"type": "text", "editable": True, "optional": True},
"table_data_retention": {"type": "text", "editable": True, "optional": True},
"column_data_retention": {"type": "text", "editable": True, "optional": True},
"comment": {"type": "text", "editable": True, "optional": True}
},
"materialized_view": {
"storage_connection": {"type": "identifier", "editable": False, "optional": True},
"storage_location": {"type": "text", "editable": False, "optional": True},
"max_time_travel_duration": {"type": "integer", "editable": True, "optional": True},
"compute_cluster": {"type": "identifier", "editable": True, "optional": True},
},
"snowflake": {
"column_transformations": {"type": "dict", "editable": False, "optional": True},
"deduplicate_with": {"type": "dict", "editable": False, "optional": True},
"exclude_columns": {"type": "list", "editable": False, "optional": True},
"create_table_if_missing": {"type": "boolean", "editable": False, "optional": True},
"write_interval": {"type": "integer", "editable": False, "optional": True},
}
}
11 changes: 6 additions & 5 deletions dbt/adapters/upsolver/options/transformation_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"output_offset": {"type": "integer", "editable": False, "optional": True},
"location": {"type": "text", "editable": False, "optional": True}
},
'elasticsearch': {
"elasticsearch": {
"run_interval": {"type": "ineger", "editable": False, "optional": True},
"start_from": {"type": "value", "editable": False, "optional": True},
"end_at": {"type": "value", "editable": True, "optional": True},
Expand All @@ -25,7 +25,8 @@
"index_partition_size": {"type": "value", "editable": True, "optional": True},
"comment": {"type": "text", "editable": True, "optional": True}
},
'snowflake': {
"snowflake": {
"add_missing_columns": {"type": "boolean", "editable": False, "optional": True},
"run_interval": {"type": "ineger", "editable": False, "optional": True},
"start_from": {"type": "value", "editable": False, "optional": True},
"end_at": {"type": "value", "editable": True, "optional": True},
Expand All @@ -35,7 +36,7 @@
"run_parallelism": {"type": "integer", "editable": True, "optional": True},
"comment": {"type": "text", "editable": True, "optional": True}
},
'upsolver_data_lake': {
"datalake": {
"add_missing_columns": {"type": "boolean", "editable": False, "optional": True},
"run_interval": {"type": "ineger", "editable": False, "optional": True},
"start_from": {"type": "value", "editable": False, "optional": True},
Expand All @@ -46,15 +47,15 @@
"run_parallelism": {"type": "integer", "editable": True, "optional": True},
"comment": {"type": "text", "editable": True, "optional": True}
},
'redshift': {
"redshift": {
"run_interval": {"type": "ineger", "editable": False, "optional": True},
"start_from": {"type": "value", "editable": False, "optional": True},
"end_at": {"type": "value", "editable": True, "optional": True},
"compute_cluster": {"type": "identifier", "editable": True, "optional": True},
"allow_cartesian_products": {"type": "boolean", "editable": False, "optional": True},
"aggregation_parallelism": {"type": "integer", "editable": True, "optional": True},
"run_parallelism": {"type": "integer", "editable": True, "optional": True},
"skip_faild_files": {"type": "boolean", "editable": False, "optional": True},
"skip_failed_files": {"type": "boolean", "editable": False, "optional": True},
"fail_on_write_error": {"type": "boolean", "editable": False, "optional": True},
"comment": {"type": "text", "editable": True, "optional": True}
}
Expand Down
6 changes: 3 additions & 3 deletions dbt/include/upsolver/macros/materializations/connection.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{% materialization connection, adapter='upsolver' %}
{%- set identifier = model['alias'] -%}

{% set connection_type = config.require('connection_type') %}
{% set connection_options = config.require('connection_options') %}
{%- set model_config = model['config'] -%}
{% set connection_type = adapter.require(model_config, 'connection_type') %}
{% set connection_options = adapter.require(model_config, 'connection_options') %}
{% set enriched_options = adapter.enrich_options(connection_options, connection_type, 'connection_options') %}
{% set enriched_editable_options = adapter.filter_options(enriched_options, 'editable') %}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
{% macro get_create_copy_job_sql(job_identifier, sql, table, sync, options, source) -%}
{% macro get_create_copy_job_sql(job_identifier, sql, into_relation, sync, options, source, target_type) -%}

{% set connection_identifier = adapter.get_connection_from_sql(sql) %}
{% set job_options, source_options = adapter.separate_options(options, source) %}
{%- if target_type != 'datalake' -%}
{% set target_options = adapter.enrich_options(options, target_type, 'target_options') %}
{% set target_type = target_type %}
{%- else -%}
{% set target_options = {} %}
{% set target_type = '' %}
{%- endif -%}

CREATE
{% if sync %}
SYNC
{% endif %}
JOB {{job_identifier}}
{{ render_options(job_options, 'create') }}
{{ render_options(target_options, 'create') }}
AS COPY FROM {{source}} {{connection_identifier}}
{{ render_options(source_options, 'create') }}
INTO {{table}}
INTO {{target_type}} {{into_relation}}

{%- endmacro %}
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
{% macro get_create_insert_job_sql(job_identifier, table, sync, options, map_columns_by_name) -%}
{% macro get_create_insert_job_sql(job_identifier, into_relation, sync, options, map_columns_by_name, target_type) -%}

{% set enriched_options = adapter.enrich_options(options, 'upsolver_data_lake', 'transformation_options') %}
{% set enriched_options = adapter.enrich_options(options, target_type, 'transformation_options') %}
{%- if target_type == 'datalake' -%}
{% set target_type = '' %}
{%- endif -%}

CREATE
{% if sync %}
SYNC
{% endif %}
JOB {{job_identifier}}
{{ render_options(enriched_options, 'create') }}
AS INSERT INTO {{table}}
AS INSERT INTO {{target_type}} {{into_relation}}
{% if map_columns_by_name %}
MAP_COLUMNS_BY_NAME
{% endif %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
{% macro get_create_merge_job_sql(job_identifier, table, sync, options, primary_key, delete_condition) -%}
{% macro get_create_merge_job_sql(job_identifier, into_relation, sync, options, primary_key, delete_condition, target_type) -%}

{% set enriched_options = adapter.enrich_options(options, 'upsolver_data_lake', 'transformation_options') %}
{% set enriched_options = adapter.enrich_options(options, target_type, 'transformation_options') %}

{%- if target_type == 'datalake' -%}
{% set target_type = '' %}
{%- endif -%}

CREATE
{% if sync %}
SYNC
{% endif %}
JOB {{ job_identifier }}
{{ render_options(enriched_options, 'create') }}
AS MERGE INTO {{ table }} AS target
AS MERGE INTO {{ target_type }} {{ into_relation }} AS target
USING (
{{ sql }}
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
{% materialization incremental, adapter='upsolver' %}

{%- set identifier = model['alias'] -%}
{% set incremental_strategy = config.get('incremental_strategy', False) %}
{% set partition_by = config.get('partition_by', []) %}
{% set sync = config.get('sync', False) %}
{% set options = config.get('options', {}) %}
{% set source = config.get('source', none) %}
{% set delete_condition = config.get('delete_condition', False) %}
{% set partition_by = config.get('partition_by', []) %}
{% set primary_key = config.get('primary_key', []) %}
{% set map_columns_by_name = config.get('map_columns_by_name', False) %}
{%- set model_config = model['config'] -%}
{% set incremental_strategy = adapter.get(model_config, 'incremental_strategy', False) %}
{% set sync = adapter.get(model_config, 'sync', False) %}
{% set options = adapter.get(model_config, 'options', {}) %}
{% set source = adapter.get(model_config, 'source') %}
{% set target_type = adapter.get(model_config, 'target_type', 'datalake').lower() %}
{% set target_schema = adapter.get(model_config, 'target_schema', schema) %}
{% set delete_condition = adapter.get(model_config, 'delete_condition', False) %}
{% set partition_by = adapter.get(model_config, 'partition_by', []) %}
{% set primary_key = adapter.get(model_config, 'primary_key', []) %}
{% set map_columns_by_name = adapter.get(model_config, 'map_columns_by_name', False) %}
{% set job_identifier = identifier + '_job' %}

{%- set old_relation = adapter.get_relation(identifier=job_identifier,
Expand All @@ -20,17 +22,25 @@
database=database,
type='incremental') -%}

{%- set table_relation = api.Relation.create(identifier=identifier,
schema=schema,
database=database,
type='table') -%}

{{ run_hooks(pre_hooks, inside_transaction=False) }}
{{ run_hooks(pre_hooks, inside_transaction=True) }}
{{ log("model[config]: " ~ model['config'] ) }}

{%- call statement('create_table_if_not_exists') -%}
{{ get_create_table_if_not_exists_sql(table_relation, partition_by, primary_key, options) }}
{%- endcall -%}

{% if target_type == 'datalake' %}
{%- set table_relation = api.Relation.create(identifier=identifier,
schema=schema,
database=database,
type='table') -%}
{%- set into_relation = table_relation -%}
{%- call statement('create_table_if_not_exists') -%}
{{ get_create_table_if_not_exists_sql(table_relation, partition_by, primary_key, options) }}
{%- endcall -%}
{%- else -%}
{% set target_connection = adapter.require(model_config, 'target_connection') %}
{%- set into_relation = target_connection + '.' + target_schema + '.' + identifier -%}
{%- endif %}

{% if old_relation %}
{% call statement('main') -%}
Expand All @@ -39,16 +49,18 @@
{% else %}
{% call statement('main') -%}
{% if incremental_strategy == 'merge' %}
{{ get_create_merge_job_sql(job_identifier, table_relation, sync,
options, primary_key, delete_condition) }}
{{ get_create_merge_job_sql(job_identifier, into_relation, sync,
options, primary_key, delete_condition,
target_type) }}
{% elif incremental_strategy == 'insert' %}
{{ get_create_insert_job_sql(job_identifier,
table_relation, sync, options,
map_columns_by_name) }}
into_relation, sync, options,
map_columns_by_name, target_type) }}

{% else %}
{{ get_create_copy_job_sql(job_identifier, sql,
table_relation, sync, options, source) }}
into_relation, sync, options, source,
target_type) }}

{% endif %}
{%- endcall %}
Expand All @@ -60,5 +72,9 @@
{{ run_hooks(post_hooks, inside_transaction=False) }}
{{ run_hooks(post_hooks, inside_transaction=True) }}

{{ return({'relations': [target_relation, table_relation]}) }}
{% if target_type == 'datalake' %}
{{ return({'relations': [target_relation, table_relation]}) }}
{% else %}
{{ return({'relations': [target_relation]}) }}
{%- endif %}
{% endmaterialization %}
Loading

0 comments on commit 9516cde

Please sign in to comment.