Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace mailbox system #81

Open
tunnell opened this issue Aug 29, 2018 · 14 comments
Open

Replace mailbox system #81

tunnell opened this issue Aug 29, 2018 · 14 comments

Comments

@tunnell
Copy link
Member

tunnell commented Aug 29, 2018

Mailbox should be factored out into an independent repository rather than using something else. Closes #62.

@tunnell tunnell added the wontfix This will not be worked on label Sep 11, 2019
@tunnell tunnell closed this as completed Sep 11, 2019
@JelleAalbers
Copy link
Member

By now I've come to think the mailbox approach is fundamentally flawed. The original idea was to build a simple class where one function could push and another could iterate. But it has turned into a complicated piece of custom threading code, with lots of special cases, and always the specter of a hang or timeout if things go really wrong.

I'm now working on replacing the mailbox system with a single scheduler / work distributor. This will be a medium-term project of a few weeks or months most likely, since I can only work on it off and on. There is a possibility it won't work out. The mailbox system has worked well for us so far fortunately, but it does make debugging complicated problems (e.g. Joran's comment in #212), well, complicated.

@JelleAalbers JelleAalbers reopened this Oct 12, 2019
@JelleAalbers JelleAalbers changed the title Factor out mailbox Replace mailbox system Oct 12, 2019
@JelleAalbers JelleAalbers removed the wontfix This will not be worked on label Oct 12, 2019
@tunnell
Copy link
Member Author

tunnell commented Oct 14, 2019

This might be a good project for a CS student, you think? At very least, can speak with some CS systems people who might have ideas on how to replace. Pivarski might also be a resource.

@JelleAalbers
Copy link
Member

Indeed this would be an interesting project, especially to generalize it to a multi-machine system, perhaps using zeromq or something similar.

I've put what I have now here https://github.com/JelleAalbers/plarx

@jmosbacher
Copy link
Contributor

Is this issue being worked on? if not I can take a look at it.

@tunnell
Copy link
Member Author

tunnell commented Feb 14, 2020

Doubt it... It's a bit of a black hole that is ripe for overengineering, so be careful :) Be aware that the DAQ use case drives the performance requirements

@jmosbacher
Copy link
Contributor

The Mailboxectomy

I made a quick review of the options we have for using 3rd party tooling for processing instead og the ThreadedMailboxProcessor. Here are my findings.

Frameworks looked at

  • Pyleus (Apache Storm)
  • streamparse (Apache Storm)
  • Wallaroo
  • Apache Pulsar
  • Ray
  • Heron
  • Faust

Details

Apache Storm and its successor Heron

The topology of spouts and bolts are conceptually very similar to strax plugins, and have extremely battle tested implementations in python, they are mostly suitable for distibuted processing on large clusters and would not be suitable for a "one guy with a laptop" processing situation.

Apache Pulsar

If strax moved to a pub-sub configuration this would be suitable for the online processing mode of running, on a cluster of high performance machines. It boasts great throughput and reliability. Probably overkill for what xenonnt needs.

Wallaroo

Very interesting project, but implemented in a pre-version 1 language called pony, has a limited python API. May be worth revisiting for DARWIN.

Faust

Really nice project, built as a pure python alternative to the legendary java-based kafka streams. Its a framework for distributed stream processing based on kafka topics and ktables (distributed dictionaries backed by a change log topic on kafka, used for preserving state when stream processing). Kafka is known for its high throughput, eventual consitency guarentees, reliability and rich ecosystem. Its throughput is limited only by disk write speed since its essentially just writing a stream of bytes to disk and recording offsets. Using Faust can be done in two ways:

  1. Implement generic "workers" that grab processing jobs off a kafka job topic and submit the results to another kafka topic, if the dependencies for a job are not yet met the job is simply returned to the pile of jobs to be processed later. Other workers grab processed data off the result topic and store the results in a distributed dictionary for consumption by jobs that have dependencies.
    Sunch an implementations would look something like this:
class DataKey(faust.Record, serializer='json'):
	'''
	The data uid should be unique to a single processing result in the pipeline
	'''
	# non-unique key used for sharding
	run_id: str  
	target: str

	# single unique key accross all datastore instances
	uid: str 

class DataChunk(faust.Record, serializer='pickle'):
	uid: str
	run_id: str
	data: np.ndarray


class ProcessRequest(faust.Record, serializer='pickle'):
	run_id: str
	inputs: dict
	outputs: dict
	plugin: strax.Plugin

app = faust.App(
	"strax_workers",
 	broker='kafka://192.168.99.104',
 	store='memory://',
    version=2,
    topic_partitions=1,
	table_partitions=1,
	producer_max_request_size=int(1e7),
	consumer_max_fetch_size=int(1e7),
    stream_buffer_maxsize=int(1e7), #16368,
	)

data_cache = app.Table("data_cache",
						key_type=str, 
						value_type=DataChunk,
						partitions=1,
						value_serializer='pickle',
	)

# data_requests = app.topic('data_requests', value_type=DataRequest)
process_requests = app.topic('process_requests', value_type=ProcessRequest)
processed_data = app.topic('processed_data', value_type=DataChunk)

@app.agent(processed_data, concurrency=2)
async def save(chunks: StreamT[DataChunk]) -> None:
	async for chunk in chunks:
		
		print(f"Saving chunk {chunk.uid}")
		data_cache[chunk.uid] = chunk
		print(data_cache.as_ansitable())

@app.agent(process_requests, concurrency=10)
async def processor(requests: StreamT[ProcessRequest]) -> AsyncIterable[DataChunk]:
	async for request in requests.group_by(ProcessRequest.run_id):
		#FIXME: add check if all outputs exist in cache 
		if any([any([uid not in data_cache for uid in deps])
							 for deps in request.inputs.values()]):
			# print("Delaying request for ",request.plugin.__class__.__name__, "requirements not met.")
			await process_requests.send(value=request)
			continue
		
		print("Processing request for ", request.plugin.__class__.__name__)
		kwargs = {kind: np.concatenate([data_cache[uid].data for uid in deps]) 
										for kind, deps in request.inputs.items()}
		plugin = request.plugin
		output = plugin.compute(**kwargs)
		if output is None:
			yield output
		if isinstance(output, dict):
			for k, v in output.items():
				chunk = DataChunk(
					uid = request.outputs[k],
					data = v
				)		
				await processed_data.send(value=chunk)
		else:
			chunk = DataChunk(
					uid = list(request.outputs.values())[0],
					data = output,
				)		
			await processed_data.send(value=chunk)
		print("Done processing request: ", request)
		yield output



async def generate_requests(ctx, run_ids, targets):
	plugins = set()
	for run_id, target in itertools.product(run_ids, targets):
		cmps = ctx.get_components(run_id,(target,))
		plugins = cmps.plugins
		for name, loader in cmps.loaders.items():
			data = np.concatenate(list(loader()))
			uid = ctx.key_for(run_id, name)
			chunk = DataChunk(
					uid = uid,
					data = data,
				)
			await processed_data.send(value=chunk)
	
	for plugin in list(plugins):
		inputs = {
				kind: [ ctx.key_for(run_id, dep) for dep in deps]
							for kind, deps in plugin.dependencies_by_kind().items()}
		outputs ={ name: ctx.key_for(run_id, name) for name in plugin.provides }

		request = ProcessRequest(
				run_id = run_id,
				inputs = inputs,
				outputs = outputs,
				plugin = plugin,

			)
		await process_requests.send(value=request)


async def get_data(context_name, run_ids, targets, timeout: int = 100) -> np.ndarray:
	run_ids, targets = strax.to_str_tuple(run_ids), strax.to_str_tuple(targets)
	context = getattr(straxen.contexts, context_name)()
	
	await generate_requests(context, run_ids, targets)
	waiting_for = list(itertools.product(run_ids, targets))
	for _ in range(timeout):
		for run_id, target in waiting_for:
			uid = context.key_for(run_id, target)
			if uid in data_cache:
				print(f"{uid} finished")
				result = data_cache[uid]
				waiting_for.pop(result)
				yield result
		if waiting_for:
			print(data_cache.as_ansitable())
			print(f"Still waiting for {waiting_for}")
			
			await asyncio.sleep(1)
		else:
			break
	else:
		raise TimeoutError("Timeout was reached without receiving results.")

The main change required for such an implementation would be to enforce serializability of plugins even after initialization. It would be trivial to implement priority topics. Rechunking can be done at the job level. Sharding could be done based on the run_id key or some other key that ensures independence of each shard.

  1. Go all in and use faust as a stream processing framework as it was intended to be used. This would require a serious change in the topology of our processing. We would just dump all the raw aquisitions from the DAQ directly into kafka topics. All data streams would be aligned by timestamp and processed independently by an army of workers managed by faust (including worker replacement policy etc.). Such an implementation would look something like this:
class DigitizerReadRequest(faust.Record):
	uid: str
	timestamp: datetime
	digitizers: list 
	metadata: dict
	digitizer: int

class ReadDAQRequest(faust.Record):
	run_id: str
	uid: str
	timestamp: datetime
	digitizers: list 
	metadata: dict

class Waveform(faust.Record):
	run_id: str
	uid: str
	timestamp: datetime
	pmts: list 
	metadata: dict
	data: np.ndarray

class Peaklet(faust.Record, serializer='pickle'):
	run_id: str
	uid: str
	timestamp: datetime
	metadata: dict

class Peak(faust.Record, serializer='pickle'):
	run_id: str
	uid: str
	timestamp: datetime
	metadata: dict

class Event(faust.Record, serializer='pickle'):
	run_id: str
	uid: str
	timestamp: datetime
	metadata: dict

app = faust.App(
	"xenon_processor",
 	broker='kafka://192.168.99.104',
 	store='memory://',
    version=1,
	producer_max_request_size=int(1e6),
	consumer_max_fetch_size=int(1e6),
    stream_buffer_maxsize=16368,
	)


@app.agent(concurrency=100, )
async def read_digitizer(requests: StreamT[DigitizerReadRequest]) -> AsyncIterable[Waveform]:
	# read the digitizer
	return Waveform()

read_request_topic = app.topic('read_requests', value_type=ReadDAQRequest)


# the waveform topic is just a kafka topic which is a stream 
# of waveforms that need to be sharded and then processed
# I would probably have these 
waveform_topic = app.topic('waveforms', value_type=Waveform)

# a table allows the waveform processing agents to keep state information withing a window
waveform_data = app.Table("waveform_data",
						key_type=str, 
						value_type=Waveform) \
							.hopping(timedelta(microseconds=10), expires=timedelta(seconds=1),) \
							.relative_to_field(Waveform.timestamp)

peaklet_topic = app.topic('peaklets', value_type=Peaklet)
peaklet_data = app.Table("peaklet_data", key_type=str, value_type=float, default=float)
							
peak_topic = app.topic('peaks', value_type=Peak)
peak_data = app.Table("peak_data", key_type=str, value_type=Peak) \
							.tumbling(timedelta(seconds=1),expires=timedelta(hours=1))

event_topic = app.topic('events', value_type=Event)
event_data = app.Table("event_data", key_type=str, value_type=Event) \
							.hopping(size=100,step=10,)


@app.agent(read_request_topic, concurrency=1, )
async def read_daq(requests: StreamT[Waveform]) -> AsyncIterable[Peaklet]:
	async for request in requests:
		# read daq async since most likely io bound
		async for wf in read_digitizer.map(request.digitizers):
			await waveform_topic.send(key=request.uid, value=wf)


def add_metdata(wf):
	#do some smoothing here
	return wf


@app.agent(waveform_topic, concurrency=100, processors=[add_metdata])
async def make_peaklets(waveforms: StreamT[Waveform]) -> AsyncIterable[Peaklet]:
	async for wf in waveforms.group_by(Waveform.run_id):
		# do some processing and yield peaklets
		
		peaklets = []
		for peaklet in peaklets:
			await peaklet_topic.send(key=wf.run_id, value=peaklet)
			yield peaklet

@app.agent(peaklet_topic, concurrency=1, processors=[add_metdata])
async def count_peaks(peaklets: StreamT[Peaklet]) -> AsyncIterable[Peak]:
	async for peaklet in peaklets.group_by(Peaklet.run_id):
		# do some processing and yield peaklets
		peaklet_data["peaklet_count"] += 1
		peaklet_data["peaklets_per_ms"] = (peaklet_data["peaklet_count"].now()-peaklet_data["peaklet_count"].delta(timedelta(milliseconds=5)))/5
		if peaklet_data["peaklets_per_ms"].delta(timedelta(milliseconds=5)) > peaklet_data["peaklets_per_ms"].now():
			print("nearing end of peaklet train")

@app.agent(peaklet_topic, concurrency=100, processors=[add_metdata])
async def make_peaks(peaklets: StreamT[Peaklet]) -> AsyncIterable[Peak]:
	async for peaklet_batch in peaklets.group_by(Peaklet.run_id).take(max=100, within=0.05):
		peaks = []
		if len(peaklet_batch)>10:
			peak = Peak()
			await peak_topic.send(key=peak.run_id, value=peak)
			yield peak

@app.agent(peak_topic, concurrency=100, processors=[add_metdata])
async def make_events(peaks: StreamT[Peak]) -> AsyncIterable[Event]:
	async for peak_batch in peaks.group_by(Waveform.run_id).take(100, within=1.0):
		# do some processing and yield event or not
		if len(peak_batch)>10:
			event = Event()
			await event_topic.send(key=event.run_id, value=event)
			yield event

This would require a complete shift in the way we think about runs and processing. It would require anyone working on this to get intimately familiar with async programming, eventually consistent distributed systems and stream processing. There are huge benefits to be gained by working this way but also a great amount of effort. The stream processors here are acting on sharded streams and tables, making the code here a bit more complex, best to leave this to a telecon to go over it and explain.

Ray

This is a fascinating project and in my opinion the best fit for strax. It essentially combines an efficient shared object storage (for interprocess communication ) with a pretty smart centralized worker manager. It can easily scale to as many processors on as many machines as you can through at it (i think the maximum tested was 800 but it should theoretically be able to controll more) and has excellent tooling including a web based dashboard and deterministic replay for debugging. It leans heavily on the actor model, and you have two building blocks to define your computation: remote functions (stateless) and remote actors (statefull) that you submit work to and recieve a future back. You then use those futures as arguments to other remote functions and actors. Ray will only execute a remote function once all the futures passed to it as arguments are ready and then it will pass the result seamlessly to the remote function in one of its workers. Ray also provide a multiprocessing.Pool API and a stream API that are wrappers around this core functionality. There are three possible implementations using ray:

  1. Use the ray Pool API to implement the concurrent futures Executor API and simply plug the executor into to strax as is. This is technicially not a mailboxectomy but it would solve the scaling limitation of using threads and processes locally. such and implementation of the executor API is almost trivial, it would look something like this:
from concurrent.futures import Executor, Future
import ray

class FutureRay(Future):
	def __init__(self, result=None):
		self._result = result

	def result(self, timeout=None):
		if self._result is not None:
			return self._result.get()

	def cancel(self):
		pass

	def cancelled(self):
		pass

	def running(self):
		pass

	def done(self):
		pass
	
	def exception(self, timeout=None):
		pass

	def add_done_callback(self, fn):
		pass


class Exray(Executor):
	def __init__(self):
        if not ray.initialized():
            ray.init()
		self._pool = ray.utils.Pool()

	def submit(self, fn, *args, **kwargs):
		fut = FutureRay()
		res = self._pool.apply_async(fn, *args, **kwargs, callback=fut.set_result, exception_callback=fut.set_exception)
		fut._result = res
		return fut
	
	def map(self, func, *iterables, timeout=None, chunksize=1):
		# executor API callback guarentees may be a bit tricky
		# translation probably a bit more complex but doable
		for res in self._pool.imap_unordered(func, *iterables):
			yield res

	def shutdown(self):
		self._pool = None

Where each of these methods would have to be implemented in a robust manner.

  1. Add a method to the context that sets up a ray processing topology on request for a given plugin and run_id. This would look like:
def get_array(self, run_id: ty.Union[str, tuple, list],
                targets, save=tuple(), max_workers=None,
                **kwargs) -> np.ndarray:
    targets = strax.to_str_tuple(targets)
    run_ids = strax.to_str_tuple(run_id)
    if not ray.is_initialized():
        ray.init()
    
    cmps = self.get_components(run_id, targets)
    futures = {}

    # just for now preload all cached stuff
    for name, loader in cmps.loaders.items():
        futures[name] = concatenate.remote(*list(loader()))

    class FakePlugin(strax.Plugin):
        __name__ = "Fake"
        def __init__(self, **kwargs):
            for k, v in kwargs.items():
                setattr(self, k, v)
    to_add = ["run_id", "lineage", "config", "dtype", "data_kind", "provides", "depends_on"]
    actors = {}

    for name, plugin in cmps.plugins.items():
        # generate a single actor for each plugin
        # Here i create one but we can create multiple actors
        # if multiprocessing enabled for plugin and do a round robin on chunks
        actors[name] = ray.remote(plugin.__class__).remote()
        #--------------------------------------------------------------------------------#
        # Do some workarounds for plugins being heavily dependent on existing processor

        # set values on actor that are set by the get_components method.
        actors[name].set_values.remote({
            "deps": {k:FakePlugin(**{attr:getattr(v,attr) for attr in to_add}) for k,v in plugin.deps.items()},
            "provides": strax.to_str_tuple(plugin.provides),
            "depends_on": strax.to_str_tuple(plugin.depends_on),
        })
        actors[name].set_values.remote(
            {attr:getattr(plugin,attr) for attr in to_add}
        )
        
        # Run the setup method before we submit the compute method to the actor
        actors[name].setup.remote()

    start = time.time()
    remaining = list(cmps.plugins)
    for _ in range(len(cmps.plugins)**2): 
        if not remaining:
            break
        for name in remaining:
            plugin = cmps.plugins[name]
            plugin.provides = strax.to_str_tuple(plugin.provides)
            
            # if futures exist for all dependencies,
            # submit job to appropriate actor and add to future dict.
            if all([dep in futures for dep in strax.to_str_tuple(plugin.depends_on)]):
                print(f"can compute {name}")

                # collect dependency futures 
                kwargs = {}
                for kind, ps in plugin.dependencies_by_kind().items():
                    print(f"Dependencies: {kind}: {ps} met")
                    if len(ps)==1:
                        kwargs[kind] = futures[ps[0]]
                    else:
                        kwargs[kind] = merge.remote(*[futures[p] for p in ps])
                print(f"Provides: {plugin.provides}\n\n")
                
                # call the do_compute method on the actor with its dependencies as futures
                future = actors[name].do_compute.remote(**kwargs)
                if plugin.multi_output:
                    for p in plugin.provides:
                        if p not in futures:
                            futures[p] = extract.remote(future, p)
                else:
                    if name not in futures:
                        futures[name] = future
                print(f"submited {name}")
                remaining.remove(name)
    
    result = ray.get(futures[targets[0]])
    futures = None
    actors = None
    print(f"Took {time.time()-start}, No mailboxes were harmed in this experiment.")
    return result
    

This is actually a working example. It can be generalized quite easily to scale to any number of processors/machines.

  1. use the ray streaming API. This can be an elegant implementation of the "get_iter" topology in strax. It would look something like this:
from ray.streaming.streaming import Environment

env = Environment()
env.set_parallelism(2) # Each operator will be executed by two actors
source = LoaderSource([])


# Build a processing pipeline and apply it to a loader or a DAQ reader source

streams = {
    # add a source stream that reads from the DAQ in a round robin
    "raw_records": env.source(source) \
                        .round_robin() \
                        .key_by("run_id")
}

# setup the dataflow topology, assuming plugins are already ordered by dependencies
for plugin in plugins:
    # in practice we would merge multiple streams here with a gather operation
    stream = streams[plugin.depends_on] \  
                        .set_parallelism(plugin.parallelism) \
                        .flat_map(plugin.compute) # etc... 
    if plugin.multi_output:
        # apply a filter to the output for multi output streams
        for p in plugin.provides:                 
            streams[p] =  stream.filter(by_name(plugin.provides))
    else:
        streams[plugin.output] =  stream

start = time.time()
env_handle = env.execute()  # Deploys and executes the dataflow
ray.get(env_handle)  # Stay alive until execution finishes
env.wait_finish()
end = time.time()

This approach is very suitable for the online processing use case.

@tunnell
Copy link
Member Author

tunnell commented Feb 20, 2020

The Faust approach looks like a big change, though I was always surprised that we didn't leverage async more so maybe we have a conceptual problem on our side. I agree that Ray looks more promising given current code. @JelleAalbers is the expert though.

I'm curious if @jpivarski has any experience with any of these, using frameworks to pass data to plugins used for signal processing and event reconstruction. We can ask other experiments and Python experts for advice, if you can formulate the question. Or to sanity check your implementation if you write it out as a proposal? I did a review long ago for pax or the event builder (forget), and things have clearly changed a lot since those days.

@jpivarski
Copy link

I haven't tried any of these projects directly. (For mailbox-like systems, akka is something I've used once or twice.)

