Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions airflow/providers/amazon/aws/hooks/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,10 @@ def check_for_key(self, key: str, bucket_name: Optional[str] = None) -> bool:
self.get_conn().head_object(Bucket=bucket_name, Key=key)
return True
except ClientError as e:
self.log.error(e.response["Error"]["Message"])
return False
if e.response["ResponseMetadata"]["HTTPStatusCode"] == 404:
return False
else:
raise e

@provide_bucket_name
@unify_bucket_name_and_key
Expand Down
34 changes: 22 additions & 12 deletions airflow/providers/amazon/aws/log/s3_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,26 @@ def _read(self, ti, try_number, metadata=None):
log_relative_path = self._render_filename(ti, try_number)
remote_loc = os.path.join(self.remote_base, log_relative_path)

if self.s3_log_exists(remote_loc):
log_exists = False
log = ""

try:
log_exists = self.s3_log_exists(remote_loc)
except Exception as error: # pylint: disable=broad-except
self.log.exception(error)
log = '*** Failed to verify remote log exists {}.\n{}\n'.format(remote_loc, str(error))

if log_exists:
# If S3 remote file exists, we do not fetch logs from task instance
# local machine even if there are errors reading remote logs, as
# returned remote_log will contain error messages.
remote_log = self.s3_read(remote_loc, return_error=True)
log = f'*** Reading remote log from {remote_loc}.\n{remote_log}\n'
return log, {'end_of_log': True}
else:
return super()._read(ti, try_number)
log += '*** Falling back to local log\n'
local_log, metadata = super()._read(ti, try_number)
return log + local_log, metadata

def s3_log_exists(self, remote_log_location: str) -> bool:
"""
Expand All @@ -127,11 +138,7 @@ def s3_log_exists(self, remote_log_location: str) -> bool:
:type remote_log_location: str
:return: True if location exists else False
"""
try:
return self.hook.get_key(remote_log_location) is not None
except Exception: # pylint: disable=broad-except
pass
return False
return self.hook.check_for_key(remote_log_location)

def s3_read(self, remote_log_location: str, return_error: bool = False) -> str:
"""
Expand All @@ -147,8 +154,8 @@ def s3_read(self, remote_log_location: str, return_error: bool = False) -> str:
"""
try:
return self.hook.read_key(remote_log_location)
except Exception: # pylint: disable=broad-except
msg = f'Could not read logs from {remote_log_location}'
except Exception as error: # pylint: disable=broad-except
msg = f'Could not read logs from {remote_log_location} with error: {error}'
self.log.exception(msg)
# return error if needed
if return_error:
Expand All @@ -168,9 +175,12 @@ def s3_write(self, log: str, remote_log_location: str, append: bool = True):
the new log is appended to any existing logs.
:type append: bool
"""
if append and self.s3_log_exists(remote_log_location):
old_log = self.s3_read(remote_log_location)
log = '\n'.join([old_log, log]) if old_log else log
try:
if append and self.s3_log_exists(remote_log_location):
old_log = self.s3_read(remote_log_location)
log = '\n'.join([old_log, log]) if old_log else log
except Exception as error: # pylint: disable=broad-except
self.log.exception('Could not verify previous log to append: %s', str(error))

try:
self.hook.load_string(
Expand Down
20 changes: 18 additions & 2 deletions tests/providers/amazon/aws/log/test_s3_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ def test_log_exists_raises(self):
def test_log_exists_no_hook(self):
with mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook") as mock_hook:
mock_hook.side_effect = Exception('Failed to connect')
self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location))
with self.assertRaises(Exception):
self.s3_task_handler.s3_log_exists(self.remote_log_location)

def test_set_context_raw(self):
self.ti.raw = True
Expand Down Expand Up @@ -143,12 +144,27 @@ def test_read_when_s3_log_missing(self):
self.assertIn('*** Log file does not exist:', log[0][0][-1])
self.assertEqual({'end_of_log': True}, metadata[0])

def test_s3_read_when_log_missing(self):
handler = self.s3_task_handler
url = 's3://bucket/foo'
with mock.patch.object(handler.log, 'error') as mock_error:
result = handler.s3_read(url, return_error=True)
msg = (
f'Could not read logs from {url} with error: An error occurred (404) when calling the '
f'HeadObject operation: Not Found'
)
self.assertEqual(result, msg)
mock_error.assert_called_once_with(msg, exc_info=True)

def test_read_raises_return_error(self):
handler = self.s3_task_handler
url = 's3://nonexistentbucket/foo'
with mock.patch.object(handler.log, 'error') as mock_error:
result = handler.s3_read(url, return_error=True)
msg = 'Could not read logs from %s' % url
msg = (
f'Could not read logs from {url} with error: An error occurred (NoSuchBucket) when '
f'calling the HeadObject operation: The specified bucket does not exist'
)
self.assertEqual(result, msg)
mock_error.assert_called_once_with(msg, exc_info=True)

Expand Down