Skip to content

Commit

Permalink
Add oss_task_handler into alibaba-provider and enable remote logging …
Browse files Browse the repository at this point in the history
…to OSS (#21785)
  • Loading branch information
EricGao888 committed Mar 11, 2022
1 parent 03d0c70 commit 7bd8b2d
Show file tree
Hide file tree
Showing 13 changed files with 580 additions and 7 deletions.
11 changes: 11 additions & 0 deletions airflow/config_templates/airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,17 @@
}

DEFAULT_LOGGING_CONFIG['handlers'].update(STACKDRIVER_REMOTE_HANDLERS)
elif REMOTE_BASE_LOG_FOLDER.startswith('oss://'):
OSS_REMOTE_HANDLERS = {
'task': {
'class': 'airflow.providers.alibaba.cloud.log.oss_task_handler.OSSTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'oss_log_folder': REMOTE_BASE_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
}
DEFAULT_LOGGING_CONFIG['handlers'].update(OSS_REMOTE_HANDLERS)
elif ELASTICSEARCH_HOST:
ELASTICSEARCH_LOG_ID_TEMPLATE: str = conf.get('elasticsearch', 'LOG_ID_TEMPLATE')
ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get('elasticsearch', 'END_OF_LOG_MARK')
Expand Down
96 changes: 94 additions & 2 deletions airflow/providers/alibaba/cloud/hooks/oss.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,13 @@ class OSSHook(BaseHook):
conn_type = 'oss'
hook_name = 'OSS'

def __init__(self, region, oss_conn_id='oss_default', *args, **kwargs) -> None:
def __init__(self, region: Optional[str] = None, oss_conn_id='oss_default', *args, **kwargs) -> None:
self.oss_conn_id = oss_conn_id
self.oss_conn = self.get_connection(oss_conn_id)
self.region = region
if region is None:
self.region = self.get_default_region()
else:
self.region = region
super().__init__(*args, **kwargs)

def get_conn(self) -> "Connection":
Expand All @@ -117,6 +120,7 @@ def parse_oss_url(ossurl: str) -> tuple:

return bucket_name, key

@provide_bucket_name
@unify_bucket_name_and_key
def object_exists(self, key: str, bucket_name: Optional[str] = None) -> bool:
"""
Expand All @@ -143,8 +147,10 @@ def get_bucket(self, bucket_name: Optional[str] = None) -> oss2.api.Bucket:
:rtype: oss2.api.Bucket
"""
auth = self.get_credential()
assert self.region is not None
return oss2.Bucket(auth, 'http://oss-' + self.region + '.aliyuncs.com', bucket_name)

@provide_bucket_name
@unify_bucket_name_and_key
def load_string(self, key: str, content: str, bucket_name: Optional[str] = None) -> None:
"""
Expand All @@ -159,6 +165,7 @@ def load_string(self, key: str, content: str, bucket_name: Optional[str] = None)
except Exception as e:
raise AirflowException(f"Errors: {e}")

@provide_bucket_name
@unify_bucket_name_and_key
def upload_local_file(
self,
Expand All @@ -178,6 +185,7 @@ def upload_local_file(
except Exception as e:
raise AirflowException(f"Errors when upload file: {e}")

@provide_bucket_name
@unify_bucket_name_and_key
def download_file(
self,
Expand All @@ -201,6 +209,7 @@ def download_file(
return None
return local_file

@provide_bucket_name
@unify_bucket_name_and_key
def delete_object(
self,
Expand All @@ -219,6 +228,7 @@ def delete_object(
self.log.error(e)
raise AirflowException(f"Errors when deleting: {key}")

@provide_bucket_name
@unify_bucket_name_and_key
def delete_objects(
self,
Expand Down Expand Up @@ -269,6 +279,73 @@ def create_bucket(
self.log.error(e)
raise AirflowException(f"Errors when create bucket: {bucket_name}")

@provide_bucket_name
@unify_bucket_name_and_key
def append_string(self, bucket_name: Optional[str], content: str, key: str, pos: int) -> None:
"""
Append string to a remote existing file
:param bucket_name: the name of the bucket
:param content: content to be appended
:param key: oss bucket key
:param pos: position of the existing file where the content will be appended
"""
self.log.info("Write oss bucket. key: %s, pos: %s", key, pos)
try:
self.get_bucket(bucket_name).append_object(key, pos, content)
except Exception as e:
self.log.error(e)
raise AirflowException(f"Errors when append string for object: {key}")

@provide_bucket_name
@unify_bucket_name_and_key
def read_key(self, bucket_name: Optional[str], key: str) -> str:
"""
Read oss remote object content with the specified key
:param bucket_name: the name of the bucket
:param key: oss bucket key
"""
self.log.info("Read oss key: %s", key)
try:
return self.get_bucket(bucket_name).get_object(key).read().decode("utf-8")
except Exception as e:
self.log.error(e)
raise AirflowException(f"Errors when read bucket object: {key}")

@provide_bucket_name
@unify_bucket_name_and_key
def head_key(self, bucket_name: Optional[str], key: str) -> oss2.models.HeadObjectResult:
"""
Get meta info of the specified remote object
:param bucket_name: the name of the bucket
:param key: oss bucket key
"""
self.log.info("Head Object oss key: %s", key)
try:
return self.get_bucket(bucket_name).head_object(key)
except Exception as e:
self.log.error(e)
raise AirflowException(f"Errors when head bucket object: {key}")

@provide_bucket_name
@unify_bucket_name_and_key
def key_exist(self, bucket_name: Optional[str], key: str) -> bool:
"""
Find out whether the specified key exists in the oss remote storage
:param bucket_name: the name of the bucket
:param key: oss bucket key
"""
# full_path = None
self.log.info('Looking up oss bucket %s for bucket key %s ...', bucket_name, key)
try:
return self.get_bucket(bucket_name).object_exists(key)
except Exception as e:
self.log.error(e)
raise AirflowException(f"Errors when check bucket object existence: {key}")

def get_credential(self) -> oss2.auth.Auth:
extra_config = self.oss_conn.extra_dejson
auth_type = extra_config.get('auth_type', None)
Expand All @@ -285,3 +362,18 @@ def get_credential(self) -> oss2.auth.Auth:
return oss2.Auth(oss_access_key_id, oss_access_key_secret)
else:
raise Exception("Unsupported auth_type: " + auth_type)

def get_default_region(self) -> Optional[str]:
extra_config = self.oss_conn.extra_dejson
auth_type = extra_config.get('auth_type', None)
if not auth_type:
raise Exception("No auth_type specified in extra_config. ")

if auth_type == 'AK':
default_region = extra_config.get('region', None)
if not default_region:
raise Exception("No region is specified for connection: " + self.oss_conn_id)
else:
raise Exception("Unsupported auth_type: " + auth_type)

return default_region
16 changes: 16 additions & 0 deletions airflow/providers/alibaba/cloud/log/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
186 changes: 186 additions & 0 deletions airflow/providers/alibaba/cloud/log/oss_task_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import sys

if sys.version_info >= (3, 8):
from functools import cached_property
else:
from cached_property import cached_property

from airflow.configuration import conf
from airflow.providers.alibaba.cloud.hooks.oss import OSSHook
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import LoggingMixin


class OSSTaskHandler(FileTaskHandler, LoggingMixin):
"""
OSSTaskHandler is a python log handler that handles and reads
task instance logs. It extends airflow FileTaskHandler and
uploads to and reads from OSS remote storage.
"""

def __init__(self, base_log_folder, oss_log_folder, filename_template):
self.log.info("Using oss_task_handler for remote logging...")
super().__init__(base_log_folder, filename_template)
(self.bucket_name, self.base_folder) = OSSHook.parse_oss_url(oss_log_folder)
self.log_relative_path = ''
self._hook = None
self.closed = False
self.upload_on_close = True

@cached_property
def hook(self):
remote_conn_id = conf.get('logging', 'REMOTE_LOG_CONN_ID')
self.log.info("remote_conn_id: %s", remote_conn_id)
try:
return OSSHook(oss_conn_id=remote_conn_id)
except Exception as e:
self.log.error(e, exc_info=True)
self.log.error(
'Could not create an OSSHook with connection id "%s". '
'Please make sure that airflow[oss] is installed and '
'the OSS connection exists.',
remote_conn_id,
)

def set_context(self, ti):
super().set_context(ti)
# Local location and remote location is needed to open and
# upload local log file to OSS remote storage.
self.log_relative_path = self._render_filename(ti, ti.try_number)
self.upload_on_close = not ti.raw

# Clear the file first so that duplicate data is not uploaded
# when re-using the same path (e.g. with rescheduled sensors)
if self.upload_on_close:
with open(self.handler.baseFilename, 'w'):
pass

def close(self):
"""Close and upload local log file to remote storage OSS."""
# When application exit, system shuts down all handlers by
# calling close method. Here we check if logger is already
# closed to prevent uploading the log to remote storage multiple
# times when `logging.shutdown` is called.
if self.closed:
return

super().close()

if not self.upload_on_close:
return

local_loc = os.path.join(self.local_base, self.log_relative_path)
remote_loc = self.log_relative_path
if os.path.exists(local_loc):
# read log and remove old logs to get just the latest additions
with open(local_loc) as logfile:
log = logfile.read()
self.oss_write(log, remote_loc)

# Mark closed so we don't double write if close is called twice
self.closed = True

def _read(self, ti, try_number, metadata=None):
"""
Read logs of given task instance and try_number from OSS remote storage.
If failed, read the log from task instance host machine.
:param ti: task instance object
:param try_number: task instance try_number to read logs from
:param metadata: log metadata,
can be used for steaming log reading and auto-tailing.
"""
# Explicitly getting log relative path is necessary as the given
# task instance might be different than task instance passed in
# in set_context method.
log_relative_path = self._render_filename(ti, try_number)
remote_loc = log_relative_path

if self.oss_log_exists(remote_loc):
# If OSS remote file exists, we do not fetch logs from task instance
# local machine even if there are errors reading remote logs, as
# returned remote_log will contain error messages.
remote_log = self.oss_read(remote_loc, return_error=True)
log = f'*** Reading remote log from {remote_loc}.\n{remote_log}\n'
return log, {'end_of_log': True}
else:
return super()._read(ti, try_number)

def oss_log_exists(self, remote_log_location):
"""
Check if remote_log_location exists in remote storage
:param remote_log_location: log's location in remote storage
:return: True if location exists else False
"""
oss_remote_log_location = self.base_folder + '/' + remote_log_location
try:
return self.hook.key_exist(self.bucket_name, oss_remote_log_location)
except Exception:
pass
return False

def oss_read(self, remote_log_location, return_error=False):
"""
Returns the log found at the remote_log_location. Returns '' if no
logs are found or there is an error.
:param remote_log_location: the log's location in remote storage
:param return_error: if True, returns a string error message if an
error occurs. Otherwise returns '' when an error occurs.
"""
try:
oss_remote_log_location = self.base_folder + '/' + remote_log_location
self.log.info("read remote log: %s", oss_remote_log_location)
return self.hook.read_key(self.bucket_name, oss_remote_log_location)
except Exception:
msg = f'Could not read logs from {oss_remote_log_location}'
self.log.exception(msg)
# return error if needed
if return_error:
return msg

def oss_write(self, log, remote_log_location, append=True):
"""
Writes the log to the remote_log_location. Fails silently if no hook
was created.
:param log: the log to write to the remote_log_location
:param remote_log_location: the log's location in remote storage
:param append: if False, any existing log file is overwritten. If True,
the new log is appended to any existing logs.
"""
oss_remote_log_location = self.base_folder + '/' + remote_log_location
pos = 0
if append and self.oss_log_exists(oss_remote_log_location):
head = self.hook.head_key(self.bucket_name, oss_remote_log_location)
pos = head.content_length
self.log.info("log write pos is: %s", str(pos))
try:
self.log.info("writing remote log: %s", oss_remote_log_location)
self.hook.append_string(self.bucket_name, log, oss_remote_log_location, pos)
except Exception:
self.log.exception(
'Could not write logs to %s, log write pos is: %s, Append is %s',
oss_remote_log_location,
str(pos),
str(append),
)
3 changes: 3 additions & 0 deletions airflow/providers/alibaba/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,6 @@ hook-class-names: # deprecated - to be removed after providers add dependency o
connection-types:
- hook-class-name: airflow.providers.alibaba.cloud.hooks.oss.OSSHook
connection-type: oss

logging:
- airflow.providers.alibaba.cloud.log.oss_task_handler.OSSTaskHandler
3 changes: 2 additions & 1 deletion airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,8 @@ def create_default_connections(session: Session = NEW_SESSION):
extra='''{
"auth_type": "AK",
"access_key_id": "<ACCESS_KEY_ID>",
"access_key_secret": "<ACCESS_KEY_SECRET>"}
"access_key_secret": "<ACCESS_KEY_SECRET>",
"region": "<YOUR_OSS_REGION>"}
''',
),
session,
Expand Down
Loading

0 comments on commit 7bd8b2d

Please sign in to comment.