New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid initiating SQLAlchemy session twice in dag_next_execution #35539
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, we can avoid a DB query with change. LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea to avoid crating a sesion is good, but this is not the right way. You should move the "get_is_paused()" to inside create_sesion() context manager AND pass the created session as parameter to teh "get_is_paused` method.
After you leave create_session
context manager, the session is not available any more.
In fact in this case in most cases the benefit will not be too big because in most cases when create_session() context manager completes, the session is closed()
, but in fact just returns it back to the sqlalchemy pool of sessions to be used (when pooling is enabled - and it is by default: https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#sql-alchemy-pool-enabled
So what really happens here:
get_is_paused() uses @provide_session which under the hood in this case will use another create_session
def provide_session(func: Callable[PS, RT]) -> Callable[PS, RT]:
@wraps(func)
def wrapper(*args, **kwargs) -> RT:
if "session" in kwargs or session_args_idx < len(args):
return func(*args, **kwargs)
else:
with create_session() as session: # <- HERE
return func(*args, session=session, **kwargs)
return wrapper
The session will be returned to the pool and when you run with create_session
- same session is reused from the pool.
After the change it works basically in the same way but in reverse order - first it retrieves and closes (returns to the pool) session with with create_session
and then will retrieve the session from the pool with @provide_session
But you can achieve what you want by doing this (without returning the session to the pool temporarily):
with create_session() as session:
last_parsed_dag: DagModel = session.scalars(
select(DagModel).where(DagModel.dag_id == dag.dag_id)
).one()
if last_parsed_dag.get_is_paused(session):
That will actually reuse the session (we will still be in the context manager and we will pass the session as parameter so that provide_session can use it rather than create a new one.
by moving the method
Well, not even that :). When you look into |
@potiuk there are two methods, one in Dag and the other in DagModel, this change uses the second one which returns the value from DagModel instance without querying the DB. |
aaaaaaaa
AAAAAAAHHHH.. Looked at wrong get_is_paused 🤦 |
No-issue
Modifications:
We don't have to initiate SQLAlchemy session twice when we call the method
get_is_paused
dag.get_is_paused()
will query the database which is redundant.^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.