Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Xfel striping fix #844

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
83 changes: 59 additions & 24 deletions xfel/command_line/striping.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# dials.combine_experiments (optionally with clustering and selecting clusters).
#
from dials.util import show_mail_on_error
from dxtbx.model import ExperimentList
from libtbx.phil import parse
from libtbx.utils import Sorry
from libtbx import easy_run
Expand All @@ -22,7 +23,11 @@

multiprocessing_override_str = '''
mp {
method = local
use_mpi = False
mpi_command = source
mpi_option = ""
local.include_mp_in_command = False
}
'''

Expand All @@ -47,7 +52,7 @@
.help = "Enable to select results evenly spaced across each rungroup"
"(stripes) as opposed to contiguous chunks."
chunk_size = 1000
.type = float
.type = int(value_min=1)
.help = "Maximum number of images per chunk or stripe."
respect_rungroup_barriers = True
.type = bool
Expand Down Expand Up @@ -243,6 +248,46 @@
for interactive unit cell clustering, use combine_experiments.clustering.dendrogram=True
"""


def chunk_pairs(expt_paths, refl_paths, max_size=1000):
"""Distribute matching expt-refl pairs into chunks with < `max_size` expts"""
expt_lengths = [len(ExperimentList.from_file(expt_path, check_format=False))
for expt_path in expt_paths]
chunk_count = math.ceil(sum(expt_lengths) / max_size)
estimated_fill = sum(expt_lengths) / chunk_count
chunks_indices = [[] for _ in range(chunk_count)]
chunk_lengths = [0] * chunk_count
currently_filled_chunk = 0
for len_index, len_ in enumerate(expt_lengths):
if len_ / 2 + chunk_lengths[currently_filled_chunk] > estimated_fill:
currently_filled_chunk = min(chunk_count - 1, currently_filled_chunk + 1)
chunks_indices[currently_filled_chunk].append(len_index)
chunk_lengths[currently_filled_chunk] += len_
chunked_expts, chunked_refls = [], []
for chunk_indices in chunks_indices:
chunked_expts.append([expt_paths[i] for i in chunk_indices])
chunked_refls.append([refl_paths[i] for i in chunk_indices])
return chunked_expts, chunked_refls, chunk_lengths


def stripe_pairs(expt_paths, refl_paths, max_size=1000):
"""Distribute matching expt-refl pairs into stripes with <`max_size` expts"""
expt_lengths = [len(ExperimentList.from_file(expt_path, check_format=False))
for expt_path in expt_paths]
stripe_count = math.ceil(sum(expt_lengths) / max_size)
stripe_indices = [[] for _ in range(stripe_count)]
stripe_lengths = [0, ] * stripe_count
for len_index, len_ in enumerate(expt_lengths):
currently_filled_stripe = stripe_lengths.index(min(stripe_lengths))
stripe_indices[currently_filled_stripe].append(len_index)
stripe_lengths[currently_filled_stripe] += len_
striped_expts, striped_refls = [], []
for chunk_indices in stripe_indices:
striped_expts.append([expt_paths[i] for i in chunk_indices])
striped_refls.append([refl_paths[i] for i in chunk_indices])
return striped_expts, striped_refls, stripe_lengths


def allocate_chunks(results_dir,
trial_no,
rgs_selected=None,
Expand Down Expand Up @@ -273,7 +318,6 @@ def allocate_chunks(results_dir,
rgs[rg] = [run]
else:
rgs[rg].append(run)
batch_chunk_nums_sizes = {}
batch_contents = {}
if respect_rungroup_barriers:
batchable = {rg:{rg:runs} for rg, runs in six.iteritems(rgs)}
Expand Down Expand Up @@ -312,35 +356,26 @@ def allocate_chunks(results_dir,
print("no images found for %s" % batch)
del batch_contents[batch]
continue
n_chunks = int(math.ceil(n_img/max_size))
chunk_size = int(math.ceil(n_img/n_chunks))
batch_chunk_nums_sizes[batch] = (n_chunks, chunk_size)
if len(batch_contents) == 0:
raise Sorry("no DIALS integration results found.")
refl_ending += extension
batch_chunks = {}
for batch, num_size_tuple in six.iteritems(batch_chunk_nums_sizes):
num, size = num_size_tuple
for batch in batchable:
batch_chunks[batch] = []
contents = batch_contents[batch]
expts = [c for c in contents if c.endswith(expt_ending)]
refls = [c for c in contents if c.endswith(refl_ending)]
expts = sorted([c for c in contents if c.endswith(expt_ending)])
refls = sorted([c for c in contents if c.endswith(refl_ending)])
expts, refls = match_dials_files(expts, refls, expt_ending, refl_ending)
if stripe:
for i in range(num):
expts_stripe = expts[i::num]
refls_stripe = refls[i::num]
batch_chunks[batch].append((expts_stripe, refls_stripe))
print("striped %d experiments in %s with %d experiments per stripe and %d stripes" % \
(len(expts), batch, len(batch_chunks[batch][0][0]), len(batch_chunks[batch])))
else:
for i in range(num):
expts_chunk = expts[i*size:(i+1)*size]
refls_chunk = refls[i*size:(i+1)*size]
batch_chunks[batch].append((expts_chunk, refls_chunk))
print("chunked %d experiments in %s with %d experiments per chunk and %d chunks" % \
(len(expts), batch, len(batch_chunks[batch][0][0]), len(batch_chunks[batch])))
return batch_chunks
pack_func = stripe_pairs if stripe else chunk_pairs
expts_packs, refls_packs, pack_lengths = pack_func(expts, refls, max_size)
for expts_pack, refls_pack in zip(expts_packs, refls_packs):
batch_chunks[batch].append((expts_pack, refls_pack))
r = '{} {} experiments from {} files in {} into {} {} with sizes = {}'
print(r.format("Striped" if stripe else "Chunked", sum(pack_lengths),
len(expts), batch, len(pack_lengths),
"stripes" if stripe else "chunks", pack_lengths))
return batch_chunks


def parse_retaining_scope(args, phil_scope=phil_scope):
if "-c" in args:
Expand Down