@nsmith- has experience with a largish project using async everywhere (an async version of Uproot). Uproot itself uses functions as poor-man's futures (because I didn't want to glue to any particular framework or give up on Python 2 support).

@tunnell
Copy link
Member Author

tunnell commented Feb 21, 2020

@nsmith mind if @jmosbacher contacts you for your expertise before we redo parts of XENONnT core software?

@jpivarski
Copy link

Okay, I can give some ideas if it's something that I know about.

@JelleAalbers
Copy link
Member

JelleAalbers commented Feb 21, 2020

Thanks for your research @jmosbacher! Let me first add a bit of background on mailboxes and strax generally. If we end up keeping mailboxes, I will add it to we already have in the docs.

Strax is made of multiple actors (plugins, loaders and savers) that produce or consume numpy arrays with some metadata.

  • Depending on the user's request and available data, there are many possible networks of actors we need to run. Networks generally have multiple sources and multiple sinks.
  • Each actor runs in its own thread, but may punt computational work to a ThreadPoolExecutor or a ProcessPoolExecutor (the default one or the one from npshmex).
  • Plugins can depend on more than one other actor / datatype and often do a bit of work to merge/pace their inputs consistently.
  • Some plugins are stateful, i.e. their output can depend on all previous inputs. Some plugins output more than one type of data. Some plugins produce multiple chunks of output of one type given one input.

