Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: openlineage replace dagTree with downstream_task_ids #41587

Merged
merged 1 commit into from
Aug 21, 2024

Conversation

kacpermuda
Copy link
Contributor

@kacpermuda kacpermuda commented Aug 19, 2024

related: #41505

Even after fix in #41494 has been implemented, we were still able to cause the scheduler OOM error when creating a dag_tree with the example dag below:

dag crashing the dagTree
with DAG("dag", schedule=None):
    start = EmptyOperator(task_id="start")

    a = [
        start
        >> EmptyOperator(task_id=f"a_1_{i}")
        >> EmptyOperator(task_id=f"a_2_{i}")
        >> EmptyOperator(task_id=f"a_3_{i}")
        for i in range(200)
    ]

    middle = EmptyOperator(task_id="middle")

    b = [
        middle
        >> EmptyOperator(task_id=f"b_1_{i}")
        >> EmptyOperator(task_id=f"b_2_{i}")
        >> EmptyOperator(task_id=f"b_3_{i}")
        for i in range(200)
    ]

    middle2 = EmptyOperator(task_id="middle2")

    c = [
        middle2
        >> EmptyOperator(task_id=f"c_1_{i}")
        >> EmptyOperator(task_id=f"c_2_{i}")
        >> EmptyOperator(task_id=f"c_3_{i}")
        for i in range(200)
    ]

    end = EmptyOperator(task_id="end")

    start >> a >> middle >> b >> middle2 >> c >> end

To prevent OpenLineage from breaking the scheduler with additional computation, we should only rely on the information that Airflow uses itself - downstream task ids.

This PR removes the dagTree information from OL AirflowJobFacet and adds downstream task ids to the list of tasks included in the same facet. This dramatically reduces the size of the facet and allows the consumer that rely on this information to sill re-create it.

example snippet to re-create
tasks = {
    'task': {'downstream_tasks': []},
    'task1': {'downstream_tasks': ["task"]},
    'task2': {'downstream_tasks': ["task1"]},
}

def add_to_tree(task_id, tree):
    if task_id not in tree:
        tree[task_id] = {}

    for downstream_task in tasks[task_id].get('downstream_tasks', []):
        add_to_tree(downstream_task, tree[task_id])

def is_root_task(task_id):
    # A task is considered a root task if no other task lists it as a downstream task
    for task_info in tasks.values():
        if task_id in task_info.get('downstream_tasks', []):
            return False
    return True

dag_tree = {}
for task_id in tasks:
    if is_root_task(task_id):
        add_to_tree(task_id, dag_tree)

print(dag_tree)
# {'task2': {'task1': {'task': {}}}}

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@kacpermuda kacpermuda force-pushed the fix-ol-dag-tree branch 2 times, most recently from edb9b9b to 671be17 Compare August 19, 2024 13:02
@kacpermuda kacpermuda marked this pull request as ready for review August 19, 2024 13:09
@kacpermuda kacpermuda force-pushed the fix-ol-dag-tree branch 2 times, most recently from dd1a85a to 68108da Compare August 19, 2024 14:40
Signed-off-by: Kacper Muda <mudakacper@gmail.com>
@mobuchowski mobuchowski merged commit 86e12a9 into apache:main Aug 21, 2024
53 checks passed
@kacpermuda kacpermuda deleted the fix-ol-dag-tree branch August 21, 2024 08:03
@tatiana
Copy link
Contributor

tatiana commented Aug 23, 2024

apache-airflow-provider-openlineage 1.11.0 contains the first change: #41577
but the following change will be released only in 1.12

@tatiana
Copy link
Contributor

tatiana commented Aug 23, 2024

@mobuchowski to update once we have the next release

@kacpermuda
Copy link
Contributor Author

@tatiana The 1.11.0 has not been released yet, it has been excluded from this wave exactly because of this fix (my message). We should expect 1.11.0rc2 soon and both fixes will be included in 1.11.0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants