Skip to content

Ele 826 orchestration integration#900

Merged
NoyaArie merged 16 commits into
masterfrom
ele-826-orchestration-integration
May 31, 2023
Merged

Ele 826 orchestration integration#900
NoyaArie merged 16 commits into
masterfrom
ele-826-orchestration-integration

Conversation

@NoyaArie
Copy link
Copy Markdown
Contributor

Include in the report information about the last invocation for each resource.

@linear
Copy link
Copy Markdown

linear Bot commented May 28, 2023

ELE-826 Orchestration (Airflow) integration - backend

Overall goal:

  • We want to provide details about the Airflow job which triggered a dbt run.
  • We then want to be able to show the job name, as well as a link to the job in Airflow in the lineage screen in the UI (there is a design, ask Adam / Hadar).

Technical details:

  • We already have a couple of generic env vars we know to read in the package and populate under the dbt_invocations table:
    • DBT_JOB_ID (populates the job_id field) - can be used to pass the DAG run from Airflow (no work on our end)
    • DBT_JOB_NAME(populates the job_name field) - in case there is a name that is different from the ID, so for Airflow either it shouldn't be passed, or contain the same value as the ID (worth a discussion)
  • We have an automatic identification of the orchestrator (dbt Cloud, Airflow, etc), according to an env var which always exists in each (for example AIRFLOW_HOME always exists in Airflow).
  • We don't have any variable which contains the full Airflow job URL. There are two options here:
    • Supply the full Job URL, like we pass the job ID and job name (advantage - easiest).
    • Supply the base Airflow URL, so we can construct the job URL ourselves (advantage - more generic, a bit easier for users)
  • Elementor-specific considerations:
    • They don't run dbt directly from Airflow, but they actually trigger a dbt Cloud job. This means a couple of things:
      • Supplying env vars is problematic, but supplying dbt vars is likely possible (using the steps_override option here - which they can do themselves)
      • We'll need therefore to be able to supply all the vars mentioned above as dbt vars, and not just as env vars (I think).
      • We'll also need a way to explicitly override the orchestrator - because in their case it would identify dbt Cloud, where we want it to actually be Airflow.

Concrete tasks:

  • dbt package - we need to add the ability to supply the Airflow URL (in either option mentioned above), as well as pass everything as vars and not just as env vars.
  • cli (would also apply to the SaaS) - we need to ensure our lineage API returns details about the (latest?) airflow job so it will be available to the UI.

@github-actions
Copy link
Copy Markdown
Contributor

👋 @NoyaArie
Thank you for raising your pull request.
Please make sure to add tests and document all user-facing changes.
You can do this by editing the docs files in this pull request.

Copy link
Copy Markdown
Contributor

@IDoneShaveIt IDoneShaveIt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the comments are small 🙂

return DbtInvocationSchema()

def get_invocations_by_ids(
self, macro_args: Optional[dict] = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of expecting macro_args as param, spread it to its content.
I know that in the method above this is how it is done but this is not a good example (should be changed there as well).
Spreading the macro_args params to its content lets us know what are those "macro_args" and their real typing (what is dict containing? what are the keys I should pass to it? what are their values suppose to be?)


def get_invocations_by_ids(
self, macro_args: Optional[dict] = None
) -> [DbtInvocationSchema]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
) -> [DbtInvocationSchema]:
) -> List[DbtInvocationSchema]:

And don't forget to import it from typing 🙂

Comment on lines +48 to +54
resources_latest_invocation_dict = dict()
for result in resources_latest_invocation_results:
resources_latest_invocation_dict[result["unique_id"]] = result[
"invocation_id"
]

return resources_latest_invocation_dict
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
resources_latest_invocation_dict = dict()
for result in resources_latest_invocation_results:
resources_latest_invocation_dict[result["unique_id"]] = result[
"invocation_id"
]
return resources_latest_invocation_dict
resources_latest_invocation_map = {result["unique_id"]: result["invocation_id"] for result in resources_latest_invocation_results}
return resources_latest_invocation_map

@@ -0,0 +1,22 @@
{% macro get_resources_latest_invocation() %}
{% set dbt_run_results = ref('dbt_run_results') %}
{%- if elementary.relation_exists(dbt_run_results) -%}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that there is no need to check if this table exists.
If a user ran elementary dbt package he should have this table.
Plus I am not sure we want to do nothing if this table does not exist.

{% set dbt_run_results = ref('dbt_run_results') %}
{%- if elementary.relation_exists(dbt_run_results) -%}
{% set get_resources_latest_invocation_query %}
with row_numbered_run_results as (
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets give it a more meaningful name than row_numbered 🙂
Something like ordered_run_results or something that explains what is the meaning of the row_number

{% endif %}

{% set get_invocations_query %}
select * from {{ invocations_relation }} where invocation_id in {{ elementary.strings_list_to_tuple(ids) }}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets spread * to make it easy to understand which data is returning from this query 🙂

{% set database, schema = elementary.target_database(), target.schema %}
{% set invocations_relation = adapter.get_relation(database, schema, 'dbt_invocations') %}
{% if not invocations_relation %}
{% do elementary.edr_log('failed getting invocations relation') %}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you try to run this macro when there is no invocation_relation?
We used edr_log to be able to parse the results from our macro runs using the dbt runner.
I am not sure this will give you the wanted effect 🤔

{% endset %}
{% set result = elementary.run_query(get_invocations_query) %}
{% if not result %}
{% do elementary.edr_log('no invocations were found') %}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about the edr_log

else:
raise NotImplementedError

def get_invocations_by_ids(self, invocations_ids: list[str]) -> DbtInvocationSchema:
Copy link
Copy Markdown
Contributor

@IDoneShaveIt IDoneShaveIt May 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def get_invocations_by_ids(self, invocations_ids: list[str]) -> DbtInvocationSchema:
def get_invocations_by_ids(self, invocations_ids: List[str]) -> List[DbtInvocationSchema]:

Copy link
Copy Markdown
Contributor

@JavierLopezT JavierLopezT May 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? Using List is deprecated
https://peps.python.org/pep-0585/#implementation

list # typing.List

dict # typing.Dict

...

Importing those from typing is deprecated. Due to PEP 563 and the intention to minimize the runtime impact of typing, this deprecation will not generate DeprecationWarnings. Instead, type checkers may warn about such deprecated usage when the target version of the checked program is signalled to be Python 3.9 or newer. It's recommended to allow for those warnings to be silenced on a project-wide basis.

The deprecated functionality will be removed from the typing module in the first Python version released 5 years after the release of Python 3.9.0.

@@ -1,3 +1,5 @@
from typing import Dict
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from typing import Dict
from typing import Dict, List

filters=serializable_filters,
lineage=serializable_lineage,
invocations=invocations,
resources_latest_invocation=resources_latest_invocation,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need both of the new objects at the front?

Copy link
Copy Markdown
Contributor

@IDoneShaveIt IDoneShaveIt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@NoyaArie NoyaArie merged commit 647bf20 into master May 31, 2023
@NoyaArie NoyaArie deleted the ele-826-orchestration-integration branch May 31, 2023 14:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants