Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-2608] Implements/Standardize custom exceptions for experimental APIs #3496

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 3 additions & 9 deletions airflow/api/common/experimental/delete_dag.py
Expand Up @@ -17,17 +17,10 @@
# specific language governing permissions and limitations
# under the License.

from airflow import models, settings
from airflow.exceptions import AirflowException
from sqlalchemy import or_


class DagFileExists(AirflowException):
status = 400


class DagNotFound(AirflowException):
status = 404
from airflow import models, settings
from airflow.exceptions import DagNotFound, DagFileExists


def delete_dag(dag_id):
Expand All @@ -45,6 +38,7 @@ def delete_dag(dag_id):

count = 0

# noinspection PyUnresolvedReferences,PyProtectedMember
for m in models.Base._decl_class_registry.values():
if hasattr(m, "dag_id"):
cond = or_(m.dag_id == dag_id, m.dag_id.like(dag_id + ".%"))
Expand Down
6 changes: 3 additions & 3 deletions airflow/api/common/experimental/get_dag_run_state.py
Expand Up @@ -17,7 +17,7 @@
# specific language governing permissions and limitations
# under the License.

from airflow.exceptions import AirflowException
from airflow.exceptions import DagNotFound, DagRunNotFound
from airflow.models import DagBag


Expand All @@ -29,7 +29,7 @@ def get_dag_run_state(dag_id, execution_date):
# Check DAG exists.
if dag_id not in dagbag.dags:
error_message = "Dag id {} not found".format(dag_id)
raise AirflowException(error_message)
raise DagNotFound(error_message)

# Get DAG object and check Task Exists
dag = dagbag.get_dag(dag_id)
Expand All @@ -39,6 +39,6 @@ def get_dag_run_state(dag_id, execution_date):
if not dagrun:
error_message = ('Dag Run for date {} not found in dag {}'
.format(execution_date, dag_id))
raise AirflowException(error_message)
raise DagRunNotFound(error_message)

return {'state': dagrun.get_state()}
6 changes: 3 additions & 3 deletions airflow/api/common/experimental/get_task.py
Expand Up @@ -17,7 +17,7 @@
# specific language governing permissions and limitations
# under the License.

from airflow.exceptions import AirflowException
from airflow.exceptions import DagNotFound, TaskNotFound
from airflow.models import DagBag


Expand All @@ -28,13 +28,13 @@ def get_task(dag_id, task_id):
# Check DAG exists.
if dag_id not in dagbag.dags:
error_message = "Dag id {} not found".format(dag_id)
raise AirflowException(error_message)
raise DagNotFound(error_message)

# Get DAG object and check Task Exists
dag = dagbag.get_dag(dag_id)
if not dag.has_task(task_id):
error_message = 'Task {} not found in dag {}'.format(task_id, dag_id)
raise AirflowException(error_message)
raise TaskNotFound(error_message)

# Return the task.
return dag.get_task(task_id)
11 changes: 6 additions & 5 deletions airflow/api/common/experimental/get_task_instance.py
Expand Up @@ -17,7 +17,8 @@
# specific language governing permissions and limitations
# under the License.

from airflow.exceptions import AirflowException
from airflow.exceptions import (DagNotFound, TaskNotFound,
DagRunNotFound, TaskInstanceNotFound)
from airflow.models import DagBag


Expand All @@ -29,26 +30,26 @@ def get_task_instance(dag_id, task_id, execution_date):
# Check DAG exists.
if dag_id not in dagbag.dags:
error_message = "Dag id {} not found".format(dag_id)
raise AirflowException(error_message)
raise DagNotFound(error_message)

# Get DAG object and check Task Exists
dag = dagbag.get_dag(dag_id)
if not dag.has_task(task_id):
error_message = 'Task {} not found in dag {}'.format(task_id, dag_id)
raise AirflowException(error_message)
raise TaskNotFound(error_message)

# Get DagRun object and check that it exists
dagrun = dag.get_dagrun(execution_date=execution_date)
if not dagrun:
error_message = ('Dag Run for date {} not found in dag {}'
.format(execution_date, dag_id))
raise AirflowException(error_message)
raise DagRunNotFound(error_message)

# Get task instance object and check that it exists
task_instance = dagrun.get_task_instance(task_id)
if not task_instance:
error_message = ('Task {} instance for date {} not found'
.format(task_id, execution_date))
raise AirflowException(error_message)
raise TaskInstanceNotFound(error_message)

return task_instance
18 changes: 5 additions & 13 deletions airflow/api/common/experimental/pool.py
Expand Up @@ -17,24 +17,16 @@
# specific language governing permissions and limitations
# under the License.

from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowBadRequest, PoolNotFound
from airflow.models import Pool
from airflow.utils.db import provide_session


class PoolBadRequest(AirflowException):
status = 400


class PoolNotFound(AirflowException):
status = 404


@provide_session
def get_pool(name, session=None):
"""Get pool by a given name."""
if not (name and name.strip()):
raise PoolBadRequest("Pool name shouldn't be empty")
raise AirflowBadRequest("Pool name shouldn't be empty")

pool = session.query(Pool).filter_by(pool=name).first()
if pool is None:
Expand All @@ -53,12 +45,12 @@ def get_pools(session=None):
def create_pool(name, slots, description, session=None):
"""Create a pool with a given parameters."""
if not (name and name.strip()):
raise PoolBadRequest("Pool name shouldn't be empty")
raise AirflowBadRequest("Pool name shouldn't be empty")

try:
slots = int(slots)
except ValueError:
raise PoolBadRequest("Bad value for `slots`: %s" % slots)
raise AirflowBadRequest("Bad value for `slots`: %s" % slots)

session.expire_on_commit = False
pool = session.query(Pool).filter_by(pool=name).first()
Expand All @@ -78,7 +70,7 @@ def create_pool(name, slots, description, session=None):
def delete_pool(name, session=None):
"""Delete pool by a given name."""
if not (name and name.strip()):
raise PoolBadRequest("Pool name shouldn't be empty")
raise AirflowBadRequest("Pool name shouldn't be empty")

pool = session.query(Pool).filter_by(pool=name).first()
if pool is None:
Expand Down
6 changes: 3 additions & 3 deletions airflow/api/common/experimental/trigger_dag.py
Expand Up @@ -19,7 +19,7 @@

import json

from airflow.exceptions import AirflowException
from airflow.exceptions import DagRunAlreadyExists, DagNotFound
from airflow.models import DagRun, DagBag
from airflow.utils import timezone
from airflow.utils.state import State
Expand All @@ -35,7 +35,7 @@ def _trigger_dag(
replace_microseconds,
):
if dag_id not in dag_bag.dags:
raise AirflowException("Dag id {} not found".format(dag_id))
raise DagNotFound("Dag id {} not found".format(dag_id))

dag = dag_bag.get_dag(dag_id)

Expand All @@ -52,7 +52,7 @@ def _trigger_dag(

dr = dag_run.find(dag_id=dag_id, run_id=run_id)
if dr:
raise AirflowException("Run id {} already exists for dag id {}".format(
raise DagRunAlreadyExists("Run id {} already exists for dag id {}".format(
run_id,
dag_id
))
Expand Down
51 changes: 50 additions & 1 deletion airflow/exceptions.py
Expand Up @@ -22,7 +22,21 @@


class AirflowException(Exception):
pass
"""
Base class for all Airflow's errors.
Each custom exception should be derived from this class
"""
status_code = 500


class AirflowBadRequest(AirflowException):
"""Raise when the application or server cannot handle the request"""
status_code = 400


class AirflowNotFoundException(AirflowException):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a docstring here

"""Raise when the requested object/resource is not available in the system"""
status_code = 404


class AirflowConfigException(AirflowException):
Expand All @@ -47,3 +61,38 @@ class AirflowSkipException(AirflowException):

class AirflowDagCycleException(AirflowException):
pass


class DagNotFound(AirflowNotFoundException):
"""Raise when a DAG is not available in the system"""
pass


class DagRunNotFound(AirflowNotFoundException):
"""Raise when a DAG Run is not available in the system"""
pass


class DagRunAlreadyExists(AirflowBadRequest):
"""Raise when creating a DAG run for DAG which already has DAG run entry"""
pass


class DagFileExists(AirflowBadRequest):
"""Raise when a DAG ID is still in DagBag i.e., DAG file is in DAG folder"""
pass


class TaskNotFound(AirflowNotFoundException):
"""Raise when a Task is not available in the system"""
pass


class TaskInstanceNotFound(AirflowNotFoundException):
"""Raise when a Task Instance is not available in the system"""
pass


class PoolNotFound(AirflowNotFoundException):
"""Raise when a Pool is not available in the system"""
pass
46 changes: 23 additions & 23 deletions airflow/www/api/experimental/endpoints.py
Expand Up @@ -79,7 +79,7 @@ def trigger_dag(dag_id):
except AirflowException as err:
_log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = 404
response.status_code = err.status_code
return response

if getattr(g, 'user', None):
Expand All @@ -98,10 +98,10 @@ def delete_dag(dag_id):
"""
try:
count = delete.delete_dag(dag_id)
except AirflowException as e:
_log.error(e)
response = jsonify(error="{}".format(e))
response.status_code = getattr(e, 'status', 500)
except AirflowException as err:
_log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
return jsonify(message="Removed {} record(s)".format(count), count=count)

Expand All @@ -121,7 +121,7 @@ def task_info(dag_id, task_id):
except AirflowException as err:
_log.info(err)
response = jsonify(error="{}".format(err))
response.status_code = 404
response.status_code = err.status_code
return response

# JSONify and return.
Expand Down Expand Up @@ -162,7 +162,7 @@ def task_instance_info(dag_id, execution_date, task_id):
except AirflowException as err:
_log.info(err)
response = jsonify(error="{}".format(err))
response.status_code = 404
response.status_code = err.status_code
return response

# JSONify and return.
Expand Down Expand Up @@ -198,10 +198,10 @@ def get_pool(name):
"""Get pool by a given name."""
try:
pool = pool_api.get_pool(name=name)
except AirflowException as e:
_log.error(e)
response = jsonify(error="{}".format(e))
response.status_code = getattr(e, 'status', 500)
except AirflowException as err:
_log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
else:
return jsonify(pool.to_json())
Expand All @@ -213,10 +213,10 @@ def get_pools():
"""Get all pools."""
try:
pools = pool_api.get_pools()
except AirflowException as e:
_log.error(e)
response = jsonify(error="{}".format(e))
response.status_code = getattr(e, 'status', 500)
except AirflowException as err:
_log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
else:
return jsonify([p.to_json() for p in pools])
Expand All @@ -230,10 +230,10 @@ def create_pool():
params = request.get_json(force=True)
try:
pool = pool_api.create_pool(**params)
except AirflowException as e:
_log.error(e)
response = jsonify(error="{}".format(e))
response.status_code = getattr(e, 'status', 500)
except AirflowException as err:
_log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
else:
return jsonify(pool.to_json())
Expand All @@ -246,10 +246,10 @@ def delete_pool(name):
"""Delete pool."""
try:
pool = pool_api.delete_pool(name=name)
except AirflowException as e:
_log.error(e)
response = jsonify(error="{}".format(e))
response.status_code = getattr(e, 'status', 500)
except AirflowException as err:
_log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
else:
return jsonify(pool.to_json())