From c0565a98a88c72fb5cd449b84409d931a60978a2 Mon Sep 17 00:00:00 2001 From: Javan Lacerda Date: Thu, 8 Jan 2026 20:11:16 +0000 Subject: [PATCH 1/3] fix enabled feature glag Signed-off-by: Javan Lacerda --- .../_internal/remote_task/__init__.py | 35 ++++++++----------- .../_internal/remote_task/job_frequency.py | 29 ++------------- 2 files changed, 16 insertions(+), 48 deletions(-) diff --git a/src/clusterfuzz/_internal/remote_task/__init__.py b/src/clusterfuzz/_internal/remote_task/__init__.py index 261bed8174c..fdd7841c2cf 100644 --- a/src/clusterfuzz/_internal/remote_task/__init__.py +++ b/src/clusterfuzz/_internal/remote_task/__init__.py @@ -76,13 +76,13 @@ def __init__(self): self._gcp_batch_service = GcpBatchService() self._kubernetes_service = KubernetesService() - def _should_use_kubernetes(self, job_type: str) -> bool: + def _should_use_kubernetes(self) -> bool: """Determines whether to use the Kubernetes backend for a given job. The decision is made based on a random roll and the configured frequency for the given job type. """ - frequencies = job_frequency.get_job_frequency(job_type) + frequencies = job_frequency.get_job_frequency() return random.random() < frequencies['kubernetes'] def create_uworker_main_batch_job(self, module: str, job_type: str, @@ -91,7 +91,7 @@ def create_uworker_main_batch_job(self, module: str, job_type: str, The choice of backend is determined by the `_should_use_kubernetes` method. """ - if self._should_use_kubernetes(job_type): + if self._should_use_kubernetes(): return self._kubernetes_service.create_uworker_main_batch_job( module, job_type, input_download_url) return self._gcp_batch_service.create_uworker_main_batch_job( @@ -106,28 +106,21 @@ def create_uworker_main_batch_jobs(self, remote_tasks: List[RemoteTask]): gcp_batch_tasks = [] kubernetes_tasks = [] - # Group tasks by job_type to respect per-job frequencies - tasks_by_job = collections.defaultdict(list) - for task in remote_tasks: - tasks_by_job[task.job_type].append(task) - - for job_type, tasks in tasks_by_job.items(): - # Use random distribution if there is only one task - if len(tasks) == 1: - if self._should_use_kubernetes(job_type): - kubernetes_tasks.extend(tasks) - else: - gcp_batch_tasks.extend(tasks) - continue - + # Use random distribution if there is only one task + if len(remote_tasks) == 1: + if self._should_use_kubernetes(): + kubernetes_tasks.extend(remote_tasks) + else: + gcp_batch_tasks.extend(remote_tasks) + else: # Use deterministic slicing for multiple tasks - frequencies = job_frequency.get_job_frequency(job_type) + frequencies = job_frequency.get_job_frequency() k8s_ratio = frequencies['kubernetes'] - k8s_count = int(len(tasks) * k8s_ratio) + k8s_count = int(len(remote_tasks) * k8s_ratio) # We take the first chunk for Kubernetes - kubernetes_tasks.extend(tasks[:k8s_count]) - gcp_batch_tasks.extend(tasks[k8s_count:]) + kubernetes_tasks.extend(remote_tasks[:k8s_count]) + gcp_batch_tasks.extend(remote_tasks[k8s_count:]) logs.info(f'Sending {len(gcp_batch_tasks)} tasks to GCP Batch.') logs.info(f'Sending {len(kubernetes_tasks)} tasks to Kubernetes.') diff --git a/src/clusterfuzz/_internal/remote_task/job_frequency.py b/src/clusterfuzz/_internal/remote_task/job_frequency.py index 123f0d661de..b433d1c004b 100644 --- a/src/clusterfuzz/_internal/remote_task/job_frequency.py +++ b/src/clusterfuzz/_internal/remote_task/job_frequency.py @@ -26,35 +26,10 @@ DEFAULT_FREQUENCY = {'gcp_batch': 1.0, 'kubernetes': 0.1} -def _get_job_frequencies_from_env(): - """Parses the `K8S_JOBS_FREQUENCY` environment variable. - - The variable should be a comma-separated list of key-value pairs, where the - key is the job name and the value is the frequency (a float between 0 and 1). - For example: `libfuzzer_asan_chrome=0.5,libfuzzer_msan_chrome=0.2`. - """ - job_frequencies = {} - frequency_string = environment.get_value('K8S_JOBS_FREQUENCY') - if not frequency_string: - return {} - - for item in frequency_string.split(','): - key, value = item.split('=') - job_frequencies[key] = float(value) - return job_frequencies - - -def get_job_frequency(job_name): +def get_job_frequency(): """Returns the frequency for a given job. If the frequency is not explicitly defined in the `K8S_JOBS_FREQUENCY` environment variable, the default frequency is returned. """ - job_frequencies = _get_job_frequencies_from_env() - if job_name in job_frequencies: - kubernetes_frequency = job_frequencies[job_name] - return { - 'gcp_batch': 1.0 - kubernetes_frequency, - 'kubernetes': kubernetes_frequency - } - return DEFAULT_FREQUENCY + return DEFAULT_FREQUENCY \ No newline at end of file From d77df8eddc22b3ea1caefb989fb901d38af0fc24 Mon Sep 17 00:00:00 2001 From: Javan Lacerda Date: Fri, 9 Jan 2026 11:23:01 -0300 Subject: [PATCH 2/3] Pr/cleanup scripts (#5119) Signed-off-by: Javan Lacerda --- butler.py | 1 - src/clusterfuzz/_internal/base/tasks/__init__.py | 6 ++++++ src/clusterfuzz/_internal/batch/service.py | 7 +------ src/clusterfuzz/_internal/k8s/service.py | 3 +++ src/clusterfuzz/_internal/remote_task/job_frequency.py | 2 +- 5 files changed, 11 insertions(+), 8 deletions(-) diff --git a/butler.py b/butler.py index 15bc9f0ef8f..f8757dbac00 100644 --- a/butler.py +++ b/butler.py @@ -435,7 +435,6 @@ def main(): 'clean_indexes', help='Clean up undefined indexes (in index.yaml).') parser_clean_indexes.add_argument( '-c', '--config-dir', required=True, help='Path to application config.') - parser_create_config = subparsers.add_parser( 'create_config', help='Create a new deployment config.') parser_create_config.add_argument( diff --git a/src/clusterfuzz/_internal/base/tasks/__init__.py b/src/clusterfuzz/_internal/base/tasks/__init__.py index 1220b8a2085..acb70f89ca1 100644 --- a/src/clusterfuzz/_internal/base/tasks/__init__.py +++ b/src/clusterfuzz/_internal/base/tasks/__init__.py @@ -64,6 +64,12 @@ 'regression': 24 * 60 * 60, } + +def get_task_duration(command): + """Gets the duration of a task.""" + return TASK_LEASE_SECONDS_BY_COMMAND.get(command, TASK_LEASE_SECONDS) + + TASK_QUEUE_DISPLAY_NAMES = { 'LINUX': 'Linux', 'LINUX_WITH_GPU': 'Linux (with GPU)', diff --git a/src/clusterfuzz/_internal/batch/service.py b/src/clusterfuzz/_internal/batch/service.py index a04106a0560..c11aea0a6da 100644 --- a/src/clusterfuzz/_internal/batch/service.py +++ b/src/clusterfuzz/_internal/batch/service.py @@ -238,11 +238,6 @@ def _get_config_names(batch_tasks: List[RemoteTask]): return config_map -def _get_task_duration(command): - return tasks.TASK_LEASE_SECONDS_BY_COMMAND.get(command, - tasks.TASK_LEASE_SECONDS) - - WeightedSubconfig = collections.namedtuple('WeightedSubconfig', ['name', 'weight']) @@ -293,7 +288,7 @@ def _get_specs_from_config(batch_tasks: List[RemoteTask]) -> Dict: # Lower numbers are a lower priority, meaning less likely to run From: # https://cloud.google.com/batch/docs/reference/rest/v1/projects.locations.jobs priority = 0 if task.command == 'fuzz' else 1 - max_run_duration = f'{_get_task_duration(task.command)}s' + max_run_duration = f'{tasks.get_task_duration(task.command)}s' # This saves us time and reduces fragementation, e.g. every linux fuzz task # run in this call will run in the same zone. if config_name not in subconfig_map: diff --git a/src/clusterfuzz/_internal/k8s/service.py b/src/clusterfuzz/_internal/k8s/service.py index 98d74e324fb..c371b247205 100644 --- a/src/clusterfuzz/_internal/k8s/service.py +++ b/src/clusterfuzz/_internal/k8s/service.py @@ -26,6 +26,7 @@ from kubernetes import client as k8s_client from kubernetes import config as k8s_config +from clusterfuzz._internal.base import tasks from clusterfuzz._internal.base import utils from clusterfuzz._internal.base.tasks import task_utils from clusterfuzz._internal.config import local_config @@ -146,6 +147,7 @@ def _create_job_body(config: KubernetesJobConfig, input_url: str, 'name': job_name }, 'spec': { + 'activeDeadlineSeconds': tasks.get_task_duration(config.command), 'template': { 'spec': { 'serviceAccountName': @@ -395,6 +397,7 @@ def create_kata_container_job(self, config: KubernetesJobConfig, 'name': job_name }, 'spec': { + 'activeDeadlineSeconds': tasks.get_task_duration(config.command), 'template': { 'metadata': { 'labels': { diff --git a/src/clusterfuzz/_internal/remote_task/job_frequency.py b/src/clusterfuzz/_internal/remote_task/job_frequency.py index b433d1c004b..4a37c5446d6 100644 --- a/src/clusterfuzz/_internal/remote_task/job_frequency.py +++ b/src/clusterfuzz/_internal/remote_task/job_frequency.py @@ -23,7 +23,7 @@ # By default, all jobs are sent to the GCP Batch backend. This can be # overridden on a per-job basis by setting the `K8S_JOBS_FREQUENCY` # environment variable. -DEFAULT_FREQUENCY = {'gcp_batch': 1.0, 'kubernetes': 0.1} +DEFAULT_FREQUENCY = {'gcp_batch': 1.0, 'kubernetes': 0.0} def get_job_frequency(): From 961e4a70b088f0fd58c07bb6a6e1a6828d300327 Mon Sep 17 00:00:00 2001 From: Javan Lacerda Date: Fri, 9 Jan 2026 14:34:54 +0000 Subject: [PATCH 3/3] add endline Signed-off-by: Javan Lacerda --- src/clusterfuzz/_internal/remote_task/job_frequency.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/clusterfuzz/_internal/remote_task/job_frequency.py b/src/clusterfuzz/_internal/remote_task/job_frequency.py index 4a37c5446d6..0347738cbb2 100644 --- a/src/clusterfuzz/_internal/remote_task/job_frequency.py +++ b/src/clusterfuzz/_internal/remote_task/job_frequency.py @@ -32,4 +32,4 @@ def get_job_frequency(): If the frequency is not explicitly defined in the `K8S_JOBS_FREQUENCY` environment variable, the default frequency is returned. """ - return DEFAULT_FREQUENCY \ No newline at end of file + return DEFAULT_FREQUENCY