Skip to content

Commit

Permalink
Dataproc metastore assets (#21267)
Browse files Browse the repository at this point in the history
  • Loading branch information
wojsamjan committed Feb 15, 2022
1 parent 6692e91 commit 56365b1
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 11 deletions.
156 changes: 145 additions & 11 deletions airflow/providers/google/cloud/operators/dataproc_metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#
"""This module contains Google Dataproc Metastore operators."""

from datetime import datetime
from time import sleep
from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Tuple, Union

Expand All @@ -29,13 +30,119 @@
from googleapiclient.errors import HttpError

from airflow import AirflowException
from airflow.models import BaseOperator
from airflow.models import BaseOperator, BaseOperatorLink
from airflow.models.xcom import XCom
from airflow.providers.google.cloud.hooks.dataproc_metastore import DataprocMetastoreHook
from airflow.providers.google.common.links.storage import StorageLink

if TYPE_CHECKING:
from airflow.utils.context import Context


BASE_LINK = "https://console.cloud.google.com"
METASTORE_BASE_LINK = BASE_LINK + "/dataproc/metastore/services/{region}/{service_id}"
METASTORE_BACKUP_LINK = METASTORE_BASE_LINK + "/backups/{resource}?project={project_id}"
METASTORE_BACKUPS_LINK = METASTORE_BASE_LINK + "/backuprestore?project={project_id}"
METASTORE_EXPORT_LINK = METASTORE_BASE_LINK + "/importexport?project={project_id}"
METASTORE_IMPORT_LINK = METASTORE_BASE_LINK + "/imports/{resource}?project={project_id}"
METASTORE_SERVICE_LINK = METASTORE_BASE_LINK + "/config?project={project_id}"


class DataprocMetastoreLink(BaseOperatorLink):
"""Helper class for constructing Dataproc Metastore resource link"""

name = "Dataproc Metastore"
key = "conf"

@staticmethod
def persist(
context: "Context",
task_instance: Union[
"DataprocMetastoreCreateServiceOperator",
"DataprocMetastoreGetServiceOperator",
"DataprocMetastoreRestoreServiceOperator",
"DataprocMetastoreUpdateServiceOperator",
"DataprocMetastoreListBackupsOperator",
"DataprocMetastoreExportMetadataOperator",
],
url: str,
):
task_instance.xcom_push(
context=context,
key=DataprocMetastoreLink.key,
value={
"region": task_instance.region,
"service_id": task_instance.service_id,
"project_id": task_instance.project_id,
"url": url,
},
)

def get_link(self, operator: BaseOperator, dttm: datetime):
conf = XCom.get_one(
dag_id=operator.dag.dag_id,
task_id=operator.task_id,
execution_date=dttm,
key=DataprocMetastoreLink.key,
)
return (
conf["url"].format(
region=conf["region"],
service_id=conf["service_id"],
project_id=conf["project_id"],
)
if conf
else ""
)


class DataprocMetastoreDetailedLink(BaseOperatorLink):
"""Helper class for constructing Dataproc Metastore detailed resource link"""

name = "Dataproc Metastore resource"
key = "config"

@staticmethod
def persist(
context: "Context",
task_instance: Union[
"DataprocMetastoreCreateBackupOperator",
"DataprocMetastoreCreateMetadataImportOperator",
],
url: str,
resource: str,
):
task_instance.xcom_push(
context=context,
key=DataprocMetastoreDetailedLink.key,
value={
"region": task_instance.region,
"service_id": task_instance.service_id,
"project_id": task_instance.project_id,
"url": url,
"resource": resource,
},
)

def get_link(self, operator: BaseOperator, dttm: datetime):
conf = XCom.get_one(
dag_id=operator.dag.dag_id,
task_id=operator.task_id,
execution_date=dttm,
key=DataprocMetastoreDetailedLink.key,
)
return (
conf["url"].format(
region=conf["region"],
service_id=conf["service_id"],
project_id=conf["project_id"],
resource=conf["resource"],
)
if conf
else ""
)


class DataprocMetastoreCreateBackupOperator(BaseOperator):
"""
Creates a new backup in a given project and location.
Expand Down Expand Up @@ -81,6 +188,7 @@ class DataprocMetastoreCreateBackupOperator(BaseOperator):
'impersonation_chain',
)
template_fields_renderers = {'backup': 'json'}
operator_extra_links = (DataprocMetastoreDetailedLink(),)

def __init__(
self,
Expand Down Expand Up @@ -111,7 +219,7 @@ def __init__(
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain

def execute(self, context: 'Context') -> dict:
def execute(self, context: "Context") -> dict:
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
Expand Down Expand Up @@ -144,6 +252,9 @@ def execute(self, context: 'Context') -> dict:
timeout=self.timeout,
metadata=self.metadata,
)
DataprocMetastoreDetailedLink.persist(
context=context, task_instance=self, url=METASTORE_BACKUP_LINK, resource=self.backup_id
)
return Backup.to_dict(backup)


Expand Down Expand Up @@ -192,6 +303,7 @@ class DataprocMetastoreCreateMetadataImportOperator(BaseOperator):
'impersonation_chain',
)
template_fields_renderers = {'metadata_import': 'json'}
operator_extra_links = (DataprocMetastoreDetailedLink(),)

def __init__(
self,
Expand Down Expand Up @@ -222,7 +334,7 @@ def __init__(
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain

def execute(self, context: 'Context'):
def execute(self, context: "Context"):
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
Expand All @@ -240,6 +352,10 @@ def execute(self, context: 'Context'):
)
metadata_import = hook.wait_for_operation(self.timeout, operation)
self.log.info("Metadata import %s created successfully", self.metadata_import_id)

DataprocMetastoreDetailedLink.persist(
context=context, task_instance=self, url=METASTORE_IMPORT_LINK, resource=self.metadata_import_id
)
return MetadataImport.to_dict(metadata_import)


Expand Down Expand Up @@ -282,6 +398,7 @@ class DataprocMetastoreCreateServiceOperator(BaseOperator):
'impersonation_chain',
)
template_fields_renderers = {'service': 'json'}
operator_extra_links = (DataprocMetastoreLink(),)

def __init__(
self,
Expand Down Expand Up @@ -310,7 +427,7 @@ def __init__(
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain

def execute(self, context: 'Context') -> dict:
def execute(self, context: "Context") -> dict:
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
Expand Down Expand Up @@ -340,6 +457,7 @@ def execute(self, context: 'Context') -> dict:
timeout=self.timeout,
metadata=self.metadata,
)
DataprocMetastoreLink.persist(context=context, task_instance=self, url=METASTORE_SERVICE_LINK)
return Service.to_dict(service)


Expand Down Expand Up @@ -409,7 +527,7 @@ def __init__(
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain

def execute(self, context: 'Context') -> None:
def execute(self, context: "Context") -> None:
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
Expand Down Expand Up @@ -469,7 +587,7 @@ def __init__(
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain

def execute(self, context: 'Context'):
def execute(self, context: "Context"):
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
Expand Down Expand Up @@ -521,6 +639,7 @@ class DataprocMetastoreExportMetadataOperator(BaseOperator):
'project_id',
'impersonation_chain',
)
operator_extra_links = (DataprocMetastoreLink(), StorageLink())

def __init__(
self,
Expand Down Expand Up @@ -551,7 +670,7 @@ def __init__(
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain

def execute(self, context: 'Context'):
def execute(self, context: "Context"):
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
Expand All @@ -569,8 +688,15 @@ def execute(self, context: 'Context'):
)
metadata_export = self._wait_for_export_metadata(hook)
self.log.info("Metadata from service %s exported successfully", self.service_id)

DataprocMetastoreLink.persist(context=context, task_instance=self, url=METASTORE_EXPORT_LINK)
uri = self._get_uri_from_destination(MetadataExport.to_dict(metadata_export)["destination_gcs_uri"])
StorageLink.persist(context=context, task_instance=self, uri=uri)
return MetadataExport.to_dict(metadata_export)

def _get_uri_from_destination(self, destination_uri: str):
return destination_uri[5:] if destination_uri.startswith("gs://") else destination_uri

def _wait_for_export_metadata(self, hook: DataprocMetastoreHook):
"""
Workaround to check that export was created successfully.
Expand Down Expand Up @@ -627,6 +753,7 @@ class DataprocMetastoreGetServiceOperator(BaseOperator):
'project_id',
'impersonation_chain',
)
operator_extra_links = (DataprocMetastoreLink(),)

def __init__(
self,
Expand All @@ -651,7 +778,7 @@ def __init__(
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain

def execute(self, context: 'Context') -> dict:
def execute(self, context: "Context") -> dict:
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
Expand All @@ -664,6 +791,7 @@ def execute(self, context: 'Context') -> dict:
timeout=self.timeout,
metadata=self.metadata,
)
DataprocMetastoreLink.persist(context=context, task_instance=self, url=METASTORE_SERVICE_LINK)
return Service.to_dict(result)


Expand Down Expand Up @@ -698,6 +826,7 @@ class DataprocMetastoreListBackupsOperator(BaseOperator):
'project_id',
'impersonation_chain',
)
operator_extra_links = (DataprocMetastoreLink(),)

def __init__(
self,
Expand Down Expand Up @@ -730,7 +859,7 @@ def __init__(
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain

def execute(self, context: 'Context') -> List[dict]:
def execute(self, context: "Context") -> List[dict]:
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
Expand All @@ -747,6 +876,7 @@ def execute(self, context: 'Context') -> List[dict]:
timeout=self.timeout,
metadata=self.metadata,
)
DataprocMetastoreLink.persist(context=context, task_instance=self, url=METASTORE_BACKUPS_LINK)
return [Backup.to_dict(backup) for backup in backups]


Expand Down Expand Up @@ -793,6 +923,7 @@ class DataprocMetastoreRestoreServiceOperator(BaseOperator):
'project_id',
'impersonation_chain',
)
operator_extra_links = (DataprocMetastoreLink(),)

