From c454623032f121817e7aae868dfac9d36684b562 Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Mon, 11 Jul 2022 20:42:56 -0700 Subject: [PATCH] Glue Job Driver logging Adds an optional `verbose` boolean to Glue job operators and sensors which defaults to False. If set true, then Glue job logs will be passed through to the Airflow task logs. --- airflow/providers/amazon/aws/hooks/glue.py | 97 ++++++++++++++++--- .../providers/amazon/aws/operators/glue.py | 5 +- airflow/providers/amazon/aws/sensors/glue.py | 50 +++++++--- .../amazon/aws/operators/test_glue.py | 54 ++++++++++- .../providers/amazon/aws/sensors/test_glue.py | 97 +++++++++++++++++-- 5 files changed, 265 insertions(+), 38 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/glue.py b/airflow/providers/amazon/aws/hooks/glue.py index 2fc342675db44..9201b9f70b99a 100644 --- a/airflow/providers/amazon/aws/hooks/glue.py +++ b/airflow/providers/amazon/aws/hooks/glue.py @@ -20,9 +20,18 @@ import warnings from typing import Dict, List, Optional +import boto3 + from airflow.exceptions import AirflowException from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook +DEFAULT_LOG_SUFFIX = 'output' +FAILURE_LOG_SUFFIX = 'error' +# A filter value of ' ' translates to "match all". +# see: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html +DEFAULT_LOG_FILTER = ' ' +FAILURE_LOG_FILTER = '?ERROR ?Exception' + class GlueJobHook(AwsBaseHook): """ @@ -136,32 +145,92 @@ def get_job_state(self, job_name: str, run_id: str) -> str: job_run = glue_client.get_job_run(JobName=job_name, RunId=run_id, PredecessorsIncluded=True) return job_run['JobRun']['JobRunState'] - def job_completion(self, job_name: str, run_id: str) -> Dict[str, str]: + def print_job_logs( + self, + job_name: str, + run_id: str, + job_failed: bool = False, + next_token: Optional[str] = None, + ) -> Optional[str]: + """Prints the batch of logs to the Airflow task log and returns nextToken.""" + log_client = boto3.client('logs') + response = {} + + filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER + log_group_prefix = self.conn.get_job_run(JobName=job_name, RunId=run_id)['JobRun']['LogGroupName'] + log_group_suffix = FAILURE_LOG_SUFFIX if job_failed else DEFAULT_LOG_SUFFIX + log_group_name = f'{log_group_prefix}/{log_group_suffix}' + + try: + if next_token: + response = log_client.filter_log_events( + logGroupName=log_group_name, + logStreamNames=[run_id], + filterPattern=filter_pattern, + nextToken=next_token, + ) + else: + response = log_client.filter_log_events( + logGroupName=log_group_name, + logStreamNames=[run_id], + filterPattern=filter_pattern, + ) + if len(response['events']): + messages = '\t'.join([event['message'] for event in response['events']]) + self.log.info('Glue Job Run Logs:\n\t%s', messages) + + except log_client.exceptions.ResourceNotFoundException: + self.log.warning( + 'No new Glue driver logs found. This might be because there are no new logs, ' + 'or might be an error.\nIf the error persists, check the CloudWatch dashboard ' + f'at: https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home' + ) + + # If no new log events are available, filter_log_events will return None. + # In that case, check the same token again next pass. + return response.get('nextToken') or next_token + + def job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> Dict[str, str]: """ Waits until Glue job with job_name completes or fails and return final state if finished. Raises AirflowException when the job failed :param job_name: unique job name per AWS account :param run_id: The job-run ID of the predecessor job run + :param verbose: If True, more Glue Job Run logs show in the Airflow Task Logs. (default: False) :return: Dict of JobRunState and JobRunId """ failed_states = ['FAILED', 'TIMEOUT'] finished_states = ['SUCCEEDED', 'STOPPED'] + next_log_token = None + job_failed = False while True: - job_run_state = self.get_job_state(job_name, run_id) - if job_run_state in finished_states: - self.log.info("Exiting Job %s Run State: %s", run_id, job_run_state) - return {'JobRunState': job_run_state, 'JobRunId': run_id} - if job_run_state in failed_states: - job_error_message = f"Exiting Job {run_id} Run State: {job_run_state}" - self.log.info(job_error_message) - raise AirflowException(job_error_message) - else: - self.log.info( - "Polling for AWS Glue Job %s current run state with status %s", job_name, job_run_state - ) - time.sleep(self.JOB_POLL_INTERVAL) + try: + job_run_state = self.get_job_state(job_name, run_id) + if job_run_state in finished_states: + self.log.info('Exiting Job %s Run State: %s', run_id, job_run_state) + return {'JobRunState': job_run_state, 'JobRunId': run_id} + if job_run_state in failed_states: + job_failed = True + job_error_message = f'Exiting Job {run_id} Run State: {job_run_state}' + self.log.info(job_error_message) + raise AirflowException(job_error_message) + else: + self.log.info( + 'Polling for AWS Glue Job %s current run state with status %s', + job_name, + job_run_state, + ) + time.sleep(self.JOB_POLL_INTERVAL) + finally: + if verbose: + next_log_token = self.print_job_logs( + job_name=job_name, + run_id=run_id, + job_failed=job_failed, + next_token=next_log_token, + ) def get_or_create_glue_job(self) -> str: """ diff --git a/airflow/providers/amazon/aws/operators/glue.py b/airflow/providers/amazon/aws/operators/glue.py index f5cc8c104b860..5ce55681aeb55 100644 --- a/airflow/providers/amazon/aws/operators/glue.py +++ b/airflow/providers/amazon/aws/operators/glue.py @@ -51,6 +51,7 @@ class GlueJobOperator(BaseOperator): :param create_job_kwargs: Extra arguments for Glue Job Creation :param run_job_kwargs: Extra arguments for Glue Job Run :param wait_for_completion: Whether or not wait for job run completion. (default: True) + :param verbose: If True, Glue Job Run logs show in the Airflow Task Logs. (default: False) """ template_fields: Sequence[str] = ('script_args',) @@ -78,6 +79,7 @@ def __init__( create_job_kwargs: Optional[dict] = None, run_job_kwargs: Optional[dict] = None, wait_for_completion: bool = True, + verbose: bool = False, **kwargs, ): super().__init__(**kwargs) @@ -97,6 +99,7 @@ def __init__( self.create_job_kwargs = create_job_kwargs self.run_job_kwargs = run_job_kwargs or {} self.wait_for_completion = wait_for_completion + self.verbose = verbose def execute(self, context: 'Context'): """ @@ -135,7 +138,7 @@ def execute(self, context: 'Context'): ) glue_job_run = glue_job.initialize_job(self.script_args, self.run_job_kwargs) if self.wait_for_completion: - glue_job_run = glue_job.job_completion(self.job_name, glue_job_run['JobRunId']) + glue_job_run = glue_job.job_completion(self.job_name, glue_job_run['JobRunId'], self.verbose) self.log.info( "AWS Glue Job: %s status: %s. Run Id: %s", self.job_name, diff --git a/airflow/providers/amazon/aws/sensors/glue.py b/airflow/providers/amazon/aws/sensors/glue.py index 525e7b8ee6234..a7989f64ecb52 100644 --- a/airflow/providers/amazon/aws/sensors/glue.py +++ b/airflow/providers/amazon/aws/sensors/glue.py @@ -16,7 +16,7 @@ # specific language governing permissions and limitations # under the License. import warnings -from typing import TYPE_CHECKING, Sequence +from typing import TYPE_CHECKING, List, Optional, Sequence from airflow.exceptions import AirflowException from airflow.providers.amazon.aws.hooks.glue import GlueJobHook @@ -37,30 +37,54 @@ class GlueJobSensor(BaseSensorOperator): :param job_name: The AWS Glue Job unique name :param run_id: The AWS Glue current running job identifier + :param verbose: If True, more Glue Job Run logs show in the Airflow Task Logs. (default: False) """ template_fields: Sequence[str] = ('job_name', 'run_id') - def __init__(self, *, job_name: str, run_id: str, aws_conn_id: str = 'aws_default', **kwargs): + def __init__( + self, + *, + job_name: str, + run_id: str, + verbose: bool = False, + aws_conn_id: str = 'aws_default', + **kwargs, + ): super().__init__(**kwargs) self.job_name = job_name self.run_id = run_id + self.verbose = verbose self.aws_conn_id = aws_conn_id - self.success_states = ['SUCCEEDED'] - self.errored_states = ['FAILED', 'STOPPED', 'TIMEOUT'] + self.success_states: List[str] = ['SUCCEEDED'] + self.errored_states: List[str] = ['FAILED', 'STOPPED', 'TIMEOUT'] + self.next_log_token: Optional[str] = None def poke(self, context: 'Context'): hook = GlueJobHook(aws_conn_id=self.aws_conn_id) - self.log.info("Poking for job run status :for Glue Job %s and ID %s", self.job_name, self.run_id) + self.log.info('Poking for job run status :for Glue Job %s and ID %s', self.job_name, self.run_id) job_state = hook.get_job_state(job_name=self.job_name, run_id=self.run_id) - if job_state in self.success_states: - self.log.info("Exiting Job %s Run State: %s", self.run_id, job_state) - return True - elif job_state in self.errored_states: - job_error_message = f"Exiting Job {self.run_id} Run State: {job_state}" - raise AirflowException(job_error_message) - else: - return False + job_failed = False + + try: + if job_state in self.success_states: + self.log.info('Exiting Job %s Run State: %s', self.run_id, job_state) + return True + elif job_state in self.errored_states: + job_failed = True + job_error_message = 'Exiting Job %s Run State: %s', self.run_id, job_state + self.log.info(job_error_message) + raise AirflowException(job_error_message) + else: + return False + finally: + if self.verbose: + self.next_log_token = hook.print_job_logs( + job_name=self.job_name, + run_id=self.run_id, + job_failed=job_failed, + next_token=self.next_log_token, + ) class AwsGlueJobSensor(GlueJobSensor): diff --git a/tests/providers/amazon/aws/operators/test_glue.py b/tests/providers/amazon/aws/operators/test_glue.py index be60057392a99..0dc99a796e2a8 100644 --- a/tests/providers/amazon/aws/operators/test_glue.py +++ b/tests/providers/amazon/aws/operators/test_glue.py @@ -20,7 +20,7 @@ from parameterized import parameterized -from airflow import configuration +from airflow.configuration import conf from airflow.providers.amazon.aws.hooks.glue import GlueJobHook from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.operators.glue import GlueJobOperator @@ -29,7 +29,7 @@ class TestGlueJobOperator(unittest.TestCase): @mock.patch('airflow.providers.amazon.aws.hooks.glue.GlueJobHook') def setUp(self, glue_hook_mock): - configuration.load_test_config() + conf.load_test_config() self.glue_hook_mock = glue_hook_mock @@ -39,12 +39,19 @@ def setUp(self, glue_hook_mock): "/glue-examples/glue-scripts/sample_aws_glue_job.py", ] ) + @mock.patch.object(GlueJobHook, 'print_job_logs') @mock.patch.object(GlueJobHook, 'get_job_state') @mock.patch.object(GlueJobHook, 'initialize_job') @mock.patch.object(GlueJobHook, "get_conn") @mock.patch.object(S3Hook, "load_file") def test_execute_without_failure( - self, script_location, mock_load_file, mock_get_conn, mock_initialize_job, mock_get_job_state + self, + script_location, + mock_load_file, + mock_get_conn, + mock_initialize_job, + mock_get_job_state, + mock_print_job_logs, ): glue = GlueJobOperator( task_id='test_glue_operator', @@ -57,16 +64,52 @@ def test_execute_without_failure( ) mock_initialize_job.return_value = {'JobRunState': 'RUNNING', 'JobRunId': '11111'} mock_get_job_state.return_value = 'SUCCEEDED' + glue.execute({}) + mock_initialize_job.assert_called_once_with({}, {}) + mock_print_job_logs.assert_not_called() assert glue.job_name == 'my_test_job' + @mock.patch.object(GlueJobHook, 'print_job_logs') + @mock.patch.object(GlueJobHook, 'get_job_state') + @mock.patch.object(GlueJobHook, 'initialize_job') + @mock.patch.object(GlueJobHook, "get_conn") + @mock.patch.object(S3Hook, "load_file") + def test_execute_with_verbose_logging( + self, mock_load_file, mock_get_conn, mock_initialize_job, mock_get_job_state, mock_print_job_logs + ): + job_name = 'test_job_name' + job_run_id = '11111' + glue = GlueJobOperator( + task_id='test_glue_operator', + job_name=job_name, + script_location='s3_uri', + s3_bucket='bucket_name', + iam_role_name='role_arn', + verbose=True, + ) + mock_initialize_job.return_value = {'JobRunState': 'RUNNING', 'JobRunId': job_run_id} + mock_get_job_state.return_value = 'SUCCEEDED' + + glue.execute({}) + + mock_initialize_job.assert_called_once_with({}, {}) + mock_print_job_logs.assert_called_once_with( + job_name=job_name, + run_id=job_run_id, + job_failed=False, + next_token=None, + ) + assert glue.job_name == job_name + + @mock.patch.object(GlueJobHook, 'print_job_logs') @mock.patch.object(GlueJobHook, 'job_completion') @mock.patch.object(GlueJobHook, 'initialize_job') @mock.patch.object(GlueJobHook, "get_conn") @mock.patch.object(S3Hook, "load_file") def test_execute_without_waiting_for_completion( - self, mock_load_file, mock_get_conn, mock_initialize_job, mock_job_completion + self, mock_load_file, mock_get_conn, mock_initialize_job, mock_job_completion, mock_print_job_logs ): glue = GlueJobOperator( task_id='test_glue_operator', @@ -79,8 +122,11 @@ def test_execute_without_waiting_for_completion( wait_for_completion=False, ) mock_initialize_job.return_value = {'JobRunState': 'RUNNING', 'JobRunId': '11111'} + job_run_id = glue.execute({}) + mock_initialize_job.assert_called_once_with({}, {}) mock_job_completion.assert_not_called() + mock_print_job_logs.assert_not_called() assert glue.job_name == 'my_test_job' assert job_run_id == '11111' diff --git a/tests/providers/amazon/aws/sensors/test_glue.py b/tests/providers/amazon/aws/sensors/test_glue.py index 672c612353d80..8fb96ac946e65 100644 --- a/tests/providers/amazon/aws/sensors/test_glue.py +++ b/tests/providers/amazon/aws/sensors/test_glue.py @@ -17,19 +17,24 @@ import unittest from unittest import mock +from unittest.mock import ANY -from airflow import configuration +import pytest + +from airflow import AirflowException +from airflow.configuration import conf from airflow.providers.amazon.aws.hooks.glue import GlueJobHook from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor class TestGlueJobSensor(unittest.TestCase): def setUp(self): - configuration.load_test_config() + conf.load_test_config() + @mock.patch.object(GlueJobHook, 'print_job_logs') @mock.patch.object(GlueJobHook, 'get_conn') @mock.patch.object(GlueJobHook, 'get_job_state') - def test_poke(self, mock_get_job_state, mock_conn): + def test_poke(self, mock_get_job_state, mock_conn, mock_print_job_logs): mock_conn.return_value.get_job_run() mock_get_job_state.return_value = 'SUCCEEDED' op = GlueJobSensor( @@ -38,13 +43,40 @@ def test_poke(self, mock_get_job_state, mock_conn): run_id='5152fgsfsjhsh61661', poke_interval=1, timeout=5, - aws_conn_id='aws_default', ) + + assert op.poke({}) + mock_print_job_logs.assert_not_called() + + @mock.patch.object(GlueJobHook, 'print_job_logs') + @mock.patch.object(GlueJobHook, 'get_conn') + @mock.patch.object(GlueJobHook, 'get_job_state') + def test_poke_with_verbose_logging(self, mock_get_job_state, mock_conn, mock_print_job_logs): + mock_conn.return_value.get_job_run() + mock_get_job_state.return_value = 'SUCCEEDED' + job_name = 'job_name' + job_run_id = 'job_run_id' + op = GlueJobSensor( + task_id='test_glue_job_sensor', + job_name=job_name, + run_id=job_run_id, + poke_interval=1, + timeout=5, + verbose=True, + ) + assert op.poke({}) + mock_print_job_logs.assert_called_once_with( + job_name=job_name, + run_id=job_run_id, + job_failed=False, + next_token=ANY, + ) + @mock.patch.object(GlueJobHook, 'print_job_logs') @mock.patch.object(GlueJobHook, 'get_conn') @mock.patch.object(GlueJobHook, 'get_job_state') - def test_poke_false(self, mock_get_job_state, mock_conn): + def test_poke_false(self, mock_get_job_state, mock_conn, mock_print_job_logs): mock_conn.return_value.get_job_run() mock_get_job_state.return_value = 'RUNNING' op = GlueJobSensor( @@ -53,9 +85,62 @@ def test_poke_false(self, mock_get_job_state, mock_conn): run_id='5152fgsfsjhsh61661', poke_interval=1, timeout=5, - aws_conn_id='aws_default', ) + assert not op.poke({}) + mock_print_job_logs.assert_not_called() + + @mock.patch.object(GlueJobHook, 'print_job_logs') + @mock.patch.object(GlueJobHook, 'get_conn') + @mock.patch.object(GlueJobHook, 'get_job_state') + def test_poke_false_with_verbose_logging(self, mock_get_job_state, mock_conn, mock_print_job_logs): + mock_conn.return_value.get_job_run() + mock_get_job_state.return_value = 'RUNNING' + job_name = 'job_name' + job_run_id = 'job_run_id' + op = GlueJobSensor( + task_id='test_glue_job_sensor', + job_name=job_name, + run_id=job_run_id, + poke_interval=1, + timeout=5, + verbose=True, + ) + + assert not op.poke({}) + mock_print_job_logs.assert_called_once_with( + job_name=job_name, + run_id=job_run_id, + job_failed=False, + next_token=ANY, + ) + + @mock.patch.object(GlueJobHook, 'print_job_logs') + @mock.patch.object(GlueJobHook, 'get_conn') + @mock.patch.object(GlueJobHook, 'get_job_state') + def test_poke_failed_job_with_verbose_logging(self, mock_get_job_state, mock_conn, mock_print_job_logs): + mock_conn.return_value.get_job_run() + mock_get_job_state.return_value = 'FAILED' + job_name = 'job_name' + job_run_id = 'job_run_id' + op = GlueJobSensor( + task_id='test_glue_job_sensor', + job_name=job_name, + run_id=job_run_id, + poke_interval=1, + timeout=5, + verbose=True, + ) + + with pytest.raises(AirflowException): + assert not op.poke({}) + mock_print_job_logs.assert_called_once_with( + job_name=job_name, + run_id=job_run_id, + log_group_suffix='error', + filter_pattern='?ERROR ?Exception', + next_token=ANY, + ) if __name__ == '__main__':