Skip to content

Commit

Permalink
Use DebugExecutor in TestXComArgRuntime
Browse files Browse the repository at this point in the history
  • Loading branch information
turbaszek committed May 8, 2020
1 parent 93a5376 commit 955b45f
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions tests/models/test_xcom_arg.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
# under the License.
from datetime import datetime, timedelta

import pytest

from airflow import DAG
from airflow.models.xcom_arg import XComArg
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from tests.test_utils.config import conf_vars

DEFAULT_ARGS = {
"owner": "test",
Expand All @@ -33,7 +36,8 @@


def assert_is_value(num: int):
assert num == VALUE
if num != VALUE:
raise Exception("The test has failed")


def build_python_op():
Expand Down Expand Up @@ -112,7 +116,9 @@ def test_xcom_key_getitem(self):
assert actual_new_key.key == "another_key_2"


@pytest.mark.system("core")
class TestXComArgRuntime:
@conf_vars({("core", "executor"): "DebugExecutor"})
def test_xcom_pass_to_op(self):
with DAG(dag_id="test_xcom_pass_to_op", default_args=DEFAULT_ARGS) as dag:
operator = PythonOperator(
Expand All @@ -127,8 +133,9 @@ def test_xcom_pass_to_op(self):
task_id="assert_is_value_1",
)
operator >> operator2
dag.run(local=True)
dag.run()

@conf_vars({("core", "executor"): "DebugExecutor"})
def test_xcom_push_and_pass(self):
def push_xcom_value(key, value, **context):
ti = context["task_instance"]
Expand All @@ -147,4 +154,4 @@ def push_xcom_value(key, value, **context):
op_args=[xarg],
)
op1 >> op2
dag.run(local=True)
dag.run()

0 comments on commit 955b45f

Please sign in to comment.