Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions src/clusterfuzz/_internal/swarming/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from clusterfuzz._internal.config import local_config
from clusterfuzz._internal.datastore import data_types
from clusterfuzz._internal.google_cloud_utils import credentials
from clusterfuzz._internal.metrics import logs
from clusterfuzz._internal.protos import swarming_pb2
from clusterfuzz._internal.system import environment

Expand Down Expand Up @@ -67,7 +68,7 @@ def _get_task_dimensions(job: data_types.Job, platform_specific_dimensions: list
""" Gets all swarming dimensions for a task.
Job dimensions have more precedence than static dimensions"""
unique_dimensions = {}
unique_dimensions['os'] = job.platform
unique_dimensions['os'] = str(job.platform).capitalize()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the call to str() actually needed? Isn't job.platform already a string? I suppose we need to capitalize it since Swarming is not accepting the string as is, correct? Is it because it's not a string or because it's not capitalized?

Copy link
Copy Markdown
Collaborator Author

@IvanBM18 IvanBM18 Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean the str() casting is not necessary, but i just add it to avoid one more error from displaying in the screen:
image

Worth noting that platform is never empty because is a requirement to add it to the data store

unique_dimensions['pool'] = _get_swarming_config().get('swarming_pool')

for dimension in platform_specific_dimensions:
Expand Down Expand Up @@ -133,6 +134,7 @@ def _get_new_task_spec(command: str, job_name: str,
swarming_pb2.StringPair(key='UWORKER', value='True'), # pylint: disable=no-member
swarming_pb2.StringPair(key='SWARMING_BOT', value='True'), # pylint: disable=no-member
swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'), # pylint: disable=no-member
swarming_pb2.StringPair(key='IS_K8S_ENV', value='True'), # pylint: disable=no-member
swarming_pb2.StringPair( # pylint: disable=no-member
key='LOGGING_CLOUD_PROJECT_ID',
value=logs_project_id),
Expand Down Expand Up @@ -181,7 +183,11 @@ def push_swarming_task(command, download_url, job_type):
raise ValueError('invalid job_name')

task_spec = _get_new_task_spec(command, job_type, download_url)
creds, _ = credentials.get_default(_SWARMING_SCOPES)
creds = credentials.get_scoped_service_account_credentials(_SWARMING_SCOPES)
if not creds:
logs.error(
'[Swarming] Failed to push task into swarming. Reason: No credentials.')
return

if not creds.token:
creds.refresh(requests.Request())
Expand All @@ -193,5 +199,8 @@ def push_swarming_task(command, download_url, job_type):
}
swarming_server = _get_swarming_config().get('swarming_server')
url = f'https://{swarming_server}/prpc/swarming.v2.Tasks/NewTask'
utils.post_url(
url=url, data=json_format.MessageToJson(task_spec), headers=headers)
message_body = json_format.MessageToJson(task_spec)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this a security concern to log the whole body to the console?
Might want to consider logging it conditionally depending on the environment.

Copy link
Copy Markdown
Collaborator Author

@IvanBM18 IvanBM18 Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess not since logs are only readable trough the GCP project, but im not 100% sure of this affirmation, since im not fully aware of who has access to this projects.

@javanlacerda wdyt?

logs.info(f"""[Swarming] Pushing task to {url}
as {creds.service_account_email} with {message_body}""")
response = utils.post_url(url=url, data=message_body, headers=headers)
logs.info(f'[Swarming] Response: {response}')
34 changes: 22 additions & 12 deletions src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def setUp(self):
'clusterfuzz._internal.base.utils.post_url',
'clusterfuzz._internal.swarming._get_task_name',
'clusterfuzz._internal.google_cloud_utils.credentials.get_default',
'clusterfuzz._internal.google_cloud_utils.credentials.get_scoped_service_account_credentials',
'google.auth.transport.requests.Request',
'clusterfuzz._internal.swarming.FeatureFlags',
])
Expand Down Expand Up @@ -62,7 +63,8 @@ def test_get_spec_from_config_with_docker_image(self):
'luci-auth', 'context', '--', './linux_entry_point.sh'
],
dimensions=[
swarming_pb2.StringPair(key='os', value=job.platform),
swarming_pb2.StringPair(
key='os', value=str(job.platform).capitalize()),
swarming_pb2.StringPair(key='pool', value='pool-name')
],
cipd_input=swarming_pb2.CipdInput(), # pylint: disable=no-member
Expand All @@ -82,12 +84,13 @@ def test_get_spec_from_config_with_docker_image(self):
swarming_pb2.StringPair(
key='DOCKER_ENV_VARS',
value=
'{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}'
'{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "IS_K8S_ENV": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}'
),
swarming_pb2.StringPair(key='UWORKER', value='True'),
swarming_pb2.StringPair(
key='SWARMING_BOT', value='True'),
swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'),
swarming_pb2.StringPair(key='IS_K8S_ENV', value='True'),
swarming_pb2.StringPair(
key='LOGGING_CLOUD_PROJECT_ID', value='project_id'),
],
Expand Down Expand Up @@ -126,7 +129,8 @@ def test_get_spec_from_config_without_docker_image(self):
'luci-auth', 'context', '--', './mac_entry_point.sh'
],
dimensions=[
swarming_pb2.StringPair(key='os', value=job.platform),
swarming_pb2.StringPair(
key='os', value=str(job.platform).capitalize()),
swarming_pb2.StringPair(key='pool', value='pool-name'),
swarming_pb2.StringPair(key='key1', value='value1'),
swarming_pb2.StringPair(key='key2', value='value2'),
Expand Down Expand Up @@ -155,12 +159,13 @@ def test_get_spec_from_config_without_docker_image(self):
swarming_pb2.StringPair(
key='DOCKER_ENV_VARS',
value=
'{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}'
'{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "IS_K8S_ENV": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}'
),
swarming_pb2.StringPair(key='UWORKER', value='True'),
swarming_pb2.StringPair(
key='SWARMING_BOT', value='True'),
swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'),
swarming_pb2.StringPair(key='IS_K8S_ENV', value='True'),
swarming_pb2.StringPair(
key='LOGGING_CLOUD_PROJECT_ID', value='project_id'),
],
Expand Down Expand Up @@ -196,7 +201,8 @@ def test_get_spec_from_config_for_fuzz_task(self):
'luci-auth', 'context', '--', './linux_entry_point.sh'
],
dimensions=[
swarming_pb2.StringPair(key='os', value=job.platform),
swarming_pb2.StringPair(
key='os', value=str(job.platform).capitalize()),
swarming_pb2.StringPair(key='pool', value='pool-name')
],
cipd_input=swarming_pb2.CipdInput(), # pylint: disable=no-member
Expand All @@ -216,12 +222,13 @@ def test_get_spec_from_config_for_fuzz_task(self):
swarming_pb2.StringPair(
key='DOCKER_ENV_VARS',
value=
'{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}'
'{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "IS_K8S_ENV": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}'
),
swarming_pb2.StringPair(key='UWORKER', value='True'),
swarming_pb2.StringPair(
key='SWARMING_BOT', value='True'),
swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'),
swarming_pb2.StringPair(key='IS_K8S_ENV', value='True'),
swarming_pb2.StringPair(
key='LOGGING_CLOUD_PROJECT_ID', value='project_id'),
],
Expand All @@ -234,7 +241,7 @@ def test_push_swarming_task(self):
"""Tests that push_swarming_task works as expected."""
mock_creds = mock.MagicMock()
mock_creds.token = 'fake_token'
self.mock.get_default.return_value = (mock_creds, None)
self.mock.get_scoped_service_account_credentials.return_value = mock_creds

job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX')
job.put()
Expand All @@ -253,7 +260,8 @@ def test_push_swarming_task(self):
'luci-auth', 'context', '--', './linux_entry_point.sh'
],
dimensions=[
swarming_pb2.StringPair(key='os', value=job.platform),
swarming_pb2.StringPair(
key='os', value=str(job.platform).capitalize()),
swarming_pb2.StringPair(key='pool', value='pool-name')
],
cipd_input=swarming_pb2.CipdInput(), # pylint: disable=no-member
Expand All @@ -273,20 +281,22 @@ def test_push_swarming_task(self):
swarming_pb2.StringPair(
key='DOCKER_ENV_VARS',
value=
'{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}'
'{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "IS_K8S_ENV": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}'
),
swarming_pb2.StringPair(key='UWORKER', value='True'),
swarming_pb2.StringPair(
key='SWARMING_BOT', value='True'),
swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'),
swarming_pb2.StringPair(key='IS_K8S_ENV', value='True'),
swarming_pb2.StringPair(
key='LOGGING_CLOUD_PROJECT_ID', value='project_id'),
],
secret_bytes=base64.b64encode(
'https://download_url'.encode('utf-8'))))
])

self.mock.get_default.assert_called_with(swarming._SWARMING_SCOPES) # pylint: disable=protected-access
self.mock.get_scoped_service_account_credentials.assert_called_with(
swarming._SWARMING_SCOPES) # pylint: disable=protected-access
expected_headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
Expand All @@ -302,7 +312,7 @@ def test_push_swarming_task_with_refresh(self):
"""Tests that push_swarming_task refreshes credentials if token is missing."""
mock_creds = mock.MagicMock()
mock_creds.token = None
self.mock.get_default.return_value = (mock_creds, None)
self.mock.get_scoped_service_account_credentials.return_value = mock_creds

def refresh_side_effect(_):
mock_creds.token = 'refreshed_token'
Expand Down Expand Up @@ -370,7 +380,7 @@ def test_get_task_dimensions_job_precedence(self):
dimensions = spec.task_slices[0].properties.dimensions

expected_dimensions = [
swarming_pb2.StringPair(key='os', value='MAC'),
swarming_pb2.StringPair(key='os', value='Mac'),
swarming_pb2.StringPair(key='pool', value='pool-name'),
swarming_pb2.StringPair(key='key1', value='job_value1'),
swarming_pb2.StringPair(key='key2', value='value2'),
Expand Down
Loading