def __init__(
self,
Expand Down Expand Up @@ -829,7 +960,7 @@ def __init__(
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain

def execute(self, context: 'Context'):
def execute(self, context: "Context"):
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
Expand All @@ -852,6 +983,7 @@ def execute(self, context: 'Context'):
)
self._wait_for_restore_service(hook)
self.log.info("Service %s restored from backup %s", self.service_id, self.backup_id)
DataprocMetastoreLink.persist(context=context, task_instance=self, url=METASTORE_SERVICE_LINK)

def _wait_for_restore_service(self, hook: DataprocMetastoreHook):
"""
Expand Down Expand Up @@ -921,6 +1053,7 @@ class DataprocMetastoreUpdateServiceOperator(BaseOperator):
'project_id',
'impersonation_chain',
)
operator_extra_links = (DataprocMetastoreLink(),)

def __init__(
self,
Expand Down Expand Up @@ -951,7 +1084,7 @@ def __init__(
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain

def execute(self, context: 'Context'):
def execute(self, context: "Context"):
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
Expand All @@ -970,3 +1103,4 @@ def execute(self, context: 'Context'):
)
hook.wait_for_operation(self.timeout, operation)
self.log.info("Service %s updated successfully", self.service.get("name"))
DataprocMetastoreLink.persist(context=context, task_instance=self, url=METASTORE_SERVICE_LINK)
16 changes: 16 additions & 0 deletions airflow/providers/google/common/links/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Loading

0 comments on commit 56365b1

Please sign in to comment.