Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve how pycbc_live handles localization-only detectors #4635

Merged
119 changes: 74 additions & 45 deletions bin/pycbc_live
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,13 @@ class LiveEventManager(object):
def __init__(self, args, bank):
self.low_frequency_cutoff = args.low_frequency_cutoff
self.bank = bank
self.skymap_only_ifos = [] if args.skymap_only_ifos is None else list(set(args.skymap_only_ifos))
# all interferometers involved in the analysis, whether for generating
# candidates or doing localization only
self.ifos = set(args.channel_name.keys())
# interferometers used for localization only
self.skymap_only_ifos = set(args.skymap_only_ifos or [])
# subset of the interferometers allowed to produce candidates
self.trigg_ifos = self.ifos - self.skymap_only_ifos

# Figure out what we are supposed to process within the pool of MPI processes
self.comm = mpi.COMM_WORLD
Expand Down Expand Up @@ -117,7 +123,7 @@ class LiveEventManager(object):
if self.run_snr_optimization:
# preestimate the number of CPU cores that we can afford giving
# to followup processes without slowing down the main search
bg_cores = len(tuple(itertools.combinations(ifos, 2)))
bg_cores = len(tuple(itertools.combinations(self.trigg_ifos, 2)))
analysis_cores = 1 + bg_cores
if platform.system() != 'Darwin':
available_cores = len(os.sched_getaffinity(0))
Expand Down Expand Up @@ -502,7 +508,7 @@ class LiveEventManager(object):
logging.info('computing followup data for coinc')
coinc_ifos = coinc_results['foreground/type'].split('-')
followup_ifos = set(ifos) - set(coinc_ifos)
followup_ifos = list(followup_ifos | set(self.skymap_only_ifos))
followup_ifos = list(followup_ifos | self.skymap_only_ifos)

double_ifar = coinc_results['foreground/ifar']
if double_ifar < args.ifar_double_followup_threshold:
Expand Down Expand Up @@ -537,13 +543,20 @@ class LiveEventManager(object):
logging.info('Coincident candidate! Saving as %s', fname)

# Verbally explain some details not obvious from the other info
comment = ('Trigger produced as a {} coincidence. '
'FAR is based on all listed detectors.<br />'
'Two-detector ranking statistic: {}<br />'
'Followup detectors: {}')
comment = comment.format(ppdets(coinc_ifos),
args.ranking_statistic,
ppdets(followup_ifos))
comment = (
'Trigger produced as a {} coincidence.<br />'
'Two-detector ranking statistic: {}<br />'
'Detectors used for FAR calculation: {}.<br />'
'Detectors used for localization: {}.<br />'
'Detectors used only for localization: {}.'
)
comment = comment.format(
ppdets(coinc_ifos),
args.ranking_statistic,
set(ifos) - self.skymap_only_ifos,
ppdets(followup_ifos),
ppdets(self.skymap_only_ifos)
)

ifar = coinc_results['foreground/ifar']
upload_checks = self.enable_gracedb_upload and self.ifar_upload_threshold < ifar
Expand Down Expand Up @@ -604,7 +617,7 @@ class LiveEventManager(object):
logging.info(f'Found {ifo} single with ifar {sifar}')

followup_ifos = [i for i in active if i is not ifo]
followup_ifos = list(set(followup_ifos) | set(self.skymap_only_ifos))
followup_ifos = list(set(followup_ifos) | self.skymap_only_ifos)
# Don't recompute ifar considering other ifos
sld = self.compute_followup_data(
[ifo],
Expand Down Expand Up @@ -636,10 +649,15 @@ class LiveEventManager(object):
logging.info('Single-detector candidate! Saving as %s', fname)

# Verbally explain some details not obvious from the other info
comment = ('Trigger produced as a {0} single. '
'FAR is based on {0} only.<br />'
'Followup detectors: {1}')
comment = comment.format(ifo, ppdets(followup_ifos))
comment = (
'Trigger produced as a {0} single.<br />'
'Detectors used for FAR calculation: {0}.<br />'
'Detectors used for localization: {1}.<br />'
'Detectors used only for localization: {2}.'
)
comment = comment.format(
ifo, ppdets(followup_ifos), ppdets(self.skymap_only_ifos)
)

# Has a coinc event at this time been uploaded recently?
# If so, skip upload - Note that this means that we _always_
Expand Down Expand Up @@ -1017,8 +1035,6 @@ args = parser.parse_args()
scheme.verify_processing_options(args, parser)
fft.verify_fft_options(args, parser)
Coincer.verify_args(args, parser)
ifos = set(args.channel_name.keys())
analyze_singles = LiveSingle.verify_args(args, parser, ifos)

if args.output_background is not None and len(args.output_background) != 2:
parser.error('--output-background takes two parameters: period and path')
Expand All @@ -1045,14 +1061,16 @@ if bank.min_f_lower < args.low_frequency_cutoff:
'minimum f_lower across all templates '
'({} Hz)'.format(args.low_frequency_cutoff, bank.min_f_lower))

logging.info('Analyzing data from detectors %s', ppdets(ifos))

evnt = LiveEventManager(args, bank)
logging.info('Detectors that only aid in the sky localization %s', ppdets(evnt.skymap_only_ifos))

logging.info('Analyzing data from detectors %s', ppdets(evnt.ifos))
logging.info('Using %s for localization only', ppdets(evnt.skymap_only_ifos))

analyze_singles = LiveSingle.verify_args(args, parser, evnt.trigg_ifos)

# include MPI rank and functional description into proctitle
task_name = 'root' if evnt.rank == 0 else 'filtering'
setproctitle('PyCBC Live rank {:d} [{}]'.format(evnt.rank, task_name))
setproctitle(f'PyCBC Live rank {evnt.rank:d} [{task_name}]')

sg_chisq = SingleDetSGChisq.from_cli(args, bank, args.chisq_bins)

Expand Down Expand Up @@ -1111,23 +1129,27 @@ with ctx:
args.round_start_time
logging.info('Starting from: %s', args.start_time)

# initialize the data readers for all detectors
# Initialize the data readers for all detectors. For rank 0, we need data
# from all detectors, including the localization-only ones. For higher
# ranks, we only need the detectors that can generate candidates.
if args.max_length is not None:
maxlen = args.max_length
maxlen = int(maxlen)
data_reader = {ifo: StrainBuffer.from_cli(ifo, args, maxlen)
for ifo in ifos}
titodalcanton marked this conversation as resolved.
Show resolved Hide resolved
data_reader = {
ifo: StrainBuffer.from_cli(ifo, args, maxlen)
for ifo in (evnt.ifos if evnt.rank == 0 else evnt.trigg_ifos)
}
evnt.data_readers = data_reader

# create single-detector background "estimators"
if analyze_singles and evnt.rank == 0:
sngl_estimator = {ifo: LiveSingle.from_cli(args, ifo)
for ifo in ifos}
for ifo in evnt.trigg_ifos}

# Create double coincident background estimator for every combo
# Create double coincident background estimator
# for every pair of triggering interferometers
if args.enable_background_estimation and evnt.rank == 0:
trigg_ifos = [ifo for ifo in ifos if ifo not in evnt.skymap_only_ifos]
ifo_combos = itertools.combinations(trigg_ifos, 2)
ifo_combos = itertools.combinations(evnt.trigg_ifos, 2)
estimators = []
for combo in ifo_combos:
logging.info('Will calculate %s background', ppdets(combo, "-"))
Expand Down Expand Up @@ -1167,12 +1189,14 @@ with ctx:
# main analysis loop
data_end = lambda: data_reader[tuple(data_reader.keys())[0]].end_time
last_bg_dump_time = int(data_end())
psd_count = {ifo:0 for ifo in ifos}
psd_count = {ifo:0 for ifo in evnt.ifos}

# Create dicts to track whether the psd has been recalculated and to hold
# psd variation filters
psd_recalculated = {ifo: True for ifo in ifos}
psd_var_filts = {ifo: None for ifo in ifos}
psd_recalculated = {
ifo: True for ifo in (evnt.ifos if evnt.rank == 0 else evnt.trigg_ifos)
}
psd_var_filts = {ifo: None for ifo in evnt.trigg_ifos}

while data_end() < args.end_time:
t1 = pycbc.gps_now()
Expand All @@ -1181,7 +1205,7 @@ with ctx:
results = {}
evnt.live_detectors = set()

for ifo in ifos:
for ifo in (evnt.ifos if evnt.rank == 0 else evnt.trigg_ifos):
results[ifo] = False
status = data_reader[ifo].advance(
valid_pad,
Expand Down Expand Up @@ -1259,23 +1283,27 @@ with ctx:

# Calculate and add the psd variation for the results
if args.psd_variation:

for ifo in results:
logging.info(f"Calculating PSD Variation Statistic for {ifo}")
logging.info("Calculating PSD Variation Statistic for %s", ifo)

# A new filter is needed if the PSD has been recalculated
if psd_recalculated[ifo] is True:
psd_var_filts[ifo] = variation.live_create_filter(data_reader[ifo].psd,
args.psd_segment_length,
int(args.sample_rate))
psd_var_filts[ifo] = variation.live_create_filter(
data_reader[ifo].psd,
args.psd_segment_length,
int(args.sample_rate)
)
psd_recalculated[ifo] = False

psd_var_ts = variation.live_calc_psd_variation(data_reader[ifo].strain,
psd_var_filts[ifo],
args.increment)
psd_var_ts = variation.live_calc_psd_variation(
data_reader[ifo].strain,
psd_var_filts[ifo],
args.increment
)

psd_var_vals = variation.live_find_var_value(results[ifo],
psd_var_ts)
psd_var_vals = variation.live_find_var_value(
results[ifo], psd_var_ts
)

results[ifo]['psd_var_val'] = psd_var_vals

Expand All @@ -1295,7 +1323,7 @@ with ctx:
gates = {ifo: data_reader[ifo].gate_params for ifo in data_reader}

# map the results file to an hdf file
prefix = '{}-{}-{}-{}'.format(''.join(sorted(ifos)),
prefix = '{}-{}-{}-{}'.format(''.join(sorted(evnt.ifos)),
args.file_prefix,
data_end() - args.analysis_chunk,
valid_pad)
Expand All @@ -1310,8 +1338,9 @@ with ctx:
data_end() - last_bg_dump_time > float(args.output_background[0]):
last_bg_dump_time = int(data_end())
bg_dists = coinc_pool.broadcast(output_background, None)
bg_fn = '{}-LIVE_BACKGROUND-{}.hdf'.format(''.join(sorted(ifos)),
last_bg_dump_time)
bg_fn = '{}-LIVE_BACKGROUND-{}.hdf'.format(
''.join(sorted(evnt.trigg_ifos)), last_bg_dump_time
)
bg_fn = os.path.join(args.output_background[1], bg_fn)
with h5py.File(bg_fn, 'w') as bgf:
for bg_ifos, bg_data, bg_time in bg_dists:
Expand Down
Loading