Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/snowflake): fix optimised lineage query, filter temporary … #7894

Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -664,6 +664,15 @@ def usage_per_object_per_time_bucket_for_time_window(
basic_usage_counts.bucket_start_time
"""

# Note on temporary tables:
# Snowflake access history may include temporary tables in DIRECT_OBJECTS_ACCESSED and
# OBJECTS_MODIFIED->columns->directSources. We do not need these temporary tables and filter these in the query.
#
# FIVETRAN creates temporary tables in schema named FIVETRAN_xxx_STAGING.
# Ref - https://support.fivetran.com/hc/en-us/articles/1500003507122-Why-Is-There-an-Empty-Schema-Named-Fivetran-staging-in-the-Destination-
#
# DBT incremental models create temporary tables ending with __dbt_tmp
# Ref - https://discourse.getdbt.com/t/handling-bigquery-incremental-dbt-tmp-tables/7540
@staticmethod
def table_upstreams_with_column_lineage(
start_time_millis: int,
Expand Down Expand Up @@ -703,6 +712,10 @@ def table_upstreams_with_column_lineage(
AND t.query_start_time < to_timestamp_ltz({end_time_millis}, 3)
AND upstream_table_domain in {allowed_upstream_table_domains}
AND downstream_table_domain = '{SnowflakeObjectDomain.TABLE.capitalize()}'
AND upstream_column_table_name NOT LIKE '%.FIVETRAN\\_%\\_STAGING.%'
mayurinehate marked this conversation as resolved.
Show resolved Hide resolved
AND upstream_column_table_name NOT LIKE '%__DBT_TMP'
AND upstream_table_name NOT LIKE '%.FIVETRAN\\_%\\_STAGING.%'
AND upstream_table_name NOT LIKE '%__DBT_TMP'
),
column_upstream_jobs AS (
SELECT
Expand Down Expand Up @@ -747,7 +760,7 @@ def table_upstreams_with_column_lineage(
'upstream_object_domain', h.upstream_table_domain
)
) AS "UPSTREAM_TABLES",
ARRAY_AGG(
ARRAY_UNIQUE_AGG(
mayurinehate marked this conversation as resolved.
Show resolved Hide resolved
OBJECT_CONSTRUCT(
'column_name', column_upstreams.downstream_column_name,
'upstreams', column_upstreams.upstreams
Expand All @@ -759,8 +772,11 @@ def table_upstreams_with_column_lineage(
on h.downstream_table_name = column_upstreams.downstream_table_name
GROUP BY
h.downstream_table_name
ORDER BY
h.downstream_table_name
"""

# See Note on temporary tables above.
@staticmethod
def table_upstreams_only(
start_time_millis: int,
Expand Down Expand Up @@ -794,6 +810,8 @@ def table_upstreams_only(
AND t.query_start_time < to_timestamp_ltz({end_time_millis}, 3)
AND upstream_table_domain in {allowed_upstream_table_domains}
AND downstream_table_domain = '{SnowflakeObjectDomain.TABLE.capitalize()}'
AND upstream_table_name NOT LIKE '%.FIVETRAN\\_%\\_STAGING.%'
AND upstream_table_name NOT LIKE '%__DBT_TMP'
)
SELECT
downstream_table_name AS "DOWNSTREAM_TABLE_NAME",
Expand All @@ -808,4 +826,6 @@ def table_upstreams_only(
table_lineage_history
GROUP BY
downstream_table_name
ORDER BY
downstream_table_name
"""