Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: sqa deprecations for airflow task cmd #39244

Merged

Conversation

dondaum
Copy link
Contributor

@dondaum dondaum commented Apr 24, 2024

related: #28723

fix deprecations for SQLAlchemy 2.0 for Airflow core task command.

SQLAlchemy 2.0 is changing the behavior when an object is being merged into a Session along the backref cascade. Until SQLAlchemy 1.4 and assuming a bidirectional relationship between a TaskInstance and a DagRun, if a DagRun object is already in a Session the TaskInstance object gets put into the Session as well. This behavior is deprecated for removal in SQLAlchemy 2.0.

In order to mentain the actual behavior and to fix the warning, we need to ensure that both objects are either not in the session or are in the session when they are associated with each other. See here for more information.

Reported in core

  • airflow/cli/commands/task_command.py:202

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

Copy link
Collaborator

@dirrao dirrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any way test the same?

@dondaum
Copy link
Contributor Author

dondaum commented Apr 25, 2024

is there any way test the same?

Yeah it would be great to have a test for it.

A good test should test the current behavior in SQLAlchemy 1.4 against 2.0.

I guess locally one could install SQLAlchemy 2.0 and run it against. Perhaps we could also add a new CI workflow that upgrades to SQLAlchemy 2.0 and run all tests ?

@dondaum dondaum force-pushed the fix/sqa-deprecations-airflow-core-task-cmd branch from 3052622 to cb03e44 Compare April 26, 2024 09:09
@dondaum
Copy link
Contributor Author

dondaum commented Apr 26, 2024

is there any way test the same?

is there any way test the same?

Yeah it would be great to have a test for it.

A good test should test the current behavior in SQLAlchemy 1.4 against 2.0.

I guess locally one could install SQLAlchemy 2.0 and run it against. Perhaps we could also add a new CI workflow that upgrades to SQLAlchemy 2.0 and run all tests ?

I tried to make all tests run with SQLAlchemy 2.0 but it is a lot of effort to adjust and fix everything (to make Airflow core compatible with SQLAlchemy 2.0).

Instead I created a small test setup and verified that the fix indeed preserve the current behavior.

SQLAlchemy 1.4

from sqlalchemy import Column, Integer, String, create_engine, text, ForeignKeyConstraint, UniqueConstraint, select
from sqlalchemy.orm import Session, declarative_base, relationship


Base = declarative_base()



class TaskInstance(Base):
    __tablename__ = "task_instance"

    task_id = Column(String(50), primary_key=True, nullable=False)
    dag_id = Column(String(50), primary_key=True, nullable=False)
    run_id = Column(String(50), primary_key=True, nullable=False)
    map_index = Column(Integer, primary_key=True, nullable=False, server_default=text("-1"))

    __table_args__ = (
        ForeignKeyConstraint(
            [dag_id, run_id],
            ["dag_run.dag_id", "dag_run.run_id"],
            name="task_instance_dag_run_fkey",
            ondelete="CASCADE",
        ),
    )

    dag_run = relationship("DagRun", back_populates="task_instances", lazy="joined", innerjoin=True)

class DagRun(Base):
    __tablename__ = "dag_run"

    id = Column(Integer, primary_key=True)
    dag_id = Column(String(50), nullable=False)
    run_id = Column(String(50), nullable=False)
    
    task_instances = relationship(
        TaskInstance, back_populates="dag_run", cascade="save-update, merge, delete, delete-orphan"
    )

    __table_args__ = (
        UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"),
    )



engine = create_engine("sqlite://", echo=False, future=True)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)


with Session(engine) as session:
    dag_run = DagRun(id=1, dag_id="dag_1", run_id="id_1")
    session.add(dag_run)
    session.commit()


# Simulate current behavior
with Session(engine) as session:
    dag_run = session.scalars(select(DagRun).where(DagRun.id == 1)).one()
    print(dag_run)
    print("Dag run in session:", dag_run in session)


    ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run)
    print("TaskInstance in session:", ti in session)

    session.commit()


# Check if task instance is in db
with Session(engine) as session:
    all_tis = session.scalars(select(TaskInstance).where(TaskInstance.dag_id == "dag_1")).all()
    print(all_tis)

SQLAlchemy 1.4 output

vscode ➜ /workspaces/sqa2backref/backpop $ python sqa_1_4.py 
<__main__.DagRun object at 0x7f68a696c0d0>
Dag run in session: True
/workspaces/sqa2backref/backpop/sqa_1_4.py:63: RemovedIn20Warning: Deprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. To prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to "sqlalchemy<2.0". Set environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings.  Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message. (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
  ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run)
TaskInstance in session: True
[<__main__.TaskInstance object at 0x7f68a6992c90>

SQLAlchemy 2.0

from sqlalchemy import Column, Integer, String, create_engine, text, ForeignKeyConstraint, UniqueConstraint, select
from sqlalchemy.orm import Session, declarative_base, relationship


Base = declarative_base()



class TaskInstance(Base):
    __tablename__ = "task_instance"

    task_id = Column(String(50), primary_key=True, nullable=False)
    dag_id = Column(String(50), primary_key=True, nullable=False)
    run_id = Column(String(50), primary_key=True, nullable=False)
    map_index = Column(Integer, primary_key=True, nullable=False, server_default=text("-1"))

    __table_args__ = (
        ForeignKeyConstraint(
            [dag_id, run_id],
            ["dag_run.dag_id", "dag_run.run_id"],
            name="task_instance_dag_run_fkey",
            ondelete="CASCADE",
        ),
    )

    dag_run = relationship("DagRun", back_populates="task_instances", lazy="joined", innerjoin=True)

class DagRun(Base):
    __tablename__ = "dag_run"

    id = Column(Integer, primary_key=True)
    dag_id = Column(String(50), nullable=False)
    run_id = Column(String(50), nullable=False)
    
    task_instances = relationship(
        TaskInstance, back_populates="dag_run", cascade="save-update, merge, delete, delete-orphan"
    )

    __table_args__ = (
        UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"),
    )



engine = create_engine("sqlite://", echo=False, future=True)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)


with Session(engine) as session:
    dag_run = DagRun(id=1, dag_id="dag_1", run_id="id_1")
    session.add(dag_run)
    session.commit()


# Simulate current behavior
with Session(engine) as session:
    dag_run = session.scalars(select(DagRun).where(DagRun.id == 1)).one()
    print(dag_run)
    print("Dag run in session:", dag_run in session)


    ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run)
    print("TaskInstance in session:", ti in session)

    session.commit()


# Check if task instance is in db
with Session(engine) as session:
    all_tis = session.scalars(select(TaskInstance).where(TaskInstance.dag_id == "dag_1")).all()
    print(all_tis)

SQLAlchemy 2.0 output

vscode ➜ /workspaces/sqa2backref/backpop $ python sqa_2_0.py 
<__main__.DagRun object at 0x7f24bad1f850>
Dag run in session: True
TaskInstance in session: False
[]

SQLAlchemy 2.0 with fix

from sqlalchemy import Column, Integer, String, create_engine, text, ForeignKeyConstraint, UniqueConstraint, select
from sqlalchemy.orm import Session, declarative_base, relationship


Base = declarative_base()



class TaskInstance(Base):
    __tablename__ = "task_instance"

    task_id = Column(String(50), primary_key=True, nullable=False)
    dag_id = Column(String(50), primary_key=True, nullable=False)
    run_id = Column(String(50), primary_key=True, nullable=False)
    map_index = Column(Integer, primary_key=True, nullable=False, server_default=text("-1"))

    __table_args__ = (
        ForeignKeyConstraint(
            [dag_id, run_id],
            ["dag_run.dag_id", "dag_run.run_id"],
            name="task_instance_dag_run_fkey",
            ondelete="CASCADE",
        ),
    )

    dag_run = relationship("DagRun", back_populates="task_instances", lazy="joined", innerjoin=True)

class DagRun(Base):
    __tablename__ = "dag_run"

    id = Column(Integer, primary_key=True)
    dag_id = Column(String(50), nullable=False)
    run_id = Column(String(50), nullable=False)
    
    task_instances = relationship(
        TaskInstance, back_populates="dag_run", cascade="save-update, merge, delete, delete-orphan"
    )

    __table_args__ = (
        UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"),
    )



engine = create_engine("sqlite://", echo=False, future=True)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)


with Session(engine) as session:
    dag_run = DagRun(id=1, dag_id="dag_1", run_id="id_1")
    session.add(dag_run)
    session.commit()


# Simulate current behavior
with Session(engine) as session:
    dag_run = session.scalars(select(DagRun).where(DagRun.id == 1)).one()
    print(dag_run)
    print("Dag run in session:", dag_run in session)


    ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run)
    print("TaskInstance in session:", ti in session)

    session.add(ti)  # <-- fix

    session.commit()


# Check if task instance is in db
with Session(engine) as session:
    all_tis = session.scalars(select(TaskInstance).where(TaskInstance.dag_id == "dag_1")).all()
    print(all_tis)

SQLAlchemy 2.0 with fix output

vscode ➜ /workspaces/sqa2backref/backpop $ python sqa_2_0_fix.py
<__main__.DagRun object at 0x7efc93103e10>
Dag run in session: True
TaskInstance in session: False
[<__main__.TaskInstance object at 0x7efc937081d0>

Copy link
Contributor

@Taragolis Taragolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to make all tests run with SQLAlchemy 2.0 but it is a lot of effort to adjust and fix everything

No need to spend time for test on actual SA20, we still far away of run Airflow on it. Main point sure that it works on SA14

@Taragolis Taragolis added this to the Airflow 2.9.2 milestone May 4, 2024
@Taragolis Taragolis added the type:bug-fix Changelog: Bug Fixes label May 4, 2024
@Taragolis Taragolis merged commit dbdf743 into apache:main May 4, 2024
39 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:CLI type:bug-fix Changelog: Bug Fixes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants