From 580dd93e3bcdba81dd9f7e98e708ce46e7b5aa18 Mon Sep 17 00:00:00 2001 From: dbarrundia3 Date: Wed, 17 Nov 2021 17:19:43 -0600 Subject: [PATCH 01/26] This commit adds new features to the airflow aws redshift module. Similar to the aws ec2 hook, this commit implements a wait_for_state function in the redshift hook module. Additionally we are implementing two new Operators: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator. These operators will let developers leverage Airflow to resume and pause Redshift clusters before/after loading data to optimize for costs. --- .../providers/amazon/aws/hooks/redshift.py | 20 ++++++ .../aws/operators/redshift_pause_cluster.py | 63 +++++++++++++++++++ .../aws/operators/redshift_resume_cluster.py | 63 +++++++++++++++++++ .../amazon/aws/hooks/test_redshift.py | 8 +++ .../operators/test_redshift_pause_cluster.py | 41 ++++++++++++ .../operators/test_redshift_resume_cluster.py | 60 ++++++++++++++++++ 6 files changed, 255 insertions(+) create mode 100644 airflow/providers/amazon/aws/operators/redshift_pause_cluster.py create mode 100644 airflow/providers/amazon/aws/operators/redshift_resume_cluster.py create mode 100644 tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py create mode 100644 tests/providers/amazon/aws/operators/test_redshift_resume_cluster.py diff --git a/airflow/providers/amazon/aws/hooks/redshift.py b/airflow/providers/amazon/aws/hooks/redshift.py index e9fefc022791..5f0b9eb10df6 100644 --- a/airflow/providers/amazon/aws/hooks/redshift.py +++ b/airflow/providers/amazon/aws/hooks/redshift.py @@ -16,6 +16,7 @@ # specific language governing permissions and limitations # under the License. """Interact with AWS Redshift clusters.""" +import time from typing import Dict, List, Optional, Union @@ -138,6 +139,25 @@ def create_cluster_snapshot(self, snapshot_identifier: str, cluster_identifier: ) return response['Snapshot'] if response['Snapshot'] else None + def wait_for_state(self, cluster_identifier: str, target_state: str, check_interval: float) -> None: + """ + Wait Redshift Cluster until its state is equal to the target_state. + + :param cluster_identifier: unique identifier of a cluster + :type cluster_identifier: str + :param target_state: target state of instance + :type target_state: str + :param check_interval: time in seconds that the job should wait in + between each instance state checks until operation is completed + :type check_interval: float + :return: None + :rtype: None + """ + cluster_state = self.cluster_status(cluster_identifier=cluster_identifier) + while cluster_state != target_state: + self.log.info("instance state: %s", cluster_state) + time.sleep(check_interval) + cluster_state = self.cluster_status(cluster_identifier=cluster_identifier) class RedshiftSQLHook(DbApiHook): """ diff --git a/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py b/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py new file mode 100644 index 000000000000..f3305a58f663 --- /dev/null +++ b/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py @@ -0,0 +1,63 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.redshift import RedshiftHook + + +class RedshiftPauseClusterOperator(BaseOperator): + """ + Pause an AWS Redshift Cluster using boto3. + + :param cluster_identifier: id of the AWS Redshift Cluster + :type cluster_identifier: str + :param aws_conn_id: aws connection to use + :type aws_conn_id: str + :param check_interval: time in seconds that the job should wait in + between each instance state checks until operation is completed + :type check_interval: float + """ + + template_fields = ("cluster_identifier",) + ui_color = "#eeaa11" + ui_fgcolor = "#ffffff" + + def __init__( + self, + *, + cluster_identifier: str, + aws_conn_id: str = "aws_default", + check_interval: float = 15, + **kwargs, + ): + super().__init__(**kwargs) + self.cluster_identifier = cluster_identifier + self.aws_conn_id = aws_conn_id + self.check_interval = check_interval + + def execute(self, context): + redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id) + self.log.info("Pausing Redshift cluster %s", self.cluster_identifier) + cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier) + if cluster_state == 'available': + redshift_hook.get_conn().pause_cluster(ClusterIdentifier=self.cluster_identifier) + redshift_hook.wait_for_state( + cluster_identifier=self.cluster_identifier, + target_state="paused", + check_interval=self.check_interval, + ) diff --git a/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py b/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py new file mode 100644 index 000000000000..74f7ad6808b0 --- /dev/null +++ b/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py @@ -0,0 +1,63 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.redshift import RedshiftHook + + +class RedshiftResumeClusterOperator(BaseOperator): + """ + Resume an AWS Redshift Cluster using boto3. + + :param cluster_identifier: id of the AWS Redshift Cluster + :type cluster_identifier: str + :param aws_conn_id: aws connection to use + :type aws_conn_id: str + :param check_interval: time in seconds that the job should wait in + between each instance state checks until operation is completed + :type check_interval: float + """ + + template_fields = ("cluster_identifier",) + ui_color = "#eeaa11" + ui_fgcolor = "#ffffff" + + def __init__( + self, + *, + cluster_identifier: str, + aws_conn_id: str = "aws_default", + check_interval: float = 15, + **kwargs, + ): + super().__init__(**kwargs) + self.cluster_identifier = cluster_identifier + self.aws_conn_id = aws_conn_id + self.check_interval = check_interval + + def execute(self, context): + redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id) + self.log.info("Starting Redshift cluster %s", self.cluster_identifier) + cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier) + if cluster_state == 'paused': + redshift_hook.get_conn().resume_cluster(ClusterIdentifier=self.cluster_identifier) + redshift_hook.wait_for_state( + cluster_identifier=self.cluster_identifier, + target_state="available", + check_interval=self.check_interval, + ) diff --git a/tests/providers/amazon/aws/hooks/test_redshift.py b/tests/providers/amazon/aws/hooks/test_redshift.py index 35a1b5ecc8f0..9f926c04b1be 100644 --- a/tests/providers/amazon/aws/hooks/test_redshift.py +++ b/tests/providers/amazon/aws/hooks/test_redshift.py @@ -108,6 +108,14 @@ def test_cluster_status_returns_available_cluster(self): status = hook.cluster_status('test_cluster') assert status == 'available' + @unittest.skipIf(mock_redshift is None, 'mock_redshift package not present') + @mock_redshift + def test_wait_for_state_waits_for_available_cluster(self): + self._create_clusters() + hook = RedshiftHook(aws_conn_id='aws_default') + hook.wait_for_state(cluster_identifier='test_cluster', target_state='available', check_interval=5) + status = hook.cluster_status('test_cluster') + assert status == 'available' class TestRedshiftSQLHookConn(unittest.TestCase): def setUp(self): diff --git a/tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py b/tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py new file mode 100644 index 000000000000..1aff6af2f500 --- /dev/null +++ b/tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py @@ -0,0 +1,41 @@ +import unittest +import boto3 + +from airflow.providers.amazon.aws.operators.redshift_pause_cluster import RedshiftPauseClusterOperator + +try: + from moto import mock_redshift +except ImportError: + mock_redshift = None + + +class TestPauseClusterOperator(unittest.TestCase): + @staticmethod + def _create_clusters(): + client = boto3.client('redshift', region_name='us-east-1') + client.create_cluster( + ClusterIdentifier='test_cluster_to_pause', + NodeType='dc1.large', + MasterUsername='admin', + MasterUserPassword='mock_password', + ) + client.create_cluster( + ClusterIdentifier='test_cluster_to_resume', + NodeType='dc1.large', + MasterUsername='admin', + MasterUserPassword='mock_password', + ) + if not client.describe_clusters()['Clusters']: + raise ValueError('AWS not properly mocked') + + def test_init(self): + redshift_operator = RedshiftPauseClusterOperator( + task_id="task_test", + cluster_identifier="test_cluster", + aws_conn_id="aws_conn_test", + check_interval=3, + ) + assert redshift_operator.task_id == "task_test" + assert redshift_operator.cluster_identifier == "test_cluster" + assert redshift_operator.aws_conn_id == "aws_conn_test" + assert redshift_operator.check_interval == 3 diff --git a/tests/providers/amazon/aws/operators/test_redshift_resume_cluster.py b/tests/providers/amazon/aws/operators/test_redshift_resume_cluster.py new file mode 100644 index 000000000000..a0cc99e82a26 --- /dev/null +++ b/tests/providers/amazon/aws/operators/test_redshift_resume_cluster.py @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import unittest +import boto3 + +from airflow.providers.amazon.aws.operators.redshift_resume_cluster import RedshiftResumeClusterOperator + +try: + from moto import mock_redshift +except ImportError: + mock_redshift = None + + +class TestResumeClusterOperator(unittest.TestCase): + @staticmethod + def _create_clusters(): + client = boto3.client('redshift', region_name='us-east-1') + client.create_cluster( + ClusterIdentifier='test_cluster_to_pause', + NodeType='dc1.large', + MasterUsername='admin', + MasterUserPassword='mock_password', + ) + client.create_cluster( + ClusterIdentifier='test_cluster_to_resume', + NodeType='dc1.large', + MasterUsername='admin', + MasterUserPassword='mock_password', + ) + if not client.describe_clusters()['Clusters']: + raise ValueError('AWS not properly mocked') + + def test_init(self): + redshift_operator = RedshiftResumeClusterOperator( + task_id="task_test", + cluster_identifier="test_cluster", + aws_conn_id="aws_conn_test", + check_interval=3, + ) + assert redshift_operator.task_id == "task_test" + assert redshift_operator.cluster_identifier == "test_cluster" + assert redshift_operator.aws_conn_id == "aws_conn_test" + assert redshift_operator.check_interval == 3 From 7a921c5a2827a99021115e38b3d4f05da2e41659 Mon Sep 17 00:00:00 2001 From: dbarrundia3 Date: Thu, 18 Nov 2021 07:02:41 -0600 Subject: [PATCH 02/26] Add missing requirements for Static checks. Add license to all python files, run isort, and add new operators to the amazon provider.yaml --- .../providers/amazon/aws/hooks/redshift.py | 1 - airflow/providers/amazon/provider.yaml | 2 ++ .../operators/test_redshift_pause_cluster.py | 20 +++++++++++++++++++ .../operators/test_redshift_resume_cluster.py | 1 + 4 files changed, 23 insertions(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/hooks/redshift.py b/airflow/providers/amazon/aws/hooks/redshift.py index 5f0b9eb10df6..8919b951b255 100644 --- a/airflow/providers/amazon/aws/hooks/redshift.py +++ b/airflow/providers/amazon/aws/hooks/redshift.py @@ -17,7 +17,6 @@ # under the License. """Interact with AWS Redshift clusters.""" import time - from typing import Dict, List, Optional, Union try: diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index 80555f142b90..3184b53eadab 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -245,6 +245,8 @@ operators: - integration-name: Amazon Redshift python-modules: - airflow.providers.amazon.aws.operators.redshift + - airflow.providers.amazon.aws.operators.redshift_pause_cluster + - airflow.providers.amazon.aws.operators.redshift_resume_cluster sensors: - integration-name: Amazon Athena diff --git a/tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py b/tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py index 1aff6af2f500..8d4e4dddcd4a 100644 --- a/tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py +++ b/tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py @@ -1,4 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + import unittest + import boto3 from airflow.providers.amazon.aws.operators.redshift_pause_cluster import RedshiftPauseClusterOperator diff --git a/tests/providers/amazon/aws/operators/test_redshift_resume_cluster.py b/tests/providers/amazon/aws/operators/test_redshift_resume_cluster.py index a0cc99e82a26..b7eb0b76b22c 100644 --- a/tests/providers/amazon/aws/operators/test_redshift_resume_cluster.py +++ b/tests/providers/amazon/aws/operators/test_redshift_resume_cluster.py @@ -18,6 +18,7 @@ # import unittest + import boto3 from airflow.providers.amazon.aws.operators.redshift_resume_cluster import RedshiftResumeClusterOperator From 0edb98d802a39e66e74eac11473b9cf2314c3ccd Mon Sep 17 00:00:00 2001 From: dbarrundia3 Date: Thu, 18 Nov 2021 13:24:50 -0600 Subject: [PATCH 03/26] Adding changes for pre-commit success --- airflow/providers/amazon/aws/hooks/redshift.py | 1 + tests/providers/amazon/aws/hooks/test_redshift.py | 1 + 2 files changed, 2 insertions(+) diff --git a/airflow/providers/amazon/aws/hooks/redshift.py b/airflow/providers/amazon/aws/hooks/redshift.py index 8919b951b255..849b85fe9554 100644 --- a/airflow/providers/amazon/aws/hooks/redshift.py +++ b/airflow/providers/amazon/aws/hooks/redshift.py @@ -158,6 +158,7 @@ def wait_for_state(self, cluster_identifier: str, target_state: str, check_inter time.sleep(check_interval) cluster_state = self.cluster_status(cluster_identifier=cluster_identifier) + class RedshiftSQLHook(DbApiHook): """ Execute statements against Amazon Redshift, using redshift_connector diff --git a/tests/providers/amazon/aws/hooks/test_redshift.py b/tests/providers/amazon/aws/hooks/test_redshift.py index 9f926c04b1be..ee9440f30aab 100644 --- a/tests/providers/amazon/aws/hooks/test_redshift.py +++ b/tests/providers/amazon/aws/hooks/test_redshift.py @@ -117,6 +117,7 @@ def test_wait_for_state_waits_for_available_cluster(self): status = hook.cluster_status('test_cluster') assert status == 'available' + class TestRedshiftSQLHookConn(unittest.TestCase): def setUp(self): super().setUp() From 9911c728db426a28cefe762c6266372008b50d60 Mon Sep 17 00:00:00 2001 From: dbarrundia3 Date: Sat, 20 Nov 2021 09:39:07 -0500 Subject: [PATCH 04/26] Adding `ClusterStates` Enum to redshift hook to avoid having magic strings and remove `wait_for_state` method in favor of the sensor at `airflow/providers/amazon/aws/sensors/redshift.py` --- .../providers/amazon/aws/hooks/redshift.py | 36 +++++++++---------- .../aws/operators/redshift_pause_cluster.py | 11 ++---- .../aws/operators/redshift_resume_cluster.py | 11 ++---- .../amazon/aws/hooks/test_redshift.py | 9 ----- 4 files changed, 22 insertions(+), 45 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/redshift.py b/airflow/providers/amazon/aws/hooks/redshift.py index 849b85fe9554..f51cca41a756 100644 --- a/airflow/providers/amazon/aws/hooks/redshift.py +++ b/airflow/providers/amazon/aws/hooks/redshift.py @@ -24,6 +24,8 @@ except ImportError: from cached_property import cached_property +from enum import Enum + import redshift_connector from redshift_connector import Connection as RedshiftConnection from sqlalchemy import create_engine @@ -33,6 +35,20 @@ from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook +class ClusterStates(Enum): + """Contains the possible State values of a Redshift Cluster.""" + + AVAILABLE = 'available' + CREATING = 'creating' + DELETING = 'deleting' + RESUMING = 'resuming' + MODIFYING = 'modifying' + PAUSED = 'paused' + REBOOTING = 'rebooting' + RENAMING = 'renaming' + RESIZING = 'resizing' + + class RedshiftHook(AwsBaseHook): """ Interact with AWS Redshift, using the boto3 library @@ -138,26 +154,6 @@ def create_cluster_snapshot(self, snapshot_identifier: str, cluster_identifier: ) return response['Snapshot'] if response['Snapshot'] else None - def wait_for_state(self, cluster_identifier: str, target_state: str, check_interval: float) -> None: - """ - Wait Redshift Cluster until its state is equal to the target_state. - - :param cluster_identifier: unique identifier of a cluster - :type cluster_identifier: str - :param target_state: target state of instance - :type target_state: str - :param check_interval: time in seconds that the job should wait in - between each instance state checks until operation is completed - :type check_interval: float - :return: None - :rtype: None - """ - cluster_state = self.cluster_status(cluster_identifier=cluster_identifier) - while cluster_state != target_state: - self.log.info("instance state: %s", cluster_state) - time.sleep(check_interval) - cluster_state = self.cluster_status(cluster_identifier=cluster_identifier) - class RedshiftSQLHook(DbApiHook): """ diff --git a/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py b/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py index f3305a58f663..9ab8e195b7c9 100644 --- a/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py +++ b/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py @@ -17,7 +17,7 @@ # under the License. from airflow.models import BaseOperator -from airflow.providers.amazon.aws.hooks.redshift import RedshiftHook +from airflow.providers.amazon.aws.hooks.redshift import ClusterStates, RedshiftHook class RedshiftPauseClusterOperator(BaseOperator): @@ -53,11 +53,6 @@ def __init__( def execute(self, context): redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id) self.log.info("Pausing Redshift cluster %s", self.cluster_identifier) - cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier) - if cluster_state == 'available': + cluster_state = ClusterStates(redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)) + if cluster_state == ClusterStates.AVAILABLE: redshift_hook.get_conn().pause_cluster(ClusterIdentifier=self.cluster_identifier) - redshift_hook.wait_for_state( - cluster_identifier=self.cluster_identifier, - target_state="paused", - check_interval=self.check_interval, - ) diff --git a/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py b/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py index 74f7ad6808b0..459c742e1a52 100644 --- a/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py +++ b/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py @@ -17,7 +17,7 @@ # under the License. from airflow.models import BaseOperator -from airflow.providers.amazon.aws.hooks.redshift import RedshiftHook +from airflow.providers.amazon.aws.hooks.redshift import ClusterStates, RedshiftHook class RedshiftResumeClusterOperator(BaseOperator): @@ -53,11 +53,6 @@ def __init__( def execute(self, context): redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id) self.log.info("Starting Redshift cluster %s", self.cluster_identifier) - cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier) - if cluster_state == 'paused': + cluster_state = ClusterStates(redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)) + if cluster_state == ClusterStates.PAUSED: redshift_hook.get_conn().resume_cluster(ClusterIdentifier=self.cluster_identifier) - redshift_hook.wait_for_state( - cluster_identifier=self.cluster_identifier, - target_state="available", - check_interval=self.check_interval, - ) diff --git a/tests/providers/amazon/aws/hooks/test_redshift.py b/tests/providers/amazon/aws/hooks/test_redshift.py index ee9440f30aab..35a1b5ecc8f0 100644 --- a/tests/providers/amazon/aws/hooks/test_redshift.py +++ b/tests/providers/amazon/aws/hooks/test_redshift.py @@ -108,15 +108,6 @@ def test_cluster_status_returns_available_cluster(self): status = hook.cluster_status('test_cluster') assert status == 'available' - @unittest.skipIf(mock_redshift is None, 'mock_redshift package not present') - @mock_redshift - def test_wait_for_state_waits_for_available_cluster(self): - self._create_clusters() - hook = RedshiftHook(aws_conn_id='aws_default') - hook.wait_for_state(cluster_identifier='test_cluster', target_state='available', check_interval=5) - status = hook.cluster_status('test_cluster') - assert status == 'available' - class TestRedshiftSQLHookConn(unittest.TestCase): def setUp(self): From a68cbbfca5fa09a3af1c58e5a4db8165cdb0509d Mon Sep 17 00:00:00 2001 From: dbarrundia3 Date: Mon, 22 Nov 2021 09:09:08 -0500 Subject: [PATCH 05/26] Adding documentation to existing AWS operator docs --- .../operators/redshift.rst | 47 +++++++++++-------- 1 file changed, 28 insertions(+), 19 deletions(-) diff --git a/docs/apache-airflow-providers-amazon/operators/redshift.rst b/docs/apache-airflow-providers-amazon/operators/redshift.rst index 58ee26c3a9c5..43996044ee1f 100644 --- a/docs/apache-airflow-providers-amazon/operators/redshift.rst +++ b/docs/apache-airflow-providers-amazon/operators/redshift.rst @@ -15,17 +15,19 @@ specific language governing permissions and limitations under the License. -.. _howto/operator:RedshiftSQLOperator: +Amazon Redshift Operators +================================================= -RedshiftSQLOperator -=================== +`Amazon Redshift `__ is a fully managed, +petabyte-scale data warehouse service in the cloud. You can start with just a few hundred gigabytes +of data and scale to a petabyte or more. This enables you to use your data to acquire new insights +for your business and customers. -.. contents:: - :depth: 1 - :local: +Airflow provides operators to create and interact with the Redshift clusters and compute infrastructure. -Overview --------- +RedshiftSQLOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. _howto/operator:RedshiftSQLOperator: Use the :class:`RedshiftSQLOperator ` to execute statements against an Amazon Redshift cluster. @@ -34,16 +36,6 @@ statements against an Amazon Redshift cluster. :class:`RedshiftSQLHook ` to establish connections with Amazon Redshift. - -example_redshift.py -------------------- - -Purpose -""""""" - -This is a basic example dag for using :class:`RedshiftSQLOperator ` -to execute statements against an Amazon Redshift cluster. - Create a table """""""""""""" @@ -86,7 +78,7 @@ parameters into SQL statements. :end-before: [END howto_operator_redshift_get_with_filter] The complete RedshiftSQLOperator DAG ------------------------------------- +""""""""""""""""""""""""""""""""""""""""""" All together, here is our DAG: @@ -94,3 +86,20 @@ All together, here is our DAG: :language: python :start-after: [START redshift_operator_howto_guide] :end-before: [END redshift_operator_howto_guide] + + +.. _howto/operator:RedshiftResumeClusterOperator: +Resume a Redshift Cluster +""""""""""""""""""""""""""""""""""""""""""" +To resume an existing AWS Redshift Cluster you can use +:class:`~airflow.providers.amazon.aws.operators.redshift_resume_cluster.RedshiftResumeClusterOperator`. + +This Operator leverages the AWS CLI `resume-cluster `__ API + +.. _howto/operator:RedshiftPauseClusterOperator: +Pause a Redshift Cluster +""""""""""""""""""""""""""""""""""""""""""" +To pause an existing AWS Redshift Cluster you can use +:class:`~airflow.providers.amazon.aws.operators.redshift_pause_cluster.RedshiftPauseClusterOperator`. + +This Operator leverages the AWS CLI `pause-cluster `__ API From b20a38e5bbe00fd849ba50d209b772b5186bc316 Mon Sep 17 00:00:00 2001 From: dbarrundia3 Date: Mon, 22 Nov 2021 14:35:15 -0500 Subject: [PATCH 06/26] Moving the cast to `ClusterStates` Enum to inside the `cluster_status` method instead of doing the cast in all Operators that use the function. --- airflow/providers/amazon/aws/hooks/redshift.py | 7 ++++--- .../aws/operators/redshift_pause_cluster.py | 2 +- .../aws/operators/redshift_resume_cluster.py | 2 +- .../providers/amazon/aws/sensors/redshift.py | 7 +++++-- .../providers/amazon/aws/hooks/test_redshift.py | 6 +++--- .../amazon/aws/sensors/test_redshift.py | 17 ++++++++++++++++- 6 files changed, 30 insertions(+), 11 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/redshift.py b/airflow/providers/amazon/aws/hooks/redshift.py index f51cca41a756..f108aeeb3fae 100644 --- a/airflow/providers/amazon/aws/hooks/redshift.py +++ b/airflow/providers/amazon/aws/hooks/redshift.py @@ -47,6 +47,7 @@ class ClusterStates(Enum): REBOOTING = 'rebooting' RENAMING = 'renaming' RESIZING = 'resizing' + NONEXISTENT = 'nonexistent' class RedshiftHook(AwsBaseHook): @@ -68,7 +69,7 @@ def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) # TODO: Wrap create_cluster_snapshot - def cluster_status(self, cluster_identifier: str) -> str: + def cluster_status(self, cluster_identifier: str) -> ClusterStates: """ Return status of a cluster @@ -81,9 +82,9 @@ def cluster_status(self, cluster_identifier: str) -> str: """ try: response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters'] - return response[0]['ClusterStatus'] if response else None + return ClusterStates(response[0]['ClusterStatus']) if response else None except self.get_conn().exceptions.ClusterNotFoundFault: - return 'cluster_not_found' + return ClusterStates.NONEXISTENT def delete_cluster( self, diff --git a/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py b/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py index 9ab8e195b7c9..3f022cc991f7 100644 --- a/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py +++ b/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py @@ -53,6 +53,6 @@ def __init__( def execute(self, context): redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id) self.log.info("Pausing Redshift cluster %s", self.cluster_identifier) - cluster_state = ClusterStates(redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)) + cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier) if cluster_state == ClusterStates.AVAILABLE: redshift_hook.get_conn().pause_cluster(ClusterIdentifier=self.cluster_identifier) diff --git a/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py b/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py index 459c742e1a52..4b586a9705bc 100644 --- a/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py +++ b/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py @@ -53,6 +53,6 @@ def __init__( def execute(self, context): redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id) self.log.info("Starting Redshift cluster %s", self.cluster_identifier) - cluster_state = ClusterStates(redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)) + cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier) if cluster_state == ClusterStates.PAUSED: redshift_hook.get_conn().resume_cluster(ClusterIdentifier=self.cluster_identifier) diff --git a/airflow/providers/amazon/aws/sensors/redshift.py b/airflow/providers/amazon/aws/sensors/redshift.py index 669e6c0c86b8..7c8be7a596f0 100644 --- a/airflow/providers/amazon/aws/sensors/redshift.py +++ b/airflow/providers/amazon/aws/sensors/redshift.py @@ -17,7 +17,7 @@ # under the License. from typing import Optional -from airflow.providers.amazon.aws.hooks.redshift import RedshiftHook +from airflow.providers.amazon.aws.hooks.redshift import ClusterStates, RedshiftHook from airflow.sensors.base import BaseSensorOperator @@ -43,7 +43,10 @@ def __init__( ): super().__init__(**kwargs) self.cluster_identifier = cluster_identifier - self.target_status = target_status + self.target_status = ( + target_status if isinstance(target_status, ClusterStates) else ClusterStates(str(target_status)) + ) + self.aws_conn_id = aws_conn_id self.hook: Optional[RedshiftHook] = None diff --git a/tests/providers/amazon/aws/hooks/test_redshift.py b/tests/providers/amazon/aws/hooks/test_redshift.py index 35a1b5ecc8f0..ae59f97cc08e 100644 --- a/tests/providers/amazon/aws/hooks/test_redshift.py +++ b/tests/providers/amazon/aws/hooks/test_redshift.py @@ -26,7 +26,7 @@ from airflow.models import Connection from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook -from airflow.providers.amazon.aws.hooks.redshift import RedshiftHook, RedshiftSQLHook +from airflow.providers.amazon.aws.hooks.redshift import ClusterStates, RedshiftHook, RedshiftSQLHook try: from moto import mock_redshift @@ -98,7 +98,7 @@ def test_cluster_status_returns_cluster_not_found(self): self._create_clusters() hook = RedshiftHook(aws_conn_id='aws_default') status = hook.cluster_status('test_cluster_not_here') - assert status == 'cluster_not_found' + assert status == ClusterStates.NONEXISTENT @unittest.skipIf(mock_redshift is None, 'mock_redshift package not present') @mock_redshift @@ -106,7 +106,7 @@ def test_cluster_status_returns_available_cluster(self): self._create_clusters() hook = RedshiftHook(aws_conn_id='aws_default') status = hook.cluster_status('test_cluster') - assert status == 'available' + assert status == ClusterStates.AVAILABLE class TestRedshiftSQLHookConn(unittest.TestCase): diff --git a/tests/providers/amazon/aws/sensors/test_redshift.py b/tests/providers/amazon/aws/sensors/test_redshift.py index ec7ae66ab531..ea9fb78676b5 100644 --- a/tests/providers/amazon/aws/sensors/test_redshift.py +++ b/tests/providers/amazon/aws/sensors/test_redshift.py @@ -21,6 +21,7 @@ import boto3 +from airflow.providers.amazon.aws.hooks.redshift import ClusterStates from airflow.providers.amazon.aws.sensors.redshift import AwsRedshiftClusterSensor try: @@ -56,6 +57,20 @@ def test_poke(self): ) assert op.poke(None) + @unittest.skipIf(mock_redshift is None, 'mock_redshift package not present') + @mock_redshift + def test_poke_with_cluster_state(self): + self._create_cluster() + op = AwsRedshiftClusterSensor( + task_id='test_cluster_sensor', + poke_interval=1, + timeout=5, + aws_conn_id='aws_default', + cluster_identifier='test_cluster', + target_status=ClusterStates.AVAILABLE, + ) + assert op.poke(None) + @unittest.skipIf(mock_redshift is None, 'mock_redshift package not present') @mock_redshift def test_poke_false(self): @@ -81,7 +96,7 @@ def test_poke_cluster_not_found(self): timeout=5, aws_conn_id='aws_default', cluster_identifier='test_cluster_not_found', - target_status='cluster_not_found', + target_status=ClusterStates.NONEXISTENT, ) assert op.poke(None) From 81f2f75a46bc92a17924f352d1bb08a4ab40e9af Mon Sep 17 00:00:00 2001 From: dbarrundia3 Date: Mon, 22 Nov 2021 14:58:27 -0500 Subject: [PATCH 07/26] Fixed logging to redshift sensor to show the actual value of the ENUM and change type of `target_status` --- airflow/providers/amazon/aws/sensors/redshift.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/providers/amazon/aws/sensors/redshift.py b/airflow/providers/amazon/aws/sensors/redshift.py index 7c8be7a596f0..8aefdd31cadc 100644 --- a/airflow/providers/amazon/aws/sensors/redshift.py +++ b/airflow/providers/amazon/aws/sensors/redshift.py @@ -28,7 +28,7 @@ class AwsRedshiftClusterSensor(BaseSensorOperator): :param cluster_identifier: The identifier for the cluster being pinged. :type cluster_identifier: str :param target_status: The cluster status desired. - :type target_status: str + :type target_status: ClusterStates """ template_fields = ('cluster_identifier', 'target_status') @@ -37,7 +37,7 @@ def __init__( self, *, cluster_identifier: str, - target_status: str = 'available', + target_status: ClusterStates = ClusterStates.AVAILABLE, aws_conn_id: str = 'aws_default', **kwargs, ): @@ -51,7 +51,7 @@ def __init__( self.hook: Optional[RedshiftHook] = None def poke(self, context): - self.log.info('Poking for status : %s\nfor cluster %s', self.target_status, self.cluster_identifier) + self.log.info('Poking for status : %s\nfor cluster %s', self.target_status.value, self.cluster_identifier) return self.get_hook().cluster_status(self.cluster_identifier) == self.target_status def get_hook(self) -> RedshiftHook: From 61b2935753e730659feb193a850d02199b516bec Mon Sep 17 00:00:00 2001 From: dbarrundia3 Date: Mon, 29 Nov 2021 06:49:12 -0800 Subject: [PATCH 08/26] Fixed logging to redshift sensor to show the actual value of the ENUM and change type of `target_status` --- .../providers/amazon/aws/hooks/redshift.py | 9 ++--- .../aws/operators/redshift_pause_cluster.py | 4 +- .../aws/operators/redshift_resume_cluster.py | 4 +- .../providers/amazon/aws/sensors/redshift.py | 14 ++++--- .../operators/redshift.rst | 40 +++++++++++++------ .../amazon/aws/hooks/test_redshift.py | 6 +-- .../amazon/aws/sensors/test_redshift.py | 6 +-- 7 files changed, 50 insertions(+), 33 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/redshift.py b/airflow/providers/amazon/aws/hooks/redshift.py index f108aeeb3fae..727e720acfe3 100644 --- a/airflow/providers/amazon/aws/hooks/redshift.py +++ b/airflow/providers/amazon/aws/hooks/redshift.py @@ -16,7 +16,6 @@ # specific language governing permissions and limitations # under the License. """Interact with AWS Redshift clusters.""" -import time from typing import Dict, List, Optional, Union try: @@ -35,7 +34,7 @@ from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook -class ClusterStates(Enum): +class RedshiftClusterStates(Enum): """Contains the possible State values of a Redshift Cluster.""" AVAILABLE = 'available' @@ -69,7 +68,7 @@ def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) # TODO: Wrap create_cluster_snapshot - def cluster_status(self, cluster_identifier: str) -> ClusterStates: + def cluster_status(self, cluster_identifier: str) -> RedshiftClusterStates: """ Return status of a cluster @@ -82,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> ClusterStates: """ try: response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters'] - return ClusterStates(response[0]['ClusterStatus']) if response else None + return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None except self.get_conn().exceptions.ClusterNotFoundFault: - return ClusterStates.NONEXISTENT + return RedshiftClusterStates.NONEXISTENT def delete_cluster( self, diff --git a/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py b/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py index 3f022cc991f7..004a931f7a6e 100644 --- a/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py +++ b/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py @@ -17,7 +17,7 @@ # under the License. from airflow.models import BaseOperator -from airflow.providers.amazon.aws.hooks.redshift import ClusterStates, RedshiftHook +from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook class RedshiftPauseClusterOperator(BaseOperator): @@ -54,5 +54,5 @@ def execute(self, context): redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id) self.log.info("Pausing Redshift cluster %s", self.cluster_identifier) cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier) - if cluster_state == ClusterStates.AVAILABLE: + if cluster_state == RedshiftClusterStates.AVAILABLE: redshift_hook.get_conn().pause_cluster(ClusterIdentifier=self.cluster_identifier) diff --git a/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py b/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py index 4b586a9705bc..405d9c7f471e 100644 --- a/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py +++ b/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py @@ -17,7 +17,7 @@ # under the License. from airflow.models import BaseOperator -from airflow.providers.amazon.aws.hooks.redshift import ClusterStates, RedshiftHook +from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook class RedshiftResumeClusterOperator(BaseOperator): @@ -54,5 +54,5 @@ def execute(self, context): redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id) self.log.info("Starting Redshift cluster %s", self.cluster_identifier) cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier) - if cluster_state == ClusterStates.PAUSED: + if cluster_state == RedshiftClusterStates.PAUSED: redshift_hook.get_conn().resume_cluster(ClusterIdentifier=self.cluster_identifier) diff --git a/airflow/providers/amazon/aws/sensors/redshift.py b/airflow/providers/amazon/aws/sensors/redshift.py index 8aefdd31cadc..4530035ae922 100644 --- a/airflow/providers/amazon/aws/sensors/redshift.py +++ b/airflow/providers/amazon/aws/sensors/redshift.py @@ -17,7 +17,7 @@ # under the License. from typing import Optional -from airflow.providers.amazon.aws.hooks.redshift import ClusterStates, RedshiftHook +from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook from airflow.sensors.base import BaseSensorOperator @@ -28,7 +28,7 @@ class AwsRedshiftClusterSensor(BaseSensorOperator): :param cluster_identifier: The identifier for the cluster being pinged. :type cluster_identifier: str :param target_status: The cluster status desired. - :type target_status: ClusterStates + :type target_status: RedshiftClusterStates """ template_fields = ('cluster_identifier', 'target_status') @@ -37,21 +37,25 @@ def __init__( self, *, cluster_identifier: str, - target_status: ClusterStates = ClusterStates.AVAILABLE, + target_status: RedshiftClusterStates = RedshiftClusterStates.AVAILABLE, aws_conn_id: str = 'aws_default', **kwargs, ): super().__init__(**kwargs) self.cluster_identifier = cluster_identifier self.target_status = ( - target_status if isinstance(target_status, ClusterStates) else ClusterStates(str(target_status)) + target_status + if isinstance(target_status, RedshiftClusterStates) + else RedshiftClusterStates(str(target_status)) ) self.aws_conn_id = aws_conn_id self.hook: Optional[RedshiftHook] = None def poke(self, context): - self.log.info('Poking for status : %s\nfor cluster %s', self.target_status.value, self.cluster_identifier) + self.log.info( + 'Poking for status : %s\nfor cluster %s', self.target_status.value, self.cluster_identifier + ) return self.get_hook().cluster_status(self.cluster_identifier) == self.target_status def get_hook(self) -> RedshiftHook: diff --git a/docs/apache-airflow-providers-amazon/operators/redshift.rst b/docs/apache-airflow-providers-amazon/operators/redshift.rst index 43996044ee1f..f99f16f1724e 100644 --- a/docs/apache-airflow-providers-amazon/operators/redshift.rst +++ b/docs/apache-airflow-providers-amazon/operators/redshift.rst @@ -15,19 +15,17 @@ specific language governing permissions and limitations under the License. -Amazon Redshift Operators -================================================= +.. _howto/operator:RedshiftSQLOperator: -`Amazon Redshift `__ is a fully managed, -petabyte-scale data warehouse service in the cloud. You can start with just a few hundred gigabytes -of data and scale to a petabyte or more. This enables you to use your data to acquire new insights -for your business and customers. +RedshiftSQLOperator +=================== -Airflow provides operators to create and interact with the Redshift clusters and compute infrastructure. +.. contents:: + :depth: 1 + :local: -RedshiftSQLOperator -^^^^^^^^^^^^^^^^^^^^^^^^^^ -.. _howto/operator:RedshiftSQLOperator: +Overview +-------- Use the :class:`RedshiftSQLOperator ` to execute statements against an Amazon Redshift cluster. @@ -36,6 +34,16 @@ statements against an Amazon Redshift cluster. :class:`RedshiftSQLHook ` to establish connections with Amazon Redshift. + +example_redshift.py +------------------- + +Purpose +""""""" + +This is a basic example dag for using :class:`RedshiftSQLOperator ` +to execute statements against an Amazon Redshift cluster. + Create a table """""""""""""" @@ -78,7 +86,7 @@ parameters into SQL statements. :end-before: [END howto_operator_redshift_get_with_filter] The complete RedshiftSQLOperator DAG -""""""""""""""""""""""""""""""""""""""""""" +------------------------------------ All together, here is our DAG: @@ -89,17 +97,23 @@ All together, here is our DAG: .. _howto/operator:RedshiftResumeClusterOperator: + Resume a Redshift Cluster """"""""""""""""""""""""""""""""""""""""""" + To resume an existing AWS Redshift Cluster you can use :class:`~airflow.providers.amazon.aws.operators.redshift_resume_cluster.RedshiftResumeClusterOperator`. -This Operator leverages the AWS CLI `resume-cluster `__ API +This Operator leverages the AWS CLI +`resume-cluster `__ API .. _howto/operator:RedshiftPauseClusterOperator: + Pause a Redshift Cluster """"""""""""""""""""""""""""""""""""""""""" + To pause an existing AWS Redshift Cluster you can use :class:`~airflow.providers.amazon.aws.operators.redshift_pause_cluster.RedshiftPauseClusterOperator`. -This Operator leverages the AWS CLI `pause-cluster `__ API +This Operator leverages the AWS CLI +`pause-cluster `__ API diff --git a/tests/providers/amazon/aws/hooks/test_redshift.py b/tests/providers/amazon/aws/hooks/test_redshift.py index ae59f97cc08e..c8e4ea00ff94 100644 --- a/tests/providers/amazon/aws/hooks/test_redshift.py +++ b/tests/providers/amazon/aws/hooks/test_redshift.py @@ -26,7 +26,7 @@ from airflow.models import Connection from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook -from airflow.providers.amazon.aws.hooks.redshift import ClusterStates, RedshiftHook, RedshiftSQLHook +from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook, RedshiftSQLHook try: from moto import mock_redshift @@ -98,7 +98,7 @@ def test_cluster_status_returns_cluster_not_found(self): self._create_clusters() hook = RedshiftHook(aws_conn_id='aws_default') status = hook.cluster_status('test_cluster_not_here') - assert status == ClusterStates.NONEXISTENT + assert status == RedshiftClusterStates.NONEXISTENT @unittest.skipIf(mock_redshift is None, 'mock_redshift package not present') @mock_redshift @@ -106,7 +106,7 @@ def test_cluster_status_returns_available_cluster(self): self._create_clusters() hook = RedshiftHook(aws_conn_id='aws_default') status = hook.cluster_status('test_cluster') - assert status == ClusterStates.AVAILABLE + assert status == RedshiftClusterStates.AVAILABLE class TestRedshiftSQLHookConn(unittest.TestCase): diff --git a/tests/providers/amazon/aws/sensors/test_redshift.py b/tests/providers/amazon/aws/sensors/test_redshift.py index ea9fb78676b5..8b728042ed51 100644 --- a/tests/providers/amazon/aws/sensors/test_redshift.py +++ b/tests/providers/amazon/aws/sensors/test_redshift.py @@ -21,7 +21,7 @@ import boto3 -from airflow.providers.amazon.aws.hooks.redshift import ClusterStates +from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates from airflow.providers.amazon.aws.sensors.redshift import AwsRedshiftClusterSensor try: @@ -67,7 +67,7 @@ def test_poke_with_cluster_state(self): timeout=5, aws_conn_id='aws_default', cluster_identifier='test_cluster', - target_status=ClusterStates.AVAILABLE, + target_status=RedshiftClusterStates.AVAILABLE, ) assert op.poke(None) @@ -96,7 +96,7 @@ def test_poke_cluster_not_found(self): timeout=5, aws_conn_id='aws_default', cluster_identifier='test_cluster_not_found', - target_status=ClusterStates.NONEXISTENT, + target_status=RedshiftClusterStates.NONEXISTENT, ) assert op.poke(None) From 3b3f1e09a8409677ed3f3bab00e37224447d2ba0 Mon Sep 17 00:00:00 2001 From: dbarrundia3 Date: Tue, 30 Nov 2021 13:09:22 -0800 Subject: [PATCH 09/26] Adding `seealso` to Operator documentation --- .../providers/amazon/aws/operators/redshift_pause_cluster.py | 4 ++++ .../providers/amazon/aws/operators/redshift_resume_cluster.py | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py b/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py index 004a931f7a6e..7aff945c21f7 100644 --- a/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py +++ b/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py @@ -24,6 +24,10 @@ class RedshiftPauseClusterOperator(BaseOperator): """ Pause an AWS Redshift Cluster using boto3. + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:RedshiftPauseClusterOperator` + :param cluster_identifier: id of the AWS Redshift Cluster :type cluster_identifier: str :param aws_conn_id: aws connection to use diff --git a/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py b/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py index 405d9c7f471e..c97a2a53ee93 100644 --- a/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py +++ b/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py @@ -24,6 +24,10 @@ class RedshiftResumeClusterOperator(BaseOperator): """ Resume an AWS Redshift Cluster using boto3. + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:RedshiftResumeClusterOperator` + :param cluster_identifier: id of the AWS Redshift Cluster :type cluster_identifier: str :param aws_conn_id: aws connection to use From b1513379aa96029983fac4a5abe4bbb6794f804d Mon Sep 17 00:00:00 2001 From: Daniel Barrundia Gonzalez Date: Tue, 30 Nov 2021 13:23:47 -0800 Subject: [PATCH 10/26] Update airflow/providers/amazon/aws/operators/redshift_resume_cluster.py Co-authored-by: Daniel Standish <15932138+dstandish@users.noreply.github.com> --- .../providers/amazon/aws/operators/redshift_resume_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py b/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py index c97a2a53ee93..adc14b99938e 100644 --- a/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py +++ b/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py @@ -22,7 +22,7 @@ class RedshiftResumeClusterOperator(BaseOperator): """ - Resume an AWS Redshift Cluster using boto3. + Resume a paused AWS Redshift Cluster .. seealso:: For more information on how to use this operator, take a look at the guide: From ec92273b1c2eaf1a42c92efc771b06870a33d653 Mon Sep 17 00:00:00 2001 From: Daniel Barrundia Gonzalez Date: Tue, 30 Nov 2021 13:23:55 -0800 Subject: [PATCH 11/26] Update airflow/providers/amazon/aws/operators/redshift_pause_cluster.py Co-authored-by: Daniel Standish <15932138+dstandish@users.noreply.github.com> --- .../providers/amazon/aws/operators/redshift_pause_cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py b/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py index 7aff945c21f7..6959d66a3fd8 100644 --- a/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py +++ b/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py @@ -22,7 +22,7 @@ class RedshiftPauseClusterOperator(BaseOperator): """ - Pause an AWS Redshift Cluster using boto3. + Pause an AWS Redshift Cluster if it has status `available`. .. seealso:: For more information on how to use this operator, take a look at the guide: From 3ce29da7777c240486d01313bdb210f8b7417256 Mon Sep 17 00:00:00 2001 From: dbarrundia3 Date: Tue, 30 Nov 2021 13:26:53 -0800 Subject: [PATCH 12/26] Removing unused `check_interval` from operator --- .../providers/amazon/aws/operators/redshift_pause_cluster.py | 5 ----- .../amazon/aws/operators/redshift_resume_cluster.py | 5 ----- .../amazon/aws/operators/test_redshift_pause_cluster.py | 4 +--- .../amazon/aws/operators/test_redshift_resume_cluster.py | 4 +--- 4 files changed, 2 insertions(+), 16 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py b/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py index 7aff945c21f7..674b4db0d773 100644 --- a/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py +++ b/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py @@ -32,9 +32,6 @@ class RedshiftPauseClusterOperator(BaseOperator): :type cluster_identifier: str :param aws_conn_id: aws connection to use :type aws_conn_id: str - :param check_interval: time in seconds that the job should wait in - between each instance state checks until operation is completed - :type check_interval: float """ template_fields = ("cluster_identifier",) @@ -46,13 +43,11 @@ def __init__( *, cluster_identifier: str, aws_conn_id: str = "aws_default", - check_interval: float = 15, **kwargs, ): super().__init__(**kwargs) self.cluster_identifier = cluster_identifier self.aws_conn_id = aws_conn_id - self.check_interval = check_interval def execute(self, context): redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id) diff --git a/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py b/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py index c97a2a53ee93..1dcc7268f7f5 100644 --- a/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py +++ b/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py @@ -32,9 +32,6 @@ class RedshiftResumeClusterOperator(BaseOperator): :type cluster_identifier: str :param aws_conn_id: aws connection to use :type aws_conn_id: str - :param check_interval: time in seconds that the job should wait in - between each instance state checks until operation is completed - :type check_interval: float """ template_fields = ("cluster_identifier",) @@ -46,13 +43,11 @@ def __init__( *, cluster_identifier: str, aws_conn_id: str = "aws_default", - check_interval: float = 15, **kwargs, ): super().__init__(**kwargs) self.cluster_identifier = cluster_identifier self.aws_conn_id = aws_conn_id - self.check_interval = check_interval def execute(self, context): redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id) diff --git a/tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py b/tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py index 8d4e4dddcd4a..c8f839e86f83 100644 --- a/tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py +++ b/tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py @@ -52,10 +52,8 @@ def test_init(self): redshift_operator = RedshiftPauseClusterOperator( task_id="task_test", cluster_identifier="test_cluster", - aws_conn_id="aws_conn_test", - check_interval=3, + aws_conn_id="aws_conn_test" ) assert redshift_operator.task_id == "task_test" assert redshift_operator.cluster_identifier == "test_cluster" assert redshift_operator.aws_conn_id == "aws_conn_test" - assert redshift_operator.check_interval == 3 diff --git a/tests/providers/amazon/aws/operators/test_redshift_resume_cluster.py b/tests/providers/amazon/aws/operators/test_redshift_resume_cluster.py index b7eb0b76b22c..659b52ba5527 100644 --- a/tests/providers/amazon/aws/operators/test_redshift_resume_cluster.py +++ b/tests/providers/amazon/aws/operators/test_redshift_resume_cluster.py @@ -52,10 +52,8 @@ def test_init(self): redshift_operator = RedshiftResumeClusterOperator( task_id="task_test", cluster_identifier="test_cluster", - aws_conn_id="aws_conn_test", - check_interval=3, + aws_conn_id="aws_conn_test" ) assert redshift_operator.task_id == "task_test" assert redshift_operator.cluster_identifier == "test_cluster" assert redshift_operator.aws_conn_id == "aws_conn_test" - assert redshift_operator.check_interval == 3 From 4d6e4023ed5f2e7ae915b69f2073ecd451c1bc6a Mon Sep 17 00:00:00 2001 From: Daniel Barrundia Gonzalez Date: Tue, 30 Nov 2021 13:37:01 -0800 Subject: [PATCH 13/26] Update airflow/providers/amazon/aws/sensors/redshift.py Co-authored-by: Daniel Standish <15932138+dstandish@users.noreply.github.com> --- airflow/providers/amazon/aws/sensors/redshift.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/airflow/providers/amazon/aws/sensors/redshift.py b/airflow/providers/amazon/aws/sensors/redshift.py index 4530035ae922..513a7cdce724 100644 --- a/airflow/providers/amazon/aws/sensors/redshift.py +++ b/airflow/providers/amazon/aws/sensors/redshift.py @@ -43,12 +43,7 @@ def __init__( ): super().__init__(**kwargs) self.cluster_identifier = cluster_identifier - self.target_status = ( - target_status - if isinstance(target_status, RedshiftClusterStates) - else RedshiftClusterStates(str(target_status)) - ) - + self.target_status = RedshiftClusterStates(target_status) self.aws_conn_id = aws_conn_id self.hook: Optional[RedshiftHook] = None From 57c4c35d2c3efc8133e3bf0230cf5a2792d03e00 Mon Sep 17 00:00:00 2001 From: Daniel Barrundia Gonzalez Date: Tue, 30 Nov 2021 13:38:33 -0800 Subject: [PATCH 14/26] Update docs/apache-airflow-providers-amazon/operators/redshift.rst Co-authored-by: Daniel Standish <15932138+dstandish@users.noreply.github.com> --- docs/apache-airflow-providers-amazon/operators/redshift.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/apache-airflow-providers-amazon/operators/redshift.rst b/docs/apache-airflow-providers-amazon/operators/redshift.rst index f99f16f1724e..5511d8aa5e44 100644 --- a/docs/apache-airflow-providers-amazon/operators/redshift.rst +++ b/docs/apache-airflow-providers-amazon/operators/redshift.rst @@ -101,7 +101,7 @@ All together, here is our DAG: Resume a Redshift Cluster """"""""""""""""""""""""""""""""""""""""""" -To resume an existing AWS Redshift Cluster you can use +To resume a 'paused' AWS Redshift Cluster you can use :class:`~airflow.providers.amazon.aws.operators.redshift_resume_cluster.RedshiftResumeClusterOperator`. This Operator leverages the AWS CLI From e03031f9e9faa5de0d7e9b21cac79b556969ecb8 Mon Sep 17 00:00:00 2001 From: Daniel Barrundia Gonzalez Date: Tue, 30 Nov 2021 13:38:49 -0800 Subject: [PATCH 15/26] Update docs/apache-airflow-providers-amazon/operators/redshift.rst Co-authored-by: Daniel Standish <15932138+dstandish@users.noreply.github.com> --- docs/apache-airflow-providers-amazon/operators/redshift.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/apache-airflow-providers-amazon/operators/redshift.rst b/docs/apache-airflow-providers-amazon/operators/redshift.rst index 5511d8aa5e44..054c783c05b3 100644 --- a/docs/apache-airflow-providers-amazon/operators/redshift.rst +++ b/docs/apache-airflow-providers-amazon/operators/redshift.rst @@ -112,7 +112,7 @@ This Operator leverages the AWS CLI Pause a Redshift Cluster """"""""""""""""""""""""""""""""""""""""""" -To pause an existing AWS Redshift Cluster you can use +To pause an 'available' AWS Redshift Cluster you can use :class:`~airflow.providers.amazon.aws.operators.redshift_pause_cluster.RedshiftPauseClusterOperator`. This Operator leverages the AWS CLI From 92cfcf2d60303442b432bfa4a9fd3aba8899a430 Mon Sep 17 00:00:00 2001 From: Daniel Barrundia Gonzalez Date: Tue, 30 Nov 2021 13:39:51 -0800 Subject: [PATCH 16/26] Update tests/providers/amazon/aws/sensors/test_redshift.py Co-authored-by: Daniel Standish <15932138+dstandish@users.noreply.github.com> --- tests/providers/amazon/aws/sensors/test_redshift.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/providers/amazon/aws/sensors/test_redshift.py b/tests/providers/amazon/aws/sensors/test_redshift.py index 8b728042ed51..d52a36462af5 100644 --- a/tests/providers/amazon/aws/sensors/test_redshift.py +++ b/tests/providers/amazon/aws/sensors/test_redshift.py @@ -99,4 +99,4 @@ def test_poke_cluster_not_found(self): target_status=RedshiftClusterStates.NONEXISTENT, ) - assert op.poke(None) + assert op.poke(None) is True From 06a7e3d777b6f8430a117e265c9e79b5d5ecdf5b Mon Sep 17 00:00:00 2001 From: dbarrundia3 Date: Tue, 7 Dec 2021 14:49:08 -0600 Subject: [PATCH 17/26] Rolling back changes to not use enum and move all Operators into one unified file. --- .../providers/amazon/aws/hooks/redshift.py | 24 +----- .../amazon/aws/operators/redshift.py | 83 ++++++++++++++++++- .../aws/operators/redshift_pause_cluster.py | 57 ------------- .../aws/operators/redshift_resume_cluster.py | 57 ------------- .../providers/amazon/aws/sensors/redshift.py | 17 ++-- airflow/providers/amazon/provider.yaml | 2 - .../operators/redshift.rst | 5 +- .../amazon/aws/hooks/test_redshift.py | 6 +- .../amazon/aws/operators/test_redshift.py | 73 +++++++++++++++- .../operators/test_redshift_pause_cluster.py | 59 ------------- .../operators/test_redshift_resume_cluster.py | 59 ------------- 11 files changed, 168 insertions(+), 274 deletions(-) delete mode 100644 airflow/providers/amazon/aws/operators/redshift_pause_cluster.py delete mode 100644 airflow/providers/amazon/aws/operators/redshift_resume_cluster.py delete mode 100644 tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py delete mode 100644 tests/providers/amazon/aws/operators/test_redshift_resume_cluster.py diff --git a/airflow/providers/amazon/aws/hooks/redshift.py b/airflow/providers/amazon/aws/hooks/redshift.py index 727e720acfe3..e9fefc022791 100644 --- a/airflow/providers/amazon/aws/hooks/redshift.py +++ b/airflow/providers/amazon/aws/hooks/redshift.py @@ -16,6 +16,7 @@ # specific language governing permissions and limitations # under the License. """Interact with AWS Redshift clusters.""" + from typing import Dict, List, Optional, Union try: @@ -23,8 +24,6 @@ except ImportError: from cached_property import cached_property -from enum import Enum - import redshift_connector from redshift_connector import Connection as RedshiftConnection from sqlalchemy import create_engine @@ -34,21 +33,6 @@ from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook -class RedshiftClusterStates(Enum): - """Contains the possible State values of a Redshift Cluster.""" - - AVAILABLE = 'available' - CREATING = 'creating' - DELETING = 'deleting' - RESUMING = 'resuming' - MODIFYING = 'modifying' - PAUSED = 'paused' - REBOOTING = 'rebooting' - RENAMING = 'renaming' - RESIZING = 'resizing' - NONEXISTENT = 'nonexistent' - - class RedshiftHook(AwsBaseHook): """ Interact with AWS Redshift, using the boto3 library @@ -68,7 +52,7 @@ def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) # TODO: Wrap create_cluster_snapshot - def cluster_status(self, cluster_identifier: str) -> RedshiftClusterStates: + def cluster_status(self, cluster_identifier: str) -> str: """ Return status of a cluster @@ -81,9 +65,9 @@ def cluster_status(self, cluster_identifier: str) -> RedshiftClusterStates: """ try: response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters'] - return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None + return response[0]['ClusterStatus'] if response else None except self.get_conn().exceptions.ClusterNotFoundFault: - return RedshiftClusterStates.NONEXISTENT + return 'cluster_not_found' def delete_cluster( self, diff --git a/airflow/providers/amazon/aws/operators/redshift.py b/airflow/providers/amazon/aws/operators/redshift.py index 9c1f8adfbf08..ff30e92b3f65 100644 --- a/airflow/providers/amazon/aws/operators/redshift.py +++ b/airflow/providers/amazon/aws/operators/redshift.py @@ -18,7 +18,7 @@ from typing import Dict, Iterable, Optional, Union from airflow.models import BaseOperator -from airflow.providers.amazon.aws.hooks.redshift import RedshiftSQLHook +from airflow.providers.amazon.aws.hooks.redshift import RedshiftHook, RedshiftSQLHook class RedshiftSQLOperator(BaseOperator): @@ -71,3 +71,84 @@ def execute(self, context: dict) -> None: self.log.info(f"Executing statement: {self.sql}") hook = self.get_hook() hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters) + + +class RedshiftResumeClusterOperator(BaseOperator): + """ + Resume a paused AWS Redshift Cluster + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:RedshiftResumeClusterOperator` + + :param cluster_identifier: id of the AWS Redshift Cluster + :type cluster_identifier: str + :param aws_conn_id: aws connection to use + :type aws_conn_id: str + """ + template_fields = ("cluster_identifier",) + ui_color = "#eeaa11" + ui_fgcolor = "#ffffff" + + def __init__( + self, + *, + cluster_identifier: str, + aws_conn_id: str = "aws_default", + **kwargs, + ): + super().__init__(**kwargs) + self.cluster_identifier = cluster_identifier + self.aws_conn_id = aws_conn_id + + def execute(self, context): + redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id) + self.log.info("Starting Redshift cluster %s", self.cluster_identifier) + cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier) + if cluster_state == 'paused': + redshift_hook.get_conn().resume_cluster(ClusterIdentifier=self.cluster_identifier) + else: + self.log.warning( + "Unable to resume cluster since cluster is currently in status: %s", cluster_state + ) + + +class RedshiftPauseClusterOperator(BaseOperator): + """ + Pause an AWS Redshift Cluster if it has status `available`. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:RedshiftPauseClusterOperator` + + :param cluster_identifier: id of the AWS Redshift Cluster + :type cluster_identifier: str + :param aws_conn_id: aws connection to use + :type aws_conn_id: str + """ + + template_fields = ("cluster_identifier",) + ui_color = "#eeaa11" + ui_fgcolor = "#ffffff" + + def __init__( + self, + *, + cluster_identifier: str, + aws_conn_id: str = "aws_default", + **kwargs, + ): + super().__init__(**kwargs) + self.cluster_identifier = cluster_identifier + self.aws_conn_id = aws_conn_id + + def execute(self, context): + redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id) + self.log.info("Pausing Redshift cluster %s", self.cluster_identifier) + cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier) + if cluster_state == 'available': + redshift_hook.get_conn().pause_cluster(ClusterIdentifier=self.cluster_identifier) + else: + self.log.warning( + "Unable to pause cluster since cluster is currently in status: %s", cluster_state + ) diff --git a/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py b/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py deleted file mode 100644 index 6c4656c77392..000000000000 --- a/airflow/providers/amazon/aws/operators/redshift_pause_cluster.py +++ /dev/null @@ -1,57 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from airflow.models import BaseOperator -from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook - - -class RedshiftPauseClusterOperator(BaseOperator): - """ - Pause an AWS Redshift Cluster if it has status `available`. - - .. seealso:: - For more information on how to use this operator, take a look at the guide: - :ref:`howto/operator:RedshiftPauseClusterOperator` - - :param cluster_identifier: id of the AWS Redshift Cluster - :type cluster_identifier: str - :param aws_conn_id: aws connection to use - :type aws_conn_id: str - """ - - template_fields = ("cluster_identifier",) - ui_color = "#eeaa11" - ui_fgcolor = "#ffffff" - - def __init__( - self, - *, - cluster_identifier: str, - aws_conn_id: str = "aws_default", - **kwargs, - ): - super().__init__(**kwargs) - self.cluster_identifier = cluster_identifier - self.aws_conn_id = aws_conn_id - - def execute(self, context): - redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id) - self.log.info("Pausing Redshift cluster %s", self.cluster_identifier) - cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier) - if cluster_state == RedshiftClusterStates.AVAILABLE: - redshift_hook.get_conn().pause_cluster(ClusterIdentifier=self.cluster_identifier) diff --git a/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py b/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py deleted file mode 100644 index a124ce1ec5e7..000000000000 --- a/airflow/providers/amazon/aws/operators/redshift_resume_cluster.py +++ /dev/null @@ -1,57 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from airflow.models import BaseOperator -from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook - - -class RedshiftResumeClusterOperator(BaseOperator): - """ - Resume a paused AWS Redshift Cluster - - .. seealso:: - For more information on how to use this operator, take a look at the guide: - :ref:`howto/operator:RedshiftResumeClusterOperator` - - :param cluster_identifier: id of the AWS Redshift Cluster - :type cluster_identifier: str - :param aws_conn_id: aws connection to use - :type aws_conn_id: str - """ - - template_fields = ("cluster_identifier",) - ui_color = "#eeaa11" - ui_fgcolor = "#ffffff" - - def __init__( - self, - *, - cluster_identifier: str, - aws_conn_id: str = "aws_default", - **kwargs, - ): - super().__init__(**kwargs) - self.cluster_identifier = cluster_identifier - self.aws_conn_id = aws_conn_id - - def execute(self, context): - redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id) - self.log.info("Starting Redshift cluster %s", self.cluster_identifier) - cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier) - if cluster_state == RedshiftClusterStates.PAUSED: - redshift_hook.get_conn().resume_cluster(ClusterIdentifier=self.cluster_identifier) diff --git a/airflow/providers/amazon/aws/sensors/redshift.py b/airflow/providers/amazon/aws/sensors/redshift.py index 4530035ae922..79c8803ab163 100644 --- a/airflow/providers/amazon/aws/sensors/redshift.py +++ b/airflow/providers/amazon/aws/sensors/redshift.py @@ -17,7 +17,7 @@ # under the License. from typing import Optional -from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook +from airflow.providers.amazon.aws.hooks.redshift import RedshiftHook from airflow.sensors.base import BaseSensorOperator @@ -28,7 +28,7 @@ class AwsRedshiftClusterSensor(BaseSensorOperator): :param cluster_identifier: The identifier for the cluster being pinged. :type cluster_identifier: str :param target_status: The cluster status desired. - :type target_status: RedshiftClusterStates + :type target_status: str """ template_fields = ('cluster_identifier', 'target_status') @@ -37,25 +37,18 @@ def __init__( self, *, cluster_identifier: str, - target_status: RedshiftClusterStates = RedshiftClusterStates.AVAILABLE, + target_status: str = 'available', aws_conn_id: str = 'aws_default', **kwargs, ): super().__init__(**kwargs) self.cluster_identifier = cluster_identifier - self.target_status = ( - target_status - if isinstance(target_status, RedshiftClusterStates) - else RedshiftClusterStates(str(target_status)) - ) - + self.target_status = target_status self.aws_conn_id = aws_conn_id self.hook: Optional[RedshiftHook] = None def poke(self, context): - self.log.info( - 'Poking for status : %s\nfor cluster %s', self.target_status.value, self.cluster_identifier - ) + self.log.info('Checking cluster %r for status %r', self.cluster_identifier, self.target_status) return self.get_hook().cluster_status(self.cluster_identifier) == self.target_status def get_hook(self) -> RedshiftHook: diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index 3184b53eadab..80555f142b90 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -245,8 +245,6 @@ operators: - integration-name: Amazon Redshift python-modules: - airflow.providers.amazon.aws.operators.redshift - - airflow.providers.amazon.aws.operators.redshift_pause_cluster - - airflow.providers.amazon.aws.operators.redshift_resume_cluster sensors: - integration-name: Amazon Athena diff --git a/docs/apache-airflow-providers-amazon/operators/redshift.rst b/docs/apache-airflow-providers-amazon/operators/redshift.rst index f99f16f1724e..ce984e378477 100644 --- a/docs/apache-airflow-providers-amazon/operators/redshift.rst +++ b/docs/apache-airflow-providers-amazon/operators/redshift.rst @@ -102,7 +102,7 @@ Resume a Redshift Cluster """"""""""""""""""""""""""""""""""""""""""" To resume an existing AWS Redshift Cluster you can use -:class:`~airflow.providers.amazon.aws.operators.redshift_resume_cluster.RedshiftResumeClusterOperator`. +:class:`RedshiftResumeClusterOperator ` This Operator leverages the AWS CLI `resume-cluster `__ API @@ -113,7 +113,6 @@ Pause a Redshift Cluster """"""""""""""""""""""""""""""""""""""""""" To pause an existing AWS Redshift Cluster you can use -:class:`~airflow.providers.amazon.aws.operators.redshift_pause_cluster.RedshiftPauseClusterOperator`. - +:class:`RedshiftPauseClusterOperator ` This Operator leverages the AWS CLI `pause-cluster `__ API diff --git a/tests/providers/amazon/aws/hooks/test_redshift.py b/tests/providers/amazon/aws/hooks/test_redshift.py index c8e4ea00ff94..35a1b5ecc8f0 100644 --- a/tests/providers/amazon/aws/hooks/test_redshift.py +++ b/tests/providers/amazon/aws/hooks/test_redshift.py @@ -26,7 +26,7 @@ from airflow.models import Connection from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook -from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook, RedshiftSQLHook +from airflow.providers.amazon.aws.hooks.redshift import RedshiftHook, RedshiftSQLHook try: from moto import mock_redshift @@ -98,7 +98,7 @@ def test_cluster_status_returns_cluster_not_found(self): self._create_clusters() hook = RedshiftHook(aws_conn_id='aws_default') status = hook.cluster_status('test_cluster_not_here') - assert status == RedshiftClusterStates.NONEXISTENT + assert status == 'cluster_not_found' @unittest.skipIf(mock_redshift is None, 'mock_redshift package not present') @mock_redshift @@ -106,7 +106,7 @@ def test_cluster_status_returns_available_cluster(self): self._create_clusters() hook = RedshiftHook(aws_conn_id='aws_default') status = hook.cluster_status('test_cluster') - assert status == RedshiftClusterStates.AVAILABLE + assert status == 'available' class TestRedshiftSQLHookConn(unittest.TestCase): diff --git a/tests/providers/amazon/aws/operators/test_redshift.py b/tests/providers/amazon/aws/operators/test_redshift.py index de9206ef83ff..8916960af4c9 100644 --- a/tests/providers/amazon/aws/operators/test_redshift.py +++ b/tests/providers/amazon/aws/operators/test_redshift.py @@ -20,9 +20,14 @@ from unittest import mock from unittest.mock import MagicMock +import boto3 from parameterized import parameterized -from airflow.providers.amazon.aws.operators.redshift import RedshiftSQLOperator +from airflow.providers.amazon.aws.operators.redshift import ( + RedshiftPauseClusterOperator, + RedshiftResumeClusterOperator, + RedshiftSQLOperator, +) class TestRedshiftSQLOperator(unittest.TestCase): @@ -42,3 +47,69 @@ def test_redshift_operator(self, test_autocommit, test_parameters, mock_get_hook autocommit=test_autocommit, parameters=test_parameters, ) + + +class TestResumeClusterOperator(unittest.TestCase): + @staticmethod + def _create_clusters(): + client = boto3.client('redshift', region_name='us-east-1') + client.create_cluster( + ClusterIdentifier='test_cluster_to_pause', + NodeType='dc1.large', + MasterUsername='admin', + MasterUserPassword='mock_password', + ) + client.create_cluster( + ClusterIdentifier='test_cluster_to_resume', + NodeType='dc1.large', + MasterUsername='admin', + MasterUserPassword='mock_password', + ) + if not client.describe_clusters()['Clusters']: + raise ValueError('AWS not properly mocked') + + def test_init(self): + redshift_operator = RedshiftResumeClusterOperator( + task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" + ) + assert redshift_operator.task_id == "task_test" + assert redshift_operator.cluster_identifier == "test_cluster" + assert redshift_operator.aws_conn_id == "aws_conn_test" + + def test_resume_cluster(self): + # TODO: Add test once moto library supports pause_cluster() or resume_cluster() boto api calls: + # https://github.com/spulec/moto/issues/4591 + pass + + +class TestPauseClusterOperator(unittest.TestCase): + @staticmethod + def _create_clusters(): + client = boto3.client('redshift', region_name='us-east-1') + client.create_cluster( + ClusterIdentifier='test_cluster_to_pause', + NodeType='dc1.large', + MasterUsername='admin', + MasterUserPassword='mock_password', + ) + client.create_cluster( + ClusterIdentifier='test_cluster_to_resume', + NodeType='dc1.large', + MasterUsername='admin', + MasterUserPassword='mock_password', + ) + if not client.describe_clusters()['Clusters']: + raise ValueError('AWS not properly mocked') + + def test_init(self): + redshift_operator = RedshiftPauseClusterOperator( + task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" + ) + assert redshift_operator.task_id == "task_test" + assert redshift_operator.cluster_identifier == "test_cluster" + assert redshift_operator.aws_conn_id == "aws_conn_test" + + def test_pause_cluster(self): + # TODO: Add test once moto library supports pause_cluster() or resume_cluster() boto api calls: + # https://github.com/spulec/moto/issues/4591 + pass diff --git a/tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py b/tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py deleted file mode 100644 index c8f839e86f83..000000000000 --- a/tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py +++ /dev/null @@ -1,59 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import unittest - -import boto3 - -from airflow.providers.amazon.aws.operators.redshift_pause_cluster import RedshiftPauseClusterOperator - -try: - from moto import mock_redshift -except ImportError: - mock_redshift = None - - -class TestPauseClusterOperator(unittest.TestCase): - @staticmethod - def _create_clusters(): - client = boto3.client('redshift', region_name='us-east-1') - client.create_cluster( - ClusterIdentifier='test_cluster_to_pause', - NodeType='dc1.large', - MasterUsername='admin', - MasterUserPassword='mock_password', - ) - client.create_cluster( - ClusterIdentifier='test_cluster_to_resume', - NodeType='dc1.large', - MasterUsername='admin', - MasterUserPassword='mock_password', - ) - if not client.describe_clusters()['Clusters']: - raise ValueError('AWS not properly mocked') - - def test_init(self): - redshift_operator = RedshiftPauseClusterOperator( - task_id="task_test", - cluster_identifier="test_cluster", - aws_conn_id="aws_conn_test" - ) - assert redshift_operator.task_id == "task_test" - assert redshift_operator.cluster_identifier == "test_cluster" - assert redshift_operator.aws_conn_id == "aws_conn_test" diff --git a/tests/providers/amazon/aws/operators/test_redshift_resume_cluster.py b/tests/providers/amazon/aws/operators/test_redshift_resume_cluster.py deleted file mode 100644 index 659b52ba5527..000000000000 --- a/tests/providers/amazon/aws/operators/test_redshift_resume_cluster.py +++ /dev/null @@ -1,59 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import unittest - -import boto3 - -from airflow.providers.amazon.aws.operators.redshift_resume_cluster import RedshiftResumeClusterOperator - -try: - from moto import mock_redshift -except ImportError: - mock_redshift = None - - -class TestResumeClusterOperator(unittest.TestCase): - @staticmethod - def _create_clusters(): - client = boto3.client('redshift', region_name='us-east-1') - client.create_cluster( - ClusterIdentifier='test_cluster_to_pause', - NodeType='dc1.large', - MasterUsername='admin', - MasterUserPassword='mock_password', - ) - client.create_cluster( - ClusterIdentifier='test_cluster_to_resume', - NodeType='dc1.large', - MasterUsername='admin', - MasterUserPassword='mock_password', - ) - if not client.describe_clusters()['Clusters']: - raise ValueError('AWS not properly mocked') - - def test_init(self): - redshift_operator = RedshiftResumeClusterOperator( - task_id="task_test", - cluster_identifier="test_cluster", - aws_conn_id="aws_conn_test" - ) - assert redshift_operator.task_id == "task_test" - assert redshift_operator.cluster_identifier == "test_cluster" - assert redshift_operator.aws_conn_id == "aws_conn_test" From e72bec0799d2207bcef365d0bd056420f3cc7361 Mon Sep 17 00:00:00 2001 From: dbarrundia3 Date: Tue, 7 Dec 2021 14:55:29 -0600 Subject: [PATCH 18/26] Rolling back changes to not use enum and move all Operators into one unified file. --- airflow/providers/amazon/aws/operators/redshift.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/providers/amazon/aws/operators/redshift.py b/airflow/providers/amazon/aws/operators/redshift.py index ff30e92b3f65..04660240f339 100644 --- a/airflow/providers/amazon/aws/operators/redshift.py +++ b/airflow/providers/amazon/aws/operators/redshift.py @@ -86,6 +86,7 @@ class RedshiftResumeClusterOperator(BaseOperator): :param aws_conn_id: aws connection to use :type aws_conn_id: str """ + template_fields = ("cluster_identifier",) ui_color = "#eeaa11" ui_fgcolor = "#ffffff" From e659253d2437014ff263d5ef0f7611244ab4e9c4 Mon Sep 17 00:00:00 2001 From: dbarrundia3 Date: Tue, 7 Dec 2021 14:59:09 -0600 Subject: [PATCH 19/26] Rolling back changes to not use enum and move all Operators into one unified file. --- .../amazon/aws/sensors/test_redshift.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/tests/providers/amazon/aws/sensors/test_redshift.py b/tests/providers/amazon/aws/sensors/test_redshift.py index d52a36462af5..e28bc25eae21 100644 --- a/tests/providers/amazon/aws/sensors/test_redshift.py +++ b/tests/providers/amazon/aws/sensors/test_redshift.py @@ -21,7 +21,6 @@ import boto3 -from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates from airflow.providers.amazon.aws.sensors.redshift import AwsRedshiftClusterSensor try: @@ -85,18 +84,3 @@ def test_poke_false(self): ) assert not op.poke(None) - - @unittest.skipIf(mock_redshift is None, 'mock_redshift package not present') - @mock_redshift - def test_poke_cluster_not_found(self): - self._create_cluster() - op = AwsRedshiftClusterSensor( - task_id='test_cluster_sensor', - poke_interval=1, - timeout=5, - aws_conn_id='aws_default', - cluster_identifier='test_cluster_not_found', - target_status=RedshiftClusterStates.NONEXISTENT, - ) - - assert op.poke(None) is True From b303dc7eba808fbf05c3c6a065be8018f258c038 Mon Sep 17 00:00:00 2001 From: dbarrundia3 Date: Tue, 7 Dec 2021 15:00:13 -0600 Subject: [PATCH 20/26] Rolling back changes to not use enum and move all Operators into one unified file. --- .../providers/amazon/aws/sensors/test_redshift.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/tests/providers/amazon/aws/sensors/test_redshift.py b/tests/providers/amazon/aws/sensors/test_redshift.py index e28bc25eae21..ec7ae66ab531 100644 --- a/tests/providers/amazon/aws/sensors/test_redshift.py +++ b/tests/providers/amazon/aws/sensors/test_redshift.py @@ -58,21 +58,22 @@ def test_poke(self): @unittest.skipIf(mock_redshift is None, 'mock_redshift package not present') @mock_redshift - def test_poke_with_cluster_state(self): + def test_poke_false(self): self._create_cluster() op = AwsRedshiftClusterSensor( task_id='test_cluster_sensor', poke_interval=1, timeout=5, aws_conn_id='aws_default', - cluster_identifier='test_cluster', - target_status=RedshiftClusterStates.AVAILABLE, + cluster_identifier='test_cluster_not_found', + target_status='available', ) - assert op.poke(None) + + assert not op.poke(None) @unittest.skipIf(mock_redshift is None, 'mock_redshift package not present') @mock_redshift - def test_poke_false(self): + def test_poke_cluster_not_found(self): self._create_cluster() op = AwsRedshiftClusterSensor( task_id='test_cluster_sensor', @@ -80,7 +81,7 @@ def test_poke_false(self): timeout=5, aws_conn_id='aws_default', cluster_identifier='test_cluster_not_found', - target_status='available', + target_status='cluster_not_found', ) - assert not op.poke(None) + assert op.poke(None) From 6c63fed5c253aadedad4260632d311b4ff77a57a Mon Sep 17 00:00:00 2001 From: Daniel Barrundia Gonzalez Date: Tue, 7 Dec 2021 16:44:01 -0600 Subject: [PATCH 21/26] Update airflow/providers/amazon/aws/operators/redshift.py Co-authored-by: Daniel Standish <15932138+dstandish@users.noreply.github.com> --- airflow/providers/amazon/aws/operators/redshift.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/operators/redshift.py b/airflow/providers/amazon/aws/operators/redshift.py index 04660240f339..0dec10aba660 100644 --- a/airflow/providers/amazon/aws/operators/redshift.py +++ b/airflow/providers/amazon/aws/operators/redshift.py @@ -104,9 +104,9 @@ def __init__( def execute(self, context): redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id) - self.log.info("Starting Redshift cluster %s", self.cluster_identifier) cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier) if cluster_state == 'paused': + self.log.info("Starting Redshift cluster %s", self.cluster_identifier) redshift_hook.get_conn().resume_cluster(ClusterIdentifier=self.cluster_identifier) else: self.log.warning( From 3ac8f331be3d6c7f7e64426c50a6f73b50614a48 Mon Sep 17 00:00:00 2001 From: Daniel Barrundia Gonzalez Date: Tue, 7 Dec 2021 16:44:10 -0600 Subject: [PATCH 22/26] Update airflow/providers/amazon/aws/operators/redshift.py Co-authored-by: Daniel Standish <15932138+dstandish@users.noreply.github.com> --- airflow/providers/amazon/aws/operators/redshift.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/operators/redshift.py b/airflow/providers/amazon/aws/operators/redshift.py index 0dec10aba660..52d82b40fad9 100644 --- a/airflow/providers/amazon/aws/operators/redshift.py +++ b/airflow/providers/amazon/aws/operators/redshift.py @@ -145,9 +145,9 @@ def __init__( def execute(self, context): redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id) - self.log.info("Pausing Redshift cluster %s", self.cluster_identifier) cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier) if cluster_state == 'available': + self.log.info("Pausing Redshift cluster %s", self.cluster_identifier) redshift_hook.get_conn().pause_cluster(ClusterIdentifier=self.cluster_identifier) else: self.log.warning( From 50becc631642ab6bd61874de910756725bca5956 Mon Sep 17 00:00:00 2001 From: dbarrundia3 Date: Tue, 7 Dec 2021 16:51:12 -0600 Subject: [PATCH 23/26] Removing no longer required subclass --- tests/providers/amazon/aws/operators/test_redshift.py | 4 ++-- tests/providers/amazon/aws/sensors/test_redshift.py | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/providers/amazon/aws/operators/test_redshift.py b/tests/providers/amazon/aws/operators/test_redshift.py index 8916960af4c9..f47d25a3362a 100644 --- a/tests/providers/amazon/aws/operators/test_redshift.py +++ b/tests/providers/amazon/aws/operators/test_redshift.py @@ -49,7 +49,7 @@ def test_redshift_operator(self, test_autocommit, test_parameters, mock_get_hook ) -class TestResumeClusterOperator(unittest.TestCase): +class TestResumeClusterOperator: @staticmethod def _create_clusters(): client = boto3.client('redshift', region_name='us-east-1') @@ -82,7 +82,7 @@ def test_resume_cluster(self): pass -class TestPauseClusterOperator(unittest.TestCase): +class TestPauseClusterOperator: @staticmethod def _create_clusters(): client = boto3.client('redshift', region_name='us-east-1') diff --git a/tests/providers/amazon/aws/sensors/test_redshift.py b/tests/providers/amazon/aws/sensors/test_redshift.py index ec7ae66ab531..4bddf995ea27 100644 --- a/tests/providers/amazon/aws/sensors/test_redshift.py +++ b/tests/providers/amazon/aws/sensors/test_redshift.py @@ -39,8 +39,6 @@ def _create_cluster(): MasterUsername='admin', MasterUserPassword='mock_password', ) - if not client.describe_clusters()['Clusters']: - raise ValueError('AWS not properly mocked') @unittest.skipIf(mock_redshift is None, 'mock_redshift package not present') @mock_redshift From f012c29430b3c286d3afa690804be3d5373385d1 Mon Sep 17 00:00:00 2001 From: dbarrundia3 Date: Wed, 8 Dec 2021 19:33:26 -0600 Subject: [PATCH 24/26] Add unittests for TestPauseClusterOperator and TestResumeClusterOperator --- .../amazon/aws/operators/test_redshift.py | 64 ++++++++++++++++--- 1 file changed, 54 insertions(+), 10 deletions(-) diff --git a/tests/providers/amazon/aws/operators/test_redshift.py b/tests/providers/amazon/aws/operators/test_redshift.py index f47d25a3362a..c565be8c23ce 100644 --- a/tests/providers/amazon/aws/operators/test_redshift.py +++ b/tests/providers/amazon/aws/operators/test_redshift.py @@ -76,10 +76,33 @@ def test_init(self): assert redshift_operator.cluster_identifier == "test_cluster" assert redshift_operator.aws_conn_id == "aws_conn_test" - def test_resume_cluster(self): - # TODO: Add test once moto library supports pause_cluster() or resume_cluster() boto api calls: - # https://github.com/spulec/moto/issues/4591 - pass + @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.cluster_status") + @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.get_conn") + def test_resume_cluster_is_called_when_cluster_is_paused(self, mock_get_conn, mock_cluster_status): + conn = MagicMock() + mock_run = conn.resume_cluster + mock_get_conn.return_value = conn + + mock_cluster_status.return_value = 'paused' + redshift_operator = RedshiftResumeClusterOperator( + task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" + ) + redshift_operator.execute(None) + mock_run.assert_called_once_with(ClusterIdentifier='test_cluster') + + @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.cluster_status") + @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.get_conn") + def test_resume_cluster_not_called_when_cluster_is_not_paused(self, mock_get_conn, mock_cluster_status): + conn = MagicMock() + mock_run = conn.resume_cluster + mock_get_conn.return_value = conn + + mock_cluster_status.return_value = 'available' + redshift_operator = RedshiftResumeClusterOperator( + task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" + ) + redshift_operator.execute(None) + mock_run.assert_not_called() class TestPauseClusterOperator: @@ -98,8 +121,6 @@ def _create_clusters(): MasterUsername='admin', MasterUserPassword='mock_password', ) - if not client.describe_clusters()['Clusters']: - raise ValueError('AWS not properly mocked') def test_init(self): redshift_operator = RedshiftPauseClusterOperator( @@ -109,7 +130,30 @@ def test_init(self): assert redshift_operator.cluster_identifier == "test_cluster" assert redshift_operator.aws_conn_id == "aws_conn_test" - def test_pause_cluster(self): - # TODO: Add test once moto library supports pause_cluster() or resume_cluster() boto api calls: - # https://github.com/spulec/moto/issues/4591 - pass + @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.cluster_status") + @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.get_conn") + def test_pause_cluster_is_called_when_cluster_is_available(self, mock_get_conn, mock_cluster_status): + conn = MagicMock() + mock_run = conn.pause_cluster + mock_get_conn.return_value = conn + + mock_cluster_status.return_value = 'available' + redshift_operator = RedshiftPauseClusterOperator( + task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" + ) + redshift_operator.execute(None) + mock_run.assert_called_once_with(ClusterIdentifier='test_cluster') + + @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.cluster_status") + @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.get_conn") + def test_pause_cluster_not_called_when_cluster_is_not_available(self, mock_get_conn, mock_cluster_status): + conn = MagicMock() + mock_run = conn.pause_cluster + mock_get_conn.return_value = conn + + mock_cluster_status.return_value = 'paused' + redshift_operator = RedshiftPauseClusterOperator( + task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" + ) + redshift_operator.execute(None) + mock_run.assert_not_called() From a7ff6caecb92dbf31f499f0ab3de036bd140690a Mon Sep 17 00:00:00 2001 From: dbarrundia3 Date: Thu, 9 Dec 2021 09:13:50 -0600 Subject: [PATCH 25/26] Clean unittest for redshift Operators --- .../amazon/aws/operators/test_redshift.py | 59 ++----------------- 1 file changed, 4 insertions(+), 55 deletions(-) diff --git a/tests/providers/amazon/aws/operators/test_redshift.py b/tests/providers/amazon/aws/operators/test_redshift.py index c565be8c23ce..d43cf4a939b7 100644 --- a/tests/providers/amazon/aws/operators/test_redshift.py +++ b/tests/providers/amazon/aws/operators/test_redshift.py @@ -20,7 +20,6 @@ from unittest import mock from unittest.mock import MagicMock -import boto3 from parameterized import parameterized from airflow.providers.amazon.aws.operators.redshift import ( @@ -50,24 +49,6 @@ def test_redshift_operator(self, test_autocommit, test_parameters, mock_get_hook class TestResumeClusterOperator: - @staticmethod - def _create_clusters(): - client = boto3.client('redshift', region_name='us-east-1') - client.create_cluster( - ClusterIdentifier='test_cluster_to_pause', - NodeType='dc1.large', - MasterUsername='admin', - MasterUserPassword='mock_password', - ) - client.create_cluster( - ClusterIdentifier='test_cluster_to_resume', - NodeType='dc1.large', - MasterUsername='admin', - MasterUserPassword='mock_password', - ) - if not client.describe_clusters()['Clusters']: - raise ValueError('AWS not properly mocked') - def test_init(self): redshift_operator = RedshiftResumeClusterOperator( task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" @@ -79,49 +60,25 @@ def test_init(self): @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.cluster_status") @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.get_conn") def test_resume_cluster_is_called_when_cluster_is_paused(self, mock_get_conn, mock_cluster_status): - conn = MagicMock() - mock_run = conn.resume_cluster - mock_get_conn.return_value = conn - mock_cluster_status.return_value = 'paused' redshift_operator = RedshiftResumeClusterOperator( task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" ) redshift_operator.execute(None) - mock_run.assert_called_once_with(ClusterIdentifier='test_cluster') + mock_get_conn.return_value.resume_cluster.assert_called_once_with(ClusterIdentifier='test_cluster') @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.cluster_status") @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.get_conn") def test_resume_cluster_not_called_when_cluster_is_not_paused(self, mock_get_conn, mock_cluster_status): - conn = MagicMock() - mock_run = conn.resume_cluster - mock_get_conn.return_value = conn - mock_cluster_status.return_value = 'available' redshift_operator = RedshiftResumeClusterOperator( task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" ) redshift_operator.execute(None) - mock_run.assert_not_called() + mock_get_conn.return_value.resume_cluster.assert_not_called() class TestPauseClusterOperator: - @staticmethod - def _create_clusters(): - client = boto3.client('redshift', region_name='us-east-1') - client.create_cluster( - ClusterIdentifier='test_cluster_to_pause', - NodeType='dc1.large', - MasterUsername='admin', - MasterUserPassword='mock_password', - ) - client.create_cluster( - ClusterIdentifier='test_cluster_to_resume', - NodeType='dc1.large', - MasterUsername='admin', - MasterUserPassword='mock_password', - ) - def test_init(self): redshift_operator = RedshiftPauseClusterOperator( task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" @@ -133,27 +90,19 @@ def test_init(self): @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.cluster_status") @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.get_conn") def test_pause_cluster_is_called_when_cluster_is_available(self, mock_get_conn, mock_cluster_status): - conn = MagicMock() - mock_run = conn.pause_cluster - mock_get_conn.return_value = conn - mock_cluster_status.return_value = 'available' redshift_operator = RedshiftPauseClusterOperator( task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" ) redshift_operator.execute(None) - mock_run.assert_called_once_with(ClusterIdentifier='test_cluster') + mock_get_conn.return_value.pause_cluster.assert_called_once_with(ClusterIdentifier='test_cluster') @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.cluster_status") @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.get_conn") def test_pause_cluster_not_called_when_cluster_is_not_available(self, mock_get_conn, mock_cluster_status): - conn = MagicMock() - mock_run = conn.pause_cluster - mock_get_conn.return_value = conn - mock_cluster_status.return_value = 'paused' redshift_operator = RedshiftPauseClusterOperator( task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" ) redshift_operator.execute(None) - mock_run.assert_not_called() + mock_get_conn.return_value.pause_cluster.assert_not_called() From f4ac7a486e204240248e51ed3549be2bcb14dccc Mon Sep 17 00:00:00 2001 From: dbarrundia3 Date: Thu, 9 Dec 2021 12:59:07 -0600 Subject: [PATCH 26/26] Small fixups --- airflow/providers/amazon/aws/sensors/redshift.py | 2 +- tests/providers/amazon/aws/sensors/test_redshift.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/sensors/redshift.py b/airflow/providers/amazon/aws/sensors/redshift.py index 79c8803ab163..669e6c0c86b8 100644 --- a/airflow/providers/amazon/aws/sensors/redshift.py +++ b/airflow/providers/amazon/aws/sensors/redshift.py @@ -48,7 +48,7 @@ def __init__( self.hook: Optional[RedshiftHook] = None def poke(self, context): - self.log.info('Checking cluster %r for status %r', self.cluster_identifier, self.target_status) + self.log.info('Poking for status : %s\nfor cluster %s', self.target_status, self.cluster_identifier) return self.get_hook().cluster_status(self.cluster_identifier) == self.target_status def get_hook(self) -> RedshiftHook: diff --git a/tests/providers/amazon/aws/sensors/test_redshift.py b/tests/providers/amazon/aws/sensors/test_redshift.py index 4bddf995ea27..ec7ae66ab531 100644 --- a/tests/providers/amazon/aws/sensors/test_redshift.py +++ b/tests/providers/amazon/aws/sensors/test_redshift.py @@ -39,6 +39,8 @@ def _create_cluster(): MasterUsername='admin', MasterUserPassword='mock_password', ) + if not client.describe_clusters()['Clusters']: + raise ValueError('AWS not properly mocked') @unittest.skipIf(mock_redshift is None, 'mock_redshift package not present') @mock_redshift