Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add subdir parameter to dags reserialize command #26170

Merged
merged 3 commits into from
Sep 6, 2022

Conversation

mik-laj
Copy link
Member

@mik-laj mik-laj commented Sep 5, 2022

Hi.
Today I looked at this command and noticed a few problems:

  • The DagBag.collect_dags method is called two times. The first time through DagBag.__init__, and the second time explicitly in a command code. This is not needed and causes performance degradation.
  • safe_mode has been overridden to false in the command code for no reason, which means more files are processed than needed.
  • Parameter subdir is not supported, which is inconsistent with the rest of the commands that create the DagBag instance. Whenever a DagBag is created by any other commands, it is possible to set the dag_folder using the subdir parameter including airflow dag list, airflow scheduler, and others.

I had a problem with choosing a PR title that would look good in a changelog, because we have 3 very related problems here, but I think adding a new CLI parameter is the most user-facing, The rest of the problems would not be (probably) noticed by anyone without looking at the code.

CC: @collinmcnulty, @potiuk, @uranusjr, @sfc-gh-mkmak

Best regards,
Kamil Breguła


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@@ -503,6 +503,5 @@ def dag_reserialize(args, session: Session = NEW_SESSION):
session.query(SerializedDagModel).delete(synchronize_session=False)

if not args.clear_only:
dagbag = DagBag()
dagbag.collect_dags(only_if_updated=False, safe_mode=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this line need to be kept?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, and I described in the PR description why it is not needed.

Copy link
Member

@uranusjr uranusjr Sep 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the arguments here are different from the call in __init__.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since DagBag().file_last_changed will be empty, I think it won't make a difference if only_if_updated is set to True or False

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only_if_update = False means the dags will be read again if the file has changed. In our case, we don't need to load the same files twice, so this parameter doesn't apply here.

@kaxil
Copy link
Member

kaxil commented Sep 6, 2022

Test failure @mik-laj :

___________________ TestCliTasks.test_run_get_serialized_dag ___________________
  
  self = <tests.cli.commands.test_task_command.TestCliTasks object at 0x7f2bcf86d890>
  mock_local_job = <MagicMock name='LocalTaskJob' id='139826142040912'>
  mock_get_dag_by_deserialization = <MagicMock name='get_dag_by_deserialization' id='139826142042192'>
  
      @mock.patch("airflow.cli.commands.task_command.get_dag_by_deserialization")
      @mock.patch("airflow.cli.commands.task_command.LocalTaskJob")
      def test_run_get_serialized_dag(self, mock_local_job, mock_get_dag_by_deserialization):
          """
          Test using serialized dag for local task_run
          """
          task_id = self.dag.task_ids[0]
          args = [
              'tasks',
              'run',
              '--ignore-all-dependencies',
              '--local',
              self.dag_id,
              task_id,
              self.run_id,
          ]
  >       mock_get_dag_by_deserialization.return_value = SerializedDagModel.get(self.dag_id).dag
  E       AttributeError: 'NoneType' object has no attribute 'dag'
  
  tests/cli/commands/test_task_command.py:152: AttributeError

@eladkal eladkal added this to the Airflow 2.4.0 milestone Sep 6, 2022
@potiuk potiuk merged commit 4be5616 into apache:main Sep 6, 2022
@jedcunningham jedcunningham added the type:improvement Changelog: Improvements label Sep 12, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:CLI type:improvement Changelog: Improvements
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants