Skip to content

Allow retrieving Variables, Connections, XCom without breaking HA Locks#14016

Closed
kaxil wants to merge 1 commit intoapache:masterfrom
astronomer:allow-getting-vars
Closed

Allow retrieving Variables, Connections, XCom without breaking HA Locks#14016
kaxil wants to merge 1 commit intoapache:masterfrom
astronomer:allow-getting-vars

Conversation

@kaxil
Copy link
Member

@kaxil kaxil commented Feb 2, 2021

closes #13811

Previously, because we use create_session context manager with or
without @provide_session: we always used to commit the session if
the session is not explicitly passed to a function example:

dag_run = task_instance.get_dagrun()
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/session.py", line 65, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.7/contextlib.py", line 119, in __exit__
    next(self.gen)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/session.py", line 32, in create_session
    session.commit()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1042, in commit
    self.transaction.commit()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 504, in commit
    self._prepare_impl()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 472, in _prepare_impl
    self.session.dispatch.before_commit(self.session)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/event/attr.py", line 322, in __call__
    fn(*args, **kw)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/sqlalchemy.py", line 217, in _validate_commit
    raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!")

The same happens when using Variable.get() because of

@provide_session
def get_variable(self, key: str, session=None):
"""
Get Airflow Variable from Metadata DB
:param key: Variable Key
:type key: str
:return: Variable Value
"""
from airflow.models.variable import Variable
var_value = session.query(Variable).filter(Variable.key == key).first()
session.expunge_all()
if var_value:
return var_value.val
return None

This commit makes sure that we only commit the session if we add a new object or modify an existing object in the session
by using session._is_clean:

https://github.com/sqlalchemy/sqlalchemy/blob/25ee5a05df0daeb7dc7ba432172d6abc76ffab56/lib/sqlalchemy/orm/session.py#L3236-L3241

    def _is_clean(self):
        return (
            not self.identity_map.check_modified()
            and not self._deleted
            and not self._new
        )

^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@kaxil kaxil requested a review from ashb February 2, 2021 02:25
@kaxil kaxil added this to the Airflow 2.0.1 milestone Feb 2, 2021
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both the file changes are not strictly needed. Either of the two file changes are sufficient.

In one cases we commit an empty session and in other case we don't commit a session if it is empty.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can't be here - even if the session is clean, if we allow a COMMIT the lock will be released

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I tried with session.execute("update dag set is_active=False where dag_id='example_branch_operator'") where session._is_clean() is True -- so this logic does not work -- going to close the PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this either

closes apache#13811

Previously, because we use `create_session` context manager with or
without `@provide_session`: we always used to commit the session if
the session is not explicitly passed to a function example:

```
dag_run = task_instance.get_dagrun()
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/session.py", line 65, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.7/contextlib.py", line 119, in __exit__
    next(self.gen)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/session.py", line 32, in create_session
    session.commit()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1042, in commit
    self.transaction.commit()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 504, in commit
    self._prepare_impl()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 472, in _prepare_impl
    self.session.dispatch.before_commit(self.session)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/event/attr.py", line 322, in __call__
    fn(*args, **kw)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/sqlalchemy.py", line 217, in _validate_commit
    raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!")
```

The same happens when using `Variable.get()` because of
https://github.com/apache/airflow/blob/594069ee061e9839b2b12aa43aa3a23e05beed86/airflow/secrets/metastore.py#L55-L70

This commit makes sure that we only commit the session if we add a new object or modify an existing object in the session
by using `session._is_clean`:

https://github.com/sqlalchemy/sqlalchemy/blob/25ee5a05df0daeb7dc7ba432172d6abc76ffab56/lib/sqlalchemy/orm/session.py#L3236-L3241

```python
    def _is_clean(self):
        return (
            not self.identity_map.check_modified()
            and not self._deleted
            and not self._new
        )

```
@kaxil kaxil force-pushed the allow-getting-vars branch from 1b02611 to 6482502 Compare February 2, 2021 13:24
@kaxil kaxil closed this Feb 2, 2021
@kaxil kaxil deleted the allow-getting-vars branch February 2, 2021 13:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

get_dagrun() function starts giving error in 2.0 when using inside cluster policy

3 participants

Comments