Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 253 additions & 0 deletions airflow/contrib/operators/dataproc_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,262 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging
import time

from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from googleapiclient.errors import HttpError


class DataprocClusterCreateOperator(BaseOperator):
"""
Create a new cluster on Google Cloud Dataproc. The operator will wait until the
creation is successful or an error occurs in the creation process.

The parameters allow to configure the cluster. Please refer to

https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters

for a detailed explanation on the different parameters. Most of the configuration
parameters detailed in the link are available as a parameter to this operator.
"""

template_fields = ['cluster_name',]

@apply_defaults
def __init__(self,
cluster_name,
project_id,
num_workers,
zone,
storage_bucket=None,
init_actions_uris=None,
metadata=None,
properties=None,
master_machine_type='n1-standard-4',
master_disk_size=500,
worker_machine_type='n1-standard-4',
worker_disk_size=500,
num_preemptible_workers=0,
labels=None,
region='global',
google_cloud_conn_id='google_cloud_default',
delegate_to=None,
*args,
**kwargs):
"""
Create a new DataprocClusterCreateOperator.

For more info on the creation of a cluster through the API, have a look at:

https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters

:param cluster_name: The name of the cluster to create
:type cluster_name: string
:param project_id: The ID of the google cloud project in which
to create the cluster
:type project_id: string
:param num_workers: The # of workers to spin up
:type num_workers: int
:param storage_bucket: The storage bucket to use, setting to None lets dataproc
generate a custom one for you
:type storage_bucket: string
:param init_actions_uris: List of GCS uri's containing
dataproc initialization scripts
:type init_actions_uris: list[string]
:param metadata: dict of key-value google compute engine metadata entries
to add to all instances
:type metadata: dict
:param properties: dict of properties to set on
config files (e.g. spark-defaults.conf), see
https://cloud.google.com/dataproc/docs/reference/rest/v1/ \
projects.regions.clusters#SoftwareConfig
:type properties: dict
:param master_machine_type: Compute engine machine type to use for the master node
:type master_machine_type: string
:param master_disk_size: Disk size for the master node
:type int
:param worker_machine_type:Compute engine machine type to use for the worker nodes
:type worker_machine_type: string
:param worker_disk_size: Disk size for the worker nodes
:type worker_disk_size: int
:param num_preemptible_workers: The # of preemptible worker nodes to spin up
:type num_preemptible_workers: int
:param labels: dict of labels to add to the cluster
:type labels: dict
:param zone: The zone where the cluster will be located
:type zone: string
:param region: leave as 'global', might become relevant in the future
:param google_cloud_conn_id: The connection id to use when connecting to dataproc
:type google_cloud_conn_id: string
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must have domain-wide
delegation enabled.
:type delegate_to: string
"""
super(DataprocClusterCreateOperator, self).__init__(*args, **kwargs)
self.google_cloud_conn_id = google_cloud_conn_id
self.delegate_to = delegate_to
self.cluster_name = cluster_name
self.project_id = project_id
self.num_workers = num_workers
self.num_preemptible_workers = num_preemptible_workers
self.storage_bucket = storage_bucket
self.init_actions_uris = init_actions_uris
self.metadata = metadata
self.properties = properties
self.master_machine_type = master_machine_type
self.master_disk_size = master_disk_size
self.worker_machine_type = worker_machine_type
self.worker_disk_size = worker_disk_size
self.labels = labels
self.zone = zone
self.region = region

def _get_cluster_list_for_project(self, service):
result = service.projects().regions().clusters().list(
projectId=self.project_id,
region=self.region
).execute()
return result.get('clusters', [])

def _get_cluster(self, service):
cluster_list = self._get_cluster_list_for_project(service)
cluster = [c for c in cluster_list if c['clusterName'] == self.cluster_name]
if cluster:
return cluster[0]
return None

def _get_cluster_state(self, service):
cluster = self._get_cluster(service)
if 'status' in cluster:
return cluster['status']['state']
else:
return None

def _cluster_ready(self, state, service):
if state == 'RUNNING':
return True
if state == 'ERROR':
cluster = self._get_cluster(service)
try:
error_details = cluster['status']['details']
except KeyError:
error_details = 'Unknown error in cluster creation, ' \
'check Google Cloud console for details.'
raise Exception(error_details)
return False

def _wait_for_done(self, service):
while True:
state = self._get_cluster_state(service)
if state is None:
logging.info("No state for cluster '%s'", self.cluster_name)
time.sleep(15)
else:
logging.info("State for cluster '%s' is %s", self.cluster_name, state)
if self._cluster_ready(state, service):
logging.info("Cluster '%s' successfully created",
self.cluster_name)
return
time.sleep(15)

def execute(self, context):
hook = DataProcHook(
gcp_conn_id=self.google_cloud_conn_id,
delegate_to=self.delegate_to
)
service = hook.get_conn()

if self._get_cluster(service):
logging.info('Cluster {} already exists... Checking status...'.format(
self.cluster_name
))
self._wait_for_done(service)
return True

zone_uri = \
'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(
self.project_id, self.zone
)
master_type_uri = \
"https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}".format(
self.project_id, self.zone, self.master_machine_type
)
worker_type_uri = \
"https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}".format(
self.project_id, self.zone, self.worker_machine_type
)
cluster_data = {
'projectId': self.project_id,
'clusterName': self.cluster_name,
'config': {
'gceClusterConfig': {
'zoneUri': zone_uri
},
'masterConfig': {
'numInstances': 1,
'machineTypeUri': master_type_uri,
'diskConfig': {
'bootDiskSizeGb': self.master_disk_size
}
},
'workerConfig': {
'numInstances': self.num_workers,
'machineTypeUri': worker_type_uri,
'diskConfig': {
'bootDiskSizeGb': self.worker_disk_size
}
},
'secondaryWorkerConfig': {},
'softwareConfig': {}
}
}
if self.num_preemptible_workers > 0:
cluster_data['config']['secondaryWorkerConfig'] = {
'numInstances': self.num_preemptible_workers,
'machineTypeUri': worker_type_uri,
'diskConfig': {
'bootDiskSizeGb': self.worker_disk_size
},
'isPreemptible': True
}
if self.labels:
cluster_data['labels'] = self.labels
if self.storage_bucket:
cluster_data['config']['configBucket'] = self.storage_bucket
if self.metadata:
cluster_data['config']['gceClusterConfig']['metadata'] = self.metadata
if self.properties:
cluster_data['config']['softwareConfig']['properties'] = self.properties
if self.init_actions_uris:
init_actions_dict = [
{'executableFile': uri} for uri in self.init_actions_uris
]
cluster_data['config']['initializationActions'] = init_actions_dict

try:
service.projects().regions().clusters().create(
projectId=self.project_id,
region=self.region,
body=cluster_data
).execute()
except HttpError as e:
# probably two cluster start commands at the same time
time.sleep(10)
if self._get_cluster(service):
logging.info('Cluster {} already exists... Checking status...'.format(
self.cluster_name
))
self._wait_for_done(service)
return True
else:
raise e

self._wait_for_done(service)


class DataProcPigOperator(BaseOperator):
Expand Down