Skip to content

Commit

Permalink
Removing hardcoded value for num workers
Browse files Browse the repository at this point in the history
  • Loading branch information
gustavogaldinoo committed Jun 26, 2024
1 parent 77f1aa6 commit bf47d79
Showing 1 changed file with 9 additions and 11 deletions.
20 changes: 9 additions & 11 deletions experiment/measurer/measure_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
FAIL_WAIT_SECONDS = 30
SNAPSHOT_QUEUE_GET_TIMEOUT = 1
SNAPSHOTS_BATCH_SAVE_SIZE = 100
NUM_WORKERS = 4
MEASURE_MANAGER_LOOP_TIMEOUT = 10


Expand All @@ -76,10 +75,9 @@ def measure_main(experiment_config):
experiment = experiment_config['experiment']
max_total_time = experiment_config['max_total_time']
measurers_cpus = experiment_config['measurers_cpus']
runners_cpus = experiment_config['runners_cpus']
region_coverage = experiment_config['region_coverage']
measure_manager_loop(experiment, max_total_time, measurers_cpus,
runners_cpus, region_coverage)
region_coverage)

# Clean up resources.
gc.collect()
Expand Down Expand Up @@ -723,8 +721,6 @@ def measure_manager_inner_loop(experiment: str, max_cycle: int, request_queue,
# Checking if snapshot already was queued so workers will not repeat
# measurement for same snapshot
if unmeasured_snapshot_identifier not in queued_snapshots:
# If corpus does not exist, don't put in measurer workers request
# queue.
request_queue.put(unmeasured_snapshot)
queued_snapshots.add(unmeasured_snapshot_identifier)

Expand Down Expand Up @@ -760,16 +756,17 @@ def get_pool_args(measurers_cpus, runners_cpus):
def measure_manager_loop(experiment: str,
max_total_time: int,
measurers_cpus=None,
runners_cpus=None,
region_coverage=False):
# pylint: disable=too-many-locals
"""Measure manager loop. Creates request and response queues, request
measurements tasks from workers, retrieve measurement results from response
queue and writes measured snapshots in database."""
logger.info('Starting measure manager loop.')
pool_args = get_pool_args(measurers_cpus, runners_cpus)
with multiprocessing.Pool(
*pool_args) as pool, multiprocessing.Manager() as manager:
if not measurers_cpus:
logger.info('Number of measurer CPUs not passed as argument. using %d',
multiprocessing.cpu_count())
measurers_cpus = multiprocessing.cpu_count()
with multiprocessing.Pool() as pool, multiprocessing.Manager() as manager:
logger.info('Setting up coverage binaries')
set_up_coverage_binaries(pool, experiment)
request_queue = manager.Queue()
Expand All @@ -778,14 +775,15 @@ def measure_manager_loop(experiment: str,
# Since each worker is gonna be in forever loop, we dont need result
# return. Workers life scope will end automatically when there are no
# more snapshots left to measure.
logger.info('Starting measure worker loop for %d workers', NUM_WORKERS)
logger.info('Starting measure worker loop for %d workers',
measurers_cpus)
config = {
'request_queue': request_queue,
'response_queue': response_queue,
'region_coverage': region_coverage,
}
local_measure_worker = measure_worker.LocalMeasureWorker(config)
measure_trial_coverage_args = [()] * NUM_WORKERS
measure_trial_coverage_args = [()] * measurers_cpus
_result = pool.starmap_async(local_measure_worker.measure_worker_loop,
measure_trial_coverage_args)

Expand Down

0 comments on commit bf47d79

Please sign in to comment.