Skip to content

Commit

Permalink
Implements/Standardize custom exceptions for experimental APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
verdan committed Jun 18, 2018
1 parent 0f4d681 commit c467b07
Show file tree
Hide file tree
Showing 86 changed files with 2,193 additions and 323 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Expand Up @@ -22,6 +22,7 @@ language: python
jdk:
- oraclejdk8
services:
- cassandra
- mysql
- postgresql
- rabbitmq
Expand Down
13 changes: 13 additions & 0 deletions UPDATING.md
Expand Up @@ -77,6 +77,19 @@ Header row will be added only if this parameter is set True and also in that cas

With Airflow 1.9 or lower, there were two connection strings for the Google Cloud operators, both `google_cloud_storage_default` and `google_cloud_default`. This can be confusing and therefore the `google_cloud_storage_default` connection id has been replaced with `google_cloud_default` to make the connection id consistent across Airflow.

### Logging Configuration
With Airflow 1.9 or lower, `FILENAME_TEMPLATE`, `PROCESSOR_FILENAME_TEMPLATE`, `LOG_ID_TEMPLATE`, `END_OF_LOG_MARK` were configured in `airflow_local_settings.py`. These have been moved into the configuration file, and hence if you were using a custom configuration file the following defaults need to be added.
```
[core]
fab_logging_level = WARN
log_filename_template = {{{{ ti.dag_id }}}}/{{{{ ti.task_id }}}}/{{{{ ts }}}}/{{{{ try_number }}}}.log
log_processor_filename_template = {{{{ filename }}}}.log
[elasticsearch]
elasticsearch_log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
elasticsearch_end_of_log_mark = end_of_log
```

## Airflow 1.9

### SSH Hook updates, along with new SSH Operator & SFTP Operator
Expand Down
12 changes: 3 additions & 9 deletions airflow/api/common/experimental/delete_dag.py
Expand Up @@ -17,17 +17,10 @@
# specific language governing permissions and limitations
# under the License.

from airflow import models, settings
from airflow.exceptions import AirflowException
from sqlalchemy import or_


class DagFileExists(AirflowException):
status = 400


class DagNotFound(AirflowException):
status = 404
from airflow import models, settings
from airflow.exceptions import DagNotFound, DagFileExists


def delete_dag(dag_id):
Expand All @@ -45,6 +38,7 @@ def delete_dag(dag_id):

count = 0

# noinspection PyUnresolvedReferences,PyProtectedMember
for m in models.Base._decl_class_registry.values():
if hasattr(m, "dag_id"):
cond = or_(m.dag_id == dag_id, m.dag_id.like(dag_id + ".%"))
Expand Down
6 changes: 3 additions & 3 deletions airflow/api/common/experimental/get_dag_run_state.py
Expand Up @@ -17,7 +17,7 @@
# specific language governing permissions and limitations
# under the License.

from airflow.exceptions import AirflowException
from airflow.exceptions import DagNotFound, DagRunNotFound
from airflow.models import DagBag


Expand All @@ -29,7 +29,7 @@ def get_dag_run_state(dag_id, execution_date):
# Check DAG exists.
if dag_id not in dagbag.dags:
error_message = "Dag id {} not found".format(dag_id)
raise AirflowException(error_message)
raise DagNotFound(error_message)

# Get DAG object and check Task Exists
dag = dagbag.get_dag(dag_id)
Expand All @@ -39,6 +39,6 @@ def get_dag_run_state(dag_id, execution_date):
if not dagrun:
error_message = ('Dag Run for date {} not found in dag {}'
.format(execution_date, dag_id))
raise AirflowException(error_message)
raise DagRunNotFound(error_message)

return {'state': dagrun.get_state()}
55 changes: 55 additions & 0 deletions airflow/api/common/experimental/get_dag_runs.py
@@ -0,0 +1,55 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from flask import url_for

from airflow.exceptions import AirflowException
from airflow.models import DagBag, DagRun


def get_dag_runs(dag_id, state=None):
"""
Returns a list of Dag Runs for a specific DAG ID.
:param dag_id: String identifier of a DAG
:param state: queued|running|success...
:return: List of DAG runs of a DAG with requested state,
or all runs if the state is not specified
"""
dagbag = DagBag()

# Check DAG exists.
if dag_id not in dagbag.dags:
error_message = "Dag id {} not found".format(dag_id)
raise AirflowException(error_message)

