From 02b06d2a7b413f6495ff954d07e4ec03115edb1c Mon Sep 17 00:00:00 2001 From: sravani-bobbala Date: Wed, 22 Apr 2026 05:34:10 +0000 Subject: [PATCH 1/3] feat: add idle/auto stop TTLs and master/worker instance flexibility policies to Dataproc cluster configuration --- .../google/cloud/operators/dataproc.py | 36 +++ .../google/cloud/operators/test_dataproc.py | 211 ++++++++++++++++++ 2 files changed, 247 insertions(+) diff --git a/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py b/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py index 2348c61f285c9..5c63d69030184 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py @@ -256,11 +256,16 @@ def __init__( idle_delete_ttl: int | None = None, auto_delete_time: datetime | None = None, auto_delete_ttl: int | None = None, + idle_stop_ttl: int | None = None, + auto_stop_time: datetime | None = None, + auto_stop_ttl: int | None = None, customer_managed_key: str | None = None, enable_component_gateway: bool | None = False, driver_pool_size: int = 0, driver_pool_id: str | None = None, secondary_worker_instance_flexibility_policy: InstanceFlexibilityPolicy | None = None, + master_instance_flexibility_policy: InstanceFlexibilityPolicy | None = None, + worker_instance_flexibility_policy: InstanceFlexibilityPolicy | None = None, secondary_worker_accelerator_type: str | None = None, secondary_worker_accelerator_count: int | None = None, *, @@ -305,12 +310,17 @@ def __init__( self.idle_delete_ttl = idle_delete_ttl self.auto_delete_time = auto_delete_time self.auto_delete_ttl = auto_delete_ttl + self.idle_stop_ttl = idle_stop_ttl + self.auto_stop_time = auto_stop_time + self.auto_stop_ttl = auto_stop_ttl self.customer_managed_key = customer_managed_key self.enable_component_gateway = enable_component_gateway self.single_node = num_workers == 0 self.driver_pool_size = driver_pool_size self.driver_pool_id = driver_pool_id self.secondary_worker_instance_flexibility_policy = secondary_worker_instance_flexibility_policy + self.master_instance_flexibility_policy = master_instance_flexibility_policy + self.worker_instance_flexibility_policy = worker_instance_flexibility_policy self.secondary_worker_accelerator_type = secondary_worker_accelerator_type self.secondary_worker_accelerator_count = secondary_worker_accelerator_count self.cluster_tier = cluster_tier @@ -401,6 +411,17 @@ def _build_lifecycle_config(self, cluster_data): elif self.auto_delete_ttl: cluster_data[lifecycle_config]["auto_delete_ttl"] = {"seconds": int(self.auto_delete_ttl)} + if self.idle_stop_ttl: + cluster_data[lifecycle_config]["idle_stop_ttl"] = {"seconds": self.idle_stop_ttl} + + if self.auto_stop_time: + utc_auto_stop_time = timezone.convert_to_utc(self.auto_stop_time) + cluster_data[lifecycle_config]["auto_stop_time"] = utc_auto_stop_time.strftime( + "%Y-%m-%dT%H:%M:%S.%fZ" + ) + elif self.auto_stop_ttl: + cluster_data[lifecycle_config]["auto_stop_ttl"] = {"seconds": int(self.auto_stop_ttl)} + return cluster_data def _build_driver_pool(self): @@ -451,6 +472,21 @@ def _build_cluster_data(self): "autoscaling_config": {}, "endpoint_config": {}, } + if self.master_instance_flexibility_policy: + cluster_data["master_config"]["instance_flexibility_policy"] = { + "instance_selection_list": [ + vars(s) + for s in self.master_instance_flexibility_policy.instance_selection_list + ] + } + + if self.worker_instance_flexibility_policy: + cluster_data["worker_config"]["instance_flexibility_policy"] = { + "instance_selection_list": [ + vars(s) + for s in self.worker_instance_flexibility_policy.instance_selection_list + ] + } if self.min_num_workers: cluster_data["worker_config"]["min_num_instances"] = self.min_num_workers diff --git a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py index 20e622ac3560c..7780a0ace2683 100644 --- a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py +++ b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py @@ -323,6 +323,120 @@ "endpoint_config": {}, } +CONFIG_WITH_MASTER_AND_WORKER_FLEX_MIG = { + "gce_cluster_config": { + "zone_uri": "https://www.googleapis.com/compute/v1/projects/project_id/zones/zone", + "metadata": {"metadata": "data"}, + "network_uri": "network_uri", + "subnetwork_uri": "subnetwork_uri", + "internal_ip_only": True, + "tags": ["tags"], + "service_account": "service_account", + "service_account_scopes": ["service_account_scopes"], + }, + "master_config": { + "num_instances": 2, + "machine_type_uri": "projects/project_id/zones/zone/machineTypes/master_machine_type", + "disk_config": {"boot_disk_type": "master_disk_type", "boot_disk_size_gb": 128}, + "image_uri": "https://www.googleapis.com/compute/beta/projects/" + "custom_image_project_id/global/images/custom_image", + "instance_flexibility_policy": { + "instance_selection_list": [ + { + "machine_types": [ + "projects/project_id/zones/zone/machineTypes/machine1", + "projects/project_id/zones/zone/machineTypes/machine2", + ], + "rank": 0, + }, + {"machine_types": ["projects/project_id/zones/zone/machineTypes/machine3"], "rank": 1}, + ], + }, + }, + "worker_config": { + "num_instances": 3, + "machine_type_uri": "projects/project_id/zones/zone/machineTypes/worker_machine_type", + "disk_config": {"boot_disk_type": "worker_disk_type", "boot_disk_size_gb": 256}, + "image_uri": "https://www.googleapis.com/compute/beta/projects/" + "custom_image_project_id/global/images/custom_image", + "instance_flexibility_policy": { + "instance_selection_list": [ + { + "machine_types": [ + "projects/project_id/zones/zone/machineTypes/machine1", + "projects/project_id/zones/zone/machineTypes/machine2", + ], + "rank": 0, + }, + {"machine_types": ["projects/project_id/zones/zone/machineTypes/machine3"], "rank": 1}, + ], + }, + }, + "secondary_worker_config": { + "num_instances": 4, + "machine_type_uri": "projects/project_id/zones/zone/machineTypes/worker_machine_type", + "disk_config": {"boot_disk_type": "worker_disk_type", "boot_disk_size_gb": 256}, + "is_preemptible": True, + "preemptibility": "SPOT", + }, + "software_config": {"properties": {"properties": "data"}, "optional_components": ["optional_components"]}, + "lifecycle_config": {}, + "encryption_config": {"gce_pd_kms_key_name": "customer_managed_key"}, + "autoscaling_config": {"policy_uri": "autoscaling_policy"}, + "config_bucket": "storage_bucket", + "initialization_actions": [ + {"executable_file": "init_actions_uris", "execution_timeout": {"seconds": 600}} + ], + "endpoint_config": {}, +} + + +CONFIG_WITH_STOP_TTL = { + "gce_cluster_config": { + "zone_uri": "https://www.googleapis.com/compute/v1/projects/project_id/zones/zone", + "metadata": {"metadata": "data"}, + "network_uri": "network_uri", + "subnetwork_uri": "subnetwork_uri", + "internal_ip_only": True, + "tags": ["tags"], + "service_account": "service_account", + "service_account_scopes": ["service_account_scopes"], + }, + "master_config": { + "num_instances": 2, + "machine_type_uri": "projects/project_id/zones/zone/machineTypes/master_machine_type", + "disk_config": {"boot_disk_type": "master_disk_type", "boot_disk_size_gb": 128}, + "image_uri": "https://www.googleapis.com/compute/beta/projects/" + "custom_image_project_id/global/images/custom_image", + }, + "worker_config": { + "num_instances": 2, + "machine_type_uri": "projects/project_id/zones/zone/machineTypes/worker_machine_type", + "disk_config": {"boot_disk_type": "worker_disk_type", "boot_disk_size_gb": 256}, + "image_uri": "https://www.googleapis.com/compute/beta/projects/" + "custom_image_project_id/global/images/custom_image", + }, + "secondary_worker_config": { + "num_instances": 4, + "machine_type_uri": "projects/project_id/zones/zone/machineTypes/worker_machine_type", + "disk_config": {"boot_disk_type": "worker_disk_type", "boot_disk_size_gb": 256}, + "is_preemptible": True, + "preemptibility": "SPOT", + }, + "software_config": {"properties": {"properties": "data"}, "optional_components": ["optional_components"]}, + "lifecycle_config": { + "idle_stop_ttl": {"seconds": 300}, + "auto_stop_time": "2019-09-12T00:00:00.000000Z", + }, + "encryption_config": {"gce_pd_kms_key_name": "customer_managed_key"}, + "autoscaling_config": {"policy_uri": "autoscaling_policy"}, + "config_bucket": "storage_bucket", + "initialization_actions": [ + {"executable_file": "init_actions_uris", "execution_timeout": {"seconds": 600}} + ], + "endpoint_config": {}, +} + LABELS = {"labels": "data", "airflow-version": AIRFLOW_VERSION_LABEL} LABELS.update({"airflow-version": AIRFLOW_VERSION_LABEL}) @@ -696,6 +810,103 @@ def test_build_with_flex_migs(self): cluster = generator.make() assert cluster == CONFIG_WITH_FLEX_MIG + def test_build_with_master_and_worker_flex_migs(self): + generator = ClusterGenerator( + project_id="project_id", + num_workers=3, + zone="zone", + network_uri="network_uri", + subnetwork_uri="subnetwork_uri", + internal_ip_only=True, + tags=["tags"], + storage_bucket="storage_bucket", + init_actions_uris=["init_actions_uris"], + init_action_timeout="10m", + metadata={"metadata": "data"}, + custom_image="custom_image", + custom_image_project_id="custom_image_project_id", + autoscaling_policy="autoscaling_policy", + properties={"properties": "data"}, + optional_components=["optional_components"], + num_masters=2, + master_machine_type="master_machine_type", + master_disk_type="master_disk_type", + master_disk_size=128, + worker_machine_type="worker_machine_type", + worker_disk_type="worker_disk_type", + worker_disk_size=256, + num_preemptible_workers=4, + preemptibility="Spot", + region="region", + service_account="service_account", + service_account_scopes=["service_account_scopes"], + customer_managed_key="customer_managed_key", + master_instance_flexibility_policy=InstanceFlexibilityPolicy( + [ + InstanceSelection( + [ + "projects/project_id/zones/zone/machineTypes/machine1", + "projects/project_id/zones/zone/machineTypes/machine2", + ], + 0, + ), + InstanceSelection(["projects/project_id/zones/zone/machineTypes/machine3"], 1), + ] + ), + worker_instance_flexibility_policy=InstanceFlexibilityPolicy( + [ + InstanceSelection( + [ + "projects/project_id/zones/zone/machineTypes/machine1", + "projects/project_id/zones/zone/machineTypes/machine2", + ], + 0, + ), + InstanceSelection(["projects/project_id/zones/zone/machineTypes/machine3"], 1), + ] + ), + ) + cluster = generator.make() + assert cluster == CONFIG_WITH_MASTER_AND_WORKER_FLEX_MIG + + def test_build_with_stop_ttl(self): + generator = ClusterGenerator( + project_id="project_id", + num_workers=2, + zone="zone", + network_uri="network_uri", + subnetwork_uri="subnetwork_uri", + internal_ip_only=True, + tags=["tags"], + storage_bucket="storage_bucket", + init_actions_uris=["init_actions_uris"], + init_action_timeout="10m", + metadata={"metadata": "data"}, + custom_image="custom_image", + custom_image_project_id="custom_image_project_id", + autoscaling_policy="autoscaling_policy", + properties={"properties": "data"}, + optional_components=["optional_components"], + num_masters=2, + master_machine_type="master_machine_type", + master_disk_type="master_disk_type", + master_disk_size=128, + worker_machine_type="worker_machine_type", + worker_disk_type="worker_disk_type", + worker_disk_size=256, + num_preemptible_workers=4, + preemptibility="Spot", + region="region", + service_account="service_account", + service_account_scopes=["service_account_scopes"], + idle_stop_ttl=300, + auto_stop_time=timezone.datetime(2019, 9, 12), + customer_managed_key="customer_managed_key", + ) + cluster = generator.make() + assert cluster == CONFIG_WITH_STOP_TTL + + def test_build_with_gpu_accelerator(self): generator = ClusterGenerator( project_id="project_id", From 239a6d68aef7718702288e796ddc591ce755cb15 Mon Sep 17 00:00:00 2001 From: Sravani Bobbala Date: Mon, 11 May 2026 09:03:04 +0000 Subject: [PATCH 2/3] fixing static test failures --- .../google/cloud/operators/dataproc.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py b/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py index 21166807541b4..860ddafed8595 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py @@ -201,6 +201,13 @@ class ClusterGenerator: :param auto_delete_ttl: The life duration of cluster, the cluster will be auto-deleted at the end of this duration. A duration in seconds. (If auto_delete_time is set this parameter will be ignored) + :param idle_stop_ttl: The longest duration that cluster would keep alive while + staying idle. Passing this threshold will cause cluster to be auto-stopped. + A duration in seconds. + :param auto_stop_time: The time when cluster will be auto-stopped. + :param auto_stop_ttl: The life duration of cluster, the cluster will be + auto-stopped at the end of this duration. + A duration in seconds. (If auto_stop_time is set this parameter will be ignored) :param customer_managed_key: The customer-managed key used for disk encryption ``projects/[PROJECT_STORING_KEYS]/locations/[LOCATION]/keyRings/[KEY_RING_NAME]/cryptoKeys/[KEY_NAME]`` # noqa :param enable_component_gateway: Provides access to the web interfaces of default and selected optional @@ -210,6 +217,8 @@ class ClusterGenerator: identify the driver group in future operations, such as resizing the node group. :param secondary_worker_instance_flexibility_policy: Instance flexibility Policy allowing a mixture of VM shapes and provisioning models. + :param master_instance_flexibility_policy: Instance flexibility Policy for master nodes. + :param worker_instance_flexibility_policy: Instance flexibility Policy for worker nodes. :param secondary_worker_accelerator_type: Type of the accelerator card (GPU) to attach to the secondary workers, see https://cloud.google.com/dataproc/docs/reference/rest/v1/InstanceGroupConfig#acceleratorconfig :param secondary_worker_accelerator_count: Number of accelerator cards (GPUs) to attach to the secondary workers @@ -478,16 +487,14 @@ def _build_cluster_data(self): if self.master_instance_flexibility_policy: cluster_data["master_config"]["instance_flexibility_policy"] = { "instance_selection_list": [ - vars(s) - for s in self.master_instance_flexibility_policy.instance_selection_list + vars(s) for s in self.master_instance_flexibility_policy.instance_selection_list ] } if self.worker_instance_flexibility_policy: cluster_data["worker_config"]["instance_flexibility_policy"] = { "instance_selection_list": [ - vars(s) - for s in self.worker_instance_flexibility_policy.instance_selection_list + vars(s) for s in self.worker_instance_flexibility_policy.instance_selection_list ] } @@ -3016,4 +3023,4 @@ def execute(self, context: Context): self.log.info("Canceling operation: %s", self.operation_name) hook.get_operations_client(region=self.region).cancel_operation(name=self.operation_name) - self.log.info("Operation canceled.") + self.log.info("Operation canceled.") \ No newline at end of file From 83ea49ad35abc15751ba206c367b01cb55d3e0d1 Mon Sep 17 00:00:00 2001 From: Sravani Bobbala Date: Thu, 21 May 2026 08:19:48 +0000 Subject: [PATCH 3/3] fixing static test failures --- .../src/airflow/providers/google/cloud/operators/dataproc.py | 2 +- .../google/tests/unit/google/cloud/operators/test_dataproc.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py b/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py index 860ddafed8595..e41e9b8550c5d 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py @@ -3023,4 +3023,4 @@ def execute(self, context: Context): self.log.info("Canceling operation: %s", self.operation_name) hook.get_operations_client(region=self.region).cancel_operation(name=self.operation_name) - self.log.info("Operation canceled.") \ No newline at end of file + self.log.info("Operation canceled.") diff --git a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py index 35cb390a6e66d..24284abfc8d43 100644 --- a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py +++ b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py @@ -908,7 +908,6 @@ def test_build_with_stop_ttl(self): cluster = generator.make() assert cluster == CONFIG_WITH_STOP_TTL - def test_build_with_gpu_accelerator(self): generator = ClusterGenerator( project_id="project_id",