Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove MSSQL support form Airflow core #36514

Merged
merged 4 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
74 changes: 0 additions & 74 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,14 @@ jobs:
postgres-versions: ${{ steps.selective-checks.outputs.postgres-versions }}
default-postgres-version: ${{ steps.selective-checks.outputs.default-postgres-version }}
mysql-versions: ${{ steps.selective-checks.outputs.mysql-versions }}
mssql-versions: ${{ steps.selective-checks.outputs.mssql-versions }}
default-mysql-version: ${{ steps.selective-checks.outputs.default-mysql-version }}
default-helm-version: ${{ steps.selective-checks.outputs.default-helm-version }}
default-kind-version: ${{ steps.selective-checks.outputs.default-kind-version }}
full-tests-needed: ${{ steps.selective-checks.outputs.full-tests-needed }}
parallel-test-types-list-as-string: >-
${{ steps.selective-checks.outputs.parallel-test-types-list-as-string }}
mssql-parallelism: ${{ steps.selective-checks.outputs.mssql-parallelism }}
postgres-exclude: ${{ steps.selective-checks.outputs.postgres-exclude }}
mysql-exclude: ${{ steps.selective-checks.outputs.mysql-exclude }}
mssql-exclude: ${{ steps.selective-checks.outputs.mssql-exclude }}
sqlite-exclude: ${{ steps.selective-checks.outputs.sqlite-exclude }}
skip-provider-tests: ${{ steps.selective-checks.outputs.skip-provider-tests }}
run-tests: ${{ steps.selective-checks.outputs.run-tests }}
Expand Down Expand Up @@ -1236,69 +1233,6 @@ jobs:
if: failure()


tests-mssql:
timeout-minutes: 130
name: >
DB:MSSQL${{matrix.mssql-version}}, Py${{matrix.python-version}}:
${{needs.build-info.outputs.parallel-test-types-list-as-string}}
runs-on: ${{fromJSON(needs.build-info.outputs.runs-on)}}
needs: [build-info, wait-for-ci-images]
strategy:
matrix:
python-version: "${{fromJson(needs.build-info.outputs.python-versions)}}"
mssql-version: "${{fromJson(needs.build-info.outputs.mssql-versions)}}"
exclude: "${{fromJson(needs.build-info.outputs.mssql-exclude)}}"
fail-fast: false
env:
RUNS_ON: "${{needs.build-info.outputs.runs-on}}"
PARALLEL_TEST_TYPES: "${{needs.build-info.outputs.parallel-test-types-list-as-string}}"
PR_LABELS: "${{needs.build-info.outputs.pull-request-labels}}"
FULL_TESTS_NEEDED: "${{needs.build-info.outputs.full-tests-needed}}"
DEBUG_RESOURCES: "${{needs.build-info.outputs.debug-resources}}"
BACKEND: "mssql"
ENABLE_COVERAGE: "${{needs.build-info.outputs.run-coverage}}"
PYTHON_MAJOR_MINOR_VERSION: "${{matrix.python-version}}"
MSSQL_VERSION: "${{matrix.mssql-version}}"
BACKEND_VERSION: "${{matrix.mssql-version}}"
JOB_ID: "mssql-${{matrix.mssql-version}}-${{matrix.python-version}}"
# The below (and corresponding selective checks code) can be removed once
# https://github.com/apache/airflow/issues/31575 is fixed.
# This is a temporary workaround for flaky tests that occur in MSSQL tests for public runners
PARALLELISM: "${{needs.build-info.outputs.mssql-parallelism}}"
if: >
needs.build-info.outputs.run-tests == 'true' &&
(needs.build-info.outputs.is-self-hosted-runner == 'true' &&
needs.build-info.outputs.is-airflow-runner == 'true' ||
needs.build-info.outputs.full-tests-needed == 'true' ||
needs.build-info.outputs.has-migrations == 'true')
steps:
- name: Cleanup repo
shell: bash
run: docker run -v "${GITHUB_WORKSPACE}:/workspace" -u 0:0 bash -c "rm -rf /workspace/*"
- name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
uses: actions/checkout@v4
with:
persist-credentials: false
- name: "Prepare breeze & CI image: ${{matrix.python-version}}:${{env.IMAGE_TAG}}"
uses: ./.github/actions/prepare_breeze_and_image
- name: >
Migration Tests:
${{matrix.python-version}}:${{needs.build-info.outputs.parallel-test-types-list-as-string}}
uses: ./.github/actions/migration_tests
- name: >
Tests: ${{matrix.python-version}}:${{needs.build-info.outputs.parallel-test-types-list-as-string}}
run: >
breeze testing db-tests
--parallel-test-types "${{needs.build-info.outputs.parallel-test-types-list-as-string}}"
- name: >
Post Tests success: MsSQL"
uses: ./.github/actions/post_tests_success
if: success()
- name: >
Post Tests failure: MsSQL"
uses: ./.github/actions/post_tests_failure
if: failure()

tests-sqlite:
timeout-minutes: 130
name: >
Expand Down Expand Up @@ -1507,20 +1441,12 @@ jobs:
BACKEND: "mysql"
BACKEND_VERSION: ${{needs.build-info.outputs.default-mysql-version}}
MYSQL_VERSION: ${{needs.build-info.outputs.default-mysql-version}}
- name: >
Tests: mssql:${{needs.build-info.outputs.default-python-version}}:Quarantined
run: breeze testing tests || true
env:
BACKEND: "mssql"
BACKEND_VERSION: ${{needs.build-info.outputs.default-mssql-version}}
MSSQL_VERSION: ${{needs.build-info.outputs.default-mssql-version}}
- name: >
Tests: sqlite:${{needs.build-info.outputs.default-python-version}}:Quarantined
run: breeze testing tests || true
env:
BACKEND: "sqlite"
BACKEND_VERSION: ""
MSSQL_VERSION: ""
- name: >
Post Tests success: Quarantined"
uses: ./.github/actions/post_tests_success
Expand Down
5 changes: 1 addition & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,9 @@ Apache Airflow is tested with:
| PostgreSQL | 12, 13, 14, 15, 16 | 12, 13, 14, 15, 16 |
| MySQL | 8.0, Innovation | 8.0, Innovation |
| SQLite | 3.15.0+ | 3.15.0+ |
| MSSQL | 2017(\*\*), 2019(\*\*) | 2017(\*\*), 2019(\*\*) |

\* Experimental

\*\* **Discontinued in 2.9.0**, not recommended for the new installation

**Note**: MySQL 5.x versions are unable to or have limitations with
running multiple schedulers -- please see the [Scheduler docs](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/scheduler.html).
MariaDB is not tested/recommended.
Expand Down Expand Up @@ -388,7 +385,7 @@ The important dependencies are:

* `SQLAlchemy`: upper-bound to specific MINOR version (SQLAlchemy is known to remove deprecations and
introduce breaking changes especially that support for different Databases varies and changes at
various speed (example: SQLAlchemy 1.4 broke MSSQL integration for Airflow)
various speed)
* `Alembic`: it is important to handle our migrations in predictable and performant way. It is developed
together with SQLAlchemy. Our experience with Alembic is that it very stable in MINOR version
* `Flask`: We are using Flask as the back-bone of our web UI and API. We know major version of Flask
Expand Down
7 changes: 0 additions & 7 deletions airflow/cli/commands/db_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,6 @@ def shell(args):
env["PGPASSWORD"] = url.password or ""
env["PGDATABASE"] = url.database
execute_interactive(["psql"], env=env)
elif url.get_backend_name() == "mssql":
env = os.environ.copy()
env["MSSQL_CLI_SERVER"] = url.host
env["MSSQL_CLI_DATABASE"] = url.database
env["MSSQL_CLI_USER"] = url.username
env["MSSQL_CLI_PASSWORD"] = url.password
execute_interactive(["mssql-cli"], env=env)
else:
raise AirflowException(f"Unknown driver: {url.drivername}")

Expand Down
46 changes: 19 additions & 27 deletions airflow/jobs/backfill_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,33 +266,25 @@ def _manage_executor_state(
executor = self.job.executor
# list of tuples (dag_id, task_id, execution_date, map_index) of running tasks in executor
buffered_events = list(executor.get_event_buffer().items())
if session.get_bind().dialect.name == "mssql":
# SQL Server doesn't support multiple column subqueries
# TODO: Remove this once we drop support for SQL Server (#35868)
need_refresh = True
running_dict = {(ti.dag_id, ti.task_id, ti.run_id, ti.map_index): ti for ti in running.values()}
else:
running_tis_ids = [
(key.dag_id, key.task_id, key.run_id, key.map_index)
for key, _ in buffered_events
if key in running
]
# list of TaskInstance of running tasks in executor (refreshed from db in batch)
refreshed_running_tis = session.scalars(
select(TaskInstance).where(
tuple_(
TaskInstance.dag_id,
TaskInstance.task_id,
TaskInstance.run_id,
TaskInstance.map_index,
).in_(running_tis_ids)
)
).all()
# dict of refreshed TaskInstance by key to easily find them
running_dict = {
(ti.dag_id, ti.task_id, ti.run_id, ti.map_index): ti for ti in refreshed_running_tis
}
need_refresh = False
running_tis_ids = [
(key.dag_id, key.task_id, key.run_id, key.map_index)
for key, _ in buffered_events
if key in running
]
# list of TaskInstance of running tasks in executor (refreshed from db in batch)
refreshed_running_tis = session.scalars(
select(TaskInstance).where(
tuple_(
TaskInstance.dag_id,
TaskInstance.task_id,
TaskInstance.run_id,
TaskInstance.map_index,
).in_(running_tis_ids)
)
).all()
# dict of refreshed TaskInstance by key to easily find them
running_dict = {(ti.dag_id, ti.task_id, ti.run_id, ti.map_index): ti for ti in refreshed_running_tis}
need_refresh = False

for key, value in buffered_events:
state, info = value
Expand Down
4 changes: 1 addition & 3 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1833,9 +1833,7 @@ def _orphan_unreferenced_datasets(self, session: Session = NEW_SESSION) -> None:
TaskOutletDatasetReference,
isouter=True,
)
# MSSQL doesn't like it when we select a column that we haven't grouped by. All other DBs let us
# group by id and select all columns.
.group_by(DatasetModel if session.get_bind().dialect.name == "mssql" else DatasetModel.id)
.group_by(DatasetModel.id)
.having(
and_(
func.count(DagScheduleDatasetReference.dag_id) == 0,
Expand Down
2 changes: 0 additions & 2 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ class DagRun(Base, LoggingMixin):
"state",
"dag_id",
postgresql_where=text("state='running'"),
mssql_where=text("state='running'"),
sqlite_where=text("state='running'"),
),
# since mysql lacks filtered/partial indices, this creates a
Expand All @@ -176,7 +175,6 @@ class DagRun(Base, LoggingMixin):
"state",
"dag_id",
postgresql_where=text("state='queued'"),
mssql_where=text("state='queued'"),
sqlite_where=text("state='queued'"),
),
)
Expand Down
6 changes: 3 additions & 3 deletions airflow/models/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class DagScheduleDatasetReference(Base):

__tablename__ = "dag_schedule_dataset_reference"
__table_args__ = (
PrimaryKeyConstraint(dataset_id, dag_id, name="dsdr_pkey", mssql_clustered=True),
PrimaryKeyConstraint(dataset_id, dag_id, name="dsdr_pkey"),
ForeignKeyConstraint(
(dataset_id,),
["dataset.id"],
Expand Down Expand Up @@ -173,7 +173,7 @@ class TaskOutletDatasetReference(Base):
name="todr_dataset_fkey",
ondelete="CASCADE",
),
PrimaryKeyConstraint(dataset_id, dag_id, task_id, name="todr_pkey", mssql_clustered=True),
PrimaryKeyConstraint(dataset_id, dag_id, task_id, name="todr_pkey"),
ForeignKeyConstraint(
columns=(dag_id,),
refcolumns=["dag.dag_id"],
Expand Down Expand Up @@ -211,7 +211,7 @@ class DatasetDagRunQueue(Base):

__tablename__ = "dataset_dag_run_queue"
__table_args__ = (
PrimaryKeyConstraint(dataset_id, target_dag_id, name="datasetdagrunqueue_pkey", mssql_clustered=True),
PrimaryKeyConstraint(dataset_id, target_dag_id, name="datasetdagrunqueue_pkey"),
ForeignKeyConstraint(
(dataset_id,),
["dataset.id"],
Expand Down
1 change: 0 additions & 1 deletion airflow/models/renderedtifields.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ class RenderedTaskInstanceFields(Base):
"run_id",
"map_index",
name="rendered_task_instance_fields_pkey",
mssql_clustered=True,
),
ForeignKeyConstraint(
[dag_id, task_id, run_id, map_index],
Expand Down
3 changes: 0 additions & 3 deletions airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,6 @@ def get_dag_dependencies(cls, session: Session = NEW_SESSION) -> dict[str, list[
select(cls.dag_id, func.json_extract(cls._data, "$.dag.dag_dependencies"))
)
iterator = ((dag_id, json.loads(deps_data) if deps_data else []) for dag_id, deps_data in query)
elif session.bind.dialect.name == "mssql":
query = session.execute(select(cls.dag_id, func.json_query(cls._data, "$.dag.dag_dependencies")))
iterator = ((dag_id, json.loads(deps_data) if deps_data else []) for dag_id, deps_data in query)
else:
iterator = session.execute(
select(cls.dag_id, func.json_extract_path(cls._data, "dag", "dag_dependencies"))
Expand Down
8 changes: 2 additions & 6 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1260,9 +1260,7 @@ class TaskInstance(Base, LoggingMixin):
Index("ti_pool", pool, state, priority_weight),
Index("ti_job_id", job_id),
Index("ti_trigger_id", trigger_id),
PrimaryKeyConstraint(
"dag_id", "task_id", "run_id", "map_index", name="task_instance_pkey", mssql_clustered=True
),
PrimaryKeyConstraint("dag_id", "task_id", "run_id", "map_index", name="task_instance_pkey"),
ForeignKeyConstraint(
[trigger_id],
["trigger.id"],
Expand Down Expand Up @@ -3546,9 +3544,7 @@ class TaskInstanceNote(Base):
task_instance = relationship("TaskInstance", back_populates="task_instance_note")

__table_args__ = (
PrimaryKeyConstraint(
"task_id", "dag_id", "run_id", "map_index", name="task_instance_note_pkey", mssql_clustered=True
),
PrimaryKeyConstraint("task_id", "dag_id", "run_id", "map_index", name="task_instance_note_pkey"),
ForeignKeyConstraint(
(dag_id, task_id, run_id, map_index),
[
Expand Down
14 changes: 1 addition & 13 deletions airflow/models/taskreschedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import warnings
from typing import TYPE_CHECKING

from sqlalchemy import Column, ForeignKeyConstraint, Index, Integer, String, asc, desc, event, select, text
from sqlalchemy import Column, ForeignKeyConstraint, Index, Integer, String, asc, desc, select, text
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import relationship

Expand Down Expand Up @@ -194,15 +194,3 @@ def find_for_task_instance(
return session.scalars(
TaskReschedule.stmt_for_task_instance(ti=task_instance, try_number=try_number, descending=False)
).all()


@event.listens_for(TaskReschedule.__table__, "before_create")
def add_ondelete_for_mssql(table, conn, **kw):
if conn.dialect.name != "mssql":
return

for constraint in table.constraints:
if constraint.name != "task_reschedule_dr_fkey":
continue
constraint.ondelete = "NO ACTION"
return
4 changes: 1 addition & 3 deletions airflow/models/xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,7 @@ class BaseXCom(Base, LoggingMixin):
# separately, and enforce uniqueness with DagRun.id instead.
Index("idx_xcom_key", key),
Index("idx_xcom_task_instance", dag_id, task_id, run_id, map_index),
PrimaryKeyConstraint(
"dag_run_id", "task_id", "map_index", "key", name="xcom_pkey", mssql_clustered=True
),
PrimaryKeyConstraint("dag_run_id", "task_id", "map_index", "key", name="xcom_pkey"),
ForeignKeyConstraint(
[dag_id, task_id, run_id, map_index],
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1290,6 +1290,8 @@ def permission_exists_in_one_or_more_roles(
.exists()
)
# Special case for MSSQL/Oracle (works on PG and MySQL > 8)
# Note: We need to keep MSSQL compatibility as long as this provider package
# might still be updated by Airflow prior 2.9.0 users with MSSQL
if self.appbuilder.get_session.bind.dialect.name in ("mssql", "oracle"):
return self.appbuilder.get_session.query(literal(True)).filter(q).scalar()
return self.appbuilder.get_session.query(q).scalar()
Expand Down
29 changes: 1 addition & 28 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

import pendulum
import pluggy
import sqlalchemy
from sqlalchemy import create_engine, exc, text
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.pool import NullPool
Expand Down Expand Up @@ -253,27 +252,6 @@ def configure_orm(disable_connection_pool=False, pool_class=None):
expire_on_commit=False,
)
)
if engine.dialect.name == "mssql":
session = Session()
try:
result = session.execute(
sqlalchemy.text(
"SELECT is_read_committed_snapshot_on FROM sys.databases WHERE name=:database_name"
),
params={"database_name": engine.url.database},
)
data = result.fetchone()[0]
if data != 1:
log.critical("MSSQL database MUST have READ_COMMITTED_SNAPSHOT enabled.")
log.critical("The database %s has it disabled.", engine.url.database)
log.critical("This will cause random deadlocks, Refusing to start.")
log.critical(
"See https://airflow.apache.org/docs/apache-airflow/stable/howto/"
"set-up-database.html#setting-up-a-mssql-database"
)
raise Exception("MSSQL database MUST have READ_COMMITTED_SNAPSHOT enabled.")
finally:
session.close()


DEFAULT_ENGINE_ARGS = {
Expand Down Expand Up @@ -351,12 +329,7 @@ def prepare_engine_args(disable_connection_pool=False, pool_class=None):
# More information here:
# https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html"

# Similarly MSSQL default isolation level should be set to READ COMMITTED.
# We also make sure that READ_COMMITTED_SNAPSHOT option is on, in order to avoid deadlocks when
# Select queries are running. This is by default enforced during init/upgrade. More information:
# https://docs.microsoft.com/en-us/sql/t-sql/statements/set-transaction-isolation-level-transact-sql

if SQL_ALCHEMY_CONN.startswith(("mysql", "mssql")):
if SQL_ALCHEMY_CONN.startswith("mysql"):
engine_args["isolation_level"] = "READ COMMITTED"

# Allow the user to specify an encoding for their DB otherwise default
Expand Down
1 change: 0 additions & 1 deletion airflow/utils/cli_action_loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ def default_action_log(sub_command, user, task_id, dag_id, execution_date, host_
'"log" does not exist', # postgres
"no such table", # sqlite
"log' doesn't exist", # mysql
"Invalid object name 'log'", # mssql
]
error_is_ok = e.args and any(x in e.args[0] for x in expected)
if not error_is_ok:
Expand Down