From 26c8dea4f81ceaa41d84cbb4fa9b1a2aa3005cba Mon Sep 17 00:00:00 2001 From: Dorian Kurzaj Date: Fri, 3 Oct 2025 12:09:46 +0200 Subject: [PATCH 1/4] feat(composer): pause/unpause DAGs in one operation --- composer/tools/composer_dags.py | 104 ++++++++++++++++++++++++++------ 1 file changed, 85 insertions(+), 19 deletions(-) diff --git a/composer/tools/composer_dags.py b/composer/tools/composer_dags.py index a5306fa52d5..dac32362cf6 100644 --- a/composer/tools/composer_dags.py +++ b/composer/tools/composer_dags.py @@ -111,6 +111,28 @@ def pause_dag( logger.info("Unable to pause DAG %s", dag_id) logger.info(command_output[1]) + @staticmethod + def pause_all_dags( + project_name: str, + environment: str, + location: str, + sdk_endpoint: str, + ) -> str: + """Pause all the DAGs in the given environment.""" + command = ( + f"CLOUDSDK_API_ENDPOINT_OVERRIDES_COMPOSER={sdk_endpoint} gcloud composer environments" + f" run {environment} --project={project_name} --location={location}" + f" dags pause -- \"^(?!airflow_monitoring$).*\" --treat-dag-id-as-regex -y" + ) + command_output = DAG._run_shell_command_locally_once(command=command) + if command_output[0] == 1: + logger.info(command_output[1]) + logger.info("Error pausing DAGs, Retrying...") + command_output = DAG._run_shell_command_locally_once(command=command) + if command_output[0] == 1: + logger.info("Unable to pause DAGs") + logger.info(command_output[1]) + @staticmethod def unpause_dag( project_name: str, @@ -136,6 +158,28 @@ def unpause_dag( logger.info("Unable to Unpause DAG %s", dag_id) logger.info(command_output[1]) + @staticmethod + def unpause_all_dags( + project_name: str, + environment: str, + location: str, + sdk_endpoint: str, + ) -> str: + """UnPause all the DAGs in the given environment.""" + command = ( + f"CLOUDSDK_API_ENDPOINT_OVERRIDES_COMPOSER={sdk_endpoint} gcloud composer environments" + f" run {environment} --project={project_name} --location={location}" + f" dags unpause -- \".*\" --treat-dag-id-as-regex -y" + ) + command_output = DAG._run_shell_command_locally_once(command=command) + if command_output[0] == 1: + logger.info(command_output[1]) + logger.info("Error Unpausing DAGs, Retrying...") + command_output = DAG._run_shell_command_locally_once(command=command) + if command_output[0] == 1: + logger.info("Unable to Unpause DAGs") + logger.info(command_output[1]) + @staticmethod def describe_environment( project_name: str, environment: str, location: str, sdk_endpoint: str @@ -151,25 +195,7 @@ def describe_environment( logger.info("Environment Info:\n %s", environment_json["name"]) return environment_json - -def main( - project_name: str, environment: str, location: str, operation: str, sdk_endpoint=str -) -> int: - logger.info("DAG Pause/UnPause Script for Cloud Composer") - environment_info = DAG.describe_environment( - project_name=project_name, - environment=environment, - location=location, - sdk_endpoint=sdk_endpoint, - ) - versions = DAG.COMPOSER_AF_VERSION_RE.match( - environment_info["config"]["softwareConfig"]["imageVersion"] - ).groups() - logger.info( - "Image version: %s", - environment_info["config"]["softwareConfig"]["imageVersion"], - ) - airflow_version = (int(versions[3]), int(versions[4]), int(versions[5])) +def legacy_operations(project_name: str, environment: str, location: str, sdk_endpoint: str, airflow_version: list[int], operation: str) -> None: list_of_dags = DAG.get_list_of_dags( project_name=project_name, environment=environment, @@ -201,6 +227,46 @@ def main( dag_id=dag, airflow_version=airflow_version, ) + +def modern_operations(project_name: str, environment: str, location: str, sdk_endpoint: str, operation: str) -> None: + if operation == "pause": + DAG.pause_all_dags( + project_name=project_name, + environment=environment, + location=location, + sdk_endpoint=sdk_endpoint, + ) + else: + DAG.unpause_all_dags( + project_name=project_name, + environment=environment, + location=location, + sdk_endpoint=sdk_endpoint, + ) + +def main( + project_name: str, environment: str, location: str, operation: str, sdk_endpoint=str +) -> int: + logger.info("DAG Pause/UnPause Script for Cloud Composer") + environment_info = DAG.describe_environment( + project_name=project_name, + environment=environment, + location=location, + sdk_endpoint=sdk_endpoint, + ) + versions = DAG.COMPOSER_AF_VERSION_RE.match( + environment_info["config"]["softwareConfig"]["imageVersion"] + ).groups() + logger.info( + "Image version: %s", + environment_info["config"]["softwareConfig"]["imageVersion"], + ) + airflow_version = (int(versions[3]), int(versions[4]), int(versions[5])) + if airflow_version < (2, 9, 0): + legacy_operations(project_name, environment, location, sdk_endpoint, airflow_version, operation) + else: + modern_operations(project_name, environment, location, sdk_endpoint, operation) + return 0 From 1c6c444c6278f003caf84ceb9f3557c018babaa1 Mon Sep 17 00:00:00 2001 From: Dorian Kurzaj Date: Fri, 3 Oct 2025 15:06:31 +0200 Subject: [PATCH 2/4] Fix return type --- composer/tools/composer_dags.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/composer/tools/composer_dags.py b/composer/tools/composer_dags.py index dac32362cf6..b618494d047 100644 --- a/composer/tools/composer_dags.py +++ b/composer/tools/composer_dags.py @@ -117,7 +117,7 @@ def pause_all_dags( environment: str, location: str, sdk_endpoint: str, - ) -> str: + ) -> None: """Pause all the DAGs in the given environment.""" command = ( f"CLOUDSDK_API_ENDPOINT_OVERRIDES_COMPOSER={sdk_endpoint} gcloud composer environments" @@ -164,7 +164,7 @@ def unpause_all_dags( environment: str, location: str, sdk_endpoint: str, - ) -> str: + ) -> None: """UnPause all the DAGs in the given environment.""" command = ( f"CLOUDSDK_API_ENDPOINT_OVERRIDES_COMPOSER={sdk_endpoint} gcloud composer environments" From 001a873d6492be42a9dd6b9a0f615ceb85daf34a Mon Sep 17 00:00:00 2001 From: Dorian Kurzaj Date: Fri, 3 Oct 2025 15:11:26 +0200 Subject: [PATCH 3/4] Remove useless retry --- composer/tools/composer_dags.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/composer/tools/composer_dags.py b/composer/tools/composer_dags.py index b618494d047..f9ed0d1fdb3 100644 --- a/composer/tools/composer_dags.py +++ b/composer/tools/composer_dags.py @@ -125,12 +125,6 @@ def pause_all_dags( f" dags pause -- \"^(?!airflow_monitoring$).*\" --treat-dag-id-as-regex -y" ) command_output = DAG._run_shell_command_locally_once(command=command) - if command_output[0] == 1: - logger.info(command_output[1]) - logger.info("Error pausing DAGs, Retrying...") - command_output = DAG._run_shell_command_locally_once(command=command) - if command_output[0] == 1: - logger.info("Unable to pause DAGs") logger.info(command_output[1]) @staticmethod @@ -172,12 +166,6 @@ def unpause_all_dags( f" dags unpause -- \".*\" --treat-dag-id-as-regex -y" ) command_output = DAG._run_shell_command_locally_once(command=command) - if command_output[0] == 1: - logger.info(command_output[1]) - logger.info("Error Unpausing DAGs, Retrying...") - command_output = DAG._run_shell_command_locally_once(command=command) - if command_output[0] == 1: - logger.info("Unable to Unpause DAGs") logger.info(command_output[1]) @staticmethod From cd4a38ab83e9d3f71b4841c9cd01472d860c1c3c Mon Sep 17 00:00:00 2001 From: Dorian Kurzaj Date: Fri, 3 Oct 2025 15:13:53 +0200 Subject: [PATCH 4/4] Linting --- composer/tools/composer_dags.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/composer/tools/composer_dags.py b/composer/tools/composer_dags.py index f9ed0d1fdb3..ed08b0e429b 100644 --- a/composer/tools/composer_dags.py +++ b/composer/tools/composer_dags.py @@ -183,7 +183,7 @@ def describe_environment( logger.info("Environment Info:\n %s", environment_json["name"]) return environment_json -def legacy_operations(project_name: str, environment: str, location: str, sdk_endpoint: str, airflow_version: list[int], operation: str) -> None: +def legacy_operations(project_name: str, environment: str, location: str, sdk_endpoint: str, airflow_version: tuple[int, int, int], operation: str) -> None: list_of_dags = DAG.get_list_of_dags( project_name=project_name, environment=environment, @@ -233,7 +233,7 @@ def modern_operations(project_name: str, environment: str, location: str, sdk_en ) def main( - project_name: str, environment: str, location: str, operation: str, sdk_endpoint=str + project_name: str, environment: str, location: str, operation: str, sdk_endpoint: str ) -> int: logger.info("DAG Pause/UnPause Script for Cloud Composer") environment_info = DAG.describe_environment( @@ -254,7 +254,6 @@ def main( legacy_operations(project_name, environment, location, sdk_endpoint, airflow_version, operation) else: modern_operations(project_name, environment, location, sdk_endpoint, operation) - return 0