Skip to content

Commit

Permalink
[AIRFLOW-2608] Implements/Standardize custom exceptions for experimen…
Browse files Browse the repository at this point in the history
…tal APIs

Implements/Standardize custom exceptions for
experimental APIs

Implements/Standardize custom exceptions for
experimental APIs

Closes #3496 from verdan/AIRFLOW-2608-api-
exceptions-handling
  • Loading branch information
verdan authored and Fokko Driesprong committed Jun 19, 2018
1 parent 8c4131b commit 5676ec7
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 88 deletions.
12 changes: 3 additions & 9 deletions airflow/api/common/experimental/delete_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,10 @@
# specific language governing permissions and limitations
# under the License.

from airflow import models, settings
from airflow.exceptions import AirflowException
from sqlalchemy import or_


class DagFileExists(AirflowException):
status = 400


class DagNotFound(AirflowException):
status = 404
from airflow import models, settings
from airflow.exceptions import DagNotFound, DagFileExists


def delete_dag(dag_id):
Expand All @@ -45,6 +38,7 @@ def delete_dag(dag_id):

count = 0

# noinspection PyUnresolvedReferences,PyProtectedMember
for m in models.Base._decl_class_registry.values():
if hasattr(m, "dag_id"):
cond = or_(m.dag_id == dag_id, m.dag_id.like(dag_id + ".%"))
Expand Down
6 changes: 3 additions & 3 deletions airflow/api/common/experimental/get_dag_run_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# specific language governing permissions and limitations
# under the License.

from airflow.exceptions import AirflowException
from airflow.exceptions import DagNotFound, DagRunNotFound
from airflow.models import DagBag


Expand All @@ -29,7 +29,7 @@ def get_dag_run_state(dag_id, execution_date):
# Check DAG exists.
if dag_id not in dagbag.dags:
error_message = "Dag id {} not found".format(dag_id)
raise AirflowException(error_message)
raise DagNotFound(error_message)

# Get DAG object and check Task Exists
dag = dagbag.get_dag(dag_id)
Expand All @@ -39,6 +39,6 @@ def get_dag_run_state(dag_id, execution_date):
if not dagrun:
error_message = ('Dag Run for date {} not found in dag {}'
.format(execution_date, dag_id))
raise AirflowException(error_message)
raise DagRunNotFound(error_message)

return {'state': dagrun.get_state()}
6 changes: 3 additions & 3 deletions airflow/api/common/experimental/get_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# specific language governing permissions and limitations
# under the License.

from airflow.exceptions import AirflowException
from airflow.exceptions import DagNotFound, TaskNotFound
from airflow.models import DagBag


Expand All @@ -28,13 +28,13 @@ def get_task(dag_id, task_id):
# Check DAG exists.
if dag_id not in dagbag.dags:
error_message = "Dag id {} not found".format(dag_id)
raise AirflowException(error_message)
raise DagNotFound(error_message)

# Get DAG object and check Task Exists
dag = dagbag.get_dag(dag_id)
if not dag.has_task(task_id):
error_message = 'Task {} not found in dag {}'.format(task_id, dag_id)
raise AirflowException(error_message)
raise TaskNotFound(error_message)

# Return the task.
return dag.get_task(task_id)
11 changes: 6 additions & 5 deletions airflow/api/common/experimental/get_task_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
# specific language governing permissions and limitations
# under the License.

from airflow.exceptions import AirflowException
from airflow.exceptions import (DagNotFound, TaskNotFound,
DagRunNotFound, TaskInstanceNotFound)
from airflow.models import DagBag


Expand All @@ -29,26 +30,26 @@ def get_task_instance(dag_id, task_id, execution_date):
# Check DAG exists.
if dag_id not in dagbag.dags:
error_message = "Dag id {} not found".format(dag_id)
raise AirflowException(error_message)
raise DagNotFound(error_message)

# Get DAG object and check Task Exists
dag = dagbag.get_dag(dag_id)
if not dag.has_task(task_id):
error_message = 'Task {} not found in dag {}'.format(task_id, dag_id)
raise AirflowException(error_message)
raise TaskNotFound(error_message)

# Get DagRun object and check that it exists
dagrun = dag.get_dagrun(execution_date=execution_date)
if not dagrun:
error_message = ('Dag Run for date {} not found in dag {}'
.format(execution_date, dag_id))
raise AirflowException(error_message)
raise DagRunNotFound(error_message)

# Get task instance object and check that it exists
task_instance = dagrun.get_task_instance(task_id)
if not task_instance:
error_message = ('Task {} instance for date {} not found'
.format(task_id, execution_date))
raise AirflowException(error_message)
raise TaskInstanceNotFound(error_message)

return task_instance
18 changes: 5 additions & 13 deletions airflow/api/common/experimental/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,16 @@
# specific language governing permissions and limitations
# under the License.

from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowBadRequest, PoolNotFound
from airflow.models import Pool
from airflow.utils.db import provide_session


class PoolBadRequest(AirflowException):
status = 400


class PoolNotFound(AirflowException):
status = 404


@provide_session
def get_pool(name, session=None):
"""Get pool by a given name."""
if not (name and name.strip()):
raise PoolBadRequest("Pool name shouldn't be empty")
raise AirflowBadRequest("Pool name shouldn't be empty")

pool = session.query(Pool).filter_by(pool=name).first()
if pool is None:
Expand All @@ -53,12 +45,12 @@ def get_pools(session=None):
def create_pool(name, slots, description, session=None):
"""Create a pool with a given parameters."""
if not (name and name.strip()):
raise PoolBadRequest("Pool name shouldn't be empty")
raise AirflowBadRequest("Pool name shouldn't be empty")

try:
slots = int(slots)
except ValueError:
raise PoolBadRequest("Bad value for `slots`: %s" % slots)
raise AirflowBadRequest("Bad value for `slots`: %s" % slots)

session.expire_on_commit = False
pool = session.query(Pool).filter_by(pool=name).first()
Expand All @@ -78,7 +70,7 @@ def create_pool(name, slots, description, session=None):
def delete_pool(name, session=None):
"""Delete pool by a given name."""
if not (name and name.strip()):
raise PoolBadRequest("Pool name shouldn't be empty")
raise AirflowBadRequest("Pool name shouldn't be empty")

pool = session.query(Pool).filter_by(pool=name).first()
if pool is None:
Expand Down
6 changes: 3 additions & 3 deletions airflow/api/common/experimental/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import json

from airflow.exceptions import AirflowException
from airflow.exceptions import DagRunAlreadyExists, DagNotFound
from airflow.models import DagRun, DagBag
from airflow.utils import timezone
from airflow.utils.state import State
Expand All @@ -35,7 +35,7 @@ def _trigger_dag(
replace_microseconds,
):
if dag_id not in dag_bag.dags:
raise AirflowException("Dag id {} not found".format(dag_id))
raise DagNotFound("Dag id {} not found".format(dag_id))

dag = dag_bag.get_dag(dag_id)

Expand All @@ -52,7 +52,7 @@ def _trigger_dag(

dr = dag_run.find(dag_id=dag_id, run_id=run_id)
if dr:
raise AirflowException("Run id {} already exists for dag id {}".format(
raise DagRunAlreadyExists("Run id {} already exists for dag id {}".format(
run_id,
dag_id
))
Expand Down
51 changes: 50 additions & 1 deletion airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,21 @@


class AirflowException(Exception):
pass
"""
Base class for all Airflow's errors.
Each custom exception should be derived from this class
"""
status_code = 500


class AirflowBadRequest(AirflowException):
"""Raise when the application or server cannot handle the request"""
status_code = 400


class AirflowNotFoundException(AirflowException):
"""Raise when the requested object/resource is not available in the system"""
status_code = 404


class AirflowConfigException(AirflowException):
Expand All @@ -47,3 +61,38 @@ class AirflowSkipException(AirflowException):

class AirflowDagCycleException(AirflowException):
pass


class DagNotFound(AirflowNotFoundException):
"""Raise when a DAG is not available in the system"""
pass


class DagRunNotFound(AirflowNotFoundException):
"""Raise when a DAG Run is not available in the system"""
pass


class DagRunAlreadyExists(AirflowBadRequest):
"""Raise when creating a DAG run for DAG which already has DAG run entry"""
pass


class DagFileExists(AirflowBadRequest):
"""Raise when a DAG ID is still in DagBag i.e., DAG file is in DAG folder"""
pass


class TaskNotFound(AirflowNotFoundException):
"""Raise when a Task is not available in the system"""
pass


class TaskInstanceNotFound(AirflowNotFoundException):
"""Raise when a Task Instance is not available in the system"""
pass


class PoolNotFound(AirflowNotFoundException):
"""Raise when a Pool is not available in the system"""
pass
46 changes: 23 additions & 23 deletions airflow/www/api/experimental/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def trigger_dag(dag_id):
except AirflowException as err:
_log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = 404
response.status_code = err.status_code
return response

if getattr(g, 'user', None):
Expand All @@ -98,10 +98,10 @@ def delete_dag(dag_id):
"""
try:
count = delete.delete_dag(dag_id)
except AirflowException as e:
_log.error(e)
response = jsonify(error="{}".format(e))
response.status_code = getattr(e, 'status', 500)
except AirflowException as err:
_log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
return jsonify(message="Removed {} record(s)".format(count), count=count)

Expand All @@ -121,7 +121,7 @@ def task_info(dag_id, task_id):
except AirflowException as err:
_log.info(err)
response = jsonify(error="{}".format(err))
response.status_code = 404
response.status_code = err.status_code
return response

# JSONify and return.
Expand Down Expand Up @@ -162,7 +162,7 @@ def task_instance_info(dag_id, execution_date, task_id):
except AirflowException as err:
_log.info(err)
response = jsonify(error="{}".format(err))
response.status_code = 404
response.status_code = err.status_code
return response

# JSONify and return.
Expand Down Expand Up @@ -198,10 +198,10 @@ def get_pool(name):
"""Get pool by a given name."""
try:
pool = pool_api.get_pool(name=name)
except AirflowException as e:
_log.error(e)
response = jsonify(error="{}".format(e))
response.status_code = getattr(e, 'status', 500)
except AirflowException as err:
_log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
else:
return jsonify(pool.to_json())
Expand All @@ -213,10 +213,10 @@ def get_pools():
"""Get all pools."""
try:
pools = pool_api.get_pools()
except AirflowException as e:
_log.error(e)
response = jsonify(error="{}".format(e))
response.status_code = getattr(e, 'status', 500)
except AirflowException as err:
_log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
else:
return jsonify([p.to_json() for p in pools])
Expand All @@ -230,10 +230,10 @@ def create_pool():
params = request.get_json(force=True)
try:
pool = pool_api.create_pool(**params)
except AirflowException as e:
_log.error(e)
response = jsonify(error="{}".format(e))
response.status_code = getattr(e, 'status', 500)
except AirflowException as err:
_log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
else:
return jsonify(pool.to_json())
Expand All @@ -246,10 +246,10 @@ def delete_pool(name):
"""Delete pool."""
try:
pool = pool_api.delete_pool(name=name)
except AirflowException as e:
_log.error(e)
response = jsonify(error="{}".format(e))
response.status_code = getattr(e, 'status', 500)
except AirflowException as err:
_log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
else:
return jsonify(pool.to_json())
Loading

0 comments on commit 5676ec7

Please sign in to comment.