Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Dag.test() to run with an executor if desired #40205

Merged
merged 12 commits into from
Jun 14, 2024

Conversation

vincbeck
Copy link
Contributor

@vincbeck vincbeck commented Jun 12, 2024

Dag.test() allows testing a DAG locally. See documentation. Dag.test() uses no scheduler and no executor to run the task. It runs just all the task instances of the DAG in the current process.

This PR extends Dag.test() functionality to add capability of testing a DAG with an executor. If a different executor than LocalExecutor is configured in config, then it will use this executor to run the DAG. By default, pytest erases all environment variables before executing tests, thus I introduced a new flag to disable this behavior if desired: --keep-env-variables.

For example, the command below executes successfully the system test tests/system/providers/amazon/aws/example_sns.py with a different executor than LocalExecutor. Here my Airflow environment was configured to use AwsEcsExecutor but it could have been another one.

breeze testing tests tests/system/providers/amazon/aws/example_sns.py --system amazon --keep-env-variables --test-type System --forward-credentials --test-timeout 500.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@vincbeck
Copy link
Contributor Author

@potiuk, you might be interested into that one and relates to the discussion we had in Slack

@potiuk
Copy link
Member

potiuk commented Jun 13, 2024

Looks very cool. Apart of getting support for other commands for keep-env-variables I think it we also need a small chapter in unit_tests.rst describing this kind of test scenario - why needed, when to do it and how to do it with examples.

airflow/models/dag.py Outdated Show resolved Hide resolved
@vincbeck
Copy link
Contributor Author

Looks very cool. Apart of getting support for other commands for keep-env-variables I think it we also need a small chapter in unit_tests.rst describing this kind of test scenario - why needed, when to do it and how to do it with examples.

That's a good idea, I added a section about it. Let me know your thoughts :)

@potiuk potiuk mentioned this pull request Jun 14, 2024
@vincbeck
Copy link
Contributor Author

I am currently testing with LocalExecutor. It fails because:

[2024-06-14T15:44:21.306+0000] {base_executor.py:149} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'example_sns', 'variable_fetcher', 'manual__2024-06-14T15:43:59.429752+00:00', '--force', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/tests/system/providers/amazon/aws/example_sns.py']
[2024-06-14T15:44:21.308+0000] {local_executor.py:90} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'example_sns', 'variable_fetcher', 'manual__2024-06-14T15:43:59.429752+00:00', '--force', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/tests/system/providers/amazon/aws/example_sns.py']
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/opt/airflow/airflow/__main__.py", line 58, in main
    args.func(args)
  File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
    return f(*args, **kwargs)
  File "/opt/airflow/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/utils/session.py", line 84, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/airflow/airflow/cli/commands/dag_command.py", line 611, in dag_test
    dr: DagRun = dag.test(
  File "/opt/airflow/airflow/utils/session.py", line 81, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/models/dag.py", line 2984, in test
    triggerer_running = _triggerer_is_healthy()
  File "/opt/airflow/airflow/models/dag.py", line 301, in _triggerer_is_healthy
    job = TriggererJobRunner.most_recent_job()
  File "/opt/airflow/airflow/utils/session.py", line 84, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/airflow/airflow/jobs/base_job_runner.py", line 71, in most_recent_job
    return most_recent_job(cls.job_type, session=session)
  File "/opt/airflow/airflow/api_internal/internal_api_call.py", line 127, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/utils/session.py", line 81, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/jobs/job.py", line 384, in most_recent_job
    return session.scalar(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1753, in scalar
    ).scalar()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/result.py", line 1276, in scalar
    return self._only_one_row(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/result.py", line 559, in _only_one_row
    row = onerow(hard_close=True)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/result.py", line 1801, in _fetchone_impl
    row = next(self.iterator, _NO_ROW)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/loading.py", line 147, in chunks
    fetch = cursor._raw_all_rows()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/result.py", line 393, in _raw_all_rows
    return [make_row(row) for row in rows]
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/result.py", line 393, in <listcomp>
    return [make_row(row) for row in rows]
RuntimeError: number of values in row (1) differ from number of column processors (10)
[2024-06-14T15:44:21.334+0000] {local_executor.py:192} INFO - Failed to read tasks from the task queue because the other end has closed the connection. Terminating worker QueuedLocalWorker-24.

Currently investigating, if someone has an idea, please share :)

@vincbeck
Copy link
Contributor Author

The guilty is ... https://github.com/apache/airflow/blob/main/airflow/cli/commands/task_command.py#L470. If I comment it out, it is still failing for the same reason but one task further. The first task succeeds, and then the second one fails for the same reason.

[2024-06-14T16:52:57.668+0000] {dag.py:4276} INFO - created dagrun <DagRun example_sns @ 2024-06-14 16:52:35.429844+00:00: manual__2024-06-14T16:52:35.429844+00:00, state:running, queued_at: None. externally triggered: False>
[2024-06-14T16:52:57.668+0000] {executor_loader.py:252} INFO - Loaded executor: LocalExecutor
[2024-06-14T16:52:57.760+0000] {base_executor.py:149} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'example_sns', 'variable_fetcher', 'manual__2024-06-14T16:52:35.429844+00:00', '--force', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/tests/system/providers/amazon/aws/example_sns.py']
[2024-06-14T16:52:57.762+0000] {local_executor.py:90} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'example_sns', 'variable_fetcher', 'manual__2024-06-14T16:52:35.429844+00:00', '--force', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/tests/system/providers/amazon/aws/example_sns.py']
[2024-06-14T16:52:57.861+0000] {dagbag.py:574} INFO - Filling up the DagBag from /opt/***/tests/system/providers/amazon/aws/example_sns.py
[2024-06-14T16:52:57.940+0000] {base_aws.py:164} INFO - No connection ID provided. Fallback on boto3 credential strategy (region_name='\x1b[1mNone\x1b[22m'). See: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
[2024-06-14T16:52:57.982+0000] {credentials.py:1278} INFO - Found credentials in shared credentials file: ~/.aws/credentials
[2024-06-14T16:52:58.179+0000] {__init__.py:116} INFO - SSM contains one parameter for this test, but not the requested value: 'SYSTEM_TESTS_ENV_ID'
[2024-06-14T16:52:58.550+0000] {task_command.py:462} INFO - Running <TaskInstance: example_sns.variable_fetcher manual__2024-06-14T16:52:35.429844+00:00 [scheduled]> on host 013e9415b651
[2024-06-14T16:52:58.581+0000] {dag.py:2979} WARNING - No tasks to run. unrunnable tasks: {<TaskInstance: example_sns.watcher manual__2024-06-14T16:52:35.429844+00:00 [None]>, <TaskInstance: example_sns.variable_fetcher manual__2024-06-14T16:52:35.429844+00:00 [running]>, <TaskInstance: example_sns.publish_message manual__2024-06-14T16:52:35.429844+00:00 [None]>, <TaskInstance: example_sns.create_topic manual__2024-06-14T16:52:35.429844+00:00 [None]>, <TaskInstance: example_sns.delete_topic manual__2024-06-14T16:52:35.429844+00:00 [None]>}
[2024-06-14T16:52:59.618+0000] {base_executor.py:149} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'example_sns', 'create_topic', 'manual__2024-06-14T16:52:35.429844+00:00', '--force', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/tests/system/providers/amazon/aws/example_sns.py']
[2024-06-14T16:52:59.619+0000] {local_executor.py:90} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'example_sns', 'create_topic', 'manual__2024-06-14T16:52:35.429844+00:00', '--force', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/tests/system/providers/amazon/aws/example_sns.py']
[2024-06-14T16:52:59.737+0000] {dagbag.py:574} INFO - Filling up the DagBag from /opt/***/tests/system/providers/amazon/aws/example_sns.py
[2024-06-14T16:52:59.822+0000] {base_aws.py:164} INFO - No connection ID provided. Fallback on boto3 credential strategy (region_name='\x1b[1mNone\x1b[22m'). See: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
[2024-06-14T16:52:59.865+0000] {credentials.py:1278} INFO - Found credentials in shared credentials file: ~/.aws/credentials
[2024-06-14T16:53:00.071+0000] {__init__.py:116} INFO - SSM contains one parameter for this test, but not the requested value: 'SYSTEM_TESTS_ENV_ID'
[2024-06-14T16:53:00.451+0000] {task_command.py:462} INFO - Running <TaskInstance: example_sns.create_topic manual__2024-06-14T16:52:35.429844+00:00 [scheduled]> on host 013e9415b651
[2024-06-14T16:53:00.481+0000] {dag.py:2979} WARNING - No tasks to run. unrunnable tasks: {<TaskInstance: example_sns.watcher manual__2024-06-14T16:52:35.429844+00:00 [None]>, <TaskInstance: example_sns.create_topic manual__2024-06-14T16:52:35.429844+00:00 [running]>, <TaskInstance: example_sns.delete_topic manual__2024-06-14T16:52:35.429844+00:00 [None]>, <TaskInstance: example_sns.publish_message manual__2024-06-14T16:52:35.429844+00:00 [None]>}
[2024-06-14T16:53:01.515+0000] {base_executor.py:149} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'example_sns', 'publish_message', 'manual__2024-06-14T16:52:35.429844+00:00', '--force', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/tests/system/providers/amazon/aws/example_sns.py']
[2024-06-14T16:53:01.516+0000] {local_executor.py:90} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'example_sns', 'publish_message', 'manual__2024-06-14T16:52:35.429844+00:00', '--force', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/tests/system/providers/amazon/aws/example_sns.py']
[2024-06-14T16:53:01.547+0000] {cli_action_loggers.py:176} WARNING - Failed to log action (psycopg2.DatabaseError) error with status PGRES_TUPLES_OK and no message from the libpq
(Background on this error at: https://sqlalche.me/e/14/4xp6)
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/opt/airflow/airflow/__main__.py", line 58, in main
    args.func(args)
  File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
    return f(*args, **kwargs)
  File "/opt/airflow/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/utils/session.py", line 84, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/airflow/airflow/cli/commands/dag_command.py", line 611, in dag_test
    dr: DagRun = dag.test(
  File "/opt/airflow/airflow/utils/session.py", line 81, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/models/dag.py", line 2968, in test
    schedulable_tis, _ = dr.update_state(session=session)
  File "/opt/airflow/airflow/utils/session.py", line 81, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/models/dagrun.py", line 796, in update_state
    info = self.task_instance_scheduling_decisions(session)
  File "/opt/airflow/airflow/utils/session.py", line 81, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/models/dagrun.py", line 931, in task_instance_scheduling_decisions
    tis = self.get_task_instances(session=session, state=State.task_states)
  File "/opt/airflow/airflow/utils/session.py", line 81, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/models/dagrun.py", line 625, in get_task_instances
    return DagRun.fetch_task_instances(
  File "/opt/airflow/airflow/api_internal/internal_api_call.py", line 127, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/utils/session.py", line 81, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/models/dagrun.py", line 562, in fetch_task_instances
    return session.scalars(tis).all()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1778, in scalars
    return self.execute(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1720, in execute
    result = compile_state_cls.orm_setup_cursor_result(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/context.py", line 349, in orm_setup_cursor_result
    return loading.instances(result, querycontext)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/loading.py", line 88, in instances
    with util.safe_reraise():
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/loading.py", line 69, in instances
    *[
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/loading.py", line 70, in <listcomp>
    query_entity.row_processor(context, cursor)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/context.py", line 2631, in row_processor
    _instance = loading._instance_processor(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/loading.py", line 796, in _instance_processor
    prop.create_row_processor(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/interfaces.py", line 658, in create_row_processor
    strat.create_row_processor(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/strategies.py", line 2533, in create_row_processor
    eager_adapter = self._create_eager_adapter(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/strategies.py", line 2504, in _create_eager_adapter
    if self.mapper._result_has_identity_key(result, decorator):
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/mapper.py", line 2887, in _result_has_identity_key
    rk = result.keys()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/result.py", line 708, in keys
    return self._metadata.keys
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/cursor.py", line 1225, in keys
    self._we_dont_return_rows()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/cursor.py", line 1202, in _we_dont_return_rows
    util.raise_(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
sqlalchemy.exc.ResourceClosedError: This result object does not return rows. It has been closed automatically.

@potiuk
Copy link
Member

potiuk commented Jun 14, 2024

The guilty is ... https://github.com/apache/airflow/blob/main/airflow/cli/commands/task_command.py#L470.

Still having this problem?

@vincbeck
Copy link
Contributor Author

vincbeck commented Jun 14, 2024

The guilty is ... https://github.com/apache/airflow/blob/main/airflow/cli/commands/task_command.py#L470.

Still having this problem?

Yep, I'll create an issue and work on it separately. I have the issue with Celery as well, I think this is an issue with every executor running locally. They somehow shutdown the DB connection somewhere while executing the task, which then make dag.test fail

@vincbeck
Copy link
Contributor Author

Issue created to track the bug with local executors: #40247

@vincbeck
Copy link
Contributor Author

Do you think we can still merge the PR @potiuk? It is working only with non local executors.

@potiuk potiuk merged commit 9595357 into apache:main Jun 14, 2024
79 checks passed
@potiuk
Copy link
Member

potiuk commented Jun 14, 2024

Merged.

@vincbeck vincbeck deleted the vincbeck/dag_test_executor branch June 14, 2024 19:54
@utkarsharma2 utkarsharma2 added this to the Airflow 2.10.0 milestone Jul 1, 2024
@utkarsharma2 utkarsharma2 added type:improvement Changelog: Improvements type:misc/internal Changelog: Misc changes that should appear in change log and removed type:improvement Changelog: Improvements labels Jul 1, 2024
romsharon98 pushed a commit to romsharon98/airflow that referenced this pull request Jul 26, 2024
* Update `Dag.test()` to run with an executor if desired

* Add missing parameter

* Fix typo

* Move `add_logger_if_needed` to local execution

* Add `keep-env-variables` to `breeze testing db-tests`, `breeze testing non-db-tests` and `breeze shell`

* Add documentation

* Fix tests

* Introduce `use-executor` flag

* Update `debug` documentation

* Fix test
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:dev-tools type:misc/internal Changelog: Misc changes that should appear in change log
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants