From a8386233aa73790e109cf69f453daee97b1f7daf Mon Sep 17 00:00:00 2001 From: Szymon Przedwojski Date: Thu, 13 Dec 2018 02:15:43 +0100 Subject: [PATCH] [AIRFLOW-3310] Google Cloud Spanner deploy / delete operators (#4286) --- .../example_dags/example_gcp_spanner.py | 92 +++++++++ airflow/contrib/hooks/gcp_spanner_hook.py | 183 ++++++++++++++++++ .../contrib/operators/gcp_spanner_operator.py | 132 +++++++++++++ docs/howto/operator.rst | 88 +++++++++ docs/integration.rst | 24 +++ setup.py | 2 + .../operators/test_gcp_spanner_operator.py | 178 +++++++++++++++++ 7 files changed, 699 insertions(+) create mode 100644 airflow/contrib/example_dags/example_gcp_spanner.py create mode 100644 airflow/contrib/hooks/gcp_spanner_hook.py create mode 100644 airflow/contrib/operators/gcp_spanner_operator.py create mode 100644 tests/contrib/operators/test_gcp_spanner_operator.py diff --git a/airflow/contrib/example_dags/example_gcp_spanner.py b/airflow/contrib/example_dags/example_gcp_spanner.py new file mode 100644 index 0000000000000..dd8b8c52b9d61 --- /dev/null +++ b/airflow/contrib/example_dags/example_gcp_spanner.py @@ -0,0 +1,92 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG that creates, updates and deletes a Cloud Spanner instance. + +This DAG relies on the following environment variables +* PROJECT_ID - Google Cloud Platform project for the Cloud Spanner instance. +* INSTANCE_ID - Cloud Spanner instance ID. +* CONFIG_NAME - The name of the instance's configuration. Values are of the form + projects//instanceConfigs/. + See also: + https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instanceConfigs#InstanceConfig + https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instanceConfigs/list#google.spanner.admin.instance.v1.InstanceAdmin.ListInstanceConfigs +* NODE_COUNT - Number of nodes allocated to the instance. +* DISPLAY_NAME - The descriptive name for this instance as it appears in UIs. + Must be unique per project and between 4 and 30 characters in length. +""" + +import os + +import airflow +from airflow import models +from airflow.contrib.operators.gcp_spanner_operator import \ + CloudSpannerInstanceDeployOperator, CloudSpannerInstanceDeleteOperator + +# [START howto_operator_spanner_arguments] +PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project') +INSTANCE_ID = os.environ.get('INSTANCE_ID', 'testinstance') +CONFIG_NAME = os.environ.get('CONFIG_NAME', + 'projects/example-project/instanceConfigs/eur3') +NODE_COUNT = os.environ.get('NODE_COUNT', '1') +DISPLAY_NAME = os.environ.get('DISPLAY_NAME', 'Test Instance') +# [END howto_operator_spanner_arguments] + +default_args = { + 'start_date': airflow.utils.dates.days_ago(1) +} + +with models.DAG( + 'example_gcp_spanner', + default_args=default_args, + schedule_interval=None # Override to match your needs +) as dag: + # Create + # [START howto_operator_spanner_deploy] + spanner_instance_create_task = CloudSpannerInstanceDeployOperator( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + configuration_name=CONFIG_NAME, + node_count=int(NODE_COUNT), + display_name=DISPLAY_NAME, + task_id='spanner_instance_create_task' + ) + # [END howto_operator_spanner_deploy] + + # Update + spanner_instance_update_task = CloudSpannerInstanceDeployOperator( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + configuration_name=CONFIG_NAME, + node_count=int(NODE_COUNT) + 1, + display_name=DISPLAY_NAME + '_updated', + task_id='spanner_instance_update_task' + ) + + # [START howto_operator_spanner_delete] + spanner_instance_delete_task = CloudSpannerInstanceDeleteOperator( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + task_id='spanner_instance_delete_task' + ) + # [END howto_operator_spanner_delete] + + spanner_instance_create_task >> spanner_instance_update_task \ + >> spanner_instance_delete_task diff --git a/airflow/contrib/hooks/gcp_spanner_hook.py b/airflow/contrib/hooks/gcp_spanner_hook.py new file mode 100644 index 0000000000000..fc73562e8b9fe --- /dev/null +++ b/airflow/contrib/hooks/gcp_spanner_hook.py @@ -0,0 +1,183 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from google.longrunning.operations_grpc_pb2 import Operation # noqa: F401 +from typing import Optional, Callable # noqa: F401 + +from google.api_core.exceptions import GoogleAPICallError +from google.cloud.spanner_v1.client import Client +from google.cloud.spanner_v1.instance import Instance # noqa: F401 + +from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook + + +# noinspection PyAbstractClass +class CloudSpannerHook(GoogleCloudBaseHook): + """ + Hook for Google Cloud Spanner APIs. + """ + _client = None + + def __init__(self, + gcp_conn_id='google_cloud_default', + delegate_to=None): + super(CloudSpannerHook, self).__init__(gcp_conn_id, delegate_to) + + def get_client(self, project_id): + # type: (str) -> Client + """ + Provides a client for interacting with Cloud Spanner API. + + :param project_id: The ID of the project which owns the instances, tables and data. + :type project_id: str + :return: Client for interacting with Cloud Spanner API. See: + https://googleapis.github.io/google-cloud-python/latest/spanner/client-api.html#google.cloud.spanner_v1.client.Client + :rtype: object + """ + if not self._client: + self._client = Client(project=project_id, credentials=self._get_credentials()) + return self._client + + def get_instance(self, project_id, instance_id): + # type: (str, str) -> Optional[Instance] + """ + Gets information about a particular instance. + + :param project_id: The ID of the project which owns the instances, tables and data. + :type project_id: str + :param instance_id: The ID of the instance. + :type instance_id: str + :return: Representation of a Cloud Spanner Instance. See: + https://googleapis.github.io/google-cloud-python/latest/spanner/instance-api.html#google.cloud.spanner_v1.instance.Instance + :rtype: object + """ + client = self.get_client(project_id) + instance = client.instance(instance_id) + if not instance.exists(): + return None + return instance + + def create_instance(self, project_id, instance_id, configuration_name, node_count, + display_name): + # type: (str, str, str, int, str) -> bool + """ + Creates a new Cloud Spanner instance. + + :param project_id: The ID of the project which owns the instances, tables and + data. + :type project_id: str + :param instance_id: The ID of the instance. + :type instance_id: str + :param configuration_name: Name of the instance configuration defining how the + instance will be created. Required for instances which do not yet exist. + :type configuration_name: str + :param node_count: (Optional) Number of nodes allocated to the instance. + :type node_count: int + :param display_name: (Optional) The display name for the instance in the Cloud + Console UI. (Must be between 4 and 30 characters.) If this value is not set + in the constructor, will fall back to the instance ID. + :type display_name: str + :return: True if the operation succeeded, raises an exception otherwise. + :rtype: bool + """ + return self._apply_to_instance(project_id, instance_id, configuration_name, + node_count, display_name, lambda x: x.create()) + + def update_instance(self, project_id, instance_id, configuration_name, node_count, + display_name): + # type: (str, str, str, int, str) -> bool + """ + Updates an existing Cloud Spanner instance. + + :param project_id: The ID of the project which owns the instances, tables and + data. + :type project_id: str + :param instance_id: The ID of the instance. + :type instance_id: str + :param configuration_name: Name of the instance configuration defining how the + instance will be created. Required for instances which do not yet exist. + :type configuration_name: str + :param node_count: (Optional) Number of nodes allocated to the instance. + :type node_count: int + :param display_name: (Optional) The display name for the instance in the Cloud + Console UI. (Must be between 4 and 30 characters.) If this value is not set + in the constructor, will fall back to the instance ID. + :type display_name: str + :return: True if the operation succeeded, raises an exception otherwise. + :rtype: bool + """ + return self._apply_to_instance(project_id, instance_id, configuration_name, + node_count, display_name, lambda x: x.update()) + + def _apply_to_instance(self, project_id, instance_id, configuration_name, node_count, + display_name, func): + # type: (str, str, str, int, str, Callable[[Instance], Operation]) -> bool + """ + Invokes a method on a given instance by applying a specified Callable. + + :param project_id: The ID of the project which owns the instances, tables and + data. + :type project_id: str + :param instance_id: The ID of the instance. + :type instance_id: str + :param configuration_name: Name of the instance configuration defining how the + instance will be created. Required for instances which do not yet exist. + :type configuration_name: str + :param node_count: (Optional) Number of nodes allocated to the instance. + :type node_count: int + :param display_name: (Optional) The display name for the instance in the Cloud + Console UI. (Must be between 4 and 30 characters.) If this value is not set + in the constructor, will fall back to the instance ID. + :type display_name: str + :param func: Method of the instance to be called. + :type func: Callable + """ + client = self.get_client(project_id) + instance = client.instance(instance_id, + configuration_name=configuration_name, + node_count=node_count, + display_name=display_name) + try: + operation = func(instance) # type: Operation + except GoogleAPICallError as e: + self.log.error('An error occurred: %s. Aborting.', e.message) + raise e + + if operation: + result = operation.result() + self.log.info(result) + return True + + def delete_instance(self, project_id, instance_id): + # type: (str, str) -> bool + """ + Deletes an existing Cloud Spanner instance. + + :param project_id: The ID of the project which owns the instances, tables and data. + :type project_id: str + :param instance_id: The ID of the instance. + :type instance_id: str + """ + client = self.get_client(project_id) + instance = client.instance(instance_id) + try: + instance.delete() + return True + except GoogleAPICallError as e: + self.log.error('An error occurred: %s. Aborting.', e.message) + raise e diff --git a/airflow/contrib/operators/gcp_spanner_operator.py b/airflow/contrib/operators/gcp_spanner_operator.py new file mode 100644 index 0000000000000..7b329a38490bb --- /dev/null +++ b/airflow/contrib/operators/gcp_spanner_operator.py @@ -0,0 +1,132 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from airflow import AirflowException +from airflow.contrib.hooks.gcp_spanner_hook import CloudSpannerHook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults + + +class CloudSpannerInstanceDeployOperator(BaseOperator): + """ + Creates a new Cloud Spanner instance or, if an instance with the same instance_id + exists in the specified project, updates it. + + :param project_id: The ID of the project which owns the instances, tables and data. + :type project_id: str + :param instance_id: Cloud Spanner instance ID. + :type instance_id: str + :param configuration_name: Name of the instance configuration defining + how the instance will be created. Required for instances which do not yet exist. + :type configuration_name: str + :param node_count: (Optional) Number of nodes allocated to the instance. + :type node_count: int + :param display_name: (Optional) The display name for the instance in the + Cloud Console UI. (Must be between 4 and 30 characters.) If this value is not + set in the constructor, will fall back to the instance ID. + :type display_name: str + :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform. + :type gcp_conn_id: str + """ + # [START gcp_spanner_deploy_template_fields] + template_fields = ('project_id', 'instance_id', 'configuration_name', 'display_name', + 'gcp_conn_id') + # [END gcp_spanner_deploy_template_fields] + + @apply_defaults + def __init__(self, + project_id, + instance_id, + configuration_name, + node_count, + display_name, + gcp_conn_id='google_cloud_default', + *args, **kwargs): + self.instance_id = instance_id + self.project_id = project_id + self.configuration_name = configuration_name + self.node_count = node_count + self.display_name = display_name + self.gcp_conn_id = gcp_conn_id + self._validate_inputs() + self._hook = CloudSpannerHook(gcp_conn_id=gcp_conn_id) + super(CloudSpannerInstanceDeployOperator, self).__init__(*args, **kwargs) + + def _validate_inputs(self): + if not self.project_id: + raise AirflowException("The required parameter 'project_id' is empty") + if not self.instance_id: + raise AirflowException("The required parameter 'instance_id' is empty") + + def execute(self, context): + if not self._hook.get_instance(self.project_id, self.instance_id): + self.log.info("Creating Cloud Spanner instance '%s'", self.instance_id) + func = self._hook.create_instance + else: + self.log.info("Updating Cloud Spanner instance '%s'", self.instance_id) + func = self._hook.update_instance + return func(self.project_id, + self.instance_id, + self.configuration_name, + self.node_count, + self.display_name) + + +class CloudSpannerInstanceDeleteOperator(BaseOperator): + """ + Deletes a Cloud Spanner instance. + If an instance does not exist, no action will be taken and the operator will succeed. + + :param project_id: The ID of the project which owns the instances, tables and data. + :type project_id: str + :param instance_id: Cloud Spanner instance ID. + :type instance_id: str + :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform. + :type gcp_conn_id: str + """ + # [START gcp_spanner_delete_template_fields] + template_fields = ('project_id', 'instance_id', 'gcp_conn_id') + # [END gcp_spanner_delete_template_fields] + + @apply_defaults + def __init__(self, + project_id, + instance_id, + gcp_conn_id='google_cloud_default', + *args, **kwargs): + self.instance_id = instance_id + self.project_id = project_id + self.gcp_conn_id = gcp_conn_id + self._validate_inputs() + self._hook = CloudSpannerHook(gcp_conn_id=gcp_conn_id) + super(CloudSpannerInstanceDeleteOperator, self).__init__(*args, **kwargs) + + def _validate_inputs(self): + if not self.project_id: + raise AirflowException("The required parameter 'project_id' is empty") + if not self.instance_id: + raise AirflowException("The required parameter 'instance_id' is empty") + + def execute(self, context): + if self._hook.get_instance(self.project_id, self.instance_id): + return self._hook.delete_instance(self.project_id, + self.instance_id) + else: + self.log.info("Instance '%s' does not exist in project '%s'. " + "Aborting delete.", self.instance_id, self.project_id) + return True diff --git a/docs/howto/operator.rst b/docs/howto/operator.rst index cd6e73b1f2474..095553b3ac6ab 100644 --- a/docs/howto/operator.rst +++ b/docs/howto/operator.rst @@ -545,6 +545,94 @@ See `Google Cloud Functions API documentation Google Cloud Sql Operators -------------------------- +CloudSpannerInstanceDeployOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Creates a new Cloud Spanner instance or, if an instance with the same name exists, +updates it. + +For parameter definition take a look at +:class:`~airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDeployOperator`. + +Arguments +""""""""" + +Some arguments in the example DAG are taken from environment variables: + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_spanner.py + :language: python + :start-after: [START howto_operator_spanner_arguments] + :end-before: [END howto_operator_spanner_arguments] + +Using the operator +"""""""""""""""""" + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_spanner.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_spanner_deploy] + :end-before: [END howto_operator_spanner_deploy] + +Templating +"""""""""" + +.. literalinclude:: ../../airflow/contrib/operators/gcp_spanner_operator.py + :language: python + :dedent: 4 + :start-after: [START gcp_spanner_deploy_template_fields] + :end-before: [END gcp_spanner_deploy_template_fields] + +More information +"""""""""""""""" + +See Google Cloud Spanner API documentation for instance `create +`_ +and `update +`_. + +CloudSpannerInstanceDeleteOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Deletes a Cloud Spanner instance. +If an instance does not exist, no action will be taken and the operator will succeed. + +For parameter definition take a look at +:class:`~airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDeleteOperator`. + +Arguments +""""""""" + +Some arguments in the example DAG are taken from environment variables: + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_spanner.py + :language: python + :start-after: [START howto_operator_spanner_arguments] + :end-before: [END howto_operator_spanner_arguments] + +Using the operator +"""""""""""""""""" + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_spanner.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_spanner_delete] + :end-before: [END howto_operator_spanner_delete] + +Templating +"""""""""" + +.. literalinclude:: ../../airflow/contrib/operators/gcp_spanner_operator.py + :language: python + :dedent: 4 + :start-after: [START gcp_spanner_delete_template_fields] + :end-before: [END gcp_spanner_delete_template_fields] + +More information +"""""""""""""""" + +See `Google Cloud Spanner API documentation for instance delete +`_. + CloudSqlInstanceDatabaseCreateOperator ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/docs/integration.rst b/docs/integration.rst index dfa001256a482..0afe55530927a 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -636,6 +636,30 @@ BigQueryHook .. autoclass:: airflow.contrib.hooks.bigquery_hook.BigQueryHook :members: +Cloud Spanner +''''''''''''' + +Cloud Spanner Operators +""""""""""""""""""""""" + +- :ref:`CloudSpannerInstanceDeployOperator` : creates a new Cloud Spanner instance or, + if an instance with the same name exists, updates it. +- :ref:`CloudSpannerInstanceDeleteOperator` : deletes a Cloud Spanner instance. + +.. _CloudSpannerInstanceDeployOperator: + +CloudSpannerInstanceDeployOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDeployOperator + +.. _CloudSpannerInstanceDeleteOperator: + +CloudSpannerInstanceDeleteOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDeleteOperator + Cloud SQL ''''''''' diff --git a/setup.py b/setup.py index b6a4f2d240a04..56b9bb2873c76 100644 --- a/setup.py +++ b/setup.py @@ -189,6 +189,8 @@ def write_version(filename=os.path.join(*['airflow', 'google-auth>=1.0.0, <2.0.0dev', 'google-auth-httplib2>=0.0.1', 'google-cloud-container>=0.1.1', + 'google-cloud-spanner>=1.6.0', + 'grpcio-gcp>=0.2.2', 'PyOpenSSL', 'pandas-gbq' ] diff --git a/tests/contrib/operators/test_gcp_spanner_operator.py b/tests/contrib/operators/test_gcp_spanner_operator.py new file mode 100644 index 0000000000000..ff2b82fd16cc1 --- /dev/null +++ b/tests/contrib/operators/test_gcp_spanner_operator.py @@ -0,0 +1,178 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import unittest + +from parameterized import parameterized + +from airflow import AirflowException +from airflow.contrib.operators.gcp_spanner_operator import \ + CloudSpannerInstanceDeployOperator, CloudSpannerInstanceDeleteOperator +from tests.contrib.operators.test_gcp_base import BaseGcpIntegrationTestCase, \ + SKIP_TEST_WARNING, GCP_SPANNER_KEY + +try: + # noinspection PyProtectedMember + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + +PROJECT_ID = 'project-id' +INSTANCE_ID = 'instance-id' +DB_NAME = 'db1' +CONFIG_NAME = 'projects/project-id/instanceConfigs/eur3' +NODE_COUNT = '1' +DISPLAY_NAME = 'Test Instance' + + +class CloudSpannerTest(unittest.TestCase): + @mock.patch("airflow.contrib.operators.gcp_spanner_operator.CloudSpannerHook") + def test_instance_create(self, mock_hook): + mock_hook.return_value.get_instance.return_value = None + op = CloudSpannerInstanceDeployOperator( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + configuration_name=CONFIG_NAME, + node_count=int(NODE_COUNT), + display_name=DISPLAY_NAME, + task_id="id" + ) + result = op.execute(None) + mock_hook.assert_called_once_with(gcp_conn_id="google_cloud_default") + mock_hook.return_value.create_instance.assert_called_once_with( + PROJECT_ID, INSTANCE_ID, CONFIG_NAME, int(NODE_COUNT), DISPLAY_NAME + ) + mock_hook.return_value.update_instance.assert_not_called() + self.assertTrue(result) + + @mock.patch("airflow.contrib.operators.gcp_spanner_operator.CloudSpannerHook") + def test_instance_update(self, mock_hook): + mock_hook.return_value.get_instance.return_value = {"name": INSTANCE_ID} + op = CloudSpannerInstanceDeployOperator( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + configuration_name=CONFIG_NAME, + node_count=int(NODE_COUNT), + display_name=DISPLAY_NAME, + task_id="id" + ) + result = op.execute(None) + mock_hook.assert_called_once_with(gcp_conn_id="google_cloud_default") + mock_hook.return_value.update_instance.assert_called_once_with( + PROJECT_ID, INSTANCE_ID, CONFIG_NAME, int(NODE_COUNT), DISPLAY_NAME + ) + mock_hook.return_value.create_instance.assert_not_called() + self.assertTrue(result) + + @mock.patch("airflow.contrib.operators.gcp_spanner_operator.CloudSpannerHook") + def test_instance_create_aborts_and_succeeds_if_instance_exists(self, mock_hook): + mock_hook.return_value.get_instance.return_value = {"name": INSTANCE_ID} + op = CloudSpannerInstanceDeployOperator( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + configuration_name=CONFIG_NAME, + node_count=int(NODE_COUNT), + display_name=DISPLAY_NAME, + task_id="id" + ) + result = op.execute(None) + mock_hook.assert_called_once_with(gcp_conn_id="google_cloud_default") + mock_hook.return_value.create_instance.assert_not_called() + self.assertTrue(result) + + @parameterized.expand([ + ("", INSTANCE_ID, "project_id"), + (PROJECT_ID, "", "instance_id"), + ]) + @mock.patch("airflow.contrib.operators.gcp_spanner_operator.CloudSpannerHook") + def test_instance_create_ex_if_param_missing(self, project_id, instance_id, + exp_msg, mock_hook): + with self.assertRaises(AirflowException) as cm: + CloudSpannerInstanceDeployOperator( + project_id=project_id, + instance_id=instance_id, + configuration_name=CONFIG_NAME, + node_count=int(NODE_COUNT), + display_name=DISPLAY_NAME, + task_id="id" + ) + err = cm.exception + self.assertIn("The required parameter '{}' is empty".format(exp_msg), str(err)) + mock_hook.assert_not_called() + + @mock.patch("airflow.contrib.operators.gcp_spanner_operator.CloudSpannerHook") + def test_instance_delete(self, mock_hook): + mock_hook.return_value.get_instance.return_value = {"name": INSTANCE_ID} + op = CloudSpannerInstanceDeleteOperator( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + task_id="id" + ) + result = op.execute(None) + mock_hook.assert_called_once_with(gcp_conn_id="google_cloud_default") + mock_hook.return_value.delete_instance.assert_called_once_with( + PROJECT_ID, INSTANCE_ID + ) + self.assertTrue(result) + + @mock.patch("airflow.contrib.operators.gcp_spanner_operator.CloudSpannerHook") + def test_instance_delete_aborts_and_succeeds_if_instance_does_not_exist(self, + mock_hook): + mock_hook.return_value.get_instance.return_value = None + op = CloudSpannerInstanceDeleteOperator( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + task_id="id" + ) + result = op.execute(None) + mock_hook.assert_called_once_with(gcp_conn_id="google_cloud_default") + mock_hook.return_value.delete_instance.assert_not_called() + self.assertTrue(result) + + @parameterized.expand([ + ("", INSTANCE_ID, "project_id"), + (PROJECT_ID, "", "instance_id"), + ]) + @mock.patch("airflow.contrib.operators.gcp_spanner_operator.CloudSpannerHook") + def test_instance_delete_ex_if_param_missing(self, project_id, instance_id, exp_msg, + mock_hook): + with self.assertRaises(AirflowException) as cm: + CloudSpannerInstanceDeleteOperator( + project_id=project_id, + instance_id=instance_id, + task_id="id" + ) + err = cm.exception + self.assertIn("The required parameter '{}' is empty".format(exp_msg), str(err)) + mock_hook.assert_not_called() + + +@unittest.skipIf( + BaseGcpIntegrationTestCase.skip_check(GCP_SPANNER_KEY), SKIP_TEST_WARNING) +class CloudSpannerExampleDagsTest(BaseGcpIntegrationTestCase): + def __init__(self, method_name='runTest'): + super(CloudSpannerExampleDagsTest, self).__init__( + method_name, + dag_id='example_gcp_spanner', + gcp_key=GCP_SPANNER_KEY) + + def test_run_example_dag_cloudsql_query(self): + self._run_dag()