Skip to content

Commit

Permalink
Merge dadaade into 8097b6a
Browse files Browse the repository at this point in the history
  • Loading branch information
JelleAalbers committed Mar 7, 2020
2 parents 8097b6a + dadaade commit 5852256
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 44 deletions.
4 changes: 4 additions & 0 deletions strax/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
"If False, will use multithreading only."),
strax.Option(name='allow_shm', default=False,
help="Allow use of /dev/shm for interprocess communication."),
strax.Option(name='allow_lazy', default=True,
help='Allow "lazy" processing. Saves memory, but incompatible '
'with multiprocessing and perhaps slightly slower.'),
strax.Option(name='forbid_creation_of', default=tuple(),
help="If any of the following datatypes is requested to be "
"created, throw an error instead. Useful to limit "
Expand Down Expand Up @@ -747,6 +750,7 @@ def get_iter(self, run_id: str,
allow_shm=self.context_config['allow_shm'],
allow_multiprocess=self.context_config['allow_multiprocess'],
allow_rechunk=self.context_config['allow_rechunk'],
allow_lazy=self.context_config['allow_lazy'],
max_messages=self.context_config['max_messages'],
timeout=self.context_config['timeout']).iter()):
seen_a_chunk = True
Expand Down
120 changes: 98 additions & 22 deletions strax/mailbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class Mailbox:
def __init__(self,
name='mailbox',
timeout=None,
lazy=False,
max_messages=None):
self.name = name
if timeout is None:
Expand All @@ -84,6 +85,10 @@ def __init__(self,
if max_messages is None:
max_messages = self.DEFAULT_MAX_MESSAGES
self.max_messages = max_messages
self.lazy = lazy

if self.lazy:
self.max_messages = float('inf')

self.closed = False
self.force_killed = False
Expand All @@ -92,12 +97,28 @@ def __init__(self,

self._mailbox = []
self._subscribers_have_read = []
self._subscriber_waiting_for = []
self._subscriber_can_drive = []
self._n_sent = 0
self._threads = []
self._lock = threading.RLock()

# Conditions to wait on
# Do NOT call notify_all when the condition is False!
# We use wait_for, which also returns False when the timeout is broken
# (Is this an odd design decision in the standard library
# or am I misunderstanding something?)

# If you're waiting to read a new message that hasn't yet arrived:
self._read_condition = threading.Condition(lock=self._lock)

# If you're waiting to write a new message because the mailbox is full
self._write_condition = threading.Condition(lock=self._lock)

# If you're waiting to fetch a new element because the subscribers
# stil have other things to do
self._fetch_new_condition = threading.Condition(lock=self._lock)

self.log = logging.getLogger(self.name)
self.log.debug("Initialized")

Expand All @@ -115,29 +136,32 @@ def add_sender(self, source, name=None):
args=(source,))
self._threads.append(t)

def add_reader(self, subscriber, name=None, **kwargs):
def add_reader(self, subscriber, name=None, can_drive=True, **kwargs):
"""Subscribe a function to the mailbox.
Function should accept the generator over messages as first argument.
kwargs will be passed to function.
:param subscriber: Function which accepts a generator over messages
as the first argument. Any kwargs will also be passed to the function.
:param name: Name of the thread in which the function will run.
Defaults to read_<number>:<mailbox_name>
:param can_drive: Whether this reader can cause new messages to be
generated when in lazy mode.
"""
if name is None:
name = f'read_{self._n_subscribers}:{self.name}'
t = threading.Thread(target=subscriber,
name=name,
args=(self.subscribe(),),
args=(self.subscribe(can_drive=can_drive),),
kwargs=kwargs)
self._threads.append(t)

def subscribe(self):
def subscribe(self, can_drive=True):
"""Return generator over messages in the mailbox
"""
with self._lock:
subscriber_i = self._n_subscribers
self._subscriber_can_drive.append(can_drive)
self._subscribers_have_read.append(-1)
self._subscriber_waiting_for.append(None)
self.log.debug(f"Added subscriber {subscriber_i}")
return self._read(subscriber_i=subscriber_i)

Expand Down Expand Up @@ -173,20 +197,67 @@ def kill(self, upstream=True, reason=None):
self.killed_because = reason
self._read_condition.notify_all()
self._write_condition.notify_all()
self._fetch_new_condition.notify_all()

def cleanup(self):
for t in self._threads:
t.join(timeout=self.timeout)
if t.is_alive():
raise RuntimeError("Thread %s did not terminate!" % t.name)

def _can_fetch(self):
"""Return if we can fetch then send the next element from the source.
If not, it returns None (to distinguish from False, which means the
timeout was broken)"""
assert self.lazy

# The .send() knows how to handle the exception properly
# (if we raise here we will likely duplicate the exception)
if self.killed:
return True

# If someone is still waiting for a message we already have
# (so they just haven't woken up yet), don't fetch a new message.
if (len(self._mailbox)
and any([x is not None and x <= self._lowest_msg_number
for x in self._subscriber_waiting_for])):
return False

# Everyone is waiting for the new chunk or not at all.
# Fetch only if a driver is waiting.
for _i, waiting_for in enumerate(self._subscriber_waiting_for):
if self._subscriber_can_drive[_i] and waiting_for is not None:
return True
return False

def _send_from(self, iterable):
"""Send to mailbox from iterable, exiting appropriately if an
exception is thrown
"""
try:
for x in iterable:
i = 0
while True:
if self.lazy:
with self._lock:
if not self._can_fetch():
self.log.debug(f"Waiting to fetch {i}, "
f"{self._subscriber_waiting_for}, "
f"{self._subscriber_can_drive}")
if not self._fetch_new_condition.wait_for(
self._can_fetch, timeout=self.timeout):
raise MailboxReadTimeout(
f"{self} could not progress beyond {i}, "
f"no driving subscriber requested it.")

try:
x = next(iterable)
except StopIteration:
# No need to send this yet, close will do that
break
self.send(x)
i += 1

except Exception as e:
self.kill_from_exception(e)
else:
Expand Down Expand Up @@ -229,17 +300,16 @@ def send(self, msg, msg_number: typing.Union[int, None] = None):
f'subscribers already read {read_until}.')

def can_write():
return (len(self._mailbox) < self.max_messages or self.killed)
return len(self._mailbox) < self.max_messages or self.killed

if not can_write():
self.log.debug("Subscribers have read: "
+ str(self._subscribers_have_read))
self.log.debug(f"Mailbox full, wait to send {msg_number}")
if not self._write_condition.wait_for(can_write,
timeout=self.timeout):
raise MailboxFullTimeout(
f"Mailbox buffer for {self.name} emptied too slow. "
"This is likely caused by a deadlock in the processing; "
"try SLIGHTLY increasing max_messages.")
if not self._write_condition.wait_for(can_write,
timeout=self.timeout):
raise MailboxFullTimeout(
f"Mailbox buffer for {self.name} emptied too slow.")

if self.killed:
self.log.debug(f"Sender found {self.name} killed while waiting"
Expand Down Expand Up @@ -279,12 +349,16 @@ def next_ready():
return self._has_msg(next_number) or self.killed
if not next_ready():
self.log.debug(f"Checking/waiting for {next_number}")
if not self._read_condition.wait_for(next_ready,
self.timeout):
raise MailboxReadTimeout(
f"{self.name} did not get {next_number} in time. "
"This is likely caused by a deadlock in the processing;"
" try SLIGHTLY increasing max_messages.")
self._subscriber_waiting_for[subscriber_i] = next_number
if self.lazy and self._can_fetch():
self._fetch_new_condition.notify_all()
if not self._read_condition.wait_for(next_ready,
self.timeout):
raise MailboxReadTimeout(
f"{self.name} did not get {next_number} in time.")
self._subscriber_waiting_for[subscriber_i] = None
if self.lazy and self._can_fetch():
self._fetch_new_condition.notify_all()

if self.killed:
self.log.debug(f"Reader finds {self.name} killed")
Expand All @@ -301,9 +375,11 @@ def next_ready():
next_number += 1

if len(to_yield) > 1:
self.log.debug(f"Read {to_yield[0][0]}-{to_yield[-1][0]} in subscriber {subscriber_i}")
self.log.debug(f"Read {to_yield[0][0]}-{to_yield[-1][0]}"
f" in subscriber {subscriber_i}")
else:
self.log.debug(f"Read {to_yield[0][0]} in subscriber {subscriber_i}")
self.log.debug(f"Read {to_yield[0][0]} "
f"in subscriber {subscriber_i}")

self._subscribers_have_read[subscriber_i] = next_number - 1

Expand Down
9 changes: 9 additions & 0 deletions strax/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,15 @@ def _fetch_chunk(d, hope_to_see=None):
else:
# Fetch the pacemaker, to figure out when this chunk ends
if not _fetch_chunk(pacemaker):
# Source is exhausted. The other sources should also be
# exhausted. This (a) checks this, and (b) ensures that
# the content of all sources are requested all the way to
# the end -- which lazy-mode processing requires
for d in self.depends_on:
if _fetch_chunk(d):
raise RuntimeError(
f"{self} sees that {pacemaker} is exhausted "
f"before other dependency {d}!")
self.cleanup(wait_for=pending_futures)
return
this_chunk_end = self.input_buffer[pacemaker].end
Expand Down
28 changes: 21 additions & 7 deletions strax/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,13 @@ class ProcessorComponents(ty.NamedTuple):


class MailboxDict(dict):
def __init__(self, *args, lazy=False, **kwargs):
super().__init__(*args, **kwargs)
self.lazy = lazy

def __missing__(self, key):
res = self[key] = strax.Mailbox(name=key + '_mailbox')
res = self[key] = strax.Mailbox(name=key + '_mailbox',
lazy=self.lazy)
return res


Expand All @@ -47,12 +52,12 @@ def __init__(self,
components: ProcessorComponents,
allow_rechunk=True, allow_shm=False,
allow_multiprocess=False,
allow_lazy=True,
max_workers=None,
max_messages=4,
timeout=60):
self.log = logging.getLogger(self.__class__.__name__)
self.components = components
self.mailboxes = MailboxDict()

self.log.debug("Processor components are: " + str(components))

Expand All @@ -65,14 +70,16 @@ def __init__(self,
# Disable the executors: work in one process.
# Each plugin works completely in its own thread.
self.process_executor = self.thread_executor = None
lazy = allow_lazy
else:
lazy = False
# Use executors for parallelization of computations.
self.thread_executor = futures.ThreadPoolExecutor(
max_workers=max_workers)

mp_plugins = {d: p for d, p in components.plugins.items()
if p.parallel == 'process'}
if (allow_multiprocess and len(mp_plugins)):
if allow_multiprocess and len(mp_plugins):
_proc_ex = ProcessPoolExecutor
if allow_shm:
if SHMExecutor is None:
Expand All @@ -95,6 +102,8 @@ def __init__(self,
else:
self.process_executor = self.thread_executor

self.mailboxes = MailboxDict(lazy=lazy)

for d, loader in components.loaders.items():
assert d not in components.plugins
# If paralellizing, use threads for loading
Expand Down Expand Up @@ -141,16 +150,20 @@ def __init__(self,
executor=executor),
name=f'build:{d}')

dtypes_built = {d: p
for p in components.plugins.values()
for d in p.provides}
for d, savers in components.savers.items():
for s_i, saver in enumerate(savers):
if d in components.plugins:
rechunk = components.plugins[d].rechunk_on_save
if d in dtypes_built:
can_drive = not lazy
rechunk = (dtypes_built[d].rechunk_on_save
and allow_rechunk)
else:
# This is storage conversion mode
# TODO: Don't know how to get this info, for now,
# be conservative and don't rechunk
rechunk = False
if not allow_rechunk:
can_drive = True
rechunk = False

self.mailboxes[d].add_reader(
Expand All @@ -160,6 +173,7 @@ def __init__(self,
# the compressor releases the gil,
# and we have a lot of data transfer to do
executor=self.thread_executor),
can_drive=can_drive,
name=f'save_{s_i}:{d}')

# For multi-output plugins, an output may be neither saved nor
Expand Down

0 comments on commit 5852256

Please sign in to comment.