Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,13 @@ class ClusterGenerator:
:param auto_delete_ttl: The life duration of cluster, the cluster will be
auto-deleted at the end of this duration.
A duration in seconds. (If auto_delete_time is set this parameter will be ignored)
:param idle_stop_ttl: The longest duration that cluster would keep alive while
staying idle. Passing this threshold will cause cluster to be auto-stopped.
A duration in seconds.
:param auto_stop_time: The time when cluster will be auto-stopped.
:param auto_stop_ttl: The life duration of cluster, the cluster will be
auto-stopped at the end of this duration.
A duration in seconds. (If auto_stop_time is set this parameter will be ignored)
:param customer_managed_key: The customer-managed key used for disk encryption
``projects/[PROJECT_STORING_KEYS]/locations/[LOCATION]/keyRings/[KEY_RING_NAME]/cryptoKeys/[KEY_NAME]`` # noqa
:param enable_component_gateway: Provides access to the web interfaces of default and selected optional
Expand All @@ -210,6 +217,8 @@ class ClusterGenerator:
identify the driver group in future operations, such as resizing the node group.
:param secondary_worker_instance_flexibility_policy: Instance flexibility Policy allowing a mixture of VM
shapes and provisioning models.
:param master_instance_flexibility_policy: Instance flexibility Policy for master nodes.
:param worker_instance_flexibility_policy: Instance flexibility Policy for worker nodes.
:param secondary_worker_accelerator_type: Type of the accelerator card (GPU) to attach to the secondary workers,
see https://cloud.google.com/dataproc/docs/reference/rest/v1/InstanceGroupConfig#acceleratorconfig
:param secondary_worker_accelerator_count: Number of accelerator cards (GPUs) to attach to the secondary workers
Expand Down Expand Up @@ -257,11 +266,16 @@ def __init__(
idle_delete_ttl: int | None = None,
auto_delete_time: datetime | None = None,
auto_delete_ttl: int | None = None,
idle_stop_ttl: int | None = None,
auto_stop_time: datetime | None = None,
auto_stop_ttl: int | None = None,
customer_managed_key: str | None = None,
enable_component_gateway: bool | None = False,
driver_pool_size: int = 0,
driver_pool_id: str | None = None,
secondary_worker_instance_flexibility_policy: InstanceFlexibilityPolicy | None = None,
master_instance_flexibility_policy: InstanceFlexibilityPolicy | None = None,
worker_instance_flexibility_policy: InstanceFlexibilityPolicy | None = None,
secondary_worker_accelerator_type: str | None = None,
secondary_worker_accelerator_count: int | None = None,
*,
Expand Down Expand Up @@ -307,12 +321,17 @@ def __init__(
self.idle_delete_ttl = idle_delete_ttl
self.auto_delete_time = auto_delete_time
self.auto_delete_ttl = auto_delete_ttl
self.idle_stop_ttl = idle_stop_ttl
self.auto_stop_time = auto_stop_time
self.auto_stop_ttl = auto_stop_ttl
self.customer_managed_key = customer_managed_key
self.enable_component_gateway = enable_component_gateway
self.single_node = num_workers == 0
self.driver_pool_size = driver_pool_size
self.driver_pool_id = driver_pool_id
self.secondary_worker_instance_flexibility_policy = secondary_worker_instance_flexibility_policy
self.master_instance_flexibility_policy = master_instance_flexibility_policy
self.worker_instance_flexibility_policy = worker_instance_flexibility_policy
self.secondary_worker_accelerator_type = secondary_worker_accelerator_type
self.secondary_worker_accelerator_count = secondary_worker_accelerator_count
self.cluster_tier = cluster_tier
Expand Down Expand Up @@ -404,6 +423,17 @@ def _build_lifecycle_config(self, cluster_data):
elif self.auto_delete_ttl:
cluster_data[lifecycle_config]["auto_delete_ttl"] = {"seconds": int(self.auto_delete_ttl)}

if self.idle_stop_ttl:
cluster_data[lifecycle_config]["idle_stop_ttl"] = {"seconds": self.idle_stop_ttl}

if self.auto_stop_time:
utc_auto_stop_time = timezone.convert_to_utc(self.auto_stop_time)
cluster_data[lifecycle_config]["auto_stop_time"] = utc_auto_stop_time.strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
)
elif self.auto_stop_ttl:
cluster_data[lifecycle_config]["auto_stop_ttl"] = {"seconds": int(self.auto_stop_ttl)}

return cluster_data

def _build_driver_pool(self):
Expand Down Expand Up @@ -454,6 +484,19 @@ def _build_cluster_data(self):
"autoscaling_config": {},
"endpoint_config": {},
}
if self.master_instance_flexibility_policy:
cluster_data["master_config"]["instance_flexibility_policy"] = {
"instance_selection_list": [
vars(s) for s in self.master_instance_flexibility_policy.instance_selection_list
]
}

if self.worker_instance_flexibility_policy:
cluster_data["worker_config"]["instance_flexibility_policy"] = {
"instance_selection_list": [
vars(s) for s in self.worker_instance_flexibility_policy.instance_selection_list
]
}

if self.min_num_workers:
cluster_data["worker_config"]["min_num_instances"] = self.min_num_workers
Expand Down
210 changes: 210 additions & 0 deletions providers/google/tests/unit/google/cloud/operators/test_dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,120 @@
"endpoint_config": {},
}

CONFIG_WITH_MASTER_AND_WORKER_FLEX_MIG = {
"gce_cluster_config": {
"zone_uri": "https://www.googleapis.com/compute/v1/projects/project_id/zones/zone",
"metadata": {"metadata": "data"},
"network_uri": "network_uri",
"subnetwork_uri": "subnetwork_uri",
"internal_ip_only": True,
"tags": ["tags"],
"service_account": "service_account",
"service_account_scopes": ["service_account_scopes"],
},
"master_config": {
"num_instances": 2,
"machine_type_uri": "projects/project_id/zones/zone/machineTypes/master_machine_type",
"disk_config": {"boot_disk_type": "master_disk_type", "boot_disk_size_gb": 128},
"image_uri": "https://www.googleapis.com/compute/beta/projects/"
"custom_image_project_id/global/images/custom_image",
"instance_flexibility_policy": {
"instance_selection_list": [
{
"machine_types": [
"projects/project_id/zones/zone/machineTypes/machine1",
"projects/project_id/zones/zone/machineTypes/machine2",
],
"rank": 0,
},
{"machine_types": ["projects/project_id/zones/zone/machineTypes/machine3"], "rank": 1},
],
},
},
"worker_config": {
"num_instances": 3,
"machine_type_uri": "projects/project_id/zones/zone/machineTypes/worker_machine_type",
"disk_config": {"boot_disk_type": "worker_disk_type", "boot_disk_size_gb": 256},
"image_uri": "https://www.googleapis.com/compute/beta/projects/"
"custom_image_project_id/global/images/custom_image",
"instance_flexibility_policy": {
"instance_selection_list": [
{
"machine_types": [
"projects/project_id/zones/zone/machineTypes/machine1",
"projects/project_id/zones/zone/machineTypes/machine2",
],
"rank": 0,
},
{"machine_types": ["projects/project_id/zones/zone/machineTypes/machine3"], "rank": 1},
],
},
},
"secondary_worker_config": {
"num_instances": 4,
"machine_type_uri": "projects/project_id/zones/zone/machineTypes/worker_machine_type",
"disk_config": {"boot_disk_type": "worker_disk_type", "boot_disk_size_gb": 256},
"is_preemptible": True,
"preemptibility": "SPOT",
},
"software_config": {"properties": {"properties": "data"}, "optional_components": ["optional_components"]},
"lifecycle_config": {},
"encryption_config": {"gce_pd_kms_key_name": "customer_managed_key"},
"autoscaling_config": {"policy_uri": "autoscaling_policy"},
"config_bucket": "storage_bucket",
"initialization_actions": [
{"executable_file": "init_actions_uris", "execution_timeout": {"seconds": 600}}
],
"endpoint_config": {},
}


CONFIG_WITH_STOP_TTL = {
"gce_cluster_config": {
"zone_uri": "https://www.googleapis.com/compute/v1/projects/project_id/zones/zone",
"metadata": {"metadata": "data"},
"network_uri": "network_uri",
"subnetwork_uri": "subnetwork_uri",
"internal_ip_only": True,
"tags": ["tags"],
"service_account": "service_account",
"service_account_scopes": ["service_account_scopes"],
},
"master_config": {
"num_instances": 2,
"machine_type_uri": "projects/project_id/zones/zone/machineTypes/master_machine_type",
"disk_config": {"boot_disk_type": "master_disk_type", "boot_disk_size_gb": 128},
"image_uri": "https://www.googleapis.com/compute/beta/projects/"
"custom_image_project_id/global/images/custom_image",
},
"worker_config": {
"num_instances": 2,
"machine_type_uri": "projects/project_id/zones/zone/machineTypes/worker_machine_type",
"disk_config": {"boot_disk_type": "worker_disk_type", "boot_disk_size_gb": 256},
"image_uri": "https://www.googleapis.com/compute/beta/projects/"
"custom_image_project_id/global/images/custom_image",
},
"secondary_worker_config": {
"num_instances": 4,
"machine_type_uri": "projects/project_id/zones/zone/machineTypes/worker_machine_type",
"disk_config": {"boot_disk_type": "worker_disk_type", "boot_disk_size_gb": 256},
"is_preemptible": True,
"preemptibility": "SPOT",
},
"software_config": {"properties": {"properties": "data"}, "optional_components": ["optional_components"]},
"lifecycle_config": {
"idle_stop_ttl": {"seconds": 300},
"auto_stop_time": "2019-09-12T00:00:00.000000Z",
},
"encryption_config": {"gce_pd_kms_key_name": "customer_managed_key"},
"autoscaling_config": {"policy_uri": "autoscaling_policy"},
"config_bucket": "storage_bucket",
"initialization_actions": [
{"executable_file": "init_actions_uris", "execution_timeout": {"seconds": 600}}
],
"endpoint_config": {},
}

LABELS = {"labels": "data", "airflow-version": AIRFLOW_VERSION_LABEL}

LABELS.update({"airflow-version": AIRFLOW_VERSION_LABEL})
Expand Down Expand Up @@ -698,6 +812,102 @@ def test_build_with_flex_migs(self):
cluster = generator.make()
assert cluster == CONFIG_WITH_FLEX_MIG

def test_build_with_master_and_worker_flex_migs(self):
generator = ClusterGenerator(
project_id="project_id",
num_workers=3,
zone="zone",
network_uri="network_uri",
subnetwork_uri="subnetwork_uri",
internal_ip_only=True,
tags=["tags"],
storage_bucket="storage_bucket",
init_actions_uris=["init_actions_uris"],
init_action_timeout="10m",
metadata={"metadata": "data"},
custom_image="custom_image",
custom_image_project_id="custom_image_project_id",
autoscaling_policy="autoscaling_policy",
properties={"properties": "data"},
optional_components=["optional_components"],
num_masters=2,
master_machine_type="master_machine_type",
master_disk_type="master_disk_type",
master_disk_size=128,
worker_machine_type="worker_machine_type",
worker_disk_type="worker_disk_type",
worker_disk_size=256,
num_preemptible_workers=4,
preemptibility="Spot",
region="region",
service_account="service_account",
service_account_scopes=["service_account_scopes"],
customer_managed_key="customer_managed_key",
master_instance_flexibility_policy=InstanceFlexibilityPolicy(
[
InstanceSelection(
[
"projects/project_id/zones/zone/machineTypes/machine1",
"projects/project_id/zones/zone/machineTypes/machine2",
],
0,
),
InstanceSelection(["projects/project_id/zones/zone/machineTypes/machine3"], 1),
]
),
worker_instance_flexibility_policy=InstanceFlexibilityPolicy(
[
InstanceSelection(
[
"projects/project_id/zones/zone/machineTypes/machine1",
"projects/project_id/zones/zone/machineTypes/machine2",
],
0,
),
InstanceSelection(["projects/project_id/zones/zone/machineTypes/machine3"], 1),
]
),
)
cluster = generator.make()
assert cluster == CONFIG_WITH_MASTER_AND_WORKER_FLEX_MIG

def test_build_with_stop_ttl(self):
generator = ClusterGenerator(
project_id="project_id",
num_workers=2,
zone="zone",
network_uri="network_uri",
subnetwork_uri="subnetwork_uri",
internal_ip_only=True,
tags=["tags"],
storage_bucket="storage_bucket",
init_actions_uris=["init_actions_uris"],
init_action_timeout="10m",
metadata={"metadata": "data"},
custom_image="custom_image",
custom_image_project_id="custom_image_project_id",
autoscaling_policy="autoscaling_policy",
properties={"properties": "data"},
optional_components=["optional_components"],
num_masters=2,
master_machine_type="master_machine_type",
master_disk_type="master_disk_type",
master_disk_size=128,
worker_machine_type="worker_machine_type",
worker_disk_type="worker_disk_type",
worker_disk_size=256,
num_preemptible_workers=4,
preemptibility="Spot",
region="region",
service_account="service_account",
service_account_scopes=["service_account_scopes"],
idle_stop_ttl=300,
auto_stop_time=timezone.datetime(2019, 9, 12),
customer_managed_key="customer_managed_key",
)
cluster = generator.make()
assert cluster == CONFIG_WITH_STOP_TTL

def test_build_with_gpu_accelerator(self):
generator = ClusterGenerator(
project_id="project_id",
Expand Down