Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/support for multiple buckets anomaly tests #85

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ scripts/
edr.log
integration_tests/data

models/test_model/
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{% macro get_anomaly_scores_query(test_metrics_table_relation, full_monitored_table_name, sensitivity, backfill_days, monitors, column_name = none, columns_only = false, dimensions = none) %}
{% macro get_anomaly_scores_query(test_metrics_table_relation, full_monitored_table_name, sensitivity, backfill_days, monitors, period, column_name = none, columns_only = false, dimensions = none) %}

{%- set global_min_bucket_end = elementary.get_global_min_bucket_end_as_datetime() %}
{%- set metrics_min_time = "'"~ (global_min_bucket_end - modules.datetime.timedelta(backfill_days)).strftime("%Y-%m-%d 00:00:00") ~"'" %}
{%- set global_min_bucket_end = elementary.get_global_min_bucket_end_as_datetime(period) %}
{%- set metrics_min_time = "'"~ get_metric_min_time(global_min_bucket_end, backfill_days, period).strftime("%Y-%m-%d %H:00:00") ~"'" %}
{%- set backfill_period = "'-" ~ backfill_days ~ "'" %}
{%- set test_execution_id = elementary.get_test_execution_id() %}
{%- set test_unique_id = elementary.get_test_unique_id() %}
Expand Down Expand Up @@ -64,10 +64,10 @@
where row_number = 1

),
{# Continue here#}
period_buckets as (

daily_buckets as (

{{ elementary.daily_buckets_cte() }}
{{ elementary.period_buckets_cte(period) }}

),

Expand All @@ -86,14 +86,14 @@
bucket_end,
bucket_duration_hours,
updated_at,
edr_daily_bucket,
avg(metric_value) over (partition by metric_name, full_table_name, column_name order by edr_daily_bucket asc rows between {{ elementary.get_config_var('days_back') }} preceding and current row) as training_avg,
stddev(metric_value) over (partition by metric_name, full_table_name, column_name order by edr_daily_bucket asc rows between {{ elementary.get_config_var('days_back') }} preceding and current row) as training_stddev,
count(metric_value) over (partition by metric_name, full_table_name, column_name order by edr_daily_bucket asc rows between {{ elementary.get_config_var('days_back') }} preceding and current row) as training_set_size,
last_value(bucket_end) over (partition by metric_name, full_table_name, column_name order by edr_daily_bucket asc rows between {{ elementary.get_config_var('days_back') }} preceding and current row) training_end,
first_value(bucket_end) over (partition by metric_name, full_table_name, column_name order by edr_daily_bucket asc rows between {{ elementary.get_config_var('days_back') }} preceding and current row) as training_start
from daily_buckets left join
grouped_metrics on (edr_daily_bucket = bucket_end)
edr_period_bucket,
avg(metric_value) over (partition by metric_name, full_table_name, column_name order by edr_period_bucket asc rows between {{ elementary.get_config_var('days_back') }} preceding and current row) as training_avg,
stddev(metric_value) over (partition by metric_name, full_table_name, column_name order by edr_period_bucket asc rows between {{ elementary.get_config_var('days_back') }} preceding and current row) as training_stddev,
count(metric_value) over (partition by metric_name, full_table_name, column_name order by edr_period_bucket asc rows between {{ elementary.get_config_var('days_back') }} preceding and current row) as training_set_size,
last_value(bucket_end) over (partition by metric_name, full_table_name, column_name order by edr_period_bucket asc rows between {{ elementary.get_config_var('days_back') }} preceding and current row) training_end,
first_value(bucket_end) over (partition by metric_name, full_table_name, column_name order by edr_period_bucket asc rows between {{ elementary.get_config_var('days_back') }} preceding and current row) as training_start
from period_buckets left join
grouped_metrics on (edr_period_bucket = bucket_end)
{{ dbt_utils.group_by(13) }}

),
Expand Down Expand Up @@ -146,6 +146,6 @@
select * from anomaly_scores

{% endset %}

{{ return(anomaly_scores_query) }}
{% endmacro %}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{% macro get_global_min_bucket_start() %}
{%- set global_min_bucket_start = "'"~ (elementary.get_run_started_at() - modules.datetime.timedelta(elementary.get_config_var('days_back'))).strftime("%Y-%m-%d 00:00:00") ~"'" %}
{% macro get_global_min_bucket_start(period) %}
{%- set run_started_at = "'"~elementary.get_run_started_at()~"'" -%}
{%- set global_min_bucket_start = timeadd(period, - elementary.get_config_var('days_back'), date_trunc(period, run_started_at)) %}
{{ return(global_min_bucket_start) }}
{% endmacro %}

Expand All @@ -9,36 +10,63 @@
{% endmacro %}

{# bucket_end represents the end of the bucket, so we need to add extra day to the timedelta #}
{% macro get_global_min_bucket_end_as_datetime() %}
{%- set global_min_bucket_end = elementary.get_run_started_at() - modules.datetime.timedelta(elementary.get_config_var('days_back') + 1) %}
{% macro get_global_min_bucket_end_as_datetime(period) %}
{# TODO find a more elegant solution #}
{% if period == 'hour' %}
{%- set global_min_bucket_end = elementary.get_run_started_at() - modules.datetime.timedelta(hours=elementary.get_config_var('days_back') + 1) %}
{% elif period == 'day' %}
{%- set global_min_bucket_end = elementary.get_run_started_at() - modules.datetime.timedelta(days=elementary.get_config_var('days_back') + 1) %}
{% endif %}
{{ return(global_min_bucket_end) }}
{% endmacro %}

{% macro get_max_bucket_end() %}
{%- set max_bucket_end = "'"~ elementary.get_run_started_at().strftime("%Y-%m-%d 00:00:00")~"'" %}
{% macro get_max_bucket_start(period) %}
{% if period == 'hour' %}
{%- set max_bucket_end = "'"~ (elementary.get_run_started_at() - modules.datetime.timedelta(hours=1)).strftime("%Y-%m-%d %H:00:00")~"'" %}
{% elif period == 'day' %}
{%- set max_bucket_end = "'"~ (elementary.get_run_started_at() - modules.datetime.timedelta(days=1)).strftime("%Y-%m-%d 00:00:00")~"'" %}
{% endif %}
{{ return(max_bucket_end) }}
{% endmacro %}

{% macro get_backfill_bucket_start(backfill_days) %}
{%- set backfill_bucket_start = "'"~ (elementary.get_run_started_at() - modules.datetime.timedelta(backfill_days)).strftime("%Y-%m-%d 00:00:00") ~"'" %}
{{ return(backfill_bucket_start) }}
{% macro get_max_bucket_end(period) %}
{% if period == 'hour' %}
{%- set max_bucket_end = "'"~ elementary.get_run_started_at().strftime("%Y-%m-%d %H:00:00")~"'" %}
{% elif period == 'day' %}
{%- set max_bucket_end = "'"~ elementary.get_run_started_at().strftime("%Y-%m-%d 00:00:00")~"'" %}
{% endif %}
{{ return(max_bucket_end) }}
{% endmacro %}

{% macro get_min_bucket_end(period) %}
{% if period == 'hour' %}
{%- set min_bucket_end = "'"~(elementary.get_run_started_at() - modules.datetime.timedelta(hours=elementary.get_config_var('days_back'))).strftime("%Y-%m-%d %H:00:00")~"'" %}
{% elif period == 'day' %}
{%- set min_bucket_end = "'"~(elementary.get_run_started_at() - modules.datetime.timedelta(days=elementary.get_config_var('days_back'))).strftime("%Y-%m-%d 00:00:00")~"'" %}
{% endif %}
{{ return(min_bucket_end) }}
{% endmacro %}

{% macro get_min_bucket_start(full_table_name, backfill_days, monitors=none, column_name=none) %}
{% macro get_backfill_bucket_start(backfill_days, period) %}
{%- set run_started_at = "'"~elementary.get_run_started_at()~"'" -%}
{%- set backfill_bucket_start = timeadd(period, - backfill_days, date_trunc(period, run_started_at)) %}
{{ return(backfill_bucket_start) }}
{% endmacro %}

{%- set global_min_bucket_start = elementary.get_global_min_bucket_start() %}
{%- set backfill_bucket_start = elementary.get_backfill_bucket_start(backfill_days) %}

{% macro get_min_bucket_start(full_table_name, backfill_days, period, monitors=none, column_name=none) %}

{%- set global_min_bucket_start = elementary.get_global_min_bucket_start(period) %}
{%- set backfill_bucket_start = elementary.get_backfill_bucket_start(backfill_days, period) %}
{%- if monitors %}
{%- set monitors_tuple = elementary.strings_list_to_tuple(monitors) %}
{%- endif %}

{%- set min_bucket_start_query %}
with min_times as (
select min(last_bucket_end) as last_run,
{{ elementary.cast_as_timestamp(global_min_bucket_start) }} as global_min_start,
{{ elementary.cast_as_timestamp(backfill_bucket_start) }} as backfill_start
{{ global_min_bucket_start }} as global_min_start,
{{ backfill_bucket_start }} as backfill_start
from {{ ref('monitors_runs') }}
where upper(full_table_name) = upper('{{ full_table_name }}')
{%- if monitors %}
Expand All @@ -56,7 +84,6 @@
end as min_start
from min_times
{%- endset %}

{%- set min_bucket_start_query_result = elementary.result_value(min_bucket_start_query) %}

{%- if min_bucket_start_query_result %}
Expand All @@ -65,4 +92,21 @@
{{ return(global_min_bucket_start) }}
{%- endif %}

{% endmacro %}

{% macro get_metric_min_time(global_min_bucket_end, backfill_days, period) %}
{# TODO find a more elegant solution #}
{% if period == 'hour' %}
{%- set truncated_global_min_bucket_end = modules.datetime.datetime(year=global_min_bucket_end.year,
month=global_min_bucket_end.month,
day=global_min_bucket_end.day,
hour=global_min_bucket_end.hour,) %}
{%- set metrics_min_time = truncated_global_min_bucket_end - modules.datetime.timedelta(hours=backfill_days + 1) %}
{% elif period == 'day' %}
{%- set truncated_global_min_bucket_end = modules.datetime.datetime(year=global_min_bucket_end.year,
month=global_min_bucket_end.month,
day=global_min_bucket_end.day,) %}
{%- set metrics_min_time = truncated_global_min_bucket_end - modules.datetime.timedelta(days=backfill_days + 1) %}
{% endif %}
{{ return(metrics_min_time) }}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
{% macro column_monitoring_query(monitored_table_relation, timestamp_column, is_timestamp, min_bucket_start, column_obj, column_monitors) %}

{%- set max_bucket_end = "'"~ elementary.get_run_started_at().strftime("%Y-%m-%d 00:00:00")~"'" %}
{% macro column_monitoring_query(monitored_table_relation, timestamp_column, is_timestamp, min_bucket_start, column_obj, column_monitors, period) %}
{%- set run_started_at = "'"~elementary.get_run_started_at()~"'" %}
{%- set max_bucket_end = date_trunc(period, run_started_at) %}
{%- set full_table_name_str = "'"~ elementary.relation_to_full_name(monitored_table_relation) ~"'" -%}

with filtered_monitored_table as (

select {{ column_obj.quoted }}
{% if is_timestamp -%}
, {{ elementary.time_trunc('day', timestamp_column) }} as edr_bucket
, {{ elementary.time_trunc(period, timestamp_column) }} as edr_bucket
{%- else %}
, {{ elementary.null_timestamp() }} as edr_bucket
{%- endif %}
from {{ monitored_table_relation }}
where
{% if is_timestamp -%}
{{ elementary.cast_as_timestamp(timestamp_column) }} >= {{ elementary.cast_as_timestamp(min_bucket_start) }}
and {{ elementary.cast_as_timestamp(timestamp_column) }} <= {{ elementary.cast_as_timestamp(max_bucket_end) }}
{{ elementary.cast_as_timestamp(timestamp_column) }} >= {{ min_bucket_start }}
and {{ elementary.cast_as_timestamp(timestamp_column) }} <= {{ max_bucket_end }}
{%- else %}
true
{%- endif %}
Expand Down Expand Up @@ -74,8 +73,7 @@
{{ elementary.null_string() }} as source_value,
{%- if is_timestamp %}
edr_bucket as bucket_start,
{{ elementary.timeadd('day',1,'edr_bucket') }} as bucket_end,
24 as bucket_duration_hours,
{{ elementary.timeadd(period,1,'edr_bucket') }} as bucket_end,
{%- else %}
{{ elementary.null_timestamp() }} as bucket_start,
{{ elementary.cast_as_timestamp(max_bucket_end) }} as bucket_end,
Expand All @@ -101,8 +99,9 @@
source_value,
bucket_start,
bucket_end,
bucket_duration_hours,
{{ elementary.timediff('hours', 'bucket_start', 'bucket_end') }} as bucket_duration_hours,
{{- elementary.current_timestamp_in_utc() -}} as updated_at,

dimension,
dimension_value
from metrics_final
Expand Down