Skip to content

Commit

Permalink
[AIRFLOW-1611] Customize logging
Browse files Browse the repository at this point in the history
Change the configuration of the logging to make
use of the python
logging and make the configuration easy
configurable. Some of the
settings which are now not needed anymore since
they can easily
be implemented in the config file.

Closes #2631 from Fokko/AIRFLOW-1611-customize-
logging-in-airflow

(cherry picked from commit 3c3a65a)
Signed-off-by: Bolke de Bruin <bolke@xs4all.nl>
  • Loading branch information
Fokko Driesprong authored and bolkedebruin committed Oct 2, 2017
1 parent f2bb77d commit 8b4a503
Show file tree
Hide file tree
Showing 11 changed files with 471 additions and 154 deletions.
126 changes: 123 additions & 3 deletions UPDATING.md
Expand Up @@ -3,7 +3,7 @@
This file documents any backwards-incompatible changes in Airflow and
assists people when migrating to a new version.

## Master
## Airflow 1.9

### SSH Hook updates, along with new SSH Operator & SFTP Operator
SSH Hook now uses Paramiko library to create ssh client connection, instead of sub-process based ssh command execution previously (<1.9.0), so this is backward incompatible.
Expand All @@ -13,9 +13,129 @@ assists people when migrating to a new version.
- No updates are required if you are using ftpHook, it will continue work as is.

### Logging update
Airflow's logging has been rewritten to uses Python’s builtin `logging` module to perform system logging. By extending classes with the existing `LoggingMixin`, all the logging will go through a central logger. The main benefit that this brings to us is the easy configuration of the logging through `default_airflow_logging.py` and the ability to use different handlers for logging.

Logs now are stored in the log folder as `{dag_id}/{task_id}/{execution_date}/{try_number}.log`.
The logging structure of Airflow has been rewritten to make configuration easier and the logging system more transparent.

#### A quick recap about logging

A logger is the entry point into the logging system. Each logger is a named bucket to which messages can be written for processing. A logger is configured to have a log level. This log level describes the severity of the messages that the logger will handle. Python defines the following log levels: DEBUG, INFO, WARNING, ERROR or CRITICAL.

Each message that is written to the logger is a Log Record. Each log record also has a log level indicating the severity of that specific message. A log record can also contain useful metadata that describes the event that is being logged. This can include details such as a stack trace or an error code.

When a message is given to the logger, the log level of the message is compared to the log level of the logger. If the log level of the message meets or exceeds the log level of the logger itself, the message will undergo further processing. If it doesn’t, the message will be ignored.

Once a logger has determined that a message needs to be processed, it is passed to a Handler. This configuration is now more flexible and can be easily be maintained in a single file.

#### Changes in Airflow Logging

Airflow's logging mechanism has been refactored to uses Python’s builtin `logging` module to perform logging of the application. By extending classes with the existing `LoggingMixin`, all the logging will go through a central logger. Also the `BaseHook` and `BaseOperator` already extends this class, so it is easily available to do logging.

The main benefit is easier configuration of the logging by setting a single centralized python file. Disclaimer; there is still some inline configuration, but this will be removed eventually. The new logging class is defined by setting the dotted classpath in your `~/airflow/airflow.cfg` file:

```
# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
```

The logging configuration file that contains the configuration needs te on the the `PYTHONPATH`, for example in `~/airflow/dags` or `~/airflow/plugins`. These directories are loaded by default, of course you are free to add a directory to the `PYTHONPATH`, this might be handy when you have the config in another directory or you mount a volume in case of Docker. As an example you can start from `airflow.config_templates.airflow_local_settings.LOGGING_CONFIG`:

```
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'airflow.task': {
'format': LOG_FORMAT,
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'airflow.task',
'stream': 'ext://sys.stdout'
},
'file.task': {
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
# When using s3 or gcs, provide a customized LOGGING_CONFIG
# in airflow_local_settings within your PYTHONPATH, see UPDATING.md
# for details
's3.task': {
'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
's3_log_folder': S3_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
'gcs.task': {
'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'gcs_log_folder': GCS_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
},
'loggers': {
'airflow.task': {
'handlers': ['file.task'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.task_runner': {
'handlers': ['file.task'],
'level': LOG_LEVEL,
'propagate': True,
},
'airflow': {
'handlers': ['console'],
'level': LOG_LEVEL,
'propagate': False,
},
}
}
```

If you want to customize the logging (for example, use logging rotate), you can do this by defining one or more of the logging handles that [Python has to offer](https://docs.python.org/3/library/logging.handlers.html). For more details about the Python logging, please refer to the [official logging documentation](https://docs.python.org/3/library/logging.html).

Furthermore, this change also simplifies logging within the DAG itself:

```
root@ae1bc863e815:/airflow# python
Python 3.6.2 (default, Sep 13 2017, 14:26:54)
[GCC 4.9.2] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from airflow.settings import *
>>>
>>> from datetime import datetime
>>> from airflow import DAG
>>> from airflow.operators.dummy_operator import DummyOperator
>>>
>>> dag = DAG('simple_dag', start_date=datetime(2017, 9, 1))
>>>
>>> task = DummyOperator(task_id='task_1', dag=dag)
>>>
>>> task.log.error('I want to say something..')
[2017-09-25 20:17:04,927] {<stdin>:1} ERROR - I want to say something..
```

#### Template path of the file_task_handler

The `file_task_handler` logger is more flexible. You can change the default format, `{dag_id}/{task_id}/{execution_date}/{try_number}.log` by supplying Jinja templating in the `FILENAME_TEMPLATE` configuration variable. See the `file_task_handler` for more information.

#### I'm using S3Log or GCSLogs, what do I do!?

IF you are logging to either S3Log or GCSLogs, you will need a custom logging config. The `REMOTE_BASE_LOG_FOLDER` configuration key in your airflow config has been removed, therefore you will need to take the following steps:
- Copy the logging configuration from [`airflow/config_templates/airflow_logging_settings.py`](https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py) and copy it.
- Place it in a directory inside the Python import path `PYTHONPATH`. If you are using Python 2.7, ensuring that any `__init__.py` files exist so that it is importable.
- Update the config by setting the path of `REMOTE_BASE_LOG_FOLDER` explicitly in the config. The `REMOTE_BASE_LOG_FOLDER` key is not used anymore.
- Set the `logging_config_class` to the filename and dict. For example, if you place `custom_logging_config.py` on the base of your pythonpath, you will need to set `logging_config_class = custom_logging_config.LOGGING_CONFIG` in your config as Airflow 1.8.

ELSE you don't need to change anything. If there is no custom config, the airflow config loader will still default to the same config.

### New Features

Expand Down
Expand Up @@ -26,17 +26,6 @@

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')

# TODO: REMOTE_BASE_LOG_FOLDER should be deprecated and
# directly specify in the handler definitions. This is to
# provide compatibility to older remote log folder settings.
REMOTE_BASE_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
S3_LOG_FOLDER = ''
GCS_LOG_FOLDER = ''
if REMOTE_BASE_LOG_FOLDER.startswith('s3:/'):
S3_LOG_FOLDER = REMOTE_BASE_LOG_FOLDER
elif REMOTE_BASE_LOG_FOLDER.startswith('gs:/'):
GCS_LOG_FOLDER = REMOTE_BASE_LOG_FOLDER

FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'

DEFAULT_LOGGING_CONFIG = {
Expand All @@ -58,21 +47,24 @@
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
's3.task': {
'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
's3_log_folder': S3_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
'gcs.task': {
'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'gcs_log_folder': GCS_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
}
# When using s3 or gcs, provide a customized LOGGING_CONFIG
# in airflow_local_settings within your PYTHONPATH, see UPDATING.md
# for details
# 's3.task': {
# 'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
# 'formatter': 'airflow.task',
# 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
# 's3_log_folder': S3_LOG_FOLDER,
# 'filename_template': FILENAME_TEMPLATE,
# },
# 'gcs.task': {
# 'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
# 'formatter': 'airflow.task',
# 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
# 'gcs_log_folder': GCS_LOG_FOLDER,
# 'filename_template': FILENAME_TEMPLATE,
# },
},
'loggers': {
'airflow.task': {
Expand All @@ -85,7 +77,7 @@
'level': LOG_LEVEL,
'propagate': True,
},
'airflow.task.raw': {
'airflow': {
'handlers': ['console'],
'level': LOG_LEVEL,
'propagate': False,
Expand Down
18 changes: 7 additions & 11 deletions airflow/config_templates/default_airflow.cfg
Expand Up @@ -36,23 +36,22 @@ dags_folder = {AIRFLOW_HOME}/dags
base_log_folder = {AIRFLOW_HOME}/logs

# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply a remote location URL (starting with either 's3://...' or
# 'gs://...') and an Airflow connection id that provides access to the storage
# must supply an Airflow connection id that provides access to the storage
# location.
remote_base_log_folder =
remote_log_conn_id =
# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False
# DEPRECATED option for remote log storage, use remote_base_log_folder instead!
s3_log_folder =

# Logging level
logging_level = INFO

# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class =

# Log format
log_format = [%%(asctime)s] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s
log_format_with_pid = [%%(asctime)s] [%%(process)d] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s
log_format_with_thread_name = [%%(asctime)s] {{%%(filename)s:%%(lineno)d}} %%(threadName)s %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s

# The executor class that airflow should use. Choices include
Expand Down Expand Up @@ -122,9 +121,6 @@ security =
# values at runtime)
unit_test_mode = False

# User defined logging configuration file path.
logging_config_path =

# Name of handler to read task instance logs.
# Default to use file task handler.
task_log_reader = file.task
Expand Down
5 changes: 2 additions & 3 deletions airflow/jobs.py
Expand Up @@ -40,6 +40,7 @@
from airflow import configuration as conf
from airflow import executors, models, settings
from airflow.exceptions import AirflowException
from airflow.logging_config import configure_logging
from airflow.models import DAG, DagRun
from airflow.settings import Stats
from airflow.task_runner import get_task_runner
Expand Down Expand Up @@ -372,9 +373,7 @@ def helper():
sys.stderr = f

try:
# Re-configure logging to use the new output streams
log_format = settings.LOG_FORMAT_WITH_THREAD_NAME
settings.configure_logging(log_format=log_format)
configure_logging()
# Re-configure the ORM engine as there are issues with multiple processes
settings.configure_orm()

Expand Down
64 changes: 64 additions & 0 deletions airflow/logging_config.py
@@ -0,0 +1,64 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import sys
from logging.config import dictConfig

from airflow import configuration as conf
from airflow.exceptions import AirflowConfigException
from airflow.utils.module_loading import import_string

log = logging.getLogger(__name__)


def configure_logging():
logging_class_path = ''
try:
logging_class_path = conf.get('core', 'logging_config_class')
except AirflowConfigException:
log.debug('Could not find key logging_config_class in config')

if logging_class_path:
try:
logging_config = import_string(logging_class_path)

# Make sure that the variable is in scope
assert (isinstance(logging_config, dict))

log.info(
'Successfully imported user-defined logging config from %s',
logging_class_path
)
except Exception:
# Import default logging configurations.
raise ImportError(
'Unable to load custom logging from {}'.format(logging_class_path)
)
else:
from airflow.config_templates.airflow_local_settings import (
DEFAULT_LOGGING_CONFIG as logging_config
)
log.debug('Unable to load custom logging, using default config instead')

try:
# Try to init logging
dictConfig(logging_config)
except ValueError as e:
log.warning('Unable to load the config, contains a configuration error.')
# When there is an error in the config, escalate the exception
# otherwise Airflow would silently fall back on the default config
raise e

return logging_config

0 comments on commit 8b4a503

Please sign in to comment.