Skip to content
Permalink
Browse files
Add RedshiftDeleteClusterOperator support (#23563)
Add support for `RedshiftDeleteClusterOperator`. This will help to clean resources using airflow operators when needed. In the current implementation, By default, I'm waiting until the cluster is completely removed to return immediately without waiting set `wait_for_completion` param to False

- Add operator class
- Add basic unit test
- Add an example task
- Add relevant documentation
  • Loading branch information
pankajastro committed May 11, 2022
1 parent 6169e0a commit 3ed895bb7b0dee45955b29e492ae408b867b6af8
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 0 deletions.
@@ -23,6 +23,7 @@
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.operators.redshift_cluster import (
RedshiftCreateClusterOperator,
RedshiftDeleteClusterOperator,
RedshiftPauseClusterOperator,
RedshiftResumeClusterOperator,
)
@@ -80,10 +81,18 @@
)
# [END howto_operator_redshift_resume_cluster]

# [START howto_operator_redshift_delete_cluster]
task_delete_cluster = RedshiftDeleteClusterOperator(
task_id="delete_cluster",
cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER,
)
# [END howto_operator_redshift_delete_cluster]

chain(
task_create_cluster,
task_wait_cluster_available,
task_pause_cluster,
task_wait_cluster_paused,
task_resume_cluster,
task_delete_cluster,
)
@@ -14,6 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import time
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence

from airflow.models import BaseOperator
@@ -317,3 +318,66 @@ def execute(self, context: 'Context'):
self.log.warning(
"Unable to pause cluster since cluster is currently in status: %s", cluster_state
)


class RedshiftDeleteClusterOperator(BaseOperator):
"""
Delete an AWS Redshift cluster.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:RedshiftDeleteClusterOperator`
:param cluster_identifier: unique identifier of a cluster
:param skip_final_cluster_snapshot: determines cluster snapshot creation
:param final_cluster_snapshot_identifier: name of final cluster snapshot
:param wait_for_completion: Whether wait for cluster deletion or not
The default value is ``True``
:param aws_conn_id: aws connection to use
:param poll_interval: Time (in seconds) to wait between two consecutive calls to check cluster state
"""

template_fields: Sequence[str] = ("cluster_identifier",)
ui_color = "#eeaa11"
ui_fgcolor = "#ffffff"

def __init__(
self,
*,
cluster_identifier: str,
skip_final_cluster_snapshot: bool = True,
final_cluster_snapshot_identifier: Optional[str] = None,
wait_for_completion: bool = True,
aws_conn_id: str = "aws_default",
poll_interval: float = 30.0,
**kwargs,
):
super().__init__(**kwargs)
self.cluster_identifier = cluster_identifier
self.skip_final_cluster_snapshot = skip_final_cluster_snapshot
self.final_cluster_snapshot_identifier = final_cluster_snapshot_identifier
self.wait_for_completion = wait_for_completion
self.redshift_hook = RedshiftHook(aws_conn_id=aws_conn_id)
self.poll_interval = poll_interval

def execute(self, context: 'Context'):
self.delete_cluster()

if self.wait_for_completion:
cluster_status: str = self.check_status()
while cluster_status != "cluster_not_found":
self.log.info(
"cluster status is %s. Sleeping for %s seconds.", cluster_status, self.poll_interval
)
time.sleep(self.poll_interval)
cluster_status = self.check_status()

def delete_cluster(self) -> None:
self.redshift_hook.delete_cluster(
cluster_identifier=self.cluster_identifier,
skip_final_cluster_snapshot=self.skip_final_cluster_snapshot,
final_cluster_snapshot_identifier=self.final_cluster_snapshot_identifier,
)

def check_status(self) -> str:
return self.redshift_hook.cluster_status(self.cluster_identifier)
@@ -89,6 +89,20 @@ To pause an 'available' Amazon Redshift Cluster you can use
:start-after: [START howto_operator_redshift_pause_cluster]
:end-before: [END howto_operator_redshift_pause_cluster]

.. _howto/operator:RedshiftDeleteClusterOperator:

Delete an Amazon Redshift Cluster
"""""""""""""""""""""""""""""""""

To delete an Amazon Redshift Cluster you can use
:class:`RedshiftDeleteClusterOperator <airflow.providers.amazon.aws.operators.redshift_cluster>`

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py
:language: python
:dedent: 4
:start-after: [START howto_operator_redshift_delete_cluster]
:end-before: [END howto_operator_redshift_delete_cluster]

Reference
^^^^^^^^^

@@ -19,6 +19,7 @@

from airflow.providers.amazon.aws.operators.redshift_cluster import (
RedshiftCreateClusterOperator,
RedshiftDeleteClusterOperator,
RedshiftPauseClusterOperator,
RedshiftResumeClusterOperator,
)
@@ -127,3 +128,36 @@ def test_pause_cluster_not_called_when_cluster_is_not_available(self, mock_get_c
)
redshift_operator.execute(None)
mock_get_conn.return_value.pause_cluster.assert_not_called()


class TestDeleteClusterOperator:
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.cluster_status")
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.get_conn")
def test_delete_cluster_with_wait_for_completion(self, mock_get_conn, mock_cluster_status):
mock_cluster_status.return_value = 'cluster_not_found'
redshift_operator = RedshiftDeleteClusterOperator(
task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test"
)
redshift_operator.execute(None)
mock_get_conn.return_value.delete_cluster.assert_called_once_with(
ClusterIdentifier='test_cluster',
SkipFinalClusterSnapshot=True,
FinalClusterSnapshotIdentifier='',
)

@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.get_conn")
def test_delete_cluster_without_wait_for_completion(self, mock_get_conn):
redshift_operator = RedshiftDeleteClusterOperator(
task_id="task_test",
cluster_identifier="test_cluster",
aws_conn_id="aws_conn_test",
wait_for_completion=False,
)
redshift_operator.execute(None)
mock_get_conn.return_value.delete_cluster.assert_called_once_with(
ClusterIdentifier='test_cluster',
SkipFinalClusterSnapshot=True,
FinalClusterSnapshotIdentifier='',
)

mock_get_conn.return_value.cluster_status.assert_not_called()

0 comments on commit 3ed895b

Please sign in to comment.