Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions securitycenter/snippets_management_api/noxfile_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

# Default TEST_CONFIG_OVERRIDE for python repos.


# You can copy this file into your directory, then it will be inported from
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: "inported" should be "imported".

Suggested change
# You can copy this file into your directory, then it will be inported from
# You can copy this file into your directory, then it will be imported from

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: "inported" should be "imported".

Suggested change
# You can copy this file into your directory, then it will be inported from
# the noxfile.py.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

# the noxfile.py.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
backoff==2.2.1
pytest==8.2.0
google-cloud-bigquery==3.11.4
google-cloud-bigquery==3.27.0
google-cloud-securitycentermanagement==0.1.17

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.


import uuid

from google.api_core.exceptions import GoogleAPICallError, NotFound
Expand Down Expand Up @@ -107,6 +108,7 @@ def get_security_health_analytics_custom_module(parent: str, module_id: str):
- organizations/{organization_id}/locations/{location_id}
- folders/{folder_id}/locations/{location_id}
- projects/{project_id}/locations/{location_id}
module_id: The unique identifier of the custom module.
Returns:
The retrieved Security Health Analytics custom module.
Raises:
Expand Down Expand Up @@ -238,3 +240,186 @@ def update_security_health_analytics_custom_module(parent: str, module_id: str):
raise

# [END securitycenter_update_security_health_analytics_custom_module]

# [START securitycenter_get_effective_security_health_analytics_custom_module]


def get_effective_security_health_analytics_custom_module(parent: str, module_id: str):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

The docstring could be improved by following the Google Python Style Guide format. It should include an Args section and a Returns section. Consider adding a Raises section as well.

Suggested change
def get_effective_security_health_analytics_custom_module(parent: str, module_id: str):
def get_effective_security_health_analytics_custom_module(parent: str, module_id: str):
"""
Retrieves a Security Health Analytics custom module.
Args:
parent (str): Resource name of the parent to retrieve the effective custom module from.
module_id (str): The unique identifier of the custom module.
Returns:
EffectiveSecurityHealthAnalyticsCustomModule: The retrieved effective custom module.
Raises:
google.api_core.exceptions.NotFound: If the custom module is not found.
"""

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

"""
Retrieves a Security Health Analytics custom module using parent and module id as parameters.
Args:
parent: Use any one of the following options:
- organizations/{organization_id}/locations/{location_id}
- folders/{folder_id}/locations/{location_id}
- projects/{project_id}/locations/{location_id}
module_id: The unique identifier of the custom module.
Returns:
The retrieved Security Health Analytics custom module.
Raises:
NotFound: If the specified custom module does not exist.
"""
client = securitycentermanagement_v1.SecurityCenterManagementClient()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per the Google style guide, prefer using client as the variable name for client objects, where appropriate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already doing it.


try:
request = securitycentermanagement_v1.GetEffectiveSecurityHealthAnalyticsCustomModuleRequest(
name=f"{parent}/effectiveSecurityHealthAnalyticsCustomModules/{module_id}",
)

response = client.get_effective_security_health_analytics_custom_module(request=request)
print(f"Retrieved Effective Security Health Analytics Custom Module: {response.name}")
return response
except NotFound as e:
print(f"Custom Module not found: {e}")
raise e
# [END securitycenter_get_effective_security_health_analytics_custom_module]

# [START securitycenter_list_descendant_security_health_analytics_custom_module]


def list_descendant_security_health_analytics_custom_module(parent: str):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

The docstring could be improved by following the Google Python Style Guide format. It should include an Args section and a Returns section. Consider adding a Raises section as well.

Suggested change
def list_descendant_security_health_analytics_custom_module(parent: str):
def list_descendant_security_health_analytics_custom_module(parent: str):
"""
Retrieves a list of all descendant Security Health Analytics custom modules.
Args:
parent (str): Resource name of the parent to retrieve the descendant custom modules from.
Returns:
List[SecurityHealthAnalyticsCustomModule]: A list of descendant custom modules.
Raises:
google.api_core.exceptions.NotFound: If the parent resource is not found.
"""

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

"""
Retrieves list of all resident Security Health Analytics custom modules and all of its descendants.
Args:
parent: Use any one of the following options:
- organizations/{organization_id}/locations/{location_id}
- folders/{folder_id}/locations/{location_id}
- projects/{project_id}/locations/{location_id}
Returns:
A list of all resident Security Health Analytics custom modules and all of its descendants.
Raises:
NotFound: If the parent resource is not found.
"""

client = securitycentermanagement_v1.SecurityCenterManagementClient()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per the Google style guide, prefer using client as the variable name for client objects, where appropriate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already doing it.


try:
request = securitycentermanagement_v1.ListDescendantSecurityHealthAnalyticsCustomModulesRequest(
parent=parent,
)

response = client.list_descendant_security_health_analytics_custom_modules(request=request)

custom_modules = []
for custom_module in response:
print(f"Custom Module: {custom_module.name}")
custom_modules.append(custom_module)
return custom_modules
except NotFound as e:
print(f"Parent resource not found: {parent}")
raise e
except Exception as e:
print(f"An error occurred while listing custom modules: {e}")
Comment on lines +310 to +311
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Catching a generic Exception can mask unexpected errors. PEP 8 recommends catching specific exceptions whenever possible. Could you specify the type of exceptions you expect in this block, or provide more context? This will make debugging easier and improve the overall robustness of the code.

Suggested change
except Exception as e:
print(f"An error occurred while listing custom modules: {e}")
print(f"An error occurred while listing custom modules: {e}")
raise # Re-raise the exception after printing the message

raise e
# [END securitycenter_list_descendant_security_health_analytics_custom_module]

# [START securitycenter_list_effective_security_health_analytics_custom_module]


def list_effective_security_health_analytics_custom_module(parent: str):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

Consider using a more descriptive function name, such as list_effective_sha_custom_modules.

Suggested change
def list_effective_security_health_analytics_custom_module(parent: str):
def list_effective_sha_custom_modules(parent: str):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

The docstring could be improved by following the Google Python Style Guide format. It should include an Args section and a Returns section. Consider adding a Raises section as well.

Suggested change
def list_effective_security_health_analytics_custom_module(parent: str):
def list_effective_security_health_analytics_custom_module(parent: str):
"""
Retrieves a list of all effective Security Health Analytics custom modules.
Args:
parent (str): Resource name of the parent to retrieve the effective custom modules from.
Returns:
List[EffectiveSecurityHealthAnalyticsCustomModule]: A list of effective custom modules.
Raises:
google.api_core.exceptions.NotFound: If the parent resource is not found.
"""

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

"""
Retrieves list of all Security Health Analytics custom modules.
This includes resident modules defined at the scope of the parent,
and inherited modules, inherited from ancestor organizations, folders, and projects (no descendants).

Args:
parent: Use any one of the following options:
- organizations/{organization_id}/locations/{location_id}
- folders/{folder_id}/locations/{location_id}
- projects/{project_id}/locations/{location_id}
Returns:
List of retrieved all Security Health Analytics custom modules.
Raises:
NotFound: If the parent resource is not found.
"""

client = securitycentermanagement_v1.SecurityCenterManagementClient()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per the Google style guide, prefer using client as the variable name for client objects, where appropriate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already doing it.


try:
request = securitycentermanagement_v1.ListEffectiveSecurityHealthAnalyticsCustomModulesRequest(
parent=parent,
)

response = client.list_effective_security_health_analytics_custom_modules(request=request)

custom_modules = []
for custom_module in response:
print(f"Custom Module: {custom_module.name}")
custom_modules.append(custom_module)
return custom_modules
except NotFound as e:
print(f"Parent resource not found: {parent}")
raise e
except Exception as e:
print(f"An error occurred while listing custom modules: {e}")
Comment on lines +352 to +353
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Catching a generic Exception can mask unexpected errors. PEP 8 recommends catching specific exceptions whenever possible. Could you specify the type of exceptions you expect in this block, or provide more context? This practice enhances code clarity and aids in debugging.

Suggested change
except Exception as e:
print(f"An error occurred while listing custom modules: {e}")
print(f"An error occurred while listing custom modules: {e}")
raise # Re-raise the exception after printing the message

raise e
# [END securitycenter_list_effective_security_health_analytics_custom_module]

# [START securitycenter_simulate_security_health_analytics_custom_module]


def simulate_security_health_analytics_custom_module(parent: str):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

Consider using a more descriptive function name, such as simulate_sha_custom_module.

Suggested change
def simulate_security_health_analytics_custom_module(parent: str):
def simulate_sha_custom_module(parent: str):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

The docstring could be improved by following the Google Python Style Guide format. It should include an Args section and a Returns section.

Suggested change
def simulate_security_health_analytics_custom_module(parent: str):
def simulate_security_health_analytics_custom_module(parent: str):
"""
Simulates the result of using a SecurityHealthAnalyticsCustomModule to check a resource.
Args:
parent (str): Resource name of the parent to simulate the custom module for.
Returns:
SimulateSecurityHealthAnalyticsCustomModuleResponse: The simulation response.
"""

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

"""
Simulates the result of using a SecurityHealthAnalyticsCustomModule to check a resource.

Args:
parent: Use any one of the following options:
- organizations/{organization_id}/locations/{location_id}
- folders/{folder_id}/locations/{location_id}
- projects/{project_id}/locations/{location_id}
Returns:
The simulation response of Security Health Analytics custom module.
"""

client = securitycentermanagement_v1.SecurityCenterManagementClient()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per the Google style guide, prefer using client as the variable name for client objects, where appropriate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already doing it.


# Define the custom config configuration
custom_config = {
"description": (
"Sample custom module for testing purposes. This custom module evaluates "
"Cloud KMS CryptoKeys to ensure their rotation period exceeds 30 days (2592000 seconds)."
),
"predicate": {
"expression": "has(resource.rotationPeriod) && (resource.rotationPeriod > duration('2592000s'))",
"title": "Cloud KMS CryptoKey Rotation Period",
"description": (
"Evaluates whether the rotation period of a Cloud KMS CryptoKey exceeds 30 days. "
"A longer rotation period might increase the risk of exposure."
),
},
"recommendation": (
"Review and adjust the rotation period for Cloud KMS CryptoKeys to align with your security policies. "
"Consider setting a shorter rotation period if possible."
),
"resource_selector": {"resource_types": ["cloudkms.googleapis.com/CryptoKey"]},
"severity": "CRITICAL",
"custom_output": {
"properties": [
{
"name": "example_property",
"value_expression": {
"description": "The resource name of the CryptoKey being evaluated.",
"expression": "resource.name",
"location": "global",
"title": "CryptoKey Resource Name",
},
}
]
},
}

# Initialize request argument(s)
resource = securitycentermanagement_v1.types.SimulateSecurityHealthAnalyticsCustomModuleRequest.SimulatedResource()
resource.resource_type = "cloudkms.googleapis.com/CryptoKey" # Replace with the correct resource type
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The resource type is hardcoded here. Consider making this a parameter to the function or retrieving it from the custom_config. This would make the function more flexible and reusable. For example, you could add a resource_type field to the custom_config dictionary and retrieve it within the function.

Suggested change
resource.resource_type = "cloudkms.googleapis.com/CryptoKey" # Replace with the correct resource type
resource.resource_type = custom_config.get("resource_selector", {}).get("resource_types", ["YOUR_RESOURCE_TYPE"])[0]


request = securitycentermanagement_v1.SimulateSecurityHealthAnalyticsCustomModuleRequest(
parent=parent,
custom_config=custom_config,
resource=resource,
)

response = client.simulate_security_health_analytics_custom_module(request=request)

print(f"Simulated Security Health Analytics Custom Module: {response}")
return response

# [END securitycenter_simulate_security_health_analytics_custom_module]
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,9 @@ def teardown():


def setup_shared_modules():
for _ in range(3) :
_, module_id = add_custom_module(ORGANIZATION_ID)
if module_id != "" :
shared_modules.append(module_id)
_, module_id = add_custom_module(ORGANIZATION_ID)
if module_id != "" :
shared_modules.append(module_id)


def add_module_to_cleanup(module_id):
Expand Down Expand Up @@ -132,7 +131,17 @@ def extract_custom_module_id(module_name):
return ""


@backoff.on_exception(
backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3
)
def add_custom_module(org_id: str):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

This function is missing a docstring explaining its purpose and parameters.

Suggested change
def add_custom_module(org_id: str):
def add_custom_module(org_id: str):
"""
Adds a new SHA custom module.
Args:
org_id (str): The organization ID.
Returns:
Tuple[str, str]: The name and ID of the created custom module.
"""

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

"""
Adds a new SHA custom module.
Args:
org_id (str): The organization ID.
Returns:
Tuple[str, str]: The name and ID of the created custom module.
"""

parent = f"organizations/{org_id}/locations/global"
client = securitycentermanagement_v1.SecurityCenterManagementClient()
Expand All @@ -146,24 +155,24 @@ def add_custom_module(org_id: str):
"display_name": display_name,
"enablement_state": "ENABLED",
"custom_config": {
"description": "Sample custom module for testing purpose. Please do not delete.",
"description": "Sample custom module for testing purposes. Please do not delete.",
"predicate": {
"expression": "has(resource.rotationPeriod) && (resource.rotationPeriod > duration('2592000s'))",
"title": "GCE Instance High Severity",
"description": "Custom module to detect high severity issues on GCE instances.",
"title": "Cloud KMS CryptoKey Rotation Period",
"description": "Custom module to detect CryptoKeys with rotation period greater than 30 days.",
},
"recommendation": "Ensure proper security configurations on GCE instances.",
"recommendation": "Review and adjust the rotation period for Cloud KMS CryptoKeys.",
"resource_selector": {"resource_types": ["cloudkms.googleapis.com/CryptoKey"]},
"severity": "CRITICAL",
"custom_output": {
"properties": [
{
"name": "example_property",
"value_expression": {
"description": "The name of the instance",
"description": "The resource name of the CryptoKey",
"expression": "resource.name",
"location": "global",
"title": "Instance Name",
"title": "CryptoKey Resource Name",
},
}
]
Expand Down Expand Up @@ -211,7 +220,8 @@ def test_get_security_health_analytics_custom_module():

assert response is not None, "Failed to retrieve the custom module."
assert response.display_name.startswith(PREFIX)
assert response.enablement_state == securitycentermanagement_v1.SecurityHealthAnalyticsCustomModule.EnablementState.ENABLED
response_org_id = response.name.split("/")[1] # Extract organization ID from the name field
assert response_org_id == ORGANIZATION_ID, f"Organization ID mismatch: Expected {ORGANIZATION_ID}, got {response_org_id}."


@backoff.on_exception(
Expand All @@ -229,6 +239,7 @@ def test_delete_security_health_analytics_custom_module():
assert response is None

print(f"Custom module was deleted successfully: {module_id}")
shared_modules.remove(module_id)


@backoff.on_exception(
Expand All @@ -249,12 +260,74 @@ def test_list_security_health_analytics_custom_module():
)
def test_update_security_health_analytics_custom_module():

module_id = get_random_shared_module()
parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}"
response = security_health_analytics_custom_modules.create_security_health_analytics_custom_module(parent)
module_id = extract_custom_module_id(response.name)
add_module_to_cleanup(module_id)

# Retrieve the custom modules
updated_custom_module = security_health_analytics_custom_modules.update_security_health_analytics_custom_module(parent, module_id)

assert updated_custom_module is not None, "Failed to retrieve the updated custom module."
response_org_id = updated_custom_module.name.split("/")[1] # Extract organization ID from the name field
assert response_org_id == ORGANIZATION_ID, f"Organization ID mismatch: Expected {ORGANIZATION_ID}, got {response_org_id}."
assert updated_custom_module.enablement_state == securitycentermanagement_v1.SecurityHealthAnalyticsCustomModule.EnablementState.DISABLED


@backoff.on_exception(
backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3
)
def test_get_effective_security_health_analytics_custom_module():
"""Tests getting an effective SHA custom module."""
module_id = get_random_shared_module()
parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}"

# Retrieve the custom module
response = security_health_analytics_custom_modules.get_effective_security_health_analytics_custom_module(parent, module_id)

assert response is not None, "Failed to retrieve the custom module."
# Verify that the custom module was created
assert response.display_name.startswith(PREFIX)
response_org_id = response.name.split("/")[1] # Extract organization ID from the name field
assert response_org_id == ORGANIZATION_ID, f"Organization ID mismatch: Expected {ORGANIZATION_ID}, got {response_org_id}."


@backoff.on_exception(
backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3
)
def test_list_descendant_security_health_analytics_custom_module():
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

This function is missing a docstring explaining its purpose.

Suggested change
def test_list_descendant_security_health_analytics_custom_module():
def test_list_descendant_security_health_analytics_custom_module():
"""Tests listing descendant SHA custom modules."""

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

"""Tests listing descendant SHA custom modules."""
parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}"
# Retrieve the list descendant custom modules
custom_modules = security_health_analytics_custom_modules.list_descendant_security_health_analytics_custom_module(parent)

assert custom_modules is not None, "Failed to retrieve the custom modules."
assert len(custom_modules) > 0, "No custom modules were retrieved."


@backoff.on_exception(
backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3
)
def test_list_effective_security_health_analytics_custom_module():
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

This function is missing a docstring explaining its purpose.

Suggested change
def test_list_effective_security_health_analytics_custom_module():
def test_list_effective_security_health_analytics_custom_module():
"""Tests listing effective SHA custom modules."""

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

"""Tests listing effective SHA custom modules."""
parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}"
# Retrieve the list of custom modules
custom_modules = security_health_analytics_custom_modules.list_effective_security_health_analytics_custom_module(parent)

assert custom_modules is not None, "Failed to retrieve the custom modules."
assert len(custom_modules) > 0, "No custom modules were retrieved."


@backoff.on_exception(
backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3
)
def test_simulate_security_health_analytics_custom_module():
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

This function is missing a docstring explaining its purpose.

Suggested change
def test_simulate_security_health_analytics_custom_module():
def test_simulate_security_health_analytics_custom_module():
"""Tests simulating an SHA custom module."""

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

"""Tests simulating an SHA custom module."""
parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}"

simulated_custom_module = security_health_analytics_custom_modules.simulate_security_health_analytics_custom_module(parent)

assert simulated_custom_module is not None, "Failed to retrieve the simulated custom module."
assert simulated_custom_module.result.no_violation is not None, (
f"Expected no_violation to be present, got {simulated_custom_module.result}."
)