Mailboxes receive the outputs of plugins and loaders and pass them to other plugins and savers. They do the following tasks:

  • Receive messages (results or futures that make results) from a producer. Producers can be either generators or call .send on the mailbox.
  • Pass results to multiple consumers, in the order of the data taking time they represent (I think this is also the order in which messages are submitted. It is definitely not the order in which futures complete.). Consumers always iterate over a mailbox (there is no .receive).
  • Block producer threads (wait on a threading lock) when a certain number of buffered messages (i.e. those not yet passed to all consumers) is reached;
  • Block consumer threads (wait on a threading lock OR a future) when the next message they want is missing or its future hasn't completed yet;

Mailboxes have a couple of unit tests, are integrated with strax and we've used them for a while. However, there are several problems, arising mostly from the fact that each mailbox operates independently of the others:

  • There is no scheduling or prioritization. Each plugin thread just pulls on its dependency's mailboxes, so it's up to the OS to schedule which thread runs when. In the worst case this means all internal buffers are filled to the brim before the first output is emitted.
  • The hard maximum buffer size implies deadlocks are possible. The lack of scheduling means we must set the buffer size to a low number to limit memory consumption. As a result, deadlocks happen regularly when someone tries some new processing topology.
  • To catch the deadlocks we use timeouts (and crash when these are exceeded) on each wait. If these are set too low (or e.g. some processing is very slow), we will crash without a good reason.
  • Exceptions are passed as messages through the data stream and, depending on what exception is thrown where, they may not always reach all mailboxes/plugins. Part of the processing then keeps running as a headless chicken until some timeout is exceeded, after which I get an email asking how to debug a 'mailbox timeout exceeded' error.

@jmosbacher
Copy link
Contributor

@JelleAalbers @tunnell I didnt mean to force a discussion here, I know everyone is super busy right now, I just put this up as a reference. Me and Jelle were planning on going over the options next week so he can decide whether its worth making changes at this late stage. In any case I think its fair to assume that any attempt at this would be done in parallel to current strax development and only merged if a working framework was produced that gets the same results with better performance/stability. I see no reason to replace a working system, I just want to be prepared for a scenario where the complexity starts getting out of hand and debugging without Jelles help becomes impossible.

@nsmith-
Copy link

nsmith- commented Feb 23, 2020

I'm happy to comment where I can, but I may already be behind you after your thorough survey. From a far-away view, here are some random ideas:

  • The ray definition of 'Actor' as a stateful unit that receives data is key, and its probably worth understanding how many modules are actually stateful and how many are pure functions of the inputs. Along with ray, you may be interested in dask, which has recently added actors, and has well-developed support for pure function DAGs.
  • One big challenge for large legacy codebases with adopting async/await is the function color issue: adopting async mandates that any function which awaits will need to be called exclusively by other async functions.
  • Backpressure mechanisms might help with the scheduling issue, i.e. adding to output sinks (which feed subsequent inputs) could block until the internal queue is below some size
  • An event loop with async/await might make scheduling all this a bit simpler (no thread-safety issues!), although clearly all 'real' work needs to be executed in thread or process pools.

@jpivarski
Copy link

This is function color: https://journal.stuffwithstuff.com/2015/02/01/what-color-is-your-function/

(At least, it's the article I think of when referring to "function color.")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants