Skip to content

Commit

Permalink
ray debug
Browse files Browse the repository at this point in the history
  • Loading branch information
aromanielloNTIA committed Apr 26, 2023
1 parent 9faf084 commit 585d5b1
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 15 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ dev = [
"scos-actions[test]",
"hatchling>=1.6.0,<2.0",
"pre-commit>=2.20.0",
"ray[default]>=2.4.0",
"twine>=4.0.1,<5.0",
]

Expand Down
31 changes: 16 additions & 15 deletions scos_actions/actions/acquire_sea_data_product.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@
logger = logging.getLogger(__name__)

if not ray.is_initialized():
ray.init(
include_dashboard=False,
)
ray.init() # Dashboard is enabled if ray[default] is installed

# Define parameter keys
RF_PATH = "rf_path"
Expand Down Expand Up @@ -306,20 +304,19 @@ def get_periodic_frame_power(

@ray.remote
def generate_data_product(
iqdata: np.ndarray, params: dict, iir_sos: np.ndarray
iqdata: ray.ObjectRef, params: dict, iir_sos: np.ndarray
) -> list:
"""Process IQ data and generate the SEA data product."""
iqdata = sosfilt(iir_sos, iqdata)

# Explicitly call ray.put to pass filtered IQ to nested remote procs
iqdata_id = ray.put(iqdata)
del iqdata
print(
f"GEN_DP @ {params[FREQUENCY]/1e6:.1f}: IQ data is {type(iqdata)}, {iqdata[:5]}"
)

remote_procs = [
get_fft_results.remote(iqdata_id, params),
get_td_power_results.remote(iqdata_id, params),
get_periodic_frame_power.remote(iqdata_id, params),
get_apd_results.remote(iqdata_id, params),
get_fft_results.remote(iqdata, params),
get_td_power_results.remote(iqdata, params),
get_periodic_frame_power.remote(iqdata, params),
get_apd_results.remote(iqdata, params),
]

# Return identifiers to avoid waiting for processing to complete
Expand Down Expand Up @@ -412,10 +409,14 @@ def __call__(self, schedule_entry, task_id):
for parameters in iteration_params:
measurement_result = self.capture_iq(parameters)
# Start data product processing but do not block next IQ capture
tic = perf_counter()
iqdata_ref = ray.put(measurement_result["data"])
toc = perf_counter()
logger.debug(
f"Called ray.put for channel IQ in {toc-tic:.2f} s: {measurement_result['data'][:5]}"
)
dp_procs.append(
generate_data_product.remote(
measurement_result["data"], parameters, self.iir_sos
)
generate_data_product.remote(iqdata_ref, parameters, self.iir_sos)
)
# Generate capture metadata before sigan reconfigured
cap_meta_tuple = self.create_channel_metadata(measurement_result)
Expand Down

0 comments on commit 585d5b1

Please sign in to comment.