From f93e3391ff360c0ab3a17beaf561d14303f2dc34 Mon Sep 17 00:00:00 2001 From: Eugene Wolfson Date: Tue, 8 Oct 2019 00:20:51 -0400 Subject: [PATCH] Run batches of (self-terminating) EMR JobFlows [AIRFLOW-XXX] MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit What is this: ------------- This commit introduces an implementation of an `EmrRunJobFlows` operator that allows Airflow to manage the execution of multiple EMR clusters. The clusters are all configured through a Jinja template ahead of time, just like `EmrCreateJobFlowOperator` but are submitted one batch of clusters at a time, allowing for workflows that require sequences of parallel clusters. The operator assumes that all the clusters it manages auto-terminate. As demonstrated in the unit test, with this operator the following batches of EMR clusters could be set up: 1. First, cluster1 runs its steps until they all complete and the cluster terminates. 1. Then, cluster2a and cluster2b run their steps in parallel until they both terminate. 1. Finally, cluster3 runs its steps. Just to be clear, a "batch" of clusters is just a set of clusters that can be run in EMR in parallel. Why use this: ------------- 1) The primary appeal of this operator is for submitting one-off tasks to EMR. The most straight-forward way to do this is to set up a DAG consisting of a single `EmrRunJobFlows` operator which derives its `job_flows` value from the DagRun's conf attribute ( [`airflow dags trigger --conf`][1]). While the actual EMR steps are obfuscated, one small benefit from using this operator over modifying the DAG using the existing EMR operators is that, while the resulting work is dynamic at DagRun-time, the structure of the DAG stays constant between wildly different runs, so we can see the execution history even when a typical DAG would have been modified and lost its history in the Web UI. A more complex implementation of this use-case in a closed- source project was actually the motivation for this PR - my gratitude to @ChengzhiZhao and @TarunChillara for their assistance there. [1]: https://airflow.readthedocs.io/en/latest/cli-ref.html#trigger 2) It can also simplify the retry logic wherever `EmrCreateJobFlowOperator` and `EmrJobFlowSensor` operator pairs are used. In that scenario, the `retry_handler` of an `EmrJobFlowSensor` task would need to run an `airflow clear` on the preceding `EmrCreateJobFlowOperator` task if it detects that the cluster failed, while somehow maintaining the total count of attempts. Using the `EmrJobFlowSensor`, a cluster can be restarted on failure without a custom `retry_handler` - simply by setting `retries > 0`. Please note that, as described in its docstring, the current implementation of `EmrRunJobFlows` limits the retry logic to just this basic use case. This is because we don't expose any information for a `retry_handler` to reason about at this time. This could be improved by pushing step- and cluster- level status information to an XCom, but that is not implemented here at this time because the operator implementation is complex enough as is. If they are to be retried, please make sure that all steps be idempotent. --- airflow/contrib/sensors/emr_run_job_flows.py | 184 ++++++++++ docs/operators-and-hooks-ref.rst | 1 + .../contrib/sensors/test_emr_run_job_flows.py | 320 ++++++++++++++++++ 3 files changed, 505 insertions(+) create mode 100644 airflow/contrib/sensors/emr_run_job_flows.py create mode 100644 tests/contrib/sensors/test_emr_run_job_flows.py diff --git a/airflow/contrib/sensors/emr_run_job_flows.py b/airflow/contrib/sensors/emr_run_job_flows.py new file mode 100644 index 0000000000000..857bbff1f0747 --- /dev/null +++ b/airflow/contrib/sensors/emr_run_job_flows.py @@ -0,0 +1,184 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""EmrRunJobFlows manages cluster queue by implementing an EMR sensor.""" + +from airflow import AirflowException +from airflow.contrib.hooks.emr_hook import EmrHook +from airflow.contrib.sensors.emr_base_sensor import EmrBaseSensor +from airflow.contrib.sensors.emr_job_flow_sensor import EmrJobFlowSensor +from airflow.utils.decorators import apply_defaults + + +class EmrRunJobFlows(EmrBaseSensor): + """ + Submits batches of self-terminating EMR Job Flows and waits for their steps + to complete. This operator submits a list of EMR clusters in batches, where + each Job Flow is expected to be self-terminating and list all the EMR steps + it is expected to execute. Only basic retry logic. + + Implementation Note: For each cluster, we submit all the steps at cluster + creation time. This partially frees the cluster from the vagaries of the + Airflow scheduler. Since we rely on EMR to terminate its clusters, any + failed step will need to terminate the cluster and the cluster itself should + auto-terminate as per [1]. In other words, each JobFlow must auto-terminate + (likely via the `job_flows` parameter) by setting its Instances' + `"KeepJobFlowAliveWhenNoSteps": False`. Additionally, consider setting each + Step's `"ActionOnFailure": "TERMINATE_CLUSTER"` to allow failing-fast if + your workflow allows for it. + + [1]: https://docs.aws.amazon.com/emr/latest/ManagementGuide/\ + UsingEMR_TerminationProtection.html#emr-termination-protection-steps + + TODO: The utility of the EmrBaseSensor that we extend is somewhat limited. + Currently, it asks for the state of its JobFlow until that JobFlow reaches a + terminal state. If the EMR JobFlow fails, the sensor will mark the task as + failed. If custom EMR sensor logic is pursued, we could set up step-wise + monitoring and timeouts, which would allow for context-specific retries + using XComs, and maybe able to extend the implementation to allow for + cross-cluster logic, such as waiting for all clusters in a batch to finish + even when some fail. + + :param job_flows: a queue of EMR JobFlows. It's a list of dicts, each one + mapping job_flow names to their configurations: + [{job_flow_name: job_flow_overrides}]. Each dict in the list represents + the job flows which should run in parallel, and every cluster in the + preceding dict is expected to have come to a successful terminal state, + prior to submitting the next dict. See boto3's job_flow_overrides EMR + details in + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/\ + services/emr.html#EMR.Client.run_job_flow (templated) + :type job_flows: list + """ + + template_fields = ['job_flows'] + template_ext = () + # EMR logo... ~RGB(237,165,83) + ui_color = "#eda553" + + # Overrides for EmrBaseSensor + NON_TERMINAL_STATES = EmrJobFlowSensor.NON_TERMINAL_STATES + FAILED_STATE = EmrJobFlowSensor.FAILED_STATE + + @apply_defaults + def __init__( + self, + job_flows, + emr_conn_id='emr_default', + # require_auto_termination = False, + *args, **kwargs): + super().__init__(*args, **kwargs) + self.job_flows = job_flows + self.emr_conn_id = emr_conn_id + # These two fields will be filled in as clusters are requested and poked + self.current_batch = {} + self.statuses = [] + + def execute(self, context): + """ + See EmrBaseSensor.execute + """ + self.log.info( + "The clusters will be submitted across the following batches: %s", + [set(batch.keys()) for batch in self.job_flows]) + # TODO: Verify all clusters set `"KeepJobFlowAliveWhenNoSteps": False` + # if self.require_auto_termination + return super().execute(context) + + def get_emr_response(self): + """ + override for EmrBaseSensor. Queries state of all clusters in current + batch and submits the next batch if they are all in a terminal state; + however, if any cluster is in a failed state, its state is returned so + that the current execution attempt fails. + """ + emr_conn = EmrHook(emr_conn_id=self.emr_conn_id).get_conn() + + responses = [] + for name, job_flow_id in self.current_batch.items(): + self.log.debug("Poking JobFlow {%s: %s}", name, job_flow_id) + response = emr_conn.describe_cluster(ClusterId=job_flow_id) + responses.append(response) + self._states()[name] = (job_flow_id, self._state_of(response)) + self.log.debug("Poked JobFlow states: %s", self._states()) + + failed = next(filter(lambda r: self._state_of(r) in + EmrRunJobFlows.FAILED_STATE, responses), None) + if failed: + self.log.info("there is at least one failed JobFlow") + return failed + non_terminal = next(filter(lambda r: self._state_of(r) in + EmrRunJobFlows.NON_TERMINAL_STATES, + responses), None) + if non_terminal: + self.log.info("there is still at least one non-terminal JobFlow") + return non_terminal + + # We're done with the current batch. + if self.job_flows: + self.log.info("Submitting next batch of clusters") + self._request_next(self.job_flows.pop(0)) + return self.get_emr_response() + # All batches are in a terminal state + else: + self.log.info("Completed poking all JobFlow batches: %s", + self.statuses) + return responses[0] + + def _request_next(self, cluster_set): + self.current_batch = {} + self.statuses.append({}) + errors = {} + emr_hook = EmrHook(emr_conn_id=self.emr_conn_id) + for name, cluster_config in cluster_set.items(): + response = emr_hook.create_job_flow(cluster_config) + if response["ResponseMetadata"]["HTTPStatusCode"] != 200: + errors[name] = str(response) + else: + job_flow_id = response["JobFlowId"] + self.current_batch[name] = job_flow_id + self._states()[name] = (job_flow_id, "") + self.log.info("Requested JobFlow batch: %s", self.current_batch) + + # TODO consider returning {"statuses": statuses, "errors": errors} + if errors: + self.log.error("errors: %s", errors) + raise AirflowException("JobFlow creation failed: " + str(errors)) + + def _states(self): + return self.statuses[-1] if self.statuses else {} + + @staticmethod + def _state_of(response): + return response.get("Cluster", {}).get("Status", {}).get("State", "") + + @staticmethod + def state_from_response(response): + """ + override for EmrBaseSensor. Not using _state_of(), since + state_from_response expects an exception raised if the cluster State is + not present. + """ + return EmrJobFlowSensor.state_from_response(response) + + @staticmethod + def failure_message_from_response(response): + """ + See EmrJobFlowSensor.failure_message_from_response + """ + return EmrJobFlowSensor.failure_message_from_response(response) diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst index 49a46ed38e284..df51f9e7feebb 100644 --- a/docs/operators-and-hooks-ref.rst +++ b/docs/operators-and-hooks-ref.rst @@ -336,6 +336,7 @@ These integrations allow you to perform various operations within the Amazon Web :mod:`airflow.contrib.operators.emr_terminate_job_flow_operator` - :mod:`airflow.contrib.sensors.emr_base_sensor`, :mod:`airflow.contrib.sensors.emr_job_flow_sensor`, + :mod:`airflow.contrib.sensors.emr_run_job_flows`, :mod:`airflow.contrib.sensors.emr_step_sensor` * - `AWS Glue Catalog `__ diff --git a/tests/contrib/sensors/test_emr_run_job_flows.py b/tests/contrib/sensors/test_emr_run_job_flows.py new file mode 100644 index 0000000000000..e5d39379095a3 --- /dev/null +++ b/tests/contrib/sensors/test_emr_run_job_flows.py @@ -0,0 +1,320 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import datetime +import unittest +from unittest.mock import MagicMock, patch + +from dateutil.tz import tzlocal + +from airflow import DAG, AirflowException +from airflow.contrib.sensors.emr_run_job_flows import EmrRunJobFlows +from airflow.utils import timezone + +DEFAULT_DATE = timezone.datetime(2017, 1, 1) + + +class TestEmrRunJobFlows(unittest.TestCase): + def setUp(self): + args = { + 'owner': 'airflow', + 'start_date': DEFAULT_DATE + } + + # Mock out the emr_client (moto has incorrect response) + self.emr_client = MagicMock() + self.boto3_session = None # This is set in _verify_job_flow_execution + self.emr_run_job_flows = EmrRunJobFlows( + task_id='test_task', + poke_interval=0, + job_flows=self._stubbed_job_flows([ + ["cluster1"], # first batch is just this cluster + ["cluster2a", "cluster2b"], # then these two run in parallel + ["cluster3"]]), # and finally, this third batch + dag=DAG('test_dag_id', default_args=args) + ) + self.states = {} + self.clusters = [] + + def _stubbed_job_flows(self, names_queue): + job_flows = [] + for names_batch in names_queue: + job_flows_batch = {} + for name in names_batch: + job_flows_batch[name] = self._cluster_config(name) + job_flows.append(job_flows_batch) + return job_flows + + def _cluster_config(self, name): + return { + 'Name': name, + 'ReleaseLabel': '5.11.0', + 'Instances': { + 'KeepJobFlowAliveWhenNoSteps': False + }, + 'Steps': [{ + 'Name': 'test_step', + 'ActionOnFailure': 'TERMINATE_CLUSTER', + 'HadoopJarStep': { + 'Jar': 'command-runner.jar', + 'Args': [ + '/usr/lib/spark/bin/run-example', + '{{ macros.ds_add(ds, -1) }}', + '{{ ds }}' + ] + } + }] + } + + def test_execute_calls_until_all_clusters_reach_a_terminal_state(self): + self.clusters = ["cluster1", "cluster2a", "cluster2b", "cluster3"] + self.states["j-cluster1"] = [] + self.states["j-cluster1"].append("STARTING") + self.states["j-cluster1"].append("BOOTSTRAPPING") + self.states["j-cluster1"].append("RUNNING") + self.states["j-cluster1"].append("RUNNING") + self.states["j-cluster1"].append("TERMINATING") + self.states["j-cluster1"].append("TERMINATED") # (End Of Batch) + self.states["j-cluster2a"] = [] + self.states["j-cluster2b"] = [] + self.states["j-cluster2a"].append("STARTING") # a + self.states["j-cluster2b"].append("STARTING") # b + self.states["j-cluster2a"].append("BOOTSTRAPPING") # a + self.states["j-cluster2b"].append("BOOTSTRAPPING") # b + self.states["j-cluster2a"].append("RUNNING") # a + self.states["j-cluster2b"].append("RUNNING") # b + self.states["j-cluster2a"].append("RUNNING") # a + self.states["j-cluster2b"].append("RUNNING") # b + self.states["j-cluster2a"].append("RUNNING") # a + self.states["j-cluster2b"].append("TERMINATING") # b + self.states["j-cluster2a"].append("RUNNING") # a + self.states["j-cluster2b"].append("TERMINATED") # b: terminal + self.states["j-cluster2a"].append("TERMINATING") # a + self.states["j-cluster2b"].append("TERMINATED") # b: terminal + self.states["j-cluster2a"].append("TERMINATED") # a (End Of Batch) + self.states["j-cluster2b"].append("TERMINATED") # b (End Of Batch) + self.states["j-cluster3"] = [] + self.states["j-cluster3"].append("STARTING") + self.states["j-cluster3"].append("BOOTSTRAPPING") + self.states["j-cluster3"].append("RUNNING") + self.states["j-cluster3"].append("RUNNING") + self.states["j-cluster3"].append("TERMINATING") + self.states["j-cluster3"].append("TERMINATED") # (all done) + + self.emr_client.describe_cluster.side_effect = self._describe + self.emr_client.run_job_flow.side_effect = self._create + + self._verify_job_flow_execution() + + def test_execute_stops_when_cluster_in_batch_fails(self): + self.clusters = ["cluster1"] + # First, cluster1 is queried until it terminates + self.states["j-cluster1"] = [] + self.states["j-cluster1"].append("STARTING") + self.states["j-cluster1"].append("BOOTSTRAPPING") + self.states["j-cluster1"].append("RUNNING") + self.states["j-cluster1"].append("RUNNING") + self.states["j-cluster1"].append("TERMINATING") + self.states["j-cluster1"].append("TERMINATED") + # Then, both cluster2a and cluster2b are queried + self.states["j-cluster2a"] = [] + self.states["j-cluster2b"] = [] + self.states["j-cluster2a"].append("STARTING") # a + self.states["j-cluster2b"].append("STARTING") # b + self.states["j-cluster2a"].append("BOOTSTRAPPING") # a + self.states["j-cluster2b"].append("BOOTSTRAPPING") # b + self.states["j-cluster2a"].append("RUNNING") # a + self.states["j-cluster2b"].append("RUNNING") # b + self.states["j-cluster2a"].append("RUNNING") # a + self.states["j-cluster2b"].append("RUNNING") # b + self.states["j-cluster2a"].append("TERMINATING") # a + self.states["j-cluster2b"].append("TERMINATED_WITH_ERRORS") # b + # We expect that no more calls are to be made, even though cluster3 + # hasn't even started and cluster2a isn't yet terminated. + + self.emr_client.describe_cluster.side_effect = self._describe + self.emr_client.run_job_flow.side_effect = self._create + + self._verify_job_flow_execution(failure=True) + + def test_execute_stops_on_cluster_creation_failure(self): + self.clusters = ["cluster1"] + # Note that self.states is empty since there's nothing to poke. + self.emr_client.run_job_flow.side_effect = self._fail_to_create + + self._verify_job_flow_execution(failure=True) + + def _verify_job_flow_execution(self, failure=False): + # Mock out the emr_client creator + emr_session_mock = MagicMock() + emr_session_mock.client.return_value = self.emr_client + self.boto3_session = MagicMock(return_value=emr_session_mock) + with patch('boto3.session.Session', self.boto3_session): + try: + if failure: + with self.assertRaises(AirflowException): + self._execute_and_verify_expectations() + else: + self._execute_and_verify_expectations() + except Exception as e: + raise e + + def _execute_and_verify_expectations(self): + created = len(self.clusters) + poked = sum([len(cs) for cs in self.states.values()]) + self.emr_run_job_flows.execute(None) + self.assertEqual(self.emr_client.run_job_flow.call_count, created) + self.assertEqual(self.emr_client.describe_cluster.call_count, poked) + + # Convenience methods for describing clusters + def _running_cluster(self, name, state="RUNNING"): + return { + 'Cluster': { + 'Applications': [ + {'Name': 'Spark', 'Version': '1.6.1'} + ], + 'AutoTerminate': True, + 'Configurations': [], + 'Ec2InstanceAttributes': {'IamInstanceProfile': 'EMR_EC2_DefaultRole'}, + 'Id': name, + 'LogUri': 's3n://some-location/', + 'Name': 'PiCalc', + 'NormalizedInstanceHours': 0, + 'ReleaseLabel': 'emr-4.6.0', + 'ServiceRole': 'EMR_DefaultRole', + 'Status': { + 'State': state, + 'StateChangeReason': {}, + 'Timeline': { + 'CreationDateTime': datetime.datetime( + 2016, 6, 27, 21, 5, 2, 348000, tzinfo=tzlocal())} + }, + 'Tags': [ + {'Key': 'app', 'Value': 'analytics'}, + {'Key': 'environment', 'Value': 'development'} + ], + 'TerminationProtected': False, + 'VisibleToAllUsers': True + }, + 'ResponseMetadata': { + 'HTTPStatusCode': 200, + 'RequestId': 'd5456308-3caa-11e6-9d46-951401f04e0e' + } + } + + def _terminated_cluster(self, name): + return { + 'Cluster': { + 'Applications': [ + {'Name': 'Spark', 'Version': '1.6.1'} + ], + 'AutoTerminate': True, + 'Configurations': [], + 'Ec2InstanceAttributes': {'IamInstanceProfile': 'EMR_EC2_DefaultRole'}, + 'Id': name, + 'LogUri': 's3n://some-location/', + 'Name': 'PiCalc', + 'NormalizedInstanceHours': 0, + 'ReleaseLabel': 'emr-4.6.0', + 'ServiceRole': 'EMR_DefaultRole', + 'Status': { + 'State': 'TERMINATED', + 'StateChangeReason': {}, + 'Timeline': { + 'CreationDateTime': datetime.datetime( + 2016, 6, 27, 21, 5, 2, 348000, tzinfo=tzlocal())} + }, + 'Tags': [ + {'Key': 'app', 'Value': 'analytics'}, + {'Key': 'environment', 'Value': 'development'} + ], + 'TerminationProtected': False, + 'VisibleToAllUsers': True + }, + 'ResponseMetadata': { + 'HTTPStatusCode': 200, + 'RequestId': 'd5456308-3caa-11e6-9d46-951401f04e0e' + } + } + + def _failed_cluster(self, name): + return { + 'Cluster': { + 'Applications': [ + {'Name': 'Spark', 'Version': '1.6.1'} + ], + 'AutoTerminate': True, + 'Configurations': [], + 'Ec2InstanceAttributes': {'IamInstanceProfile': 'EMR_EC2_DefaultRole'}, + 'Id': name, + 'LogUri': 's3n://some-location/', + 'Name': 'PiCalc', + 'NormalizedInstanceHours': 0, + 'ReleaseLabel': 'emr-4.6.0', + 'ServiceRole': 'EMR_DefaultRole', + 'Status': { + 'State': 'TERMINATED_WITH_ERRORS', + 'StateChangeReason': { + 'Code': 'BOOTSTRAP_FAILURE', + 'Message': 'Master instance (i-0663047709b12345c) failed attempting to ' + 'download bootstrap action 1 file from S3' + }, + 'Timeline': { + 'CreationDateTime': datetime.datetime( + 2016, 6, 27, 21, 5, 2, 348000, tzinfo=tzlocal())} + }, + 'Tags': [ + {'Key': 'app', 'Value': 'analytics'}, + {'Key': 'environment', 'Value': 'development'} + ], + 'TerminationProtected': False, + 'VisibleToAllUsers': True + }, + 'ResponseMetadata': { + 'HTTPStatusCode': 200, + 'RequestId': 'd5456308-3caa-11e6-9d46-951401f04e0e' + } + } + + def _describe(self, *args, **kwargs): + name = kwargs['ClusterId'] + state = self.states[name].pop(0) + return { + 'TERMINATED': self._terminated_cluster(name), + 'TERMINATED_WITH_ERRORS': self._failed_cluster(name), + }.get(state, self._running_cluster(name, state)) + + def _fail_to_create(self, *args, **kwargs): + return { + 'ResponseMetadata': { + 'HTTPStatusCode': 400 + } + } + + def _create(self, *args, **kwargs): + return { + 'ResponseMetadata': { + 'HTTPStatusCode': 200 + }, + 'JobFlowId': 'j-' + kwargs['Name'] + } + + +if __name__ == '__main__': + unittest.main()