Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[AIRFLOW-3310] Google Cloud Spanner deploy / delete operators (#4286)
- Loading branch information
1 parent
96b8db6
commit a838623
Showing
7 changed files
with
699 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
|
||
""" | ||
Example Airflow DAG that creates, updates and deletes a Cloud Spanner instance. | ||
This DAG relies on the following environment variables | ||
* PROJECT_ID - Google Cloud Platform project for the Cloud Spanner instance. | ||
* INSTANCE_ID - Cloud Spanner instance ID. | ||
* CONFIG_NAME - The name of the instance's configuration. Values are of the form | ||
projects/<project>/instanceConfigs/<configuration>. | ||
See also: | ||
https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instanceConfigs#InstanceConfig | ||
https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instanceConfigs/list#google.spanner.admin.instance.v1.InstanceAdmin.ListInstanceConfigs | ||
* NODE_COUNT - Number of nodes allocated to the instance. | ||
* DISPLAY_NAME - The descriptive name for this instance as it appears in UIs. | ||
Must be unique per project and between 4 and 30 characters in length. | ||
""" | ||
|
||
import os | ||
|
||
import airflow | ||
from airflow import models | ||
from airflow.contrib.operators.gcp_spanner_operator import \ | ||
CloudSpannerInstanceDeployOperator, CloudSpannerInstanceDeleteOperator | ||
|
||
# [START howto_operator_spanner_arguments] | ||
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project') | ||
INSTANCE_ID = os.environ.get('INSTANCE_ID', 'testinstance') | ||
CONFIG_NAME = os.environ.get('CONFIG_NAME', | ||
'projects/example-project/instanceConfigs/eur3') | ||
NODE_COUNT = os.environ.get('NODE_COUNT', '1') | ||
DISPLAY_NAME = os.environ.get('DISPLAY_NAME', 'Test Instance') | ||
# [END howto_operator_spanner_arguments] | ||
|
||
default_args = { | ||
'start_date': airflow.utils.dates.days_ago(1) | ||
} | ||
|
||
with models.DAG( | ||
'example_gcp_spanner', | ||
default_args=default_args, | ||
schedule_interval=None # Override to match your needs | ||
) as dag: | ||
# Create | ||
# [START howto_operator_spanner_deploy] | ||
spanner_instance_create_task = CloudSpannerInstanceDeployOperator( | ||
project_id=PROJECT_ID, | ||
instance_id=INSTANCE_ID, | ||
configuration_name=CONFIG_NAME, | ||
node_count=int(NODE_COUNT), | ||
display_name=DISPLAY_NAME, | ||
task_id='spanner_instance_create_task' | ||
) | ||
# [END howto_operator_spanner_deploy] | ||
|
||
# Update | ||
spanner_instance_update_task = CloudSpannerInstanceDeployOperator( | ||
project_id=PROJECT_ID, | ||
instance_id=INSTANCE_ID, | ||
configuration_name=CONFIG_NAME, | ||
node_count=int(NODE_COUNT) + 1, | ||
display_name=DISPLAY_NAME + '_updated', | ||
task_id='spanner_instance_update_task' | ||
) | ||
|
||
# [START howto_operator_spanner_delete] | ||
spanner_instance_delete_task = CloudSpannerInstanceDeleteOperator( | ||
project_id=PROJECT_ID, | ||
instance_id=INSTANCE_ID, | ||
task_id='spanner_instance_delete_task' | ||
) | ||
# [END howto_operator_spanner_delete] | ||
|
||
spanner_instance_create_task >> spanner_instance_update_task \ | ||
>> spanner_instance_delete_task |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,183 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
from google.longrunning.operations_grpc_pb2 import Operation # noqa: F401 | ||
from typing import Optional, Callable # noqa: F401 | ||
|
||
from google.api_core.exceptions import GoogleAPICallError | ||
from google.cloud.spanner_v1.client import Client | ||
from google.cloud.spanner_v1.instance import Instance # noqa: F401 | ||
|
||
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook | ||
|
||
|
||
# noinspection PyAbstractClass | ||
class CloudSpannerHook(GoogleCloudBaseHook): | ||
""" | ||
Hook for Google Cloud Spanner APIs. | ||
""" | ||
_client = None | ||
|
||
def __init__(self, | ||
gcp_conn_id='google_cloud_default', | ||
delegate_to=None): | ||
super(CloudSpannerHook, self).__init__(gcp_conn_id, delegate_to) | ||
|
||
def get_client(self, project_id): | ||
# type: (str) -> Client | ||
""" | ||
Provides a client for interacting with Cloud Spanner API. | ||
:param project_id: The ID of the project which owns the instances, tables and data. | ||
:type project_id: str | ||
:return: Client for interacting with Cloud Spanner API. See: | ||
https://googleapis.github.io/google-cloud-python/latest/spanner/client-api.html#google.cloud.spanner_v1.client.Client | ||
:rtype: object | ||
""" | ||
if not self._client: | ||
self._client = Client(project=project_id, credentials=self._get_credentials()) | ||
return self._client | ||
|
||
def get_instance(self, project_id, instance_id): | ||
# type: (str, str) -> Optional[Instance] | ||
""" | ||
Gets information about a particular instance. | ||
:param project_id: The ID of the project which owns the instances, tables and data. | ||
:type project_id: str | ||
:param instance_id: The ID of the instance. | ||
:type instance_id: str | ||
:return: Representation of a Cloud Spanner Instance. See: | ||
https://googleapis.github.io/google-cloud-python/latest/spanner/instance-api.html#google.cloud.spanner_v1.instance.Instance | ||
:rtype: object | ||
""" | ||
client = self.get_client(project_id) | ||
instance = client.instance(instance_id) | ||
if not instance.exists(): | ||
return None | ||
return instance | ||
|
||
def create_instance(self, project_id, instance_id, configuration_name, node_count, | ||
display_name): | ||
# type: (str, str, str, int, str) -> bool | ||
""" | ||
Creates a new Cloud Spanner instance. | ||
:param project_id: The ID of the project which owns the instances, tables and | ||
data. | ||
:type project_id: str | ||
:param instance_id: The ID of the instance. | ||
:type instance_id: str | ||
:param configuration_name: Name of the instance configuration defining how the | ||
instance will be created. Required for instances which do not yet exist. | ||
:type configuration_name: str | ||
:param node_count: (Optional) Number of nodes allocated to the instance. | ||
:type node_count: int | ||
:param display_name: (Optional) The display name for the instance in the Cloud | ||
Console UI. (Must be between 4 and 30 characters.) If this value is not set | ||
in the constructor, will fall back to the instance ID. | ||
:type display_name: str | ||
:return: True if the operation succeeded, raises an exception otherwise. | ||
:rtype: bool | ||
""" | ||
return self._apply_to_instance(project_id, instance_id, configuration_name, | ||
node_count, display_name, lambda x: x.create()) | ||
|
||
def update_instance(self, project_id, instance_id, configuration_name, node_count, | ||
display_name): | ||
# type: (str, str, str, int, str) -> bool | ||
""" | ||
Updates an existing Cloud Spanner instance. | ||
:param project_id: The ID of the project which owns the instances, tables and | ||
data. | ||
:type project_id: str | ||
:param instance_id: The ID of the instance. | ||
:type instance_id: str | ||
:param configuration_name: Name of the instance configuration defining how the | ||
instance will be created. Required for instances which do not yet exist. | ||
:type configuration_name: str | ||
:param node_count: (Optional) Number of nodes allocated to the instance. | ||
:type node_count: int | ||
:param display_name: (Optional) The display name for the instance in the Cloud | ||
Console UI. (Must be between 4 and 30 characters.) If this value is not set | ||
in the constructor, will fall back to the instance ID. | ||
:type display_name: str | ||
:return: True if the operation succeeded, raises an exception otherwise. | ||
:rtype: bool | ||
""" | ||
return self._apply_to_instance(project_id, instance_id, configuration_name, | ||
node_count, display_name, lambda x: x.update()) | ||
|
||
def _apply_to_instance(self, project_id, instance_id, configuration_name, node_count, | ||
display_name, func): | ||
# type: (str, str, str, int, str, Callable[[Instance], Operation]) -> bool | ||
""" | ||
Invokes a method on a given instance by applying a specified Callable. | ||
:param project_id: The ID of the project which owns the instances, tables and | ||
data. | ||
:type project_id: str | ||
:param instance_id: The ID of the instance. | ||
:type instance_id: str | ||
:param configuration_name: Name of the instance configuration defining how the | ||
instance will be created. Required for instances which do not yet exist. | ||
:type configuration_name: str | ||
:param node_count: (Optional) Number of nodes allocated to the instance. | ||
:type node_count: int | ||
:param display_name: (Optional) The display name for the instance in the Cloud | ||
Console UI. (Must be between 4 and 30 characters.) If this value is not set | ||
in the constructor, will fall back to the instance ID. | ||
:type display_name: str | ||
:param func: Method of the instance to be called. | ||
:type func: Callable | ||
""" | ||
client = self.get_client(project_id) | ||
instance = client.instance(instance_id, | ||
configuration_name=configuration_name, | ||
node_count=node_count, | ||
display_name=display_name) | ||
try: | ||
operation = func(instance) # type: Operation | ||
except GoogleAPICallError as e: | ||
self.log.error('An error occurred: %s. Aborting.', e.message) | ||
raise e | ||
|
||
if operation: | ||
result = operation.result() | ||
self.log.info(result) | ||
return True | ||
|
||
def delete_instance(self, project_id, instance_id): | ||
# type: (str, str) -> bool | ||
""" | ||
Deletes an existing Cloud Spanner instance. | ||
:param project_id: The ID of the project which owns the instances, tables and data. | ||
:type project_id: str | ||
:param instance_id: The ID of the instance. | ||
:type instance_id: str | ||
""" | ||
client = self.get_client(project_id) | ||
instance = client.instance(instance_id) | ||
try: | ||
instance.delete() | ||
return True | ||
except GoogleAPICallError as e: | ||
self.log.error('An error occurred: %s. Aborting.', e.message) | ||
raise e |
Oops, something went wrong.