Skip to content

Commit

Permalink
delay ray.get using generators
Browse files Browse the repository at this point in the history
  • Loading branch information
aromanielloNTIA committed Apr 27, 2023
1 parent eb3de44 commit 3b8f941
Showing 1 changed file with 15 additions and 13 deletions.
28 changes: 15 additions & 13 deletions scos_actions/actions/acquire_sea_data_product.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,11 +331,9 @@ def generate_data_product(
procs.append(get_td_power_results.remote(iqdata_ref, params))
procs.append(get_periodic_frame_power.remote(iqdata_ref, params))
procs.append(get_apd_results.remote(iqdata_ref, params))
yield procs

for p in procs:
yield ray.get(p)

del iqdata, procs, p
del iqdata, procs
gc.collect()


Expand Down Expand Up @@ -427,7 +425,7 @@ def __call__(self, schedule_entry, task_id):
# Start data product processing but do not block next IQ capture
tic = perf_counter()
dp_procs.append(
generate_data_product.remote(
generate_data_product.remote( # Copies IQ to Ray object store
measurement_result["data"], parameters, self.iir_sos
)
)
Expand Down Expand Up @@ -457,19 +455,23 @@ def __call__(self, schedule_entry, task_id):
extra_entries=cap_entries[i],
)

# Now wait for channel data to be processed
# Retrieve object references for channel data
channel_data_refs = ray.get(channel_data_process)
channel_data = []
tic = perf_counter()
for j, d in enumerate(channel_data_process):
if j == 1: # Power-vs-Time results
data = ray.get(d)
for j, data_ref in enumerate(channel_data_refs):
# Now block until the data is ready
data = ray.get(data_ref)
if j == 0:
# Power-vs-Time results
channel_data.extend(data[:2])
max_max_ch_pwrs.append(DATA_TYPE(data[2]))
med_mean_ch_pwrs.append(DATA_TYPE(data[3]))
if j == 3: # APD results
channel_data.append(ray.get(d))
elif j == 3:
# APD results
channel_data.append(data)
else:
channel_data.extend(ray.get(d))
channel_data.extend(data)

toc = perf_counter()
logger.debug(f"Waited {toc-tic} s for channel {i} data")
all_data.extend(NasctnSeaDataProduct.transform_data(channel_data))
Expand Down

0 comments on commit 3b8f941

Please sign in to comment.