Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimise and migrate to SA2-compatible syntax for TaskReschedule #33720

Merged
merged 1 commit into from Oct 16, 2023

Conversation

Taragolis
Copy link
Contributor

@Taragolis Taragolis commented Aug 25, 2023

related: #28723


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added the area:core-operators Operators, Sensors and hooks within Core Airflow label Aug 25, 2023
@Taragolis
Copy link
Contributor Author

Interesting errors which happen on, it might refers either and error in code during change behaviour or the fact that something wrong with test on Postgres. Locally I've also have this error, wit flakey behaviour: 5-10 sequential fail, 2-3 success, and again failure

FAILED tests/sensors/test_base.py::TestBaseSensor::test_ok_with_reschedule - AssertionError: assert datetime.datetime(2023, 8, 27, 16, 39, 45, 511220, tzinfo=Timezone('UTC')) == datetime.datetime(2023, 8, 27, 16, 39, 55, 511220, tzinfo=Timezone('UTC'))
 +  where datetime.datetime(2023, 8, 27, 16, 39, 45, 511220, tzinfo=Timezone('UTC')) = <airflow.models.taskreschedule.TaskReschedule object at 0x7fee0a9bdf70>.start_date
FAILED tests/sensors/test_base.py::TestBaseSensor::test_ok_with_custom_reschedule_exception - AssertionError: assert datetime.datetime(2023, 8, 27, 16, 39, 46, 338455, tzinfo=Timezone('UTC')) == datetime.datetime(2023, 8, 27, 16, 40, 46, 338455, tzinfo=Timezone('UTC'))
 +  where datetime.datetime(2023, 8, 27, 16, 39, 46, 338455, tzinfo=Timezone('UTC')) = <airflow.models.taskreschedule.TaskReschedule object at 0x7fee0a900e80>.start_date
FAILED tests/sensors/test_base.py::TestBaseSensor::test_reschedule_and_retry_timeout - Failed: DID NOT RAISE <class 'airflow.exceptions.AirflowSensorTimeout'>
FAILED tests/sensors/test_base.py::TestBaseSensor::test_reschedule_and_retry_timeout_and_silent_fail - Failed: DID NOT RAISE <class 'airflow.exceptions.AirflowSensorTimeout'>


@classmethod
@provide_session
def find_last_for_task_instance(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we really need this method at all. The logic is rather simple and does not really provide any encapsulation benefits. (Same for find_for_task_instance above but it’s already there so there’s probably an argument for keeping it.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I change logic a bit methods

New (all marked as private):
stmt_for_task_instance -> Query builder
find_task_reschedule_for_ti -> Return single task_reschedule record, first or last
find_task_reschedules_for_ti -> replacement for find_for_task_instance, not used in production code, only in tests

Deprecated Methods, for backward compatibility
query_for_task_instance
find_for_task_instance

In general only find_task_reschedule_for_ti and stmt_for_task_instance uses, all other are redundant, and in theory could be dropped. However find_task_reschedules_for_ti help me to find my mistake during tests

Copy link
Member

@uranusjr uranusjr Aug 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I mean we don’t really need find_task_reschedule_for_ti and find_task_reschedules_for_ti since those are just one liners that pass most of the arguments (except session) to stmt_for_task_instance anyway. We can just call stmt_for_task_instance and execute the statement as appropriated directly where needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've implement by this way due to difference between legacy Query API and new Statement Based (i don't know correct name): you don't have to provide a DB session for build query statement

We could directly use stmt_for_task_instance into

  • airflow/models/taskinstance.py
  • airflow/ti_deps/deps/ready_to_reschedule.py

Unfortunetly we can't do that in airflow/sensors/base.py, because we do not open session here, however it is not a showstopper, if applicable I could create private method in BaseSensorOperator which would open a session, execute query against stmt_for_task_instance and return start_date

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don’t even need a private method, just use with create_session(), that’s what @provide_session does anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I just need to figure out how to deal with tests now, I'm not a fan of mocking DB execution if we could avoid it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just thought, are find_task_reschedule_for_ti / find_task_reschedules_for_ti would be required for AIP-44?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually might be keep only find_task_reschedule_for_ti and use it again, WDYT?

@Taragolis
Copy link
Contributor Author

wit flakey behaviour: 5-10 sequential fail, 2-3 success, and again failure

🤦 🤦 🤦 🤦 🤦 🤦 🤦 🤦 🤦 🤦 🤦 🤦 🤦
I've forgot about assignments: stmt = stmt.order_by() instead of stmt.order_by()

tests/ti_deps/deps/test_ready_to_reschedule_dep.py Outdated Show resolved Hide resolved
Comment on lines 206 to 172
@staticmethod
@provide_session
def query_for_task_instance(
task_instance: TaskInstance,
descending: bool = False,
session: Session = NEW_SESSION,
try_number: int | None = None,
) -> Query:
"""
Return query for task reschedules for a given the task instance (deprecated).

:param session: the database session object
:param task_instance: the task instance to find task reschedules for
:param descending: If True then records are returned in descending order
:param try_number: Look for TaskReschedule of the given try_number. Default is None which
looks for the same try_number of the given task_instance.
"""
warnings.warn(
"`query_for_task_instance` use SQLAlchemy's Legacy Query API.",
category=RemovedInAirflow3Warning,
stacklevel=2,
)

if try_number is None:
try_number = task_instance.try_number

TR = TaskReschedule
qry = session.query(TR).filter(
TR.dag_id == task_instance.dag_id,
TR.task_id == task_instance.task_id,
TR.run_id == task_instance.run_id,
TR.map_index == task_instance.map_index,
TR.try_number == try_number,
)
if descending:
return qry.order_by(desc(TR.id))
else:
return qry.order_by(asc(TR.id))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've decided to keep query_for_task_instance as is, because if some one use it before, than expects that this method return sqlalchemy.orm.Query and there is no direct replacement for this object, which are implements same methods.

If we think that this part never been a part of public interface, we could remove it, this PR also remove usage of this method

@Taragolis Taragolis marked this pull request as ready for review August 29, 2023 20:58
Comment on lines +107 to +117
def task_reschedules_for_ti():
def wrapper(ti):
with create_session() as session:
return session.scalars(TaskReschedule.stmt_for_task_instance(ti=ti, descending=False)).all()

return wrapper
Copy link
Member

@uranusjr uranusjr Aug 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def task_reschedules_for_ti():
def wrapper(ti):
with create_session() as session:
return session.scalars(TaskReschedule.stmt_for_task_instance(ti=ti, descending=False)).all()
return wrapper
def task_reschedules_for_ti(session):
def wrapper(ti):
return session.scalars(TaskReschedule.stmt_for_task_instance(ti=ti, descending=False)).all()
return wrapper

(I think this should work. session here is a function-level fixture.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be better try to avoid to keep session longer than it required

@Taragolis Taragolis force-pushed the optimise-taskreschedule branch 2 times, most recently from e44ee27 to 489f8bb Compare September 4, 2023 22:43
@phanikumv phanikumv added the full tests needed We need to run full set of tests for this PR to merge label Sep 5, 2023
@phanikumv
Copy link
Contributor

We need to run full tests on the changes related to SqlAlchemy 2.0 , hence applied the label

@phanikumv
Copy link
Contributor

@Taragolis can you please rebase so that we can run all the tests on this PR

@potiuk
Copy link
Member

potiuk commented Sep 5, 2023

@Taragolis can you please rebase so that we can run all the tests on this PR

I rebased it.

@Taragolis
Copy link
Contributor Author

@Taragolis Taragolis 3 weeks ago
I've just thought, are find_task_reschedule_for_ti / find_task_reschedules_for_ti would be required for AIP-44?

@Taragolis Taragolis 3 weeks ago
Actually might be keep only find_task_reschedule_for_ti and use it again, WDYT?

cc: @potiuk @phanikumv @uranusjr @mhenc @vincbeck

@phanikumv
Copy link
Contributor

@Taragolis could you please resolve the conflict here

Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
@ephraimbuddy ephraimbuddy merged commit 85fd0e1 into apache:main Oct 16, 2023
66 checks passed
@ephraimbuddy ephraimbuddy added the type:improvement Changelog: Improvements label Oct 16, 2023
@Taragolis Taragolis deleted the optimise-taskreschedule branch October 16, 2023 21:31
@eladkal eladkal added this to the Airflow 2.8.0 milestone Oct 17, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core-operators Operators, Sensors and hooks within Core Airflow full tests needed We need to run full set of tests for this PR to merge type:improvement Changelog: Improvements
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants