Skip to content

Add auto termination to emr job flow#22980

Closed
hectormirete wants to merge 3 commits intoapache:mainfrom
hectormirete:add-auto-termination-to-emr-jobflow
Closed

Add auto termination to emr job flow#22980
hectormirete wants to merge 3 commits intoapache:mainfrom
hectormirete:add-auto-termination-to-emr-jobflow

Conversation

@hectormirete
Copy link

This PR's adds a new operator that allows to set the auto termination policy to an EMR cluster.

With this operator after creating an emr jobflow/cluster you can ensure it will be terminated if something unexpected happens on your DAG execution and airflow is not able to order the termination of the cluster.

^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@boring-cyborg
Copy link

boring-cyborg bot commented Apr 13, 2022

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

Comment on lines 409 to 412
class EmrAutoTerminatePolicyOperator(BaseOperator):
"""
An operator to put auto terminate policy on a given cluster/jobflow
Note: auto terminate policy is supported with Amazon EMR versions 5.30.0 and 6.1.0 and later.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should / Can we generalize it to be EmrChangePolicyOperator thus allowing to change setting rather than focusing on a specific one?

For example this operator is using put_auto_termination_policy what if someone else wants put_auto_scaling_policy will we create a new dedicated operator?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, totally agree.
I did it so specific because I think this policy is the unique one that can not be set when creating the EMR cluster with boto3.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ferruzzi @o-nikolas @vincbeck maybe you have some idea how can we generalize it?

Copy link
Contributor

@vincbeck vincbeck Apr 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would do it that way. I would add 3 parameters:

  • policy_name to indicates which policy you want to update. This value will tell which API to call through boto3
  • policy_content which contains the actual content of the policy. e.g.
{
	AutoTerminationPolicy={
        'IdleTimeout': 123
    }
}
  • instance_group_id. Optional parameter needed if policy_name == "auto_scaling"

The code would look like this (pseudo code, please bare with me):

def __init__(
    self,
    policy_name: str,
    policy_content: dict,
    instance_group_id: Optional[str] = None,
    job_flow_id: Optional[str] = None,
    job_flow_name: Optional[str] = None,
    cluster_states: Optional[List[str]] = None,
    aws_conn_id: str = 'aws_default',
    **kwargs
):
	...


def execute(self, context: 'Context') -> None:
    ...

    if self.policy_name == "auto_termination":
        response = emr.put_auto_termination_policy(
            ClusterId=job_flow_id,
            **self.policy_content,
        )
    elif self.policy_name == "auto_scaling":
        response = emr.put_auto_scaling_policy(
            ClusterId=job_flow_id,
            InstanceGroupId=self.instance_group_id,
            **self.policy_content,
        )
    elif self.policy_name == "managed_scaling":
        response = emr.put_managed_scaling_policy(
            ClusterId=job_flow_id,
            **self.policy_content,
        )
    else:
        raise ...

@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label May 30, 2022
@github-actions github-actions bot closed this Jun 5, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers kind:documentation provider:amazon AWS/Amazon - related issues stale Stale PRs per the .github/workflows/stale.yml policy file

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants