Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - Clickhouse integration #507

Closed
wants to merge 10 commits into from
7 changes: 7 additions & 0 deletions macros/utils/cross_db_utils/current_timestamp.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
cast(current_timestamp() as timestamp)
{% endmacro %}

{% macro clickhouse__edr_current_timestamp() -%}
now()
{%- endmacro -%}

{% macro edr_current_timestamp_in_utc() -%}
{{ adapter.dispatch('edr_current_timestamp_in_utc','elementary')() }}
Expand All @@ -45,4 +48,8 @@

{% macro spark__edr_current_timestamp_in_utc() %}
cast(unix_timestamp() as timestamp)
{% endmacro %}

{% macro clickhouse__edr_current_timestamp_in_utc() %}
toDateTime(toUInt32(toUnixTimestamp(now()) - 7 * 86400))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain a bit about these numbers? how does it make it into UTC?

{% endmacro %}
13 changes: 13 additions & 0 deletions macros/utils/cross_db_utils/generate_elementary_profile_args.sql
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@
{% do return(parameters) %}
{% endmacro %}

{% macro clickhouse__generate_elementary_profile_args(method, elementary_database, elementary_schema) %}
{% do return([
_parameter("type", target.type),
_parameter("host", target.host),
_parameter("port", target.port),
_parameter("user", target.user),
_parameter("password", "<PASSWORD>"),
_parameter("schema", elementary_schema),
_parameter("driver", "<http|native>"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this available through the target (e.g target.driver)? if so i believe it would be better

_parameter("secure", True),
]) %}
{% endmacro %}

{% macro default__generate_elementary_profile_args(method, elementary_database, elementary_schema) %}
Adapter "{{ target.type }}" is not supported on Elementary.
{% endmacro %}
6 changes: 6 additions & 0 deletions macros/utils/table_operations/replace_table_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,9 @@

{% do adapter.drop_relation(intermediate_relation) %}
{% endmacro %}

{# Clickhouse - truncate and insert (non-atomic) #}
{% macro clickhouse__replace_table_data(relation, rows) %}
{% do dbt.truncate_relation(relation) %}
{% do elementary.insert_rows(relation, rows, should_commit=false, chunk_size=elementary.get_config_var('dbt_artifacts_chunk_size')) %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ time_window_aggregation as (
bucket_duration_hours,
updated_at,
avg(metric_value) over (partition by metric_name, full_table_name, column_name order by bucket_start asc rows between unbounded preceding and current row) as training_avg,
{% if target.type == 'clickhouse' %}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We prefer to have the adapter specific code use adapter.dispatch and contained in the cross_db_utils section of the code, if you could add there an edr_stddev macro it would be awesome

stddevSamp(metric_value) over (partition by metric_name, full_table_name, column_name order by bucket_start asc rows between unbounded preceding and current row) as training_stddev,
{% else %}
stddev(metric_value) over (partition by metric_name, full_table_name, column_name order by bucket_start asc rows between unbounded preceding and current row) as training_stddev,
{% endif %}
count(metric_value) over (partition by metric_name, full_table_name, column_name order by bucket_start asc rows between unbounded preceding and current row) as training_set_size,
last_value(bucket_end) over (partition by metric_name, full_table_name, column_name order by bucket_start asc rows between unbounded preceding and current row) training_end,
first_value(bucket_end) over (partition by metric_name, full_table_name, column_name order by bucket_start asc rows between unbounded preceding and current row) as training_start
Expand Down