Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BugFix: Fix writing & deleting Dag Code for Serialized DAGs #8151

Merged
merged 4 commits into from Apr 5, 2020

Conversation

kaxil
Copy link
Member

@kaxil kaxil commented Apr 5, 2020

With store_dag_code enabled scheduler throws the following error:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line
297, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 158, in _run_file_processor
    pickle_dags)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py",
line 74, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 1582, in process_file
    dag.sync_to_db()
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py",
line 74, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dag.py",
line 1519, in sync_to_db
    DagCode.bulk_sync_to_db([dag.fileloc for dag in orm_dag])
TypeError: 'DagModel' object is not iterable

This is a blocker for 1.10.10 and the bug was found in 1.10.10rc2 by Kostya Esmukov


Make sure to mark the boxes below before creating PR: [x]


In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

@kaxil kaxil requested a review from ashb April 5, 2020 01:58
@kaxil kaxil added this to the Airflow 1.10.10 milestone Apr 5, 2020
@codecov-io
Copy link

codecov-io commented Apr 5, 2020

Codecov Report

Merging #8151 into v1-10-test will decrease coverage by 0.51%.
The diff coverage is 100.00%.

Impacted file tree graph

@@              Coverage Diff               @@
##           v1-10-test    #8151      +/-   ##
==============================================
- Coverage       81.81%   81.29%   -0.52%     
==============================================
  Files             541      541              
  Lines           37178    37178              
==============================================
- Hits            30417    30225     -192     
- Misses           6761     6953     +192     
Impacted Files Coverage Δ
airflow/models/dagcode.py 92.13% <ø> (ø)
airflow/models/dag.py 92.44% <100.00%> (+0.13%) ⬆️
airflow/operators/postgres_operator.py 0.00% <0.00%> (-100.00%) ⬇️
airflow/contrib/kubernetes/volume_mount.py 33.33% <0.00%> (-66.67%) ⬇️
airflow/contrib/kubernetes/volume.py 50.00% <0.00%> (-50.00%) ⬇️
airflow/contrib/kubernetes/pod_launcher.py 45.11% <0.00%> (-47.37%) ⬇️
airflow/contrib/kubernetes/pod_generator.py 42.50% <0.00%> (-45.00%) ⬇️
...rflow/contrib/operators/kubernetes_pod_operator.py 56.84% <0.00%> (-40.01%) ⬇️
...flow/contrib/operators/postgres_to_gcs_operator.py 51.51% <0.00%> (-36.37%) ⬇️
airflow/contrib/kubernetes/refresh_config.py 50.98% <0.00%> (-23.53%) ⬇️
... and 8 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update f3c4924...a87f1b3. Read the comment docs.

@kaxil
Copy link
Member Author

kaxil commented Apr 5, 2020

Tests have passed, failing tests is for Generating requirements.txt and is unrelated

@KostyaEsmukov
Copy link
Contributor

@kaxil Thank you for the patch! I've applied it on my installation, and the dag_code table filled with rows.

However, new errors appeared in scheduler:

Process DagFileProcessor35-Process:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 158, in _run_file_processor
    pickle_dags)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1582, in process_file
    dag.sync_to_db()
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dag.py", line 1519, in sync_to_db
    DagCode.bulk_sync_to_db([orm_dag.fileloc], session=session)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 70, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dagcode.py", line 131, in bulk_sync_to_db
    orm_dag_code.last_updated = timezone.utcnow()
UnboundLocalError: local variable 'orm_dag_code' referenced before assignment

and

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/dag_processing.py", line 624, in _run_processor_manager
    processor_manager.start()
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/dag_processing.py", line 856, in start
    self._refresh_dag_dir()
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/dag_processing.py", line 928, in _refresh_dag_dir
    DagCode.remove_deleted_code(self._file_paths)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dagcode.py", line 151, in remove_deleted_code
    cls.fileloc.notin_(alive_dag_filelocs))).delete())
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3789, in delete
    delete_op.exec_()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1696, in exec_
    self._do_pre_synchronize()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1778, in _do_pre_synchronize
    from_=err,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 178, in raise_
    raise exception
sqlalchemy.exc.InvalidRequestError: Could not evaluate current criteria in Python: "Cannot evaluate clauselist with operator <function comma_op at 0x7f14acf61440>".
 Specify 'fetch' or False for the synchronize_session parameter.

Do these look like related bugs?

@potiuk
Copy link
Member

potiuk commented Apr 5, 2020

Tests have passed, failing tests is for Generating requirements.txt and is unrelated

Yep. Rebase should help with that.

@kaxil kaxil changed the title BugFix: Fix writing Dag Code for Serialized DAGs BugFix: Fix writing & deleting Dag Code for Serialized DAGs Apr 5, 2020
@kaxil
Copy link
Member Author

kaxil commented Apr 5, 2020

@KostyaEsmukov Can you please check now with the patch in this PR. 🤞 All problems should be solved.

I have tested locally and DagCode is successfully saved in the DB and overwritten when a file has changed too

@kaxil kaxil merged commit 892d4db into apache:v1-10-test Apr 5, 2020
@kaxil kaxil deleted the v1-10-test branch April 5, 2020 12:59
@KostyaEsmukov
Copy link
Contributor

@kaxil Thanks for the fix! The scheduler errors are gone, but on a stateless webserver the code is still not rendered, because, apparently, DagCode instance always tries to read the code from a local file during initialization:

>>> DagCode(dag_orm.fileloc)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<string>", line 4, in __init__
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 69, in __exit__
    exc_value, with_traceback=exc_tb,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 178, in raise_
    raise exception
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dagcode.py", line 59, in __init__
    self.source_code = DagCode._read_code(self.fileloc)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dagcode.py", line 63, in _read_code
    with open_maybe_zipped(fileloc, 'r') as source:
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/file.py", line 102, in open_maybe_zipped
    return io.open(fileloc, mode=mode)
FileNotFoundError: [Errno 2] No such file or directory: '<...my_dag...>'

Notice the _read_code line.

@kaxil
Copy link
Member Author

kaxil commented Apr 5, 2020

@kaxil Thanks for the fix! The scheduler errors are gone, but on a stateless webserver the code is still not rendered, because, apparently, DagCode instance always tries to read the code from a local file during initialization:

>>> DagCode(dag_orm.fileloc)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<string>", line 4, in __init__
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 69, in __exit__
    exc_value, with_traceback=exc_tb,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 178, in raise_
    raise exception
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dagcode.py", line 59, in __init__
    self.source_code = DagCode._read_code(self.fileloc)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dagcode.py", line 63, in _read_code
    with open_maybe_zipped(fileloc, 'r') as source:
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/file.py", line 102, in open_maybe_zipped
    return io.open(fileloc, mode=mode)
FileNotFoundError: [Errno 2] No such file or directory: '<...my_dag...>'

Notice the _read_code line.

Are you on Airflow Slack (https://apache-airflow-slack.herokuapp.com/), would you mind signing up if you are not so that we can check this together :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants