From c4e02923a9139b5a4a46b76b8e63d49514eb7784 Mon Sep 17 00:00:00 2001 From: Vijaykanth Melugiri Date: Fri, 20 Dec 2024 00:37:47 +0000 Subject: [PATCH 01/12] feat(securitycenter): Add Resource SCC Mgt API Org SHA Cust Modules (GetEff, ListEff, ListDesc, Simulate) --- .../snippets_management_api/noxfile_config.py | 41 ++++++ .../requirements-test.txt | 4 + .../snippets_management_api/requirements.txt | 4 + ...ecurity_health_analytics_custom_modules.py | 51 +++++++ ...ty_health_analytics_custom_modules_test.py | 139 ++++++++++++++++++ 5 files changed, 239 insertions(+) create mode 100644 securitycenter/snippets_management_api/noxfile_config.py create mode 100644 securitycenter/snippets_management_api/requirements-test.txt create mode 100644 securitycenter/snippets_management_api/requirements.txt create mode 100644 securitycenter/snippets_management_api/security_health_analytics_custom_modules.py create mode 100644 securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py diff --git a/securitycenter/snippets_management_api/noxfile_config.py b/securitycenter/snippets_management_api/noxfile_config.py new file mode 100644 index 00000000000..4efdab3e923 --- /dev/null +++ b/securitycenter/snippets_management_api/noxfile_config.py @@ -0,0 +1,41 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Default TEST_CONFIG_OVERRIDE for python repos. + +# You can copy this file into your directory, then it will be inported from +# the noxfile.py. + +# The source of truth: +# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/noxfile_config.py + +TEST_CONFIG_OVERRIDE = { + # You can opt out from the test for specific Python versions. + "ignored_versions": ["2.7", "3.7", "3.9", "3.10", "3.11"], + # An envvar key for determining the project id to use. Change it + # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a + # build specific Cloud project. You can also use your own string + # to use your own Cloud project. + "gcloud_project_env": "GOOGLE_CLOUD_PROJECT", + # "gcloud_project_env": "BUILD_SPECIFIC_GCLOUD_PROJECT", + # A dictionary you want to inject into your test. Don't put any + # secrets here. These values will override predefined values. + "envs": { + "GCLOUD_ORGANIZATION": "1081635000895", + "GCLOUD_PROJECT": "project-a-id", + "GCLOUD_PUBSUB_TOPIC": "projects/project-a-id/topics/notifications-sample-topic", + "GCLOUD_PUBSUB_SUBSCRIPTION": "projects/project-a-id/subscriptions/notification-sample-subscription", + "GCLOUD_LOCATION": "global", + }, +} diff --git a/securitycenter/snippets_management_api/requirements-test.txt b/securitycenter/snippets_management_api/requirements-test.txt new file mode 100644 index 00000000000..987e7bb0d89 --- /dev/null +++ b/securitycenter/snippets_management_api/requirements-test.txt @@ -0,0 +1,4 @@ +backoff==2.2.1 +pytest==8.2.0 +google-cloud-bigquery==3.27.0 +google-cloud-securitycentermanagement==0.1.17 diff --git a/securitycenter/snippets_management_api/requirements.txt b/securitycenter/snippets_management_api/requirements.txt new file mode 100644 index 00000000000..9d938d3231e --- /dev/null +++ b/securitycenter/snippets_management_api/requirements.txt @@ -0,0 +1,4 @@ +google-cloud-securitycentermanagement==0.1.17 +google-cloud-bigquery==3.27.0 +google-cloud-pubsub==2.21.5 +pytest diff --git a/securitycenter/snippets_management_api/security_health_analytics_custom_modules.py b/securitycenter/snippets_management_api/security_health_analytics_custom_modules.py new file mode 100644 index 00000000000..40ee0afedb1 --- /dev/null +++ b/securitycenter/snippets_management_api/security_health_analytics_custom_modules.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python +# +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import random +import time +from typing import Dict + +# [START securitycenter_get_effective_security_health_analytics_custom_module] +from google.api_core.exceptions import NotFound +from google.cloud import securitycentermanagement_v1 + +def get_effective_security_health_analytics_custom_module(parent: str, module_id: str): + """ + Retrieves a Security Health Analytics custom module. + Args: + parent: Use any one of the following options: + - organizations/{organization_id}/locations/{location_id} + - folders/{folder_id}/locations/{location_id} + - projects/{project_id}/locations/{location_id} + Returns: + The retrieved Security Health Analytics custom module. + Raises: + NotFound: If the specified custom module does not exist. + """ + client = securitycentermanagement_v1.SecurityCenterManagementClient() + + try: + request = securitycentermanagement_v1.GetEffectiveSecurityHealthAnalyticsCustomModuleRequest( + name=f"{parent}/effectiveSecurityHealthAnalyticsCustomModules/{module_id}", + ) + + response = client.get_effective_security_health_analytics_custom_module(request=request) + print(f"Retrieved Effective Security Health Analytics Custom Module: {response.name}") + return response + except NotFound as e: + print(f"Custom Module not found: {response.name}") + raise e +# [END securitycenter_get_effective_security_health_analytics_custom_module] diff --git a/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py b/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py new file mode 100644 index 00000000000..85d4d200540 --- /dev/null +++ b/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python +# +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os + +import random + +import time + +import backoff + +from google.api_core.exceptions import InternalServerError, NotFound, ServiceUnavailable + +from google.cloud import securitycentermanagement_v1 + +import pytest + +import security_health_analytics_custom_modules + +# Replace these variables before running the sample. +# GCLOUD_ORGANIZATION: The organization ID. +ORGANIZATION_ID = os.environ["GCLOUD_ORGANIZATION"] +LOCATION = "global" +PREFIX = "python_sample_sha_custom_module" # Prefix used for identifying test modules + + +@pytest.fixture(scope="session", autouse=True) +def setup_environment(): + """Fixture to ensure a clean environment by removing test modules before running tests.""" + if not ORGANIZATION_ID: + pytest.fail("GCLOUD_ORGANIZATION environment variable is not set.") + + print(f"Cleaning up existing custom modules for organization: {ORGANIZATION_ID}") + cleanup_existing_custom_modules(ORGANIZATION_ID) + + +def cleanup_existing_custom_modules(org_id: str): + """ + Deletes all custom modules matching a specific naming pattern. + Args: + org_id: The organization ID. + """ + client = securitycentermanagement_v1.SecurityCenterManagementClient() + parent = f"organizations/{org_id}/locations/global" + print(f"Parent path: {parent}") + try: + custom_modules = client.list_security_health_analytics_custom_modules( + request={"parent": parent} + ) + for module in custom_modules: + if module.display_name.startswith(PREFIX): + client.delete_security_health_analytics_custom_module( + request={"name": module.name} + ) + print(f"Deleted custom module: {module.name}") + except NotFound as e: + print(f"Resource not found: {e}") + except Exception as e: + print(f"Unexpected error during cleanup: {e}") + raise + + +def add_custom_module(org_id: str): + + parent = f"organizations/{org_id}/locations/global" + client = securitycentermanagement_v1.SecurityCenterManagementClient() + + # Generate a unique display name + unique_suffix = f"{int(time.time())}_{random.randint(0, 999)}" + display_name = f"python_sample_sha_custom_module_test_{unique_suffix}" + + # Define the custom module configuration + custom_module = { + "display_name": display_name, + "enablement_state": "ENABLED", + "custom_config": { + "description": "Sample custom module for testing purpose. Please do not delete.", + "predicate": { + "expression": "has(resource.rotationPeriod) && (resource.rotationPeriod > duration('2592000s'))", + "title": "GCE Instance High Severity", + "description": "Custom module to detect high severity issues on GCE instances.", + }, + "recommendation": "Ensure proper security configurations on GCE instances.", + "resource_selector": {"resource_types": ["cloudkms.googleapis.com/CryptoKey"]}, + "severity": "CRITICAL", + "custom_output": { + "properties": [ + { + "name": "example_property", + "value_expression": { + "description": "The name of the instance", + "expression": "resource.name", + "location": "global", + "title": "Instance Name", + }, + } + ] + }, + }, + } + + request = securitycentermanagement_v1.CreateSecurityHealthAnalyticsCustomModuleRequest( + parent=parent, + security_health_analytics_custom_module=custom_module, + ) + response = client.create_security_health_analytics_custom_module(request=request) + print(f"Created Security Health Analytics Custom Module: {response.name}") + module_name = response.name + module_id = module_name.split("/")[-1] + return module_name, module_id + +@backoff.on_exception( + backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3 +) +def test_get_effective_security_health_analytics_custom_module(): + + module_name, module_id = add_custom_module(ORGANIZATION_ID) + parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" + + # Retrieve the custom module + response = security_health_analytics_custom_modules.get_effective_security_health_analytics_custom_module(parent, module_id) + + assert response is not None, "Failed to retrieve the custom module." + # Verify that the custom module was created + assert response.display_name.startswith(PREFIX) + assert response.enablement_state == securitycentermanagement_v1.EffectiveSecurityHealthAnalyticsCustomModule.EnablementState.ENABLED + print(f"Retrieved Custom Module: {response.name}") From 5b8d9a68b3ef7f773bf2461a896e90217477060d Mon Sep 17 00:00:00 2001 From: Vijaykanth Melugiri Date: Mon, 30 Dec 2024 03:49:34 +0000 Subject: [PATCH 02/12] feat(securitycenter): Add Resource SCC Org Mgt API SHA Cust Modules (GetEff, ListEff, ListDesc, Simulate) --- ...ecurity_health_analytics_custom_modules.py | 159 +++++++++++++++++- ...ty_health_analytics_custom_modules_test.py | 67 ++++++++ 2 files changed, 221 insertions(+), 5 deletions(-) diff --git a/securitycenter/snippets_management_api/security_health_analytics_custom_modules.py b/securitycenter/snippets_management_api/security_health_analytics_custom_modules.py index 40ee0afedb1..f180dd3b058 100644 --- a/securitycenter/snippets_management_api/security_health_analytics_custom_modules.py +++ b/securitycenter/snippets_management_api/security_health_analytics_custom_modules.py @@ -14,14 +14,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import random -import time -from typing import Dict - -# [START securitycenter_get_effective_security_health_analytics_custom_module] from google.api_core.exceptions import NotFound from google.cloud import securitycentermanagement_v1 +# [START securitycenter_get_effective_security_health_analytics_custom_module] + + def get_effective_security_health_analytics_custom_module(parent: str, module_id: str): """ Retrieves a Security Health Analytics custom module. @@ -49,3 +47,154 @@ def get_effective_security_health_analytics_custom_module(parent: str, module_id print(f"Custom Module not found: {response.name}") raise e # [END securitycenter_get_effective_security_health_analytics_custom_module] + +# [START securitycenter_list_descendant_security_health_analytics_custom_module] + + +def list_descendant_security_health_analytics_custom_module(parent: str): + """ + Retrieves list of all resident Security Health Analytics custom modules and all of its descendants. + Args: + parent: Use any one of the following options: + - organizations/{organization_id}/locations/{location_id} + - folders/{folder_id}/locations/{location_id} + - projects/{project_id}/locations/{location_id} + Returns: + List of retrieved all resident Security Health Analytics custom modules and all of its descendants. + Raises: + NotFound: If the specified custom module does not exist. + """ + + client = securitycentermanagement_v1.SecurityCenterManagementClient() + + try: + request = securitycentermanagement_v1.ListDescendantSecurityHealthAnalyticsCustomModulesRequest( + parent=parent, + ) + + response = client.list_descendant_security_health_analytics_custom_modules(request=request) + + custom_modules = [] + for custom_module in response: + print(f"Custom Module: {custom_module.name}") + custom_modules.append(custom_module) + return custom_modules + except NotFound as e: + print(f"Parent resource not found: {parent}") + raise e + except Exception as e: + print(f"An error occurred while listing custom modules: {e}") + raise e +# [END securitycenter_list_descendant_security_health_analytics_custom_module] + +# [START securitycenter_list_effective_security_health_analytics_custom_module] + + +def list_effective_security_health_analytics_custom_module(parent: str): + """ + Retrieves list of all Security Health Analytics custom modules. + This includes resident modules defined at the scope of the parent, + and inherited modules, inherited from ancestor organizations, folders, and projects (no descendants). + + Args: + parent: Use any one of the following options: + - organizations/{organization_id}/locations/{location_id} + - folders/{folder_id}/locations/{location_id} + - projects/{project_id}/locations/{location_id} + Returns: + List of retrieved all Security Health Analytics custom modules. + Raises: + NotFound: If the specified custom module does not exist. + """ + + client = securitycentermanagement_v1.SecurityCenterManagementClient() + + try: + request = securitycentermanagement_v1.ListEffectiveSecurityHealthAnalyticsCustomModulesRequest( + parent=parent, + ) + + response = client.list_effective_security_health_analytics_custom_modules(request=request) + + custom_modules = [] + for custom_module in response: + print(f"Custom Module: {custom_module.name}") + custom_modules.append(custom_module) + return custom_modules + except NotFound as e: + print(f"Parent resource not found: {parent}") + raise e + except Exception as e: + print(f"An error occurred while listing custom modules: {e}") + raise e +# [END securitycenter_list_effective_security_health_analytics_custom_module] + +# [START securitycenter_simulate_security_health_analytics_custom_module] + + +def simulate_security_health_analytics_custom_module(parent: str): + """ + Simulates the result of using a SecurityHealthAnalyticsCustomModule to check a resource. + + Args: + parent: Use any one of the following options: + - organizations/{organization_id}/locations/{location_id} + - folders/{folder_id}/locations/{location_id} + - projects/{project_id}/locations/{location_id} + Returns: + Simulated Security Health Analytics custom module. + """ + + client = securitycentermanagement_v1.SecurityCenterManagementClient() + + # Define the custom config configuration + custom_config = { + "description": ( + "Sample custom module for testing purposes. This custom module evaluates " + "Cloud KMS CryptoKeys to ensure their rotation period exceeds 30 days (2592000 seconds)." + ), + "predicate": { + "expression": "has(resource.rotationPeriod) && (resource.rotationPeriod > duration('2592000s'))", + "title": "Cloud KMS CryptoKey Rotation Period", + "description": ( + "Evaluates whether the rotation period of a Cloud KMS CryptoKey exceeds 30 days. " + "A longer rotation period might increase the risk of exposure." + ), + }, + "recommendation": ( + "Review and adjust the rotation period for Cloud KMS CryptoKeys to align with your security policies. " + "Consider setting a shorter rotation period if possible." + ), + "resource_selector": {"resource_types": ["cloudkms.googleapis.com/CryptoKey"]}, + "severity": "CRITICAL", + "custom_output": { + "properties": [ + { + "name": "example_property", + "value_expression": { + "description": "The resource name of the CryptoKey being evaluated.", + "expression": "resource.name", + "location": "global", + "title": "CryptoKey Resource Name", + }, + } + ] + }, + } + + # Initialize request argument(s) + resource = securitycentermanagement_v1.types.SimulateSecurityHealthAnalyticsCustomModuleRequest.SimulatedResource() + resource.resource_type = "cloudkms.googleapis.com/CryptoKey" # Replace with the correct resource type + + request = securitycentermanagement_v1.SimulateSecurityHealthAnalyticsCustomModuleRequest( + parent=parent, + custom_config=custom_config, + resource=resource, + ) + + response = client.simulate_security_health_analytics_custom_module(request=request) + + print(f"Simulated Security Health Analytics Custom Module: {response}") + return response + +# [END securitycenter_simulate_security_health_analytics_custom_module] diff --git a/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py b/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py index 85d4d200540..ad7a7a44c7b 100644 --- a/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py +++ b/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py @@ -121,6 +121,7 @@ def add_custom_module(org_id: str): module_id = module_name.split("/")[-1] return module_name, module_id + @backoff.on_exception( backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3 ) @@ -137,3 +138,69 @@ def test_get_effective_security_health_analytics_custom_module(): assert response.display_name.startswith(PREFIX) assert response.enablement_state == securitycentermanagement_v1.EffectiveSecurityHealthAnalyticsCustomModule.EnablementState.ENABLED print(f"Retrieved Custom Module: {response.name}") + + +@backoff.on_exception( + backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3 +) +def test_list_descendant_security_health_analytics_custom_module(): + + module_name, module_id = add_custom_module(ORGANIZATION_ID) + parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" + # Retrieve the list descendant custom modules + custom_modules = security_health_analytics_custom_modules.list_descendant_security_health_analytics_custom_module(parent) + + assert custom_modules is not None, "Failed to retrieve the custom modules." + assert len(custom_modules) > 0, "No custom modules were retrieved." + + # Verify the created module is in the list + created_module = next( + (module for module in custom_modules if module.name == module_name), None + ) + assert created_module is not None, "Created custom module not found in the list." + assert created_module.display_name.startswith(PREFIX) + assert ( + created_module.enablement_state + == securitycentermanagement_v1.SecurityHealthAnalyticsCustomModule.EnablementState.ENABLED + ) + + +@backoff.on_exception( + backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3 +) +def test_list_effective_security_health_analytics_custom_module(): + + module_name, module_id = add_custom_module(ORGANIZATION_ID) + parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" + # Retrieve the list of custom modules + custom_modules = security_health_analytics_custom_modules.list_effective_security_health_analytics_custom_module(parent) + + assert custom_modules is not None, "Failed to retrieve the custom modules." + assert len(custom_modules) > 0, "No custom modules were retrieved." + + # Verify the created module is in the list + created_module = next( + (module for module in custom_modules if (module.name.split("/")[-1]) == module_id), None + ) + assert created_module is not None, "Created custom module not found in the list." + assert created_module.display_name.startswith(PREFIX) + assert ( + created_module.enablement_state + == securitycentermanagement_v1.EffectiveSecurityHealthAnalyticsCustomModule.EnablementState.ENABLED + ) + + +@backoff.on_exception( + backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3 +) +def test_simulate_security_health_analytics_custom_module(): + + module_name, module_id = add_custom_module(ORGANIZATION_ID) + parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" + + simulated_custom_module = security_health_analytics_custom_modules.simulate_security_health_analytics_custom_module(parent) + + assert simulated_custom_module is not None, "Failed to retrieve the simulated custom module." + assert simulated_custom_module.result.no_violation is not None, ( + f"Expected no_violation to be present, got {simulated_custom_module.result}." + ) From fedcc2606be9d0660625ba69f2f5a8f44338cb49 Mon Sep 17 00:00:00 2001 From: Vijaykanth Melugiri Date: Mon, 30 Dec 2024 23:29:32 +0000 Subject: [PATCH 03/12] Address comments by code review bot --- .../snippets_management_api/noxfile_config.py | 2 +- ...ecurity_health_analytics_custom_modules.py | 13 +++---- ...ty_health_analytics_custom_modules_test.py | 35 ++++++++++++------- 3 files changed, 31 insertions(+), 19 deletions(-) diff --git a/securitycenter/snippets_management_api/noxfile_config.py b/securitycenter/snippets_management_api/noxfile_config.py index 4efdab3e923..25766af1193 100644 --- a/securitycenter/snippets_management_api/noxfile_config.py +++ b/securitycenter/snippets_management_api/noxfile_config.py @@ -14,7 +14,7 @@ # Default TEST_CONFIG_OVERRIDE for python repos. -# You can copy this file into your directory, then it will be inported from +# You can copy this file into your directory, then it will be imported from # the noxfile.py. # The source of truth: diff --git a/securitycenter/snippets_management_api/security_health_analytics_custom_modules.py b/securitycenter/snippets_management_api/security_health_analytics_custom_modules.py index f180dd3b058..7b22d6a7a6f 100644 --- a/securitycenter/snippets_management_api/security_health_analytics_custom_modules.py +++ b/securitycenter/snippets_management_api/security_health_analytics_custom_modules.py @@ -22,12 +22,13 @@ def get_effective_security_health_analytics_custom_module(parent: str, module_id: str): """ - Retrieves a Security Health Analytics custom module. + Retrieves a Security Health Analytics custom module using parent and module id as parameters. Args: parent: Use any one of the following options: - organizations/{organization_id}/locations/{location_id} - folders/{folder_id}/locations/{location_id} - projects/{project_id}/locations/{location_id} + module_id: The unique identifier of the custom module. Returns: The retrieved Security Health Analytics custom module. Raises: @@ -44,7 +45,7 @@ def get_effective_security_health_analytics_custom_module(parent: str, module_id print(f"Retrieved Effective Security Health Analytics Custom Module: {response.name}") return response except NotFound as e: - print(f"Custom Module not found: {response.name}") + print(f"Custom Module not found: {e}") raise e # [END securitycenter_get_effective_security_health_analytics_custom_module] @@ -60,9 +61,9 @@ def list_descendant_security_health_analytics_custom_module(parent: str): - folders/{folder_id}/locations/{location_id} - projects/{project_id}/locations/{location_id} Returns: - List of retrieved all resident Security Health Analytics custom modules and all of its descendants. + A list of all resident Security Health Analytics custom modules and all of its descendants. Raises: - NotFound: If the specified custom module does not exist. + NotFound: If the parent resource is not found. """ client = securitycentermanagement_v1.SecurityCenterManagementClient() @@ -104,7 +105,7 @@ def list_effective_security_health_analytics_custom_module(parent: str): Returns: List of retrieved all Security Health Analytics custom modules. Raises: - NotFound: If the specified custom module does not exist. + NotFound: If the parent resource is not found. """ client = securitycentermanagement_v1.SecurityCenterManagementClient() @@ -142,7 +143,7 @@ def simulate_security_health_analytics_custom_module(parent: str): - folders/{folder_id}/locations/{location_id} - projects/{project_id}/locations/{location_id} Returns: - Simulated Security Health Analytics custom module. + The simulation response of Security Health Analytics custom module. """ client = securitycentermanagement_v1.SecurityCenterManagementClient() diff --git a/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py b/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py index ad7a7a44c7b..a602740604c 100644 --- a/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py +++ b/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py @@ -38,7 +38,12 @@ @pytest.fixture(scope="session", autouse=True) def setup_environment(): - """Fixture to ensure a clean environment by removing test modules before running tests.""" + """ + Fixture to ensure a clean environment by removing test modules before running tests. + + This fixture lists all SHA custom modules in the organization and deletes any + that were created by previous test runs, identified by the PREFIX. + """ if not ORGANIZATION_ID: pytest.fail("GCLOUD_ORGANIZATION environment variable is not set.") @@ -73,7 +78,13 @@ def cleanup_existing_custom_modules(org_id: str): def add_custom_module(org_id: str): - + """ + Adds a new SHA custom module. + Args: + org_id (str): The organization ID. + Returns: + Tuple[str, str]: The name and ID of the created custom module. + """ parent = f"organizations/{org_id}/locations/global" client = securitycentermanagement_v1.SecurityCenterManagementClient() @@ -86,13 +97,13 @@ def add_custom_module(org_id: str): "display_name": display_name, "enablement_state": "ENABLED", "custom_config": { - "description": "Sample custom module for testing purpose. Please do not delete.", + "description": "Sample custom module for testing purposes. Please do not delete.", "predicate": { "expression": "has(resource.rotationPeriod) && (resource.rotationPeriod > duration('2592000s'))", - "title": "GCE Instance High Severity", - "description": "Custom module to detect high severity issues on GCE instances.", + "title": "Cloud KMS CryptoKey Rotation Period", + "description": "Custom module to detect CryptoKeys with rotation period greater than 30 days.", }, - "recommendation": "Ensure proper security configurations on GCE instances.", + "recommendation": "Review and adjust the rotation period for Cloud KMS CryptoKeys.", "resource_selector": {"resource_types": ["cloudkms.googleapis.com/CryptoKey"]}, "severity": "CRITICAL", "custom_output": { @@ -100,10 +111,10 @@ def add_custom_module(org_id: str): { "name": "example_property", "value_expression": { - "description": "The name of the instance", + "description": "The resource name of the CryptoKey", "expression": "resource.name", "location": "global", - "title": "Instance Name", + "title": "CryptoKey Resource Name", }, } ] @@ -126,7 +137,7 @@ def add_custom_module(org_id: str): backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3 ) def test_get_effective_security_health_analytics_custom_module(): - + """Tests getting an effective SHA custom module.""" module_name, module_id = add_custom_module(ORGANIZATION_ID) parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" @@ -144,7 +155,7 @@ def test_get_effective_security_health_analytics_custom_module(): backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3 ) def test_list_descendant_security_health_analytics_custom_module(): - + """Tests listing descendant SHA custom modules.""" module_name, module_id = add_custom_module(ORGANIZATION_ID) parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" # Retrieve the list descendant custom modules @@ -169,7 +180,7 @@ def test_list_descendant_security_health_analytics_custom_module(): backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3 ) def test_list_effective_security_health_analytics_custom_module(): - + """Tests listing effective SHA custom modules.""" module_name, module_id = add_custom_module(ORGANIZATION_ID) parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" # Retrieve the list of custom modules @@ -194,7 +205,7 @@ def test_list_effective_security_health_analytics_custom_module(): backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3 ) def test_simulate_security_health_analytics_custom_module(): - + """Tests simulating an SHA custom module.""" module_name, module_id = add_custom_module(ORGANIZATION_ID) parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" From 38a91fa4f6c625a788d1f2c8c14165b3336ffe72 Mon Sep 17 00:00:00 2001 From: Vijaykanth Melugiri Date: Tue, 7 Jan 2025 00:57:23 +0000 Subject: [PATCH 04/12] Trigger CI pipeline From 44c259114e15eefa27abc6260276e51da51ee8f3 Mon Sep 17 00:00:00 2001 From: Vijaykanth Melugiri Date: Mon, 13 Jan 2025 21:12:36 +0000 Subject: [PATCH 05/12] Refactor the cleaning up of created custom modules in the current test session --- ...ty_health_analytics_custom_modules_test.py | 92 ++++++++++++------- 1 file changed, 60 insertions(+), 32 deletions(-) diff --git a/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py b/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py index a602740604c..0ea775f880b 100644 --- a/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py +++ b/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py @@ -35,46 +35,26 @@ LOCATION = "global" PREFIX = "python_sample_sha_custom_module" # Prefix used for identifying test modules +# Global list to track created modules +created_modules = [] + @pytest.fixture(scope="session", autouse=True) def setup_environment(): - """ - Fixture to ensure a clean environment by removing test modules before running tests. - - This fixture lists all SHA custom modules in the organization and deletes any - that were created by previous test runs, identified by the PREFIX. - """ if not ORGANIZATION_ID: pytest.fail("GCLOUD_ORGANIZATION environment variable is not set.") - print(f"Cleaning up existing custom modules for organization: {ORGANIZATION_ID}") - cleanup_existing_custom_modules(ORGANIZATION_ID) +@pytest.fixture(scope="session", autouse=True) +def cleanup_after_tests(request): + """Fixture to clean up created custom modules after the test session.""" + def teardown(): + print("\nCreated Custom Modules:") + print_all_created_modules() + print("Cleaning up created custom modules...") + cleanup_created_custom_modules() -def cleanup_existing_custom_modules(org_id: str): - """ - Deletes all custom modules matching a specific naming pattern. - Args: - org_id: The organization ID. - """ - client = securitycentermanagement_v1.SecurityCenterManagementClient() - parent = f"organizations/{org_id}/locations/global" - print(f"Parent path: {parent}") - try: - custom_modules = client.list_security_health_analytics_custom_modules( - request={"parent": parent} - ) - for module in custom_modules: - if module.display_name.startswith(PREFIX): - client.delete_security_health_analytics_custom_module( - request={"name": module.name} - ) - print(f"Deleted custom module: {module.name}") - except NotFound as e: - print(f"Resource not found: {e}") - except Exception as e: - print(f"Unexpected error during cleanup: {e}") - raise + request.addfinalizer(teardown) def add_custom_module(org_id: str): @@ -139,6 +119,7 @@ def add_custom_module(org_id: str): def test_get_effective_security_health_analytics_custom_module(): """Tests getting an effective SHA custom module.""" module_name, module_id = add_custom_module(ORGANIZATION_ID) + created_modules.append(module_name) parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" # Retrieve the custom module @@ -157,6 +138,7 @@ def test_get_effective_security_health_analytics_custom_module(): def test_list_descendant_security_health_analytics_custom_module(): """Tests listing descendant SHA custom modules.""" module_name, module_id = add_custom_module(ORGANIZATION_ID) + created_modules.append(module_name) parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" # Retrieve the list descendant custom modules custom_modules = security_health_analytics_custom_modules.list_descendant_security_health_analytics_custom_module(parent) @@ -182,6 +164,7 @@ def test_list_descendant_security_health_analytics_custom_module(): def test_list_effective_security_health_analytics_custom_module(): """Tests listing effective SHA custom modules.""" module_name, module_id = add_custom_module(ORGANIZATION_ID) + created_modules.append(module_name) parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" # Retrieve the list of custom modules custom_modules = security_health_analytics_custom_modules.list_effective_security_health_analytics_custom_module(parent) @@ -207,6 +190,7 @@ def test_list_effective_security_health_analytics_custom_module(): def test_simulate_security_health_analytics_custom_module(): """Tests simulating an SHA custom module.""" module_name, module_id = add_custom_module(ORGANIZATION_ID) + created_modules.append(module_name) parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" simulated_custom_module = security_health_analytics_custom_modules.simulate_security_health_analytics_custom_module(parent) @@ -215,3 +199,47 @@ def test_simulate_security_health_analytics_custom_module(): assert simulated_custom_module.result.no_violation is not None, ( f"Expected no_violation to be present, got {simulated_custom_module.result}." ) + + +def print_all_created_modules(): + """Print all created custom modules.""" + if not created_modules: + print("No custom modules were created.") + else: + for module in created_modules: + print(module) + + +def cleanup_created_custom_modules(): + """ + Deletes all created custom modules in this test session. + """ + client = securitycentermanagement_v1.SecurityCenterManagementClient() + + for module in list(created_modules): + if not custom_module_exists(module): + print(f"Module not found (already deleted): {module}") + created_modules.remove(module) + continue + try: + client.delete_security_health_analytics_custom_module( + request={"name": module} + ) + print(f"Deleted custom module: {module}") + created_modules.remove(module) + except Exception as e: + print(f"Failed to delete module {module}: {e}") + raise + + +def custom_module_exists(module_name): + client = securitycentermanagement_v1.SecurityCenterManagementClient() + try: + client.get_security_health_analytics_custom_module( + request={"name": module_name} + ) + return True + except Exception as e: + if "404" in str(e): + return False + raise From d48a67e33d9f10294127a36bc2d799650115b735 Mon Sep 17 00:00:00 2001 From: Vijaykanth Melugiri Date: Tue, 21 Jan 2025 23:09:39 +0000 Subject: [PATCH 06/12] Refactor the module creation and clean up --- ...ecurity_health_analytics_custom_modules.py | 2 +- ...ty_health_analytics_custom_modules_test.py | 171 +++++++++--------- 2 files changed, 87 insertions(+), 86 deletions(-) diff --git a/securitycenter/snippets_management_api/security_health_analytics_custom_modules.py b/securitycenter/snippets_management_api/security_health_analytics_custom_modules.py index 7b22d6a7a6f..81837fe5b18 100644 --- a/securitycenter/snippets_management_api/security_health_analytics_custom_modules.py +++ b/securitycenter/snippets_management_api/security_health_analytics_custom_modules.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2024 Google LLC +# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py b/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py index 0ea775f880b..33e60d2c28b 100644 --- a/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py +++ b/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2024 Google LLC +# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,6 +19,8 @@ import time +import uuid + import backoff from google.api_core.exceptions import InternalServerError, NotFound, ServiceUnavailable @@ -33,10 +35,10 @@ # GCLOUD_ORGANIZATION: The organization ID. ORGANIZATION_ID = os.environ["GCLOUD_ORGANIZATION"] LOCATION = "global" -PREFIX = "python_sample_sha_custom_module" # Prefix used for identifying test modules +PREFIX = "python_sample_sha_custom_module" -# Global list to track created modules -created_modules = [] +# Global list to track created shared modules +shared_modules = [] @pytest.fixture(scope="session", autouse=True) @@ -44,19 +46,92 @@ def setup_environment(): if not ORGANIZATION_ID: pytest.fail("GCLOUD_ORGANIZATION environment variable is not set.") + setup_shared_modules() + @pytest.fixture(scope="session", autouse=True) def cleanup_after_tests(request): """Fixture to clean up created custom modules after the test session.""" def teardown(): - print("\nCreated Custom Modules:") - print_all_created_modules() - print("Cleaning up created custom modules...") - cleanup_created_custom_modules() + print_all_shared_modules() + cleanup_shared_modules() request.addfinalizer(teardown) +def setup_shared_modules(): + for _ in range(3) : + _, module_id = add_custom_module(ORGANIZATION_ID) + if module_id != "" : + shared_modules.append(module_id) + + +def add_module_to_cleanup(module_id): + shared_modules.append(module_id) + + +def print_all_shared_modules(): + """Print all created custom modules.""" + if not shared_modules: + print("No custom modules were created.") + else: + print("\nCreated Custom Modules:") + for module_id in shared_modules: + print(module_id) + + +def cleanup_shared_modules(): + """ + Deletes all created custom modules in this test session. + """ + client = securitycentermanagement_v1.SecurityCenterManagementClient() + + print("Cleaning up created custom modules...") + + for module_id in list(shared_modules): + if not custom_module_exists(module_id): + print(f"Module not found (already deleted): {module_id}") + shared_modules.remove(module_id) + continue + try: + client.delete_security_health_analytics_custom_module( + request={"name": f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}/securityHealthAnalyticsCustomModules/{module_id}"} + ) + print(f"Deleted custom module: {module_id}") + shared_modules.remove(module_id) + except Exception as e: + print(f"Failed to delete module {module_id}: {e}") + raise + + +def custom_module_exists(module_id): + client = securitycentermanagement_v1.SecurityCenterManagementClient() + try: + client.get_security_health_analytics_custom_module( + request={"name": f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}/securityHealthAnalyticsCustomModules/{module_id}"} + ) + return True + except Exception as e: + if "404" in str(e): + return False + raise + + +def get_random_shared_module(): + if not shared_modules: + return "" + random.seed(int(time.time() * 1000000)) + return shared_modules[random.randint(0, len(shared_modules) - 1)] + + +def extract_custom_module_id(module_name): + trimmed_full_name = module_name.strip() + parts = trimmed_full_name.split("/") + if parts: + return parts[-1] + return "" + + def add_custom_module(org_id: str): """ Adds a new SHA custom module. @@ -69,7 +144,7 @@ def add_custom_module(org_id: str): client = securitycentermanagement_v1.SecurityCenterManagementClient() # Generate a unique display name - unique_suffix = f"{int(time.time())}_{random.randint(0, 999)}" + unique_suffix = str(uuid.uuid4()).replace("-", "_") display_name = f"python_sample_sha_custom_module_test_{unique_suffix}" # Define the custom module configuration @@ -109,7 +184,7 @@ def add_custom_module(org_id: str): response = client.create_security_health_analytics_custom_module(request=request) print(f"Created Security Health Analytics Custom Module: {response.name}") module_name = response.name - module_id = module_name.split("/")[-1] + module_id = extract_custom_module_id(module_name) return module_name, module_id @@ -118,8 +193,7 @@ def add_custom_module(org_id: str): ) def test_get_effective_security_health_analytics_custom_module(): """Tests getting an effective SHA custom module.""" - module_name, module_id = add_custom_module(ORGANIZATION_ID) - created_modules.append(module_name) + module_id = get_random_shared_module() parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" # Retrieve the custom module @@ -129,7 +203,6 @@ def test_get_effective_security_health_analytics_custom_module(): # Verify that the custom module was created assert response.display_name.startswith(PREFIX) assert response.enablement_state == securitycentermanagement_v1.EffectiveSecurityHealthAnalyticsCustomModule.EnablementState.ENABLED - print(f"Retrieved Custom Module: {response.name}") @backoff.on_exception( @@ -137,8 +210,6 @@ def test_get_effective_security_health_analytics_custom_module(): ) def test_list_descendant_security_health_analytics_custom_module(): """Tests listing descendant SHA custom modules.""" - module_name, module_id = add_custom_module(ORGANIZATION_ID) - created_modules.append(module_name) parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" # Retrieve the list descendant custom modules custom_modules = security_health_analytics_custom_modules.list_descendant_security_health_analytics_custom_module(parent) @@ -146,25 +217,12 @@ def test_list_descendant_security_health_analytics_custom_module(): assert custom_modules is not None, "Failed to retrieve the custom modules." assert len(custom_modules) > 0, "No custom modules were retrieved." - # Verify the created module is in the list - created_module = next( - (module for module in custom_modules if module.name == module_name), None - ) - assert created_module is not None, "Created custom module not found in the list." - assert created_module.display_name.startswith(PREFIX) - assert ( - created_module.enablement_state - == securitycentermanagement_v1.SecurityHealthAnalyticsCustomModule.EnablementState.ENABLED - ) - @backoff.on_exception( backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3 ) def test_list_effective_security_health_analytics_custom_module(): """Tests listing effective SHA custom modules.""" - module_name, module_id = add_custom_module(ORGANIZATION_ID) - created_modules.append(module_name) parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" # Retrieve the list of custom modules custom_modules = security_health_analytics_custom_modules.list_effective_security_health_analytics_custom_module(parent) @@ -172,25 +230,12 @@ def test_list_effective_security_health_analytics_custom_module(): assert custom_modules is not None, "Failed to retrieve the custom modules." assert len(custom_modules) > 0, "No custom modules were retrieved." - # Verify the created module is in the list - created_module = next( - (module for module in custom_modules if (module.name.split("/")[-1]) == module_id), None - ) - assert created_module is not None, "Created custom module not found in the list." - assert created_module.display_name.startswith(PREFIX) - assert ( - created_module.enablement_state - == securitycentermanagement_v1.EffectiveSecurityHealthAnalyticsCustomModule.EnablementState.ENABLED - ) - @backoff.on_exception( backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3 ) def test_simulate_security_health_analytics_custom_module(): """Tests simulating an SHA custom module.""" - module_name, module_id = add_custom_module(ORGANIZATION_ID) - created_modules.append(module_name) parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" simulated_custom_module = security_health_analytics_custom_modules.simulate_security_health_analytics_custom_module(parent) @@ -199,47 +244,3 @@ def test_simulate_security_health_analytics_custom_module(): assert simulated_custom_module.result.no_violation is not None, ( f"Expected no_violation to be present, got {simulated_custom_module.result}." ) - - -def print_all_created_modules(): - """Print all created custom modules.""" - if not created_modules: - print("No custom modules were created.") - else: - for module in created_modules: - print(module) - - -def cleanup_created_custom_modules(): - """ - Deletes all created custom modules in this test session. - """ - client = securitycentermanagement_v1.SecurityCenterManagementClient() - - for module in list(created_modules): - if not custom_module_exists(module): - print(f"Module not found (already deleted): {module}") - created_modules.remove(module) - continue - try: - client.delete_security_health_analytics_custom_module( - request={"name": module} - ) - print(f"Deleted custom module: {module}") - created_modules.remove(module) - except Exception as e: - print(f"Failed to delete module {module}: {e}") - raise - - -def custom_module_exists(module_name): - client = securitycentermanagement_v1.SecurityCenterManagementClient() - try: - client.get_security_health_analytics_custom_module( - request={"name": module_name} - ) - return True - except Exception as e: - if "404" in str(e): - return False - raise From fc5dc161644deb0bc9cde028d89c5e32cef38c84 Mon Sep 17 00:00:00 2001 From: Vijaykanth Melugiri Date: Mon, 10 Feb 2025 22:27:44 +0000 Subject: [PATCH 07/12] fix the lint error --- .../security_health_analytics_custom_modules_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py b/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py index 771b0545a11..04c2c227531 100644 --- a/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py +++ b/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py @@ -192,7 +192,6 @@ def add_custom_module(org_id: str): @backoff.on_exception( backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3 ) - def test_create_security_health_analytics_custom_module(): parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" From 261805403c50ce3577cdd2ff02b9d9c95f194ec2 Mon Sep 17 00:00:00 2001 From: Vijaykanth Melugiri Date: Wed, 12 Feb 2025 19:44:55 +0000 Subject: [PATCH 08/12] Refactor the update test method --- .../security_health_analytics_custom_modules_test.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py b/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py index 04c2c227531..883ee57d816 100644 --- a/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py +++ b/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py @@ -255,9 +255,11 @@ def test_list_security_health_analytics_custom_module(): backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3 ) def test_update_security_health_analytics_custom_module(): - - module_id = get_random_shared_module() parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" + response = security_health_analytics_custom_modules.create_security_health_analytics_custom_module(parent) + module_id = extract_custom_module_id(response.name) + add_module_to_cleanup(module_id) + # Retrieve the custom modules updated_custom_module = security_health_analytics_custom_modules.update_security_health_analytics_custom_module(parent, module_id) From 35fc246a89c61aad9fe5aa0cedb132ecb5306edd Mon Sep 17 00:00:00 2001 From: Vijaykanth Melugiri Date: Wed, 12 Feb 2025 23:24:42 +0000 Subject: [PATCH 09/12] Refactor the test file to resolve the python version error --- .../security_health_analytics_custom_modules_test.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py b/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py index 883ee57d816..ed77b0d1504 100644 --- a/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py +++ b/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py @@ -218,7 +218,8 @@ def test_get_security_health_analytics_custom_module(): assert response is not None, "Failed to retrieve the custom module." assert response.display_name.startswith(PREFIX) - assert response.enablement_state == securitycentermanagement_v1.SecurityHealthAnalyticsCustomModule.EnablementState.ENABLED + response_org_id = response.name.split("/")[1] # Extract organization ID from the name field + assert response_org_id == ORGANIZATION_ID, f"Organization ID mismatch: Expected {ORGANIZATION_ID}, got {response_org_id}." @backoff.on_exception( @@ -255,6 +256,7 @@ def test_list_security_health_analytics_custom_module(): backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3 ) def test_update_security_health_analytics_custom_module(): + parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" response = security_health_analytics_custom_modules.create_security_health_analytics_custom_module(parent) module_id = extract_custom_module_id(response.name) @@ -283,7 +285,8 @@ def test_get_effective_security_health_analytics_custom_module(): assert response is not None, "Failed to retrieve the custom module." # Verify that the custom module was created assert response.display_name.startswith(PREFIX) - assert response.enablement_state == securitycentermanagement_v1.EffectiveSecurityHealthAnalyticsCustomModule.EnablementState.ENABLED + response_org_id = response.name.split("/")[1] # Extract organization ID from the name field + assert response_org_id == ORGANIZATION_ID, f"Organization ID mismatch: Expected {ORGANIZATION_ID}, got {response_org_id}." @backoff.on_exception( From 91c539ea4d347b0dd0d57a1e554e7e019fda932b Mon Sep 17 00:00:00 2001 From: Vijaykanth Melugiri Date: Fri, 14 Feb 2025 01:57:33 +0000 Subject: [PATCH 10/12] add backoff to add custom module method --- .../security_health_analytics_custom_modules_test.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py b/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py index ed77b0d1504..4d2e9177b65 100644 --- a/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py +++ b/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py @@ -132,6 +132,9 @@ def extract_custom_module_id(module_name): return "" +@backoff.on_exception( + backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3 +) def add_custom_module(org_id: str): """ Adds a new SHA custom module. From d26cab3c8f27d2ffdb8dc53782e5562dcc16d221 Mon Sep 17 00:00:00 2001 From: Vijaykanth Melugiri Date: Fri, 14 Feb 2025 06:32:26 +0000 Subject: [PATCH 11/12] Reduce the loop count and add logs --- .../security_health_analytics_custom_modules_test.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py b/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py index 4d2e9177b65..cb307750b44 100644 --- a/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py +++ b/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py @@ -60,7 +60,7 @@ def teardown(): def setup_shared_modules(): - for _ in range(3) : + for _ in range(1) : _, module_id = add_custom_module(ORGANIZATION_ID) if module_id != "" : shared_modules.append(module_id) @@ -201,6 +201,7 @@ def test_create_security_health_analytics_custom_module(): # Run the function to create the custom module response = security_health_analytics_custom_modules.create_security_health_analytics_custom_module(parent) add_module_to_cleanup(extract_custom_module_id(response.name)) + print("Create module id::", extract_custom_module_id(response.name)) assert response is not None, "Custom module creation failed." # Verify that the custom module was created @@ -214,6 +215,7 @@ def test_create_security_health_analytics_custom_module(): def test_get_security_health_analytics_custom_module(): module_id = get_random_shared_module() + print("Get module id::", module_id) parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" # Retrieve the custom module @@ -240,6 +242,7 @@ def test_delete_security_health_analytics_custom_module(): assert response is None print(f"Custom module was deleted successfully: {module_id}") + shared_modules.remove(module_id) @backoff.on_exception( @@ -265,6 +268,7 @@ def test_update_security_health_analytics_custom_module(): module_id = extract_custom_module_id(response.name) add_module_to_cleanup(module_id) + print("Update module id::", module_id) # Retrieve the custom modules updated_custom_module = security_health_analytics_custom_modules.update_security_health_analytics_custom_module(parent, module_id) @@ -282,6 +286,7 @@ def test_get_effective_security_health_analytics_custom_module(): module_id = get_random_shared_module() parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" + print("Get Effective module id::", module_id) # Retrieve the custom module response = security_health_analytics_custom_modules.get_effective_security_health_analytics_custom_module(parent, module_id) From a5f9be86bba2975780e1c488763282ab5e1a9f02 Mon Sep 17 00:00:00 2001 From: Vijaykanth Melugiri Date: Fri, 14 Feb 2025 19:20:51 +0000 Subject: [PATCH 12/12] Remove the loop and logs --- .../security_health_analytics_custom_modules_test.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py b/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py index cb307750b44..a22a51eeb7c 100644 --- a/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py +++ b/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py @@ -60,10 +60,9 @@ def teardown(): def setup_shared_modules(): - for _ in range(1) : - _, module_id = add_custom_module(ORGANIZATION_ID) - if module_id != "" : - shared_modules.append(module_id) + _, module_id = add_custom_module(ORGANIZATION_ID) + if module_id != "" : + shared_modules.append(module_id) def add_module_to_cleanup(module_id): @@ -201,7 +200,6 @@ def test_create_security_health_analytics_custom_module(): # Run the function to create the custom module response = security_health_analytics_custom_modules.create_security_health_analytics_custom_module(parent) add_module_to_cleanup(extract_custom_module_id(response.name)) - print("Create module id::", extract_custom_module_id(response.name)) assert response is not None, "Custom module creation failed." # Verify that the custom module was created @@ -215,7 +213,6 @@ def test_create_security_health_analytics_custom_module(): def test_get_security_health_analytics_custom_module(): module_id = get_random_shared_module() - print("Get module id::", module_id) parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" # Retrieve the custom module @@ -268,7 +265,6 @@ def test_update_security_health_analytics_custom_module(): module_id = extract_custom_module_id(response.name) add_module_to_cleanup(module_id) - print("Update module id::", module_id) # Retrieve the custom modules updated_custom_module = security_health_analytics_custom_modules.update_security_health_analytics_custom_module(parent, module_id) @@ -286,7 +282,6 @@ def test_get_effective_security_health_analytics_custom_module(): module_id = get_random_shared_module() parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" - print("Get Effective module id::", module_id) # Retrieve the custom module response = security_health_analytics_custom_modules.get_effective_security_health_analytics_custom_module(parent, module_id)