Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use multiple Ray supervisor actors to process IQ in SEA action #85

Merged
merged 8 commits into from
Jun 28, 2023
25 changes: 19 additions & 6 deletions scos_actions/actions/acquire_sea_data_product.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
FFT_WINDOW_ECF = get_fft_window_correction(FFT_WINDOW, "energy")
IMPEDANCE_OHMS = 50.0
DATA_REFERENCE_POINT = "noise source output"
NUM_ACTORS = 3 # Number of ray actors to initialize

# Create power detectors
TD_DETECTOR = create_statistical_detector("TdMeanMaxDetector", ["mean", "max"])
Expand Down Expand Up @@ -429,7 +430,6 @@ def run(self, iqdata: np.ndarray) -> list:
# Do not wait until they finish. Yield references to their results.
yield [worker.run.remote(iqdata) for worker in self.workers]
del iqdata
gc.collect()


class NasctnSeaDataProduct(Action):
Expand Down Expand Up @@ -514,15 +514,27 @@ def __call__(self, schedule_entry, task_id):
)
self.create_global_data_product_metadata()

# Initialize remote supervisor actors for IQ processing
tic = perf_counter()
# This uses iteration_params[0] because
iq_processors = [
IQProcessor.remote(self.iteration_params[0], self.iir_sos)
for _ in range(NUM_ACTORS)
]
toc = perf_counter()
logger.debug(f"Spawned {NUM_ACTORS} supervisor actors in {toc-tic:.2f} s")

# Collect all IQ data and spawn data product computation processes
dp_procs, cpu_speed = [], []
capture_tic = perf_counter()
iq_processor = IQProcessor.remote(self.iteration_params[0], self.iir_sos)
for i, parameters in enumerate(self.iteration_params):
measurement_result = self.capture_iq(parameters)
# Start data product processing but do not block next IQ capture
tic = perf_counter()
dp_procs.append(iq_processor.run.remote(measurement_result["data"]))

dp_procs.append(
iq_processors[i % NUM_ACTORS].run.remote(measurement_result["data"])
)
del measurement_result["data"]
toc = perf_counter()
logger.debug(f"IQ data delivered for processing in {toc-tic:.2f} s")
Expand All @@ -540,7 +552,7 @@ def __call__(self, schedule_entry, task_id):
# Collect processed data product results
all_data, max_max_ch_pwrs, med_mean_ch_pwrs = [], [], []
result_tic = perf_counter()
for channel_data_process in dp_procs:
for i, channel_data_process in enumerate(dp_procs):
# Retrieve object references for channel data
channel_data_refs = ray.get(channel_data_process)
channel_data = []
Expand All @@ -563,9 +575,10 @@ def __call__(self, schedule_entry, task_id):
toc = perf_counter()
logger.debug(f"Waited {toc-tic} s for channel data")
all_data.extend(NasctnSeaDataProduct.transform_data(channel_data))
for ray_actor in iq_processors:
ray.kill(ray_actor)
result_toc = perf_counter()
del dp_procs, iq_processor, channel_data, channel_data_refs
gc.collect()
del dp_procs, iq_processors, channel_data, channel_data_refs
logger.debug(f"Got all processed data in {result_toc-result_tic:.2f} s")

# Build metadata and convert data to compressed bytes
Expand Down