Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure that DAG processor job row has filed value in job_type column #31182

Merged
merged 4 commits into from
May 10, 2023

Conversation

AmFlint
Copy link
Contributor

@AmFlint AmFlint commented May 10, 2023

Description

We're running custom probes on the dag processor component.

Since the DagProcessor component is represented as a job_runner, we're running SQL queries to check the latest_heartbeat for the associated job object.

To speed up queries in big deployments, we're using a the index on job_type:

SELECT latest_heartbeat FROM job
  WHERE
    job_type = 'DagProcessorJob'
  AND
    hostname = '%s';

(hostname being the IP address of the k8s pod).

The problem is that there's a bug currently, and the job_type is always NULL for dag processor job_runners.

I added the missing line of code that should fix this issue.

@boring-cyborg boring-cyborg bot added the area:Scheduler Scheduler or dag parsing Issues label May 10, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented May 10, 2023

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@ephraimbuddy ephraimbuddy added this to the Airflow 2.6.1 milestone May 10, 2023
@ephraimbuddy ephraimbuddy added the type:bug-fix Changelog: Bug Fixes label May 10, 2023
@ashb ashb changed the title fix dag processor job_runner, add missing job_type Make sure that DAG processor job row has filed value in job_type column May 10, 2023
@ephraimbuddy
Copy link
Contributor

@ephraimbuddy ephraimbuddy merged commit 420a9b1 into apache:main May 10, 2023
46 checks passed
@boring-cyborg
Copy link

boring-cyborg bot commented May 10, 2023

Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions.

ephraimbuddy pushed a commit that referenced this pull request May 10, 2023
…lumn (#31182)

* fix dag processor job_runner, add missing job_type

* remove uneeded spaces

* move job_type check to base_runner

* fix test mock job runner pass job argument to init

(cherry picked from commit 420a9b1)
@AmFlint AmFlint deleted the fix-dagprocessor-jobtype branch May 10, 2023 17:41
@potiuk
Copy link
Member

potiuk commented May 11, 2023

FYI: This is what I was talking about -> result if we add the typing now.

It's ok how it is now, I won't complai, and I will solve it later.

But just FYI why it has been separated - previously Scheduler and BackfillJobRunner had Job only while the others could have either Job or JobPydantic. By bring it as a common field in Base Runner, we have no easy way to distinguish those two cases.

airflow/jobs/backfill_job_runner.py:258: error: Item "JobPydantic" of
"Union[Job, JobPydantic]" has no attribute "executor"  [union-attr]
            executor = self.job.executor
                       ^
airflow/jobs/backfill_job_runner.py:540: error: Argument 1 to
"import_executor_cls" of "ExecutorLoader" has incompatible type
"Union[Any, str, None]"; expected "str"  [arg-type]
                                self.job.executor_class,
                                ^
airflow/jobs/backfill_job_runner.py:898: error: Item "JobPydantic" of
"Union[Job, JobPydantic]" has no attribute "executor"  [union-attr]
            executor = self.job.executor
                       ^
airflow/jobs/backfill_job_runner.py:964: error: Item "JobPydantic" of
"Union[Job, JobPydantic]" has no attribute "executor"  [union-attr]
            queued_tis = self.job.executor.queued_tasks
                         ^
airflow/jobs/backfill_job_runner.py:966: error: Item "JobPydantic" of
"Union[Job, JobPydantic]" has no attribute "executor"  [union-attr]
            running_tis = self.job.executor.running
                          ^
airflow/jobs/backfill_job_runner.py:1005: error: Argument 4 to
"reduce_in_chunks" has incompatible type "Optional[int]"; expected "int" 
[arg-type]
    ...elpers.reduce_in_chunks(query, tis_to_reset, [], self.job.max_tis_per_...
                                                        ^
airflow/jobs/scheduler_job_runner.py:264: error: Item "JobPydantic" of
"Union[Job, JobPydantic]" has no attribute "executor"  [union-attr]
            self.job.executor.debug_dump()
            ^
airflow/jobs/scheduler_job_runner.py:280: error: Item "JobPydantic" of
"Union[Job, JobPydantic]" has no attribute "is_alive"  [union-attr]
                return self.job.is_alive(grace_multiplier=grace_multiplier...
                       ^
airflow/jobs/scheduler_job_runner.py:650: error: Item "JobPydantic" of
"Union[Job, JobPydantic]" has no attribute "executor"  [union-attr]
                self.job.executor.queue_command(
                ^
airflow/jobs/scheduler_job_runner.py:677: error: Item "JobPydantic" of
"Union[Job, JobPydantic]" has no attribute "executor"  [union-attr]
                max_tis = self.job.executor.slots_available
                          ^
airflow/jobs/scheduler_job_runner.py:679: error: Item "JobPydantic" of
"Union[Job, JobPydantic]" has no attribute "executor"  [union-attr]
    ...       max_tis = min(self.job.max_tis_per_query, self.job.executor.slo...
                                                        ^
airflow/jobs/scheduler_job_runner.py:690: error: Item "JobPydantic" of
"Union[Job, JobPydantic]" has no attribute "executor"  [union-attr]
            event_buffer = self.job.executor.get_event_buffer()
                           ^
airflow/jobs/scheduler_job_runner.py:772: error: Item "JobPydantic" of
"Union[Job, JobPydantic]" has no attribute "executor"  [union-attr]
                    or self.job.executor.has_task(ti)  # This scheduler ha...
                       ^
airflow/jobs/scheduler_job_runner.py:802: error: Item "JobPydantic" of
"Union[Job, JobPydantic]" has no attribute "executor"  [union-attr]
                        self.job.executor.send_callback(request)
                        ^
airflow/jobs/scheduler_job_runner.py:837: error: Item "JobPydantic" of
"Union[Job, JobPydantic]" has no attribute "executor"  [union-attr]
                self.job.executor.job_id = self.job.id
                ^
airflow/jobs/scheduler_job_runner.py:840: error: Item "JobPydantic" of
"Union[Job, JobPydantic]" has no attribute "executor"  [union-attr]
                    self.job.executor.callback_sink = PipeCallbackSink(
                    ^
airflow/jobs/scheduler_job_runner.py:847: error: Item "JobPydantic" of
"Union[Job, JobPydantic]" has no attribute "executor"  [union-attr]
                    self.job.executor.callback_sink = DatabaseCallbackSink...
                    ^
airflow/jobs/scheduler_job_runner.py:849: error: Item "JobPydantic" of
"Union[Job, JobPydantic]" has no attribute "executor"  [union-attr]
                self.job.executor.start()
                ^
airflow/jobs/scheduler_job_runner.py:879: error: Item "JobPydantic" of
"Union[Job, JobPydantic]" has no attribute "executor"  [union-attr]
                    self.job.executor.end()
                    ^
airflow/jobs/scheduler_job_runner.py:994: error: Item "JobPydantic" of
"Union[Job, JobPydantic]" has no attribute "executor"  [union-attr]
                        self.job.executor.heartbeat()
                        ^
airflow/jobs/scheduler_job_runner.py:1091: error: Item "JobPydantic" of
"Union[Job, JobPydantic]" has no attribute "executor"  [union-attr]
                if self.job.executor.slots_available <= 0:
                   ^
airflow/jobs/scheduler_job_runner.py:1477: error: Item "JobPydantic" of
"Union[Job, JobPydantic]" has no attribute "executor"  [union-attr]
                self.job.executor.send_callback(callback)
                ^
airflow/jobs/scheduler_job_runner.py:1504: error: Item "JobPydantic" of
"Union[Job, JobPydantic]" has no attribute "executor"  [union-attr]
            self.job.executor.send_callback(request)
            ^
airflow/jobs/scheduler_job_runner.py:1529: error: Item "JobPydantic" of
"Union[Job, JobPydantic]" has no attribute "executor"  [union-attr]
                tis_for_warning_message = self.job.executor.cleanup_stuck_...
                                          ^
airflow/jobs/scheduler_job_runner.py:1612: error: Item "JobPydantic" of
"Union[Job, JobPydantic]" has no attribute "executor"  [union-attr]
                        to_reset = self.job.executor.try_adopt_task_instan...
                                   ^
airflow/jobs/scheduler_job_runner.py:1710: error: Item "JobPydantic" of
"Union[Job, JobPydantic]" has no attribute "executor"  [union-attr]
                self.job.executor.send_callback(request)
                ^

@potiuk
Copy link
Member

potiuk commented May 12, 2023

Fix in #31240 (cc: @AmFlint @ephraimbuddy )

potiuk added a commit to potiuk/airflow that referenced this pull request May 14, 2023
By avoiding setting the job in the BaseJobRunner, the typing for Runners
and Job and JobPydantic is now more complete and accurate.

Scheduler and Backfill Runners limit their code to Job and can use all
the things that ORM Job allows them to do

Other runners are limited to union of Job and JobPydantic version so
that they can be run on the client side of the internal API without
having all the Job features.

This is a follow up after apache#31182 that fixed missing job_type for
DagProcessor Job and nicely extracted job to BaseRunner but broke
MyPy/Typing guards implemented in the runners that should aid the AIP-44
implementation.
potiuk added a commit that referenced this pull request May 15, 2023
By avoiding setting the job in the BaseJobRunner, the typing for Runners
and Job and JobPydantic is now more complete and accurate.

Scheduler and Backfill Runners limit their code to Job and can use all
the things that ORM Job allows them to do

Other runners are limited to union of Job and JobPydantic version so
that they can be run on the client side of the internal API without
having all the Job features.

This is a follow up after #31182 that fixed missing job_type for
DagProcessor Job and nicely extracted job to BaseRunner but broke
MyPy/Typing guards implemented in the runners that should aid the AIP-44
implementation.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler Scheduler or dag parsing Issues type:bug-fix Changelog: Bug Fixes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants