Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix IntegrityError in DagFileProcessor.manage_slas #19553

Merged
merged 2 commits into from
Nov 13, 2021

Conversation

ephraimbuddy
Copy link
Contributor

The DagFileProcessor.manage_slas does not consider if an SlaMiss already exists in
DB while inserting slas.

If an SLA for a task is missed and recorded, on checking SLA again, this task
comes up again if there's no recent run of the task and we try to insert
the record into the SlaMiss table again, this results in Integrity error.

This PR fixes that by avoiding insert if the record already exists

[2021-11-12 11:56:11,159] {processor.py:567} ERROR - Error executing SlaCallbackRequest callback for file: /files/dags/example_sla_dag.py
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1257, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 912, in do_executemany
    cursor.executemany(statement, parameters)
psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "sla_miss_pkey"
DETAIL:  Key (task_id, dag_id, execution_date)=(sleep_20, example_sla_dag, 2021-11-12 11:56:00+00) already exists.


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/airflow/airflow/dag_processing/processor.py", line 560, in execute_callbacks
    self.manage_slas(dagbag.dags.get(request.dag_id))
  File "/opt/airflow/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/airflow/airflow/dag_processing/processor.py", line 434, in manage_slas
    session.commit()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1046, in commit
    self.transaction.commit()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 504, in commit
    self._prepare_impl()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 483, in _prepare_impl
    self.session.flush()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2540, in flush
    self._flush(objects)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2682, in _flush
    transaction.rollback(_capture_exception=True)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2642, in _flush
    flush_context.execute()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
    rec.execute(self)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
    uow,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
    insert,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1083, in _emit_insert_statements
    c = cached_connections[connection].execute(statement, multiparams)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
    distilled_params,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
    e, statement, parameters, cursor, context
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
    sqlalchemy_exception, with_traceback=exc_info[2], from_=e
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1257, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 912, in do_executemany
    cursor.executemany(statement, parameters)
sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "sla_miss_pkey"
DETAIL:  Key (task_id, dag_id, execution_date)=(sleep_20, example_sla_dag, 2021-11-12 11:56:00+00) already exists.

^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Nov 12, 2021
airflow/dag_processing/processor.py Outdated Show resolved Hide resolved
@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Nov 12, 2021
@github-actions
Copy link

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

ephraimbuddy and others added 2 commits November 12, 2021 23:36
The DagFileProcessor.manage_slas does not consider if an SlaMiss already exists in
DB while inserting slas.

If an SLA for a task is missed and recorded, on checking SLA again, this task
comes up again if there's no recent run of the task and we try to insert
the record into the SlaMiss table again, this results in Integrity error.

This PR fixes that by avoiding insert if the record already exists

Co-Authored-By: Tzu-ping Chung <uranusjr@gmail.com>
@ephraimbuddy ephraimbuddy merged commit 9519bf6 into apache:main Nov 13, 2021
@ephraimbuddy ephraimbuddy deleted the fix-integrity-error-sla branch November 13, 2021 06:51
@jedcunningham jedcunningham added the type:bug-fix Changelog: Bug Fixes label Dec 6, 2021
jedcunningham pushed a commit that referenced this pull request Dec 7, 2021
The DagFileProcessor.manage_slas does not consider if an SlaMiss already exists in
DB while inserting slas.

If an SLA for a task is missed and recorded, on checking SLA again, this task
comes up again if there's no recent run of the task and we try to insert
the record into the SlaMiss table again, this results in Integrity error.

This PR fixes that by avoiding insert if the record already exists

Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
Co-authored-by: Kaxil Naik <kaxilnaik@apache.org>
(cherry picked from commit 9519bf6)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler including HA (high availability) scheduler full tests needed We need to run full set of tests for this PR to merge type:bug-fix Changelog: Bug Fixes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants