Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More memory optimizations #248

Merged
merged 5 commits into from
Mar 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 37 additions & 6 deletions strax/mailbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,6 @@ def next_ready():
raise MailboxReadTimeout(
f"{self.name} did not get {next_number} in time.")
self._subscriber_waiting_for[subscriber_i] = None
if self.lazy and self._can_fetch():
self._fetch_new_condition.notify_all()

if self.killed:
self.log.debug(f"Reader finds {self.name} killed")
Expand Down Expand Up @@ -388,6 +386,9 @@ def next_ready():
and (min(self._subscribers_have_read)
>= self._lowest_msg_number)):
heapq.heappop(self._mailbox)

if self.lazy and self._can_fetch():
self._fetch_new_condition.notify_all()
self._write_condition.notify_all()

for msg_number, msg in to_yield:
Expand Down Expand Up @@ -423,8 +424,7 @@ def _get_msg(self, number):
for msg_number, msg in self._mailbox:
if msg_number == number:
return msg
else:
raise RuntimeError(f"Could not find message {number}")
raise RuntimeError(f"Could not find message {number}")

def _has_msg(self, number):
"""Return if mailbox has message number.
Expand All @@ -447,7 +447,11 @@ def _lowest_msg_number(self):


@export
def divide_outputs(source, mailboxes, outputs=None):
def divide_outputs(source,
mailboxes: typing.Dict[str, Mailbox],
lazy=False,
flow_freely=tuple(),
outputs=None):
"""This code is a 'mail sorter' which gets dicts of arrays from source
and sends the right array to the right mailbox.
"""
Expand All @@ -457,10 +461,37 @@ def divide_outputs(source, mailboxes, outputs=None):
mbs_to_kill = [mailboxes[d] for d in outputs]
# TODO: this code duplicates exception handling and cleanup
# from Mailbox.send_from! Can we avoid that somehow?
i = 0
try:
for result in source:
while True:
for d in outputs:
if d in flow_freely:
# Do not block on account of these guys
continue

m = mailboxes[d]
if lazy:
with m._lock:
if not m._can_fetch():
m.log.debug(f"Waiting to fetch {i}, "
f"{m._subscriber_waiting_for}, "
f"{m._subscriber_can_drive}")
if not m._fetch_new_condition.wait_for(
m._can_fetch, timeout=m.timeout):
raise MailboxReadTimeout(
f"{m} could not progress beyond {i}, "
f"no driving subscriber requested it.")

try:
result = next(source)
except StopIteration:
# No need to send this yet, close will do that
break

for d, x in result.items():
mailboxes[d].send(x)
i += 1

except Exception as e:
for m in mbs_to_kill:
m.kill_from_exception(e, reraise=False)
Expand Down
60 changes: 30 additions & 30 deletions strax/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,8 @@ def iter(self, iters, executor=None):
"""
pending_futures = []
last_input_received = time.time()

self.input_buffer = {d: None
for d in self.depends_on}
if len(self.depends_on):
pacemaker = self.depends_on[0]
else:
pacemaker = None

def _fetch_chunk(d, hope_to_see=None):
try:
Expand All @@ -262,6 +257,16 @@ def _fetch_chunk(d, hope_to_see=None):
f"ended prematurely at {self.input_buffer[d].end}")
return False

# Fetch chunks from all inputs. Whoever is the slowest becomes the
# pacemaker
pacemaker = None
_end = float('inf')
for d in self.depends_on:
_fetch_chunk(d)
if self.input_buffer[d].end < _end:
pacemaker = d
_end = self.input_buffer[d].end

for chunk_i in itertools.count():

# Online input support
Expand All @@ -286,19 +291,21 @@ def _fetch_chunk(d, hope_to_see=None):
if pacemaker is None:
inputs_merged = dict()
else:
# Fetch the pacemaker, to figure out when this chunk ends
if not _fetch_chunk(pacemaker):
# Source is exhausted. The other sources should also be
# exhausted. This (a) checks this, and (b) ensures that
# the content of all sources are requested all the way to
# the end -- which lazy-mode processing requires
for d in self.depends_on:
if _fetch_chunk(d):
raise RuntimeError(
f"{self} sees that {pacemaker} is exhausted "
f"before other dependency {d}!")
self.cleanup(wait_for=pending_futures)
return
if chunk_i != 0:
# Fetch the pacemaker, to figure out when this chunk ends
# (don't do it for chunk 0, for which we already fetched)
if not _fetch_chunk(pacemaker):
# Source is exhausted. The other sources should also be
# exhausted. This (a) checks this, and (b) ensures that
# the content of all sources are requested all the way
# to the end -- which lazy-mode processing requires
for d in self.depends_on:
if _fetch_chunk(d):
raise RuntimeError(
f"{self} sees that {pacemaker} is exhausted "
f"before other dependency {d}!")
self.cleanup(wait_for=pending_futures)
return
this_chunk_end = self.input_buffer[pacemaker].end

inputs = dict()
Expand Down Expand Up @@ -504,18 +511,11 @@ def get_window_size(self):
raise NotImplementedError

def iter(self, iters, executor=None):
# Keep one chunk in reserve, since we have to do something special
# if we see the last chunk.
last_result = None
for x in super().iter(iters, executor=executor):
if last_result is not None:
yield last_result
last_result = x

if self.cached_results is not None and last_result is not None:
# Note we do this even if the cached_result is only emptiness,
# to make sure our final result ends at the right time.
yield strax.Chunk.concatenate([last_result, self.cached_results])
yield from super().iter(iters, executor=executor)

# Yield final results, kept at bay in fear of a new chunk
if self.cached_results is not None:
yield self.cached_results

def do_compute(self, chunk_i=None, **kwargs):
if not len(kwargs):
Expand Down
28 changes: 18 additions & 10 deletions strax/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@
from functools import partial
import logging
import typing as ty
import psutil
import os
import signal
import sys
import time
from concurrent.futures import ProcessPoolExecutor

import numpy as np
Expand Down Expand Up @@ -102,6 +99,19 @@ def __init__(self,
else:
self.process_executor = self.thread_executor

# Figure which ouputs
# - we should exclude from the flow control in lazy mode,
# because they are produced but not required.
# - we should discard (produced but neither required not saved)
produced = set(components.loaders)
required = set(components.targets)
saved = set(components.savers.keys())
for p in components.plugins.values():
produced.update(p.provides)
required.update(p.depends_on)
to_flow_freely = produced - required
to_discard = to_flow_freely - saved

self.mailboxes = MailboxDict(lazy=lazy)

for d, loader in components.loaders.items():
Expand Down Expand Up @@ -139,7 +149,9 @@ def __init__(self,

self.mailboxes[mname].add_reader(
partial(strax.divide_outputs,
lazy=lazy,
mailboxes=self.mailboxes,
flow_freely=to_flow_freely,
outputs=p.provides))

else:
Expand Down Expand Up @@ -183,13 +195,9 @@ def __init__(self,
def discarder(source):
for _ in source:
pass

for p in multi_output_seen:
for d in p.provides:
if d in components.targets or self.mailboxes[d]._n_subscribers:
continue
self.mailboxes[d].add_reader(
discarder, name=f'discard_{d}')
for d in to_discard:
self.mailboxes[d].add_reader(
discarder, name=f'discard_{d}')

# Set to preferred number of maximum messages
# TODO: may not work if plugins are inlined??
Expand Down