Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Glue Job Driver logging #25142

Merged
merged 1 commit into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 83 additions & 14 deletions airflow/providers/amazon/aws/hooks/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,18 @@
import warnings
from typing import Dict, List, Optional

import boto3

from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook

DEFAULT_LOG_SUFFIX = 'output'
FAILURE_LOG_SUFFIX = 'error'
# A filter value of ' ' translates to "match all".
# see: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
DEFAULT_LOG_FILTER = ' '
FAILURE_LOG_FILTER = '?ERROR ?Exception'


class GlueJobHook(AwsBaseHook):
"""
Expand Down Expand Up @@ -136,32 +145,92 @@ def get_job_state(self, job_name: str, run_id: str) -> str:
job_run = glue_client.get_job_run(JobName=job_name, RunId=run_id, PredecessorsIncluded=True)
return job_run['JobRun']['JobRunState']

def job_completion(self, job_name: str, run_id: str) -> Dict[str, str]:
def print_job_logs(
self,
job_name: str,
run_id: str,
job_failed: bool = False,
next_token: Optional[str] = None,
) -> Optional[str]:
"""Prints the batch of logs to the Airflow task log and returns nextToken."""
log_client = boto3.client('logs')
response = {}

filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER
log_group_prefix = self.conn.get_job_run(JobName=job_name, RunId=run_id)['JobRun']['LogGroupName']
log_group_suffix = FAILURE_LOG_SUFFIX if job_failed else DEFAULT_LOG_SUFFIX
log_group_name = f'{log_group_prefix}/{log_group_suffix}'

try:
if next_token:
response = log_client.filter_log_events(
logGroupName=log_group_name,
logStreamNames=[run_id],
filterPattern=filter_pattern,
nextToken=next_token,
)
else:
response = log_client.filter_log_events(
logGroupName=log_group_name,
logStreamNames=[run_id],
filterPattern=filter_pattern,
)
if len(response['events']):
messages = '\t'.join([event['message'] for event in response['events']])
self.log.info('Glue Job Run Logs:\n\t%s', messages)

except log_client.exceptions.ResourceNotFoundException:
self.log.warning(
'No new Glue driver logs found. This might be because there are no new logs, '
'or might be an error.\nIf the error persists, check the CloudWatch dashboard '
f'at: https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home'
)

# If no new log events are available, filter_log_events will return None.
# In that case, check the same token again next pass.
return response.get('nextToken') or next_token

def job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> Dict[str, str]:
"""
Waits until Glue job with job_name completes or
fails and return final state if finished.
Raises AirflowException when the job failed
:param job_name: unique job name per AWS account
:param run_id: The job-run ID of the predecessor job run
:param verbose: If True, more Glue Job Run logs show in the Airflow Task Logs. (default: False)
:return: Dict of JobRunState and JobRunId
"""
failed_states = ['FAILED', 'TIMEOUT']
finished_states = ['SUCCEEDED', 'STOPPED']
next_log_token = None
job_failed = False

while True:
job_run_state = self.get_job_state(job_name, run_id)
if job_run_state in finished_states:
self.log.info("Exiting Job %s Run State: %s", run_id, job_run_state)
return {'JobRunState': job_run_state, 'JobRunId': run_id}
if job_run_state in failed_states:
job_error_message = f"Exiting Job {run_id} Run State: {job_run_state}"
self.log.info(job_error_message)
raise AirflowException(job_error_message)
else:
self.log.info(
"Polling for AWS Glue Job %s current run state with status %s", job_name, job_run_state
)
time.sleep(self.JOB_POLL_INTERVAL)
try:
job_run_state = self.get_job_state(job_name, run_id)
if job_run_state in finished_states:
self.log.info('Exiting Job %s Run State: %s', run_id, job_run_state)
return {'JobRunState': job_run_state, 'JobRunId': run_id}
if job_run_state in failed_states:
job_failed = True
job_error_message = f'Exiting Job {run_id} Run State: {job_run_state}'
self.log.info(job_error_message)
raise AirflowException(job_error_message)
else:
self.log.info(
'Polling for AWS Glue Job %s current run state with status %s',
job_name,
job_run_state,
)
time.sleep(self.JOB_POLL_INTERVAL)
finally:
if verbose:
next_log_token = self.print_job_logs(
job_name=job_name,
run_id=run_id,
job_failed=job_failed,
next_token=next_log_token,
)

def get_or_create_glue_job(self) -> str:
"""
Expand Down
5 changes: 4 additions & 1 deletion airflow/providers/amazon/aws/operators/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class GlueJobOperator(BaseOperator):
:param create_job_kwargs: Extra arguments for Glue Job Creation
:param run_job_kwargs: Extra arguments for Glue Job Run
:param wait_for_completion: Whether or not wait for job run completion. (default: True)
:param verbose: If True, Glue Job Run logs show in the Airflow Task Logs. (default: False)
"""

template_fields: Sequence[str] = ('script_args',)
Expand Down Expand Up @@ -78,6 +79,7 @@ def __init__(
create_job_kwargs: Optional[dict] = None,
run_job_kwargs: Optional[dict] = None,
wait_for_completion: bool = True,
verbose: bool = False,
**kwargs,
):
super().__init__(**kwargs)
Expand All @@ -97,6 +99,7 @@ def __init__(
self.create_job_kwargs = create_job_kwargs
self.run_job_kwargs = run_job_kwargs or {}
self.wait_for_completion = wait_for_completion
self.verbose = verbose

def execute(self, context: 'Context'):
"""
Expand Down Expand Up @@ -135,7 +138,7 @@ def execute(self, context: 'Context'):
)
glue_job_run = glue_job.initialize_job(self.script_args, self.run_job_kwargs)
if self.wait_for_completion:
glue_job_run = glue_job.job_completion(self.job_name, glue_job_run['JobRunId'])
glue_job_run = glue_job.job_completion(self.job_name, glue_job_run['JobRunId'], self.verbose)
self.log.info(
"AWS Glue Job: %s status: %s. Run Id: %s",
self.job_name,
Expand Down
50 changes: 37 additions & 13 deletions airflow/providers/amazon/aws/sensors/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
import warnings
from typing import TYPE_CHECKING, Sequence
from typing import TYPE_CHECKING, List, Optional, Sequence

from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.glue import GlueJobHook
Expand All @@ -37,30 +37,54 @@ class GlueJobSensor(BaseSensorOperator):

:param job_name: The AWS Glue Job unique name
:param run_id: The AWS Glue current running job identifier
:param verbose: If True, more Glue Job Run logs show in the Airflow Task Logs. (default: False)
"""

template_fields: Sequence[str] = ('job_name', 'run_id')

def __init__(self, *, job_name: str, run_id: str, aws_conn_id: str = 'aws_default', **kwargs):
def __init__(
self,
*,
job_name: str,
run_id: str,
verbose: bool = False,
aws_conn_id: str = 'aws_default',
**kwargs,
):
super().__init__(**kwargs)
self.job_name = job_name
self.run_id = run_id
self.verbose = verbose
self.aws_conn_id = aws_conn_id
self.success_states = ['SUCCEEDED']
self.errored_states = ['FAILED', 'STOPPED', 'TIMEOUT']
self.success_states: List[str] = ['SUCCEEDED']
self.errored_states: List[str] = ['FAILED', 'STOPPED', 'TIMEOUT']
self.next_log_token: Optional[str] = None

def poke(self, context: 'Context'):
hook = GlueJobHook(aws_conn_id=self.aws_conn_id)
self.log.info("Poking for job run status :for Glue Job %s and ID %s", self.job_name, self.run_id)
self.log.info('Poking for job run status :for Glue Job %s and ID %s', self.job_name, self.run_id)
job_state = hook.get_job_state(job_name=self.job_name, run_id=self.run_id)
if job_state in self.success_states:
self.log.info("Exiting Job %s Run State: %s", self.run_id, job_state)
return True
elif job_state in self.errored_states:
job_error_message = f"Exiting Job {self.run_id} Run State: {job_state}"
raise AirflowException(job_error_message)
else:
return False
job_failed = False

try:
if job_state in self.success_states:
self.log.info('Exiting Job %s Run State: %s', self.run_id, job_state)
return True
elif job_state in self.errored_states:
job_failed = True
job_error_message = 'Exiting Job %s Run State: %s', self.run_id, job_state
self.log.info(job_error_message)
raise AirflowException(job_error_message)
else:
return False
finally:
if self.verbose:
self.next_log_token = hook.print_job_logs(
job_name=self.job_name,
run_id=self.run_id,
job_failed=job_failed,
next_token=self.next_log_token,
)


class AwsGlueJobSensor(GlueJobSensor):
Expand Down
54 changes: 50 additions & 4 deletions tests/providers/amazon/aws/operators/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from parameterized import parameterized

from airflow import configuration
from airflow.configuration import conf
from airflow.providers.amazon.aws.hooks.glue import GlueJobHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
Expand All @@ -29,7 +29,7 @@
class TestGlueJobOperator(unittest.TestCase):
@mock.patch('airflow.providers.amazon.aws.hooks.glue.GlueJobHook')
def setUp(self, glue_hook_mock):
configuration.load_test_config()
conf.load_test_config()

self.glue_hook_mock = glue_hook_mock

Expand All @@ -39,12 +39,19 @@ def setUp(self, glue_hook_mock):
"/glue-examples/glue-scripts/sample_aws_glue_job.py",
]
)
@mock.patch.object(GlueJobHook, 'print_job_logs')
@mock.patch.object(GlueJobHook, 'get_job_state')
@mock.patch.object(GlueJobHook, 'initialize_job')
@mock.patch.object(GlueJobHook, "get_conn")
@mock.patch.object(S3Hook, "load_file")
def test_execute_without_failure(
self, script_location, mock_load_file, mock_get_conn, mock_initialize_job, mock_get_job_state
self,
script_location,
mock_load_file,
mock_get_conn,
mock_initialize_job,
mock_get_job_state,
mock_print_job_logs,
):
glue = GlueJobOperator(
task_id='test_glue_operator',
Expand All @@ -57,16 +64,52 @@ def test_execute_without_failure(
)
mock_initialize_job.return_value = {'JobRunState': 'RUNNING', 'JobRunId': '11111'}
mock_get_job_state.return_value = 'SUCCEEDED'

glue.execute({})

mock_initialize_job.assert_called_once_with({}, {})
mock_print_job_logs.assert_not_called()
assert glue.job_name == 'my_test_job'

@mock.patch.object(GlueJobHook, 'print_job_logs')
@mock.patch.object(GlueJobHook, 'get_job_state')
@mock.patch.object(GlueJobHook, 'initialize_job')
@mock.patch.object(GlueJobHook, "get_conn")
@mock.patch.object(S3Hook, "load_file")
def test_execute_with_verbose_logging(
self, mock_load_file, mock_get_conn, mock_initialize_job, mock_get_job_state, mock_print_job_logs
):
job_name = 'test_job_name'
job_run_id = '11111'
glue = GlueJobOperator(
task_id='test_glue_operator',
job_name=job_name,
script_location='s3_uri',
s3_bucket='bucket_name',
iam_role_name='role_arn',
verbose=True,
)
mock_initialize_job.return_value = {'JobRunState': 'RUNNING', 'JobRunId': job_run_id}
mock_get_job_state.return_value = 'SUCCEEDED'

glue.execute({})

mock_initialize_job.assert_called_once_with({}, {})
mock_print_job_logs.assert_called_once_with(
job_name=job_name,
run_id=job_run_id,
job_failed=False,
next_token=None,
)
assert glue.job_name == job_name

@mock.patch.object(GlueJobHook, 'print_job_logs')
@mock.patch.object(GlueJobHook, 'job_completion')
@mock.patch.object(GlueJobHook, 'initialize_job')
@mock.patch.object(GlueJobHook, "get_conn")
@mock.patch.object(S3Hook, "load_file")
def test_execute_without_waiting_for_completion(
self, mock_load_file, mock_get_conn, mock_initialize_job, mock_job_completion
self, mock_load_file, mock_get_conn, mock_initialize_job, mock_job_completion, mock_print_job_logs
):
glue = GlueJobOperator(
task_id='test_glue_operator',
Expand All @@ -79,8 +122,11 @@ def test_execute_without_waiting_for_completion(
wait_for_completion=False,
)
mock_initialize_job.return_value = {'JobRunState': 'RUNNING', 'JobRunId': '11111'}

job_run_id = glue.execute({})

mock_initialize_job.assert_called_once_with({}, {})
mock_job_completion.assert_not_called()
mock_print_job_logs.assert_not_called()
assert glue.job_name == 'my_test_job'
assert job_run_id == '11111'
Loading