Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,9 @@ def run(args, dag=None):
# GCS
elif remote_base.startswith('gs:/'):
logging_utils.GCSLog().write(log, remote_log_location)
# HDFS
elif remote_base.startswith('hdfs:/'):
logging_utils.HDFSLog().write(log, remote_log_location)
# Other
elif remote_base and remote_base != 'None':
logging.error(
Expand Down
4 changes: 2 additions & 2 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ dags_folder = {AIRFLOW_HOME}/dags
base_log_folder = {AIRFLOW_HOME}/logs

# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply a remote location URL (starting with either 's3://...' or
# 'gs://...') and an Airflow connection id that provides access to the storage
# must supply a remote location URL (starting with either 's3://', 'gs://', or
# 'hdfs://') and an Airflow connection id that provides access to the storage
# location.
remote_base_log_folder =
remote_log_conn_id =
Expand Down
1 change: 1 addition & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def run_command(command):

return output


_templates_dir = os.path.join(os.path.dirname(__file__), 'config_templates')
with open(os.path.join(_templates_dir, 'default_airflow.cfg')) as f:
DEFAULT_CONFIG = f.read()
Expand Down
87 changes: 87 additions & 0 deletions airflow/utils/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from builtins import object

import logging
import os

from airflow import configuration
from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -211,3 +212,89 @@ def parse_gcs_url(self, gsurl):
bucket = parsed_url.netloc
blob = parsed_url.path.strip('/')
return (bucket, blob)


class HDFSLog(object):
"""
Utility class for reading and writing logs in HDFS via WebHDFS.
Requires airflow[webhdfs] and setting the REMOTE_BASE_LOG_FOLDER and
REMOTE_LOG_CONN_ID configuration options in airflow.cfg.
"""
def __init__(self):
remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID')
try:
from airflow.hooks.webhdfs_hook import WebHDFSHook
self.hook = WebHDFSHook(remote_conn_id)
except:
self.hook = None
logging.error(
'Could not create an WebHDFSHook with connection id "{}". '
'Please make sure that airflow[webhdfs] is installed and '
'the WebHDFS connection exists.'.format(remote_conn_id))

def escape_filename(self, hdfs_path):
directory = os.path.dirname(hdfs_path)
filename = os.path.basename(hdfs_path)
return os.path.join(directory, filename.replace(':', '-'))

def remove_scheme(self, hdfs_path):
return hdfs_path[7:] if hdfs_path.startswith('hdfs://') else hdfs_path

def read(self, remote_log_location, return_error=False, client=None):
"""
Returns the log found at the remote_log_location. Returns '' if no
logs are found or there is an error.

:param remote_log_location: the log's location in remote storage
:type remote_log_location: string (path)
:param return_error: if True, returns a string error message if an
error occurs. Otherwise returns '' when an error occurs.
:type return_error: bool
:param client: if passed, reuse already established snakebite client
:type client: hdfs.client.Client
"""
remote_log_location = self.escape_filename(remote_log_location)
if self.hook:
try:
if client is None:
client = self.hook.get_conn()
with client.read(self.remove_scheme(remote_log_location)) as f:
return f.read()
except Exception:
pass

err = 'Could not read logs from {}'.format(remote_log_location)
logging.error(err)
return err if return_error else ''

def write(self, log, remote_log_location, append=True):
"""
Writes the log to the remote_log_location. Fails silently if no hook
was created.

:param log: the log to write to the remote_log_location
:type log: string
:param remote_log_location: the log's location in remote storage
:type remote_log_location: string (path)
:param append: if False, any existing log file is overwritten. If True,
the new log is appended to any existing logs.
:type append: bool

"""
remote_log_location = self.escape_filename(remote_log_location)
if self.hook:
try:
client = self.hook.get_conn()
if append:
old_log = self.read(remote_log_location, client=client)
log = old_log + '\n' + log
with client.write(
self.remove_scheme(remote_log_location),
overwrite=True) as f:
f.write(log)
return
except Exception:
pass

# raise/return error if we get here
logging.error('Could not write logs to {}'.format(remote_log_location))
12 changes: 10 additions & 2 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -753,12 +753,20 @@ def log(self):
# S3
if remote_log_path.startswith('s3:/'):
remote_log += log_utils.S3Log().read(
remote_log_path, return_error=surface_log_retrieval_errors)
remote_log_path,
return_error=surface_log_retrieval_errors)
remote_log_loaded = True
# GCS
elif remote_log_path.startswith('gs:/'):
remote_log += log_utils.GCSLog().read(
remote_log_path, return_error=surface_log_retrieval_errors)
remote_log_path,
return_error=surface_log_retrieval_errors)
remote_log_loaded = True
# HDFS
elif remote_log_path.startswith('hdfs:/'):
remote_log += log_utils.HDFSLog().read(
remote_log_path,
return_error=surface_log_retrieval_errors)
remote_log_loaded = True
# unsupported
else:
Expand Down
33 changes: 0 additions & 33 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,13 @@
from __future__ import print_function
from __future__ import unicode_literals

import logging
import unittest

import airflow.utils.logging
from airflow import configuration
from airflow.exceptions import AirflowException
from airflow.utils.operator_resources import Resources


class LogUtilsTest(unittest.TestCase):

def test_gcs_url_parse(self):
"""
Test GCS url parsing
"""
logging.info(
'About to create a GCSLog object without a connection. This will '
'log an error but testing will proceed.')
glog = airflow.utils.logging.GCSLog()

self.assertEqual(
glog.parse_gcs_url('gs://bucket/path/to/blob'),
('bucket', 'path/to/blob'))

# invalid URI
self.assertRaises(
AirflowException,
glog.parse_gcs_url,
'gs:/bucket/path/to/blob')

# trailing slash
self.assertEqual(
glog.parse_gcs_url('gs://bucket/path/to/blob/'),
('bucket', 'path/to/blob'))

# bucket only
self.assertEqual(
glog.parse_gcs_url('gs://bucket/'),
('bucket', ''))

class OperatorResourcesTest(unittest.TestCase):

def setUp(self):
Expand Down
91 changes: 91 additions & 0 deletions tests/utils/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

import logging
import unittest

from airflow.utils.logging import GCSLog, HDFSLog
from airflow.exceptions import AirflowException


class GCSLogTest(unittest.TestCase):

def test_gcs_url_parse(self):
"""
Test GCS url parsing
"""
logging.info(
'About to create a GCSLog object without a connection. This will '
'log an error, but testing will proceed.')
glog = GCSLog()

self.assertEqual(
glog.parse_gcs_url('gs://bucket/path/to/blob'),
('bucket', 'path/to/blob'))

# invalid URI
self.assertRaises(
AirflowException,
glog.parse_gcs_url,
'gs:/bucket/path/to/blob')

# trailing slash
self.assertEqual(
glog.parse_gcs_url('gs://bucket/path/to/blob/'),
('bucket', 'path/to/blob'))

# bucket only
self.assertEqual(
glog.parse_gcs_url('gs://bucket/'),
('bucket', ''))


class HDFSLogTest(unittest.TestCase):

def test_hdfs_escape_filename(self):
"""
Test HDFS escape filename
"""
logging.info(
'About to create an HDFSLog object without a connection. This '
'will log an error, but testing will proceed.')
hdfs_log = HDFSLog()

path = hdfs_log.escape_filename(
'/logs/airflow/my_dag/task_1/2017-01-01T00:00:00')
self.assertEqual(
'/logs/airflow/my_dag/task_1/2017-01-01T00-00-00', path)

def test_hdfs_remove_scheme(self):
"""
Test HDFS escape filename
"""
logging.info(
'About to create an HDFSLog object without a connection. This '
'will log an error, but testing will proceed.')
hdfs_log = HDFSLog()

# with scheme
path = hdfs_log.remove_scheme(
'hdfs:///logs/airflow/my_dag/task_1/2017-01-01T00:00:00')
self.assertEqual(
'/logs/airflow/my_dag/task_1/2017-01-01T00:00:00', path)

# no scheme
path = hdfs_log.remove_scheme(
'/logs/airflow/my_dag/task_1/2017-01-01T00:00:00')
self.assertEqual(
'/logs/airflow/my_dag/task_1/2017-01-01T00:00:00', path)