Skip to content

Commit

Permalink
Merge pull request #85 from NTIA/troubleshoot-segfaults
Browse files Browse the repository at this point in the history
Use multiple Ray supervisor actors to process IQ in SEA action
  • Loading branch information
dboulware committed Jun 28, 2023
2 parents 38cc859 + cbaddd6 commit 71c3545
Showing 1 changed file with 19 additions and 6 deletions.
25 changes: 19 additions & 6 deletions scos_actions/actions/acquire_sea_data_product.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
FFT_WINDOW_ECF = get_fft_window_correction(FFT_WINDOW, "energy")
IMPEDANCE_OHMS = 50.0
DATA_REFERENCE_POINT = "noise source output"
NUM_ACTORS = 3 # Number of ray actors to initialize

# Create power detectors
TD_DETECTOR = create_statistical_detector("TdMeanMaxDetector", ["mean", "max"])
Expand Down Expand Up @@ -429,7 +430,6 @@ def run(self, iqdata: np.ndarray) -> list:
# Do not wait until they finish. Yield references to their results.
yield [worker.run.remote(iqdata) for worker in self.workers]
del iqdata
gc.collect()


class NasctnSeaDataProduct(Action):
Expand Down Expand Up @@ -514,15 +514,27 @@ def __call__(self, schedule_entry, task_id):
)
self.create_global_data_product_metadata()

# Initialize remote supervisor actors for IQ processing
tic = perf_counter()
# This uses iteration_params[0] because
iq_processors = [
IQProcessor.remote(self.iteration_params[0], self.iir_sos)
for _ in range(NUM_ACTORS)
]
toc = perf_counter()
logger.debug(f"Spawned {NUM_ACTORS} supervisor actors in {toc-tic:.2f} s")

# Collect all IQ data and spawn data product computation processes
dp_procs, cpu_speed = [], []
capture_tic = perf_counter()
iq_processor = IQProcessor.remote(self.iteration_params[0], self.iir_sos)
for i, parameters in enumerate(self.iteration_params):
measurement_result = self.capture_iq(parameters)
# Start data product processing but do not block next IQ capture
tic = perf_counter()
dp_procs.append(iq_processor.run.remote(measurement_result["data"]))

dp_procs.append(
iq_processors[i % NUM_ACTORS].run.remote(measurement_result["data"])
)
del measurement_result["data"]
toc = perf_counter()
logger.debug(f"IQ data delivered for processing in {toc-tic:.2f} s")
Expand All @@ -540,7 +552,7 @@ def __call__(self, schedule_entry, task_id):
# Collect processed data product results
all_data, max_max_ch_pwrs, med_mean_ch_pwrs = [], [], []
result_tic = perf_counter()
for channel_data_process in dp_procs:
for i, channel_data_process in enumerate(dp_procs):
# Retrieve object references for channel data
channel_data_refs = ray.get(channel_data_process)
channel_data = []
Expand All @@ -563,9 +575,10 @@ def __call__(self, schedule_entry, task_id):
toc = perf_counter()
logger.debug(f"Waited {toc-tic} s for channel data")
all_data.extend(NasctnSeaDataProduct.transform_data(channel_data))
for ray_actor in iq_processors:
ray.kill(ray_actor)
result_toc = perf_counter()
del dp_procs, iq_processor, channel_data, channel_data_refs
gc.collect()
del dp_procs, iq_processors, channel_data, channel_data_refs
logger.debug(f"Got all processed data in {result_toc-result_tic:.2f} s")

# Build metadata and convert data to compressed bytes
Expand Down

0 comments on commit 71c3545

Please sign in to comment.