dag_runs = list()
state = state.lower() if state else None
for run in DagRun.find(dag_id=dag_id, state=state):
dag_runs.append({
'id': run.id,
'run_id': run.run_id,
'state': run.state,
'dag_id': run.dag_id,
'execution_date': run.execution_date.isoformat(),
'start_date': ((run.start_date or '') and
run.start_date.isoformat()),
'dag_run_url': url_for('Airflow.graph', dag_id=run.dag_id,
execution_date=run.execution_date)
})

return dag_runs
6 changes: 3 additions & 3 deletions airflow/api/common/experimental/get_task.py
Expand Up @@ -17,7 +17,7 @@
# specific language governing permissions and limitations
# under the License.

from airflow.exceptions import AirflowException
from airflow.exceptions import DagNotFound, TaskNotFound
from airflow.models import DagBag


Expand All @@ -28,13 +28,13 @@ def get_task(dag_id, task_id):
# Check DAG exists.
if dag_id not in dagbag.dags:
error_message = "Dag id {} not found".format(dag_id)
raise AirflowException(error_message)
raise DagNotFound(error_message)

# Get DAG object and check Task Exists
dag = dagbag.get_dag(dag_id)
if not dag.has_task(task_id):
error_message = 'Task {} not found in dag {}'.format(task_id, dag_id)
raise AirflowException(error_message)
raise TaskNotFound(error_message)

# Return the task.
return dag.get_task(task_id)
11 changes: 6 additions & 5 deletions airflow/api/common/experimental/get_task_instance.py
Expand Up @@ -17,7 +17,8 @@
# specific language governing permissions and limitations
# under the License.

from airflow.exceptions import AirflowException
from airflow.exceptions import (DagNotFound, TaskNotFound,
DagRunNotFound, TaskInstanceNotFound)
from airflow.models import DagBag


Expand All @@ -29,26 +30,26 @@ def get_task_instance(dag_id, task_id, execution_date):
# Check DAG exists.
if dag_id not in dagbag.dags:
error_message = "Dag id {} not found".format(dag_id)
raise AirflowException(error_message)
raise DagNotFound(error_message)

# Get DAG object and check Task Exists
dag = dagbag.get_dag(dag_id)
if not dag.has_task(task_id):
error_message = 'Task {} not found in dag {}'.format(task_id, dag_id)
raise AirflowException(error_message)
raise TaskNotFound(error_message)

# Get DagRun object and check that it exists
dagrun = dag.get_dagrun(execution_date=execution_date)
if not dagrun:
error_message = ('Dag Run for date {} not found in dag {}'
.format(execution_date, dag_id))
raise AirflowException(error_message)
raise DagRunNotFound(error_message)

# Get task instance object and check that it exists
task_instance = dagrun.get_task_instance(task_id)
if not task_instance:
error_message = ('Task {} instance for date {} not found'
.format(task_id, execution_date))
raise AirflowException(error_message)
raise TaskInstanceNotFound(error_message)

return task_instance
18 changes: 5 additions & 13 deletions airflow/api/common/experimental/pool.py
Expand Up @@ -17,24 +17,16 @@
# specific language governing permissions and limitations
# under the License.

from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowBadRequest, PoolNotFound
from airflow.models import Pool
from airflow.utils.db import provide_session


class PoolBadRequest(AirflowException):
status = 400


class PoolNotFound(AirflowException):
status = 404


@provide_session
def get_pool(name, session=None):
"""Get pool by a given name."""
if not (name and name.strip()):
raise PoolBadRequest("Pool name shouldn't be empty")
raise AirflowBadRequest("Pool name shouldn't be empty")

pool = session.query(Pool).filter_by(pool=name).first()
if pool is None:
Expand All @@ -53,12 +45,12 @@ def get_pools(session=None):
def create_pool(name, slots, description, session=None):
"""Create a pool with a given parameters."""
if not (name and name.strip()):
raise PoolBadRequest("Pool name shouldn't be empty")
raise AirflowBadRequest("Pool name shouldn't be empty")

try:
slots = int(slots)
except ValueError:
raise PoolBadRequest("Bad value for `slots`: %s" % slots)
raise AirflowBadRequest("Bad value for `slots`: %s" % slots)

session.expire_on_commit = False
pool = session.query(Pool).filter_by(pool=name).first()
Expand All @@ -78,7 +70,7 @@ def create_pool(name, slots, description, session=None):
def delete_pool(name, session=None):
"""Delete pool by a given name."""
if not (name and name.strip()):
raise PoolBadRequest("Pool name shouldn't be empty")
raise AirflowBadRequest("Pool name shouldn't be empty")

pool = session.query(Pool).filter_by(pool=name).first()
if pool is None:
Expand Down
64 changes: 49 additions & 15 deletions airflow/api/common/experimental/trigger_dag.py
Expand Up @@ -19,20 +19,25 @@

import json

from airflow.exceptions import AirflowException
from airflow.exceptions import DagRunAlreadyExists, DagNotFound
from airflow.models import DagRun, DagBag
from airflow.utils import timezone
from airflow.utils.state import State


def trigger_dag(dag_id, run_id=None, conf=None, execution_date=None,
replace_microseconds=True):
dagbag = DagBag()

if dag_id not in dagbag.dags:
raise AirflowException("Dag id {} not found".format(dag_id))
def _trigger_dag(
dag_id,
dag_bag,
dag_run,
run_id,
conf,
execution_date,
replace_microseconds,
):
if dag_id not in dag_bag.dags:
raise DagNotFound("Dag id {} not found".format(dag_id))

dag = dagbag.get_dag(dag_id)
dag = dag_bag.get_dag(dag_id)

if not execution_date:
execution_date = timezone.utcnow()
Expand All @@ -45,9 +50,9 @@ def trigger_dag(dag_id, run_id=None, conf=None, execution_date=None,
if not run_id:
run_id = "manual__{0}".format(execution_date.isoformat())

dr = DagRun.find(dag_id=dag_id, run_id=run_id)
dr = dag_run.find(dag_id=dag_id, run_id=run_id)
if dr:
raise AirflowException("Run id {} already exists for dag id {}".format(
raise DagRunAlreadyExists("Run id {} already exists for dag id {}".format(
run_id,
dag_id
))
Expand All @@ -56,12 +61,41 @@ def trigger_dag(dag_id, run_id=None, conf=None, execution_date=None,
if conf:
run_conf = json.loads(conf)

trigger = dag.create_dagrun(
triggers = list()
dags_to_trigger = list()
dags_to_trigger.append(dag)
while dags_to_trigger:
dag = dags_to_trigger.pop()
trigger = dag.create_dagrun(
run_id=run_id,
execution_date=execution_date,
state=State.RUNNING,
conf=run_conf,
external_trigger=True,
)
triggers.append(trigger)
if dag.subdags:
dags_to_trigger.extend(dag.subdags)
return triggers


def trigger_dag(
dag_id,
run_id=None,
conf=None,
execution_date=None,
replace_microseconds=True,
):
dagbag = DagBag()
dag_run = DagRun()
triggers = _trigger_dag(
dag_id=dag_id,
dag_run=dag_run,
dag_bag=dagbag,
run_id=run_id,
conf=conf,
execution_date=execution_date,
state=State.RUNNING,
conf=run_conf,
external_trigger=True
replace_microseconds=replace_microseconds,
)

return trigger
return triggers[0] if triggers else None
13 changes: 6 additions & 7 deletions airflow/config_templates/airflow_local_settings.py
Expand Up @@ -30,19 +30,16 @@

# Flask appbuilder's info level log is very verbose,
# so it's set to 'WARN' by default.
FAB_LOG_LEVEL = 'WARN'
FAB_LOG_LEVEL = conf.get('core', 'FAB_LOGGING_LEVEL').upper()

LOG_FORMAT = conf.get('core', 'LOG_FORMAT')

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')

PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY')

FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'

PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'

LOG_ID_TEMPLATE = '{dag_id}-{task_id}-{execution_date}-{try_number}'
FILENAME_TEMPLATE = conf.get('core', 'LOG_FILENAME_TEMPLATE')
PROCESSOR_FILENAME_TEMPLATE = conf.get('core', 'LOG_PROCESSOR_FILENAME_TEMPLATE')

# Storage bucket url for remote logging
# s3 buckets should start with "s3://"
Expand All @@ -53,7 +50,9 @@

ELASTICSEARCH_HOST = conf.get('elasticsearch', 'ELASTICSEARCH_HOST')

END_OF_LOG_MARK = 'end_of_log'
LOG_ID_TEMPLATE = conf.get('elasticsearch', 'ELASTICSEARCH_LOG_ID_TEMPLATE')

END_OF_LOG_MARK = conf.get('elasticsearch', 'ELASTICSEARCH_END_OF_LOG_MARK')

DEFAULT_LOGGING_CONFIG = {
'version': 1,
Expand Down

0 comments on commit c467b07

Please sign in to comment.