Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix backfill_job_runner to work with custom executors #32101

Closed
wants to merge 1 commit into from
Closed

Fix backfill_job_runner to work with custom executors #32101

wants to merge 1 commit into from

Conversation

adh-wonolo
Copy link
Contributor

Backfill Job Runner pulls in the class name of your executor but doesn't pull in the full path so if you aren't using a default core executor you get an error like:

  File "/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/utils/module_loading.py", line 32, in import_string                                                                                                              module_path, class_name = dotted_path.rsplit(".", 1)
ValueError: not enough values to unpack (expected 2, got 1)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/executors/executor_loader.py", line 106, in load_executor                                                                                                        executor_cls, import_source = cls.import_executor_cls(executor_name)
  File "/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/executors/executor_loader.py", line 148, in import_executor_cls                                                                                                  return _import_and_validate(executor_name), ConnectorSource.CUSTOM_PATH
  File "/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/executors/executor_loader.py", line 129, in _import_and_validate                                                                                                 executor = import_string(path)
  File "/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/utils/module_loading.py", line 34, in import_string                                                                                                              raise ImportError(f"{dotted_path} doesn't look like a module path")
ImportError: CustomExecutor doesn't look like a module path

This can be fixed either by just passing in the actual class to executor_class or passing in #f"{self.job.executor.__class__.__module__}.{self.job.executor_class}" to ExecutorLoader.import_executor_cls or setting self.job.executor_class to be #f"{self.job.executor.__class__.__module__}.{self.job.executor.__class__.__name}"

I'm not sure which of these three is the best solution, though in my quick read through the code it seems like this isn't really called elsewhere besides in this specific file.

I ran the core tests and they all passed.


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added the area:Scheduler Scheduler or dag parsing Issues label Jun 23, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented Jun 23, 2023

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@potiuk potiuk requested a review from o-nikolas June 23, 2023 22:52
@o-nikolas
Copy link
Contributor

Hmm, this one is odd, because import_executor_class (see below) is written to be able to import default executors and executors from plugins. So the real fix is there if something is broken, not in the backfill job.

def import_executor_cls(cls, executor_name: str) -> tuple[type[BaseExecutor], ConnectorSource]:
"""
Imports the executor class.
Supports the same formats as ExecutorLoader.load_executor.
:return: executor class via executor_name and executor import source
"""
def _import_and_validate(path: str) -> type[BaseExecutor]:
executor = import_string(path)
cls.validate_database_executor_compatibility(executor)
return executor
if executor_name in cls.executors:
return _import_and_validate(cls.executors[executor_name]), ConnectorSource.CORE
if executor_name.count(".") == 1:
log.debug(
"The executor name looks like the plugin path (executor_name=%s). Trying to import a "
"executor from a plugin",
executor_name,
)
with suppress(ImportError, AttributeError):
# Load plugins here for executors as at that time the plugins might not have been
# initialized yet
from airflow import plugins_manager
plugins_manager.integrate_executor_plugins()
return _import_and_validate(f"airflow.executors.{executor_name}"), ConnectorSource.PLUGIN
return _import_and_validate(executor_name), ConnectorSource.CUSTOM_PATH

@adh-wonolo can you include a full traceback (the one in the description has some strange formatting making it hard to read). And also an example of the executor path you're using? You can change the values if any of them are private, but try keep the formatting as similar as possible.

@o-nikolas
Copy link
Contributor

Also this backfill code has changed since I last touched it for AIP-51.
I'm confused why we're checking executor compatibility for the executor class on self.job (self.job.executor) instead of the executor that's passed into this method (executor) used on the line below (invalidating the above check for local anyway?):

executor_class, _ = ExecutorLoader.import_executor_cls(
self.job.executor_class,
)
if executor_class.is_local:
cfg_path = tmp_configuration_copy()
executor.queue_task_instance(

@potiuk It looks like you made that change in #30255, do you have any context on that?

@adh-wonolo
Copy link
Contributor Author

@o-nikolas the executor loads and functions normally for everything other than backfills which you can see in the logs too, its just that when executing

executor_class, _ = ExecutorLoader.import_executor_cls(
self.job.executor_class,
)
if executor_class.is_local:
cfg_path = tmp_configuration_copy()
executor.queue_task_instance(

All that's passed is the name of the Executor (in NomadExecutor) rather than the full path which the importer code can only resolve for core executors, hence why this could also be fixed by instead passing the path:
f"{self.job.executor.__class__.__module__}.{self.job.executor_class}" to that function instead or setting that as the value for self.job.executor_class more directly.

I'm storing the plugin in ./plugins/executor/nomad_executor.py

Here's the stacktrace:

$ airflow dags backfill clean_airflow_metadb --start-date 20230501 --end-date 20230612 --reset-dagruns
/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/cli/commands/dag_command.py:119 RemovedInAirflow3Warning: --ignore-first-depends-on-past is deprecated as the value is always set to True
[2023-06-27T09:54:21.489-0400] {dagbag.py:541} INFO - Filling up the DagBag from /Users/adh/repos/internal-tools-airflow/dags
You are about to delete these 2 tasks:
<TaskInstance: clean_airflow_metadb.clean_xcoms backfill__2023-05-01T00:00:00+00:00 [queued]>
<TaskInstance: clean_airflow_metadb.clean_xcoms backfill__2023-06-01T00:00:00+00:00 [scheduled]>

Are you sure? [y/n]
y
[2023-06-27T09:54:29.538-0400] {executor_loader.py:114} INFO - Loaded executor: plugins.executor.nomad_executor.NomadExecutor
/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/plugins_manager.py:258 RemovedInAirflow3Warning: This decorator is deprecated.

In previous versions, all subclasses of BaseOperator must use apply_default decorator for the `default_args` feature to work properly.

In current version, it is optional. The decorator is applied automatically using the metaclass.

2023-06-27 09:54:29,667 - [bugsnag] WARNING - No API key configured, couldn't notify
[2023-06-27T09:54:29.667-0400] {client.py:170} WARNING - No API key configured, couldn't notify
Traceback (most recent call last):
  File "/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/utils/module_loading.py", line 32, in import_string
    module_path, class_name = dotted_path.rsplit(".", 1)
ValueError: not enough values to unpack (expected 2, got 1)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/adh/.pyenv/versions/airflow310/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/__main__.py", line 48, in main
    args.func(args)
  File "/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/cli/cli_config.py", line 52, in command
    return func(*args, **kwargs)
  File "/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/utils/cli.py", line 112, in wrapper
    return f(*args, **kwargs)
  File "/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/cli/commands/dag_command.py", line 139, in dag_backfill
    _run_dag_backfill(dags, args)
  File "/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/cli/commands/dag_command.py", line 92, in _run_dag_backfill
    dag.run(
  File "/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/models/dag.py", line 2490, in run
    run_job(job=job, execute_callable=job_runner._execute)
  File "/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/utils/session.py", line 76, in wrapper
    return func(*args, session=session, **kwargs)
  File "/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/jobs/job.py", line 284, in run_job
    return execute_job(job, execute_callable=execute_callable)
  File "/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/jobs/job.py", line 313, in execute_job
    ret = execute_callable()
  File "/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/utils/session.py", line 76, in wrapper
    return func(*args, session=session, **kwargs)
  File "/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/jobs/backfill_job_runner.py", line 914, in _execute
    self._execute_dagruns(
  File "/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/utils/session.py", line 73, in wrapper
    return func(*args, **kwargs)
  File "/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/jobs/backfill_job_runner.py", line 801, in _execute_dagruns
    processed_dag_run_dates = self._process_backfill_task_instances(
  File "/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/jobs/backfill_job_runner.py", line 643, in _process_backfill_task_instances
    _per_task_process(key, ti, session)
  File "/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/jobs/backfill_job_runner.py", line 539, in _per_task_process
    executor_class, _ = ExecutorLoader.import_executor_cls(
  File "/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/executors/executor_loader.py", line 148, in import_executor_cls
    return _import_and_validate(executor_name), ConnectorSource.CUSTOM_PATH
  File "/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/executors/executor_loader.py", line 129, in _import_and_validate
    executor = import_string(path)
  File "/Users/adh/.pyenv/versions/3.10.7/envs/airflow310/lib/python3.10/site-packages/airflow/utils/module_loading.py", line 34, in import_string
    raise ImportError(f"{dotted_path} doesn't look like a module path")
ImportError: NomadExecutor doesn't look like a module path

Happy to answer any other questions!

@potiuk
Copy link
Member

potiuk commented Jun 27, 2023

I looked at it and it loooks like a remnant from an old past that sneaked-in when I was refactoring BaseJob. Seems that the "executor_class" which was stored in the BaseJob was actually a different than "executor_class" that has been used in a number of places to check executor compatibility - and using it from job was simply a mistake.

But I also found out that the "job.executor_class" is something of a dead-relic. It has not been used anywhere else (only in one place in tests where it was not really needed any more). So I took the liberty to remove it altogether.

I also applied the same fix as you did here @adh-wonolo, but with a small twist (there is no need to get class to run the is_local property - the way python works, if you have an object of the class, you can run class method/property directly on the object and it will use the class one if there is no object property defined.

So I will close this one in favour of mine: #32219

@potiuk
Copy link
Member

potiuk commented Jun 27, 2023

I also made you co-author of that change @adh-wonolo . thanks for letting us know and providing the fix proposal!

@potiuk
Copy link
Member

potiuk commented Jun 28, 2023

And merged / marked for 2.6.3 -> once agin thans @adh-wonolo for raising it and providing proposed fix (and thanks @o-nikolas for raising my attantion :).

@adh-wonolo
Copy link
Contributor Author

Thanks so much @potiuk! Looking forward to collaborate again in the future :)

@adh-wonolo adh-wonolo deleted the fix/backfill-custom-executor branch June 28, 2023 13:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler Scheduler or dag parsing Issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants