Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(main): fix ImportError on examples dags #37571

Merged
merged 6 commits into from Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
217 changes: 110 additions & 107 deletions airflow/example_dags/example_branch_operator.py
Expand Up @@ -28,140 +28,143 @@

import pendulum

from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import (
BranchExternalPythonOperator,
BranchPythonOperator,
BranchPythonVirtualenvOperator,
ExternalPythonOperator,
PythonOperator,
PythonVirtualenvOperator,
)
from airflow.utils.edgemodifier import Label
from airflow.utils.trigger_rule import TriggerRule

PATH_TO_PYTHON_BINARY = sys.executable

with DAG(
dag_id="example_branch_operator",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule="@daily",
tags=["example", "example2"],
orientation="TB",
) as dag:
run_this_first = EmptyOperator(
task_id="run_this_first",
from airflow.operators.python import is_venv_installed

if is_venv_installed():
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import (
BranchExternalPythonOperator,
BranchPythonOperator,
BranchPythonVirtualenvOperator,
ExternalPythonOperator,
PythonOperator,
PythonVirtualenvOperator,
)
from airflow.utils.edgemodifier import Label
from airflow.utils.trigger_rule import TriggerRule

PATH_TO_PYTHON_BINARY = sys.executable

with DAG(
dag_id="example_branch_operator",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule="@daily",
tags=["example", "example2"],
orientation="TB",
) as dag:
run_this_first = EmptyOperator(
task_id="run_this_first",
)

options = ["a", "b", "c", "d"]

# Example branching on standard Python tasks
options = ["a", "b", "c", "d"]

# [START howto_operator_branch_python]
branching = BranchPythonOperator(
task_id="branching",
python_callable=lambda: f"branch_{random.choice(options)}",
)
# [END howto_operator_branch_python]
run_this_first >> branching

join = EmptyOperator(
task_id="join",
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)
# Example branching on standard Python tasks

for option in options:
t = PythonOperator(
task_id=f"branch_{option}",
python_callable=lambda: print("Hello World"),
# [START howto_operator_branch_python]
branching = BranchPythonOperator(
task_id="branching",
python_callable=lambda: f"branch_{random.choice(options)}",
)
# [END howto_operator_branch_python]
run_this_first >> branching

empty_follow = EmptyOperator(
task_id="follow_" + option,
join = EmptyOperator(
task_id="join",
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)

# Label is optional here, but it can help identify more complex branches
branching >> Label(option) >> t >> empty_follow >> join
for option in options:
t = PythonOperator(
task_id=f"branch_{option}",
python_callable=lambda: print("Hello World"),
)

# Example the same with external Python calls
empty_follow = EmptyOperator(
task_id="follow_" + option,
)

# [START howto_operator_branch_ext_py]
def branch_with_external_python(choices):
import random
# Label is optional here, but it can help identify more complex branches
branching >> Label(option) >> t >> empty_follow >> join

return f"ext_py_{random.choice(choices)}"
# Example the same with external Python calls

branching_ext_py = BranchExternalPythonOperator(
task_id="branching_ext_python",
python=PATH_TO_PYTHON_BINARY,
python_callable=branch_with_external_python,
op_args=[options],
)
# [END howto_operator_branch_ext_py]
join >> branching_ext_py

join_ext_py = EmptyOperator(
task_id="join_ext_python",
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)
# [START howto_operator_branch_ext_py]
def branch_with_external_python(choices):
import random

def hello_world_with_external_python():
print("Hello World from external Python")
return f"ext_py_{random.choice(choices)}"

for option in options:
t = ExternalPythonOperator(
task_id=f"ext_py_{option}",
branching_ext_py = BranchExternalPythonOperator(
task_id="branching_ext_python",
python=PATH_TO_PYTHON_BINARY,
python_callable=hello_world_with_external_python,
python_callable=branch_with_external_python,
op_args=[options],
)
# [END howto_operator_branch_ext_py]
join >> branching_ext_py

# Label is optional here, but it can help identify more complex branches
branching_ext_py >> Label(option) >> t >> join_ext_py

# Example the same with Python virtual environments
join_ext_py = EmptyOperator(
task_id="join_ext_python",
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)

# [START howto_operator_branch_virtualenv]
# Note: Passing a caching dir allows to keep the virtual environment over multiple runs
# Run the example a second time and see that it re-uses it and is faster.
VENV_CACHE_PATH = Path(tempfile.gettempdir())
def hello_world_with_external_python():
print("Hello World from external Python")

def branch_with_venv(choices):
import random
for option in options:
t = ExternalPythonOperator(
task_id=f"ext_py_{option}",
python=PATH_TO_PYTHON_BINARY,
python_callable=hello_world_with_external_python,
)

import numpy as np
# Label is optional here, but it can help identify more complex branches
branching_ext_py >> Label(option) >> t >> join_ext_py

print(f"Some numpy stuff: {np.arange(6)}")
return f"venv_{random.choice(choices)}"
# Example the same with Python virtual environments

branching_venv = BranchPythonVirtualenvOperator(
task_id="branching_venv",
requirements=["numpy~=1.24.4"],
venv_cache_path=VENV_CACHE_PATH,
python_callable=branch_with_venv,
op_args=[options],
)
# [END howto_operator_branch_virtualenv]
join_ext_py >> branching_venv
# [START howto_operator_branch_virtualenv]
# Note: Passing a caching dir allows to keep the virtual environment over multiple runs
# Run the example a second time and see that it re-uses it and is faster.
VENV_CACHE_PATH = Path(tempfile.gettempdir())

join_venv = EmptyOperator(
task_id="join_venv",
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)
def branch_with_venv(choices):
import random

def hello_world_with_venv():
import numpy as np
import numpy as np

print(f"Hello World with some numpy stuff: {np.arange(6)}")
print(f"Some numpy stuff: {np.arange(6)}")
return f"venv_{random.choice(choices)}"

for option in options:
t = PythonVirtualenvOperator(
task_id=f"venv_{option}",
branching_venv = BranchPythonVirtualenvOperator(
task_id="branching_venv",
requirements=["numpy~=1.24.4"],
venv_cache_path=VENV_CACHE_PATH,
python_callable=hello_world_with_venv,
python_callable=branch_with_venv,
op_args=[options],
)
# [END howto_operator_branch_virtualenv]
join_ext_py >> branching_venv

join_venv = EmptyOperator(
task_id="join_venv",
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)

# Label is optional here, but it can help identify more complex branches
branching_venv >> Label(option) >> t >> join_venv
def hello_world_with_venv():
import numpy as np

print(f"Hello World with some numpy stuff: {np.arange(6)}")

for option in options:
t = PythonVirtualenvOperator(
task_id=f"venv_{option}",
requirements=["numpy~=1.24.4"],
venv_cache_path=VENV_CACHE_PATH,
python_callable=hello_world_with_venv,
)

# Label is optional here, but it can help identify more complex branches
branching_venv >> Label(option) >> t >> join_venv