diff --git a/securitycenter/snippets_management_api/noxfile_config.py b/securitycenter/snippets_management_api/noxfile_config.py index 4efdab3e923..be0c07ead7f 100644 --- a/securitycenter/snippets_management_api/noxfile_config.py +++ b/securitycenter/snippets_management_api/noxfile_config.py @@ -14,6 +14,7 @@ # Default TEST_CONFIG_OVERRIDE for python repos. + # You can copy this file into your directory, then it will be inported from # the noxfile.py. diff --git a/securitycenter/snippets_management_api/requirements-test.txt b/securitycenter/snippets_management_api/requirements-test.txt index c8175a54cd3..987e7bb0d89 100644 --- a/securitycenter/snippets_management_api/requirements-test.txt +++ b/securitycenter/snippets_management_api/requirements-test.txt @@ -1,5 +1,4 @@ backoff==2.2.1 pytest==8.2.0 -google-cloud-bigquery==3.11.4 +google-cloud-bigquery==3.27.0 google-cloud-securitycentermanagement==0.1.17 - diff --git a/securitycenter/snippets_management_api/security_health_analytics_custom_modules.py b/securitycenter/snippets_management_api/security_health_analytics_custom_modules.py index 937d42f2b77..bffe8a623f8 100644 --- a/securitycenter/snippets_management_api/security_health_analytics_custom_modules.py +++ b/securitycenter/snippets_management_api/security_health_analytics_custom_modules.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. + import uuid from google.api_core.exceptions import GoogleAPICallError, NotFound @@ -107,6 +108,7 @@ def get_security_health_analytics_custom_module(parent: str, module_id: str): - organizations/{organization_id}/locations/{location_id} - folders/{folder_id}/locations/{location_id} - projects/{project_id}/locations/{location_id} + module_id: The unique identifier of the custom module. Returns: The retrieved Security Health Analytics custom module. Raises: @@ -238,3 +240,186 @@ def update_security_health_analytics_custom_module(parent: str, module_id: str): raise # [END securitycenter_update_security_health_analytics_custom_module] + +# [START securitycenter_get_effective_security_health_analytics_custom_module] + + +def get_effective_security_health_analytics_custom_module(parent: str, module_id: str): + """ + Retrieves a Security Health Analytics custom module using parent and module id as parameters. + Args: + parent: Use any one of the following options: + - organizations/{organization_id}/locations/{location_id} + - folders/{folder_id}/locations/{location_id} + - projects/{project_id}/locations/{location_id} + module_id: The unique identifier of the custom module. + Returns: + The retrieved Security Health Analytics custom module. + Raises: + NotFound: If the specified custom module does not exist. + """ + client = securitycentermanagement_v1.SecurityCenterManagementClient() + + try: + request = securitycentermanagement_v1.GetEffectiveSecurityHealthAnalyticsCustomModuleRequest( + name=f"{parent}/effectiveSecurityHealthAnalyticsCustomModules/{module_id}", + ) + + response = client.get_effective_security_health_analytics_custom_module(request=request) + print(f"Retrieved Effective Security Health Analytics Custom Module: {response.name}") + return response + except NotFound as e: + print(f"Custom Module not found: {e}") + raise e +# [END securitycenter_get_effective_security_health_analytics_custom_module] + +# [START securitycenter_list_descendant_security_health_analytics_custom_module] + + +def list_descendant_security_health_analytics_custom_module(parent: str): + """ + Retrieves list of all resident Security Health Analytics custom modules and all of its descendants. + Args: + parent: Use any one of the following options: + - organizations/{organization_id}/locations/{location_id} + - folders/{folder_id}/locations/{location_id} + - projects/{project_id}/locations/{location_id} + Returns: + A list of all resident Security Health Analytics custom modules and all of its descendants. + Raises: + NotFound: If the parent resource is not found. + """ + + client = securitycentermanagement_v1.SecurityCenterManagementClient() + + try: + request = securitycentermanagement_v1.ListDescendantSecurityHealthAnalyticsCustomModulesRequest( + parent=parent, + ) + + response = client.list_descendant_security_health_analytics_custom_modules(request=request) + + custom_modules = [] + for custom_module in response: + print(f"Custom Module: {custom_module.name}") + custom_modules.append(custom_module) + return custom_modules + except NotFound as e: + print(f"Parent resource not found: {parent}") + raise e + except Exception as e: + print(f"An error occurred while listing custom modules: {e}") + raise e +# [END securitycenter_list_descendant_security_health_analytics_custom_module] + +# [START securitycenter_list_effective_security_health_analytics_custom_module] + + +def list_effective_security_health_analytics_custom_module(parent: str): + """ + Retrieves list of all Security Health Analytics custom modules. + This includes resident modules defined at the scope of the parent, + and inherited modules, inherited from ancestor organizations, folders, and projects (no descendants). + + Args: + parent: Use any one of the following options: + - organizations/{organization_id}/locations/{location_id} + - folders/{folder_id}/locations/{location_id} + - projects/{project_id}/locations/{location_id} + Returns: + List of retrieved all Security Health Analytics custom modules. + Raises: + NotFound: If the parent resource is not found. + """ + + client = securitycentermanagement_v1.SecurityCenterManagementClient() + + try: + request = securitycentermanagement_v1.ListEffectiveSecurityHealthAnalyticsCustomModulesRequest( + parent=parent, + ) + + response = client.list_effective_security_health_analytics_custom_modules(request=request) + + custom_modules = [] + for custom_module in response: + print(f"Custom Module: {custom_module.name}") + custom_modules.append(custom_module) + return custom_modules + except NotFound as e: + print(f"Parent resource not found: {parent}") + raise e + except Exception as e: + print(f"An error occurred while listing custom modules: {e}") + raise e +# [END securitycenter_list_effective_security_health_analytics_custom_module] + +# [START securitycenter_simulate_security_health_analytics_custom_module] + + +def simulate_security_health_analytics_custom_module(parent: str): + """ + Simulates the result of using a SecurityHealthAnalyticsCustomModule to check a resource. + + Args: + parent: Use any one of the following options: + - organizations/{organization_id}/locations/{location_id} + - folders/{folder_id}/locations/{location_id} + - projects/{project_id}/locations/{location_id} + Returns: + The simulation response of Security Health Analytics custom module. + """ + + client = securitycentermanagement_v1.SecurityCenterManagementClient() + + # Define the custom config configuration + custom_config = { + "description": ( + "Sample custom module for testing purposes. This custom module evaluates " + "Cloud KMS CryptoKeys to ensure their rotation period exceeds 30 days (2592000 seconds)." + ), + "predicate": { + "expression": "has(resource.rotationPeriod) && (resource.rotationPeriod > duration('2592000s'))", + "title": "Cloud KMS CryptoKey Rotation Period", + "description": ( + "Evaluates whether the rotation period of a Cloud KMS CryptoKey exceeds 30 days. " + "A longer rotation period might increase the risk of exposure." + ), + }, + "recommendation": ( + "Review and adjust the rotation period for Cloud KMS CryptoKeys to align with your security policies. " + "Consider setting a shorter rotation period if possible." + ), + "resource_selector": {"resource_types": ["cloudkms.googleapis.com/CryptoKey"]}, + "severity": "CRITICAL", + "custom_output": { + "properties": [ + { + "name": "example_property", + "value_expression": { + "description": "The resource name of the CryptoKey being evaluated.", + "expression": "resource.name", + "location": "global", + "title": "CryptoKey Resource Name", + }, + } + ] + }, + } + + # Initialize request argument(s) + resource = securitycentermanagement_v1.types.SimulateSecurityHealthAnalyticsCustomModuleRequest.SimulatedResource() + resource.resource_type = "cloudkms.googleapis.com/CryptoKey" # Replace with the correct resource type + + request = securitycentermanagement_v1.SimulateSecurityHealthAnalyticsCustomModuleRequest( + parent=parent, + custom_config=custom_config, + resource=resource, + ) + + response = client.simulate_security_health_analytics_custom_module(request=request) + + print(f"Simulated Security Health Analytics Custom Module: {response}") + return response + +# [END securitycenter_simulate_security_health_analytics_custom_module] diff --git a/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py b/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py index 756b66ef607..a22a51eeb7c 100644 --- a/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py +++ b/securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py @@ -60,10 +60,9 @@ def teardown(): def setup_shared_modules(): - for _ in range(3) : - _, module_id = add_custom_module(ORGANIZATION_ID) - if module_id != "" : - shared_modules.append(module_id) + _, module_id = add_custom_module(ORGANIZATION_ID) + if module_id != "" : + shared_modules.append(module_id) def add_module_to_cleanup(module_id): @@ -132,7 +131,17 @@ def extract_custom_module_id(module_name): return "" +@backoff.on_exception( + backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3 +) def add_custom_module(org_id: str): + """ + Adds a new SHA custom module. + Args: + org_id (str): The organization ID. + Returns: + Tuple[str, str]: The name and ID of the created custom module. + """ parent = f"organizations/{org_id}/locations/global" client = securitycentermanagement_v1.SecurityCenterManagementClient() @@ -146,13 +155,13 @@ def add_custom_module(org_id: str): "display_name": display_name, "enablement_state": "ENABLED", "custom_config": { - "description": "Sample custom module for testing purpose. Please do not delete.", + "description": "Sample custom module for testing purposes. Please do not delete.", "predicate": { "expression": "has(resource.rotationPeriod) && (resource.rotationPeriod > duration('2592000s'))", - "title": "GCE Instance High Severity", - "description": "Custom module to detect high severity issues on GCE instances.", + "title": "Cloud KMS CryptoKey Rotation Period", + "description": "Custom module to detect CryptoKeys with rotation period greater than 30 days.", }, - "recommendation": "Ensure proper security configurations on GCE instances.", + "recommendation": "Review and adjust the rotation period for Cloud KMS CryptoKeys.", "resource_selector": {"resource_types": ["cloudkms.googleapis.com/CryptoKey"]}, "severity": "CRITICAL", "custom_output": { @@ -160,10 +169,10 @@ def add_custom_module(org_id: str): { "name": "example_property", "value_expression": { - "description": "The name of the instance", + "description": "The resource name of the CryptoKey", "expression": "resource.name", "location": "global", - "title": "Instance Name", + "title": "CryptoKey Resource Name", }, } ] @@ -211,7 +220,8 @@ def test_get_security_health_analytics_custom_module(): assert response is not None, "Failed to retrieve the custom module." assert response.display_name.startswith(PREFIX) - assert response.enablement_state == securitycentermanagement_v1.SecurityHealthAnalyticsCustomModule.EnablementState.ENABLED + response_org_id = response.name.split("/")[1] # Extract organization ID from the name field + assert response_org_id == ORGANIZATION_ID, f"Organization ID mismatch: Expected {ORGANIZATION_ID}, got {response_org_id}." @backoff.on_exception( @@ -229,6 +239,7 @@ def test_delete_security_health_analytics_custom_module(): assert response is None print(f"Custom module was deleted successfully: {module_id}") + shared_modules.remove(module_id) @backoff.on_exception( @@ -249,8 +260,11 @@ def test_list_security_health_analytics_custom_module(): ) def test_update_security_health_analytics_custom_module(): - module_id = get_random_shared_module() parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" + response = security_health_analytics_custom_modules.create_security_health_analytics_custom_module(parent) + module_id = extract_custom_module_id(response.name) + add_module_to_cleanup(module_id) + # Retrieve the custom modules updated_custom_module = security_health_analytics_custom_modules.update_security_health_analytics_custom_module(parent, module_id) @@ -258,3 +272,62 @@ def test_update_security_health_analytics_custom_module(): response_org_id = updated_custom_module.name.split("/")[1] # Extract organization ID from the name field assert response_org_id == ORGANIZATION_ID, f"Organization ID mismatch: Expected {ORGANIZATION_ID}, got {response_org_id}." assert updated_custom_module.enablement_state == securitycentermanagement_v1.SecurityHealthAnalyticsCustomModule.EnablementState.DISABLED + + +@backoff.on_exception( + backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3 +) +def test_get_effective_security_health_analytics_custom_module(): + """Tests getting an effective SHA custom module.""" + module_id = get_random_shared_module() + parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" + + # Retrieve the custom module + response = security_health_analytics_custom_modules.get_effective_security_health_analytics_custom_module(parent, module_id) + + assert response is not None, "Failed to retrieve the custom module." + # Verify that the custom module was created + assert response.display_name.startswith(PREFIX) + response_org_id = response.name.split("/")[1] # Extract organization ID from the name field + assert response_org_id == ORGANIZATION_ID, f"Organization ID mismatch: Expected {ORGANIZATION_ID}, got {response_org_id}." + + +@backoff.on_exception( + backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3 +) +def test_list_descendant_security_health_analytics_custom_module(): + """Tests listing descendant SHA custom modules.""" + parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" + # Retrieve the list descendant custom modules + custom_modules = security_health_analytics_custom_modules.list_descendant_security_health_analytics_custom_module(parent) + + assert custom_modules is not None, "Failed to retrieve the custom modules." + assert len(custom_modules) > 0, "No custom modules were retrieved." + + +@backoff.on_exception( + backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3 +) +def test_list_effective_security_health_analytics_custom_module(): + """Tests listing effective SHA custom modules.""" + parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" + # Retrieve the list of custom modules + custom_modules = security_health_analytics_custom_modules.list_effective_security_health_analytics_custom_module(parent) + + assert custom_modules is not None, "Failed to retrieve the custom modules." + assert len(custom_modules) > 0, "No custom modules were retrieved." + + +@backoff.on_exception( + backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3 +) +def test_simulate_security_health_analytics_custom_module(): + """Tests simulating an SHA custom module.""" + parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}" + + simulated_custom_module = security_health_analytics_custom_modules.simulate_security_health_analytics_custom_module(parent) + + assert simulated_custom_module is not None, "Failed to retrieve the simulated custom module." + assert simulated_custom_module.result.no_violation is not None, ( + f"Expected no_violation to be present, got {simulated_custom_module.result}." + )