Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DAG scheduling (e.g. Dagger.jl) to training of learning networks #72

Open
ablaom opened this issue Feb 5, 2019 · 14 comments
Open
Labels
enhancement New feature or request

Comments

@ablaom
Copy link
Member

ablaom commented Feb 5, 2019

Currently each node and machine in a learning network has a simple linear "tape" to track dependencies on machines in the network. I had in mind to replace these tapes with directed acyclic graphs, which (hopefully) makes scheduling amenable to Dagger.jl or similar.

A thorough understanding of the learning network interface at src/networks.jl will be needed. If someone has experience with scheduling, I could provide guidance, but this is probably not a small project.

@ablaom ablaom added the enhancement New feature or request label Feb 5, 2019
@fkiraly
Copy link
Collaborator

fkiraly commented Feb 5, 2019

(fixed typo acrylic -> acyclic)

@jpsamaroo
Copy link
Collaborator

I'd be happy to take this one on; I have very recent experience with working with Dagger's scheduler (and plan to keep improving on it). Before I dive in, how would you like things to work dependency-wise? Would MLJ.jl directly depend on Dagger, or would you want to do the Requires.jl approach (or something else)?

@ablaom
Copy link
Member Author

ablaom commented May 27, 2019

That would be awesome.

Re your question. Is there a reason why Dagger should not be a dependency? I don't have any prior experience using it, so maybe this is a dumb question.

@jpsamaroo
Copy link
Collaborator

Not really, it's pretty lightweight and loads reasonably quickly. I guess you'd want it as a direct dep since you're already doing multiprocessing in MLJ.

I'll get started on this today and post a PR sometime this week once things start working 😄

@ablaom
Copy link
Member Author

ablaom commented May 28, 2019

For some orientation, I would suggest the following:

Learning Networks docs, especially the last section, "The learning network API". Note that both NodalMachine objects and Node objects have linear dependency "tapes" (which presumably become DAG's in your reimplementation). There are two simple examples of learning networks there. These are also spread across two notebooks in the /examples, namely tour.ipynb and ames.ipynb. The second example blends two predictors, which should train in parallel in your implementation (but don't currently).

Beware of the fact that learning network training is "smart" in the sense that components of a network are only retrained if upstream changes make this necessary. This should be clear enough from the docs. This doesn't directly effect you, I guess, but it could confuse you if you did not know about it.

The code you will be messing with lives primarily in src/networks.jl, but note that the NodalMachine fit! method lives in src/machines.jl because it simultaneously fits ordinary Machine objects.

There's a lot to get your head around here. (I sometimes forget myself how all this works!) Also, designing meaningful tests could take a bit of work. Many thanks for having a go at this. It certainly would be very cool you can get this to work.

Tip: To get a recursively defined "developers' dump" of any MLJ object at the REPL, just type "@more" after defining the object. The numbers you see in REPL representations of objects (as in Machine{PCA} @ 1…18) refer to the objectid. Handy in debugging.

Some components a learning network (eg, ones wrapped as TunedModels with a cv strategy) will have the option of running in a Distributed mode. Is this compatible with the Dagger scheduling?

@jpsamaroo
Copy link
Collaborator

Thanks for the links to the Learning Networks docs and src/networks.jl; these helped my understanding tremendously!

Quick primer on Dagger computation: Dagger has a Thunk object which represents a computational kernel (a regular function with a fixed arity), as well as the inputs to the kernel. This Thunk can be scheduled on any worker, regardless of where its inputs are located. A DAG of Thunks is intended for single usage; if you need another DAG similar to one you just finished executing, then you need to construct it again (this should be relatively cheap).

Back to MLJ: The current tape semantics don't allow one to determine the inputs to a certain DAG node, only the order of execution. I'm going to leave this mechanism alone for now and just recursively compute the DAG at each call to fit!, which should both be inexpensive and can also dynamically take into account staleness of each DAG node at DAG construction (we will just pass the fitresult or source if not stale). This is where the requirement to reconstruct the DAG at each execution becomes a good thing; we can create it to be maximally efficient based on staleness.

I think that we shouldn't have problems creating tests, but I haven't yet looked into what's currently in place; I'll raise a flag if I need help constructing a good test suite.

Some components a learning network (eg, ones wrapped as TunedModels with a cv strategy) will have the option of running in a Distributed mode. Is this compatible with the Dagger scheduling?

Can you point me to an example of where this is done? I don't think it should be a problem to mix raw Distributed with Dagger DAGs; you can use as little or as much of Dagger as you want, and if we want to just turn it off for all or part of a learning network, I can make that happen.

I've got a few things going on this week, but am already working on the integration of Dagger right now; hopefully I can have a PR posted by tomorrow so that you can see what changes are required to make this work.

@ablaom
Copy link
Member Author

ablaom commented May 29, 2019

Thanks for explanations!

Fitting an EnsembleModel has an option to run in parallel: search for "# build in parallel" at src/ensembles.jl.

And evaluate! will run in parallel if given a CV resampling strategy with parallel=true: search for "TODO: progress" in src/resampling.jl.

And external models often have an option to run in parallel, or wrap compiled code that runs on multiple processors, eg XGBoost.jl

Warning and apology I have just detected a flaw in the logic for fitting nodal machines. Sometimes a call to fit a a machine is calling update when it should be calling fit. Determining possible impact oj what you are doing is a top priority but I also have the Sprint applications to process today. My humble apologies for this.

@ablaom
Copy link
Member Author

ablaom commented May 30, 2019

Okay, I've had to make a few changes, to resolve #146 (and to lesser extent, #147). The issues give a few details but re-reading the up-dated documentation should help also.

Key differences are:

  1. All the logic about whether or not to train a machine (and whether to fit the corresponding model or just update it) has been moved into fit!(::NodalMachine). When fitting a Node, fit! gets called on every machine in the dependency tape. The doc string for fit!(::NodalMachine) is at the end of this comment.

  2. Each (nodal) machine now tracks the number of times a fit-result has been computed, it's state, and records the complete state of the machines in its tape (the upstream_state) each time it computes a fit-result. The current state of the tape, and the previous state, are used in the logic above.

I've worked quite a bit adding tests to fill the hole that existed before and some. It may be instructive to run over the tests in /test/networks.jl. Still, you could not be blamed for waiting a few days for the dust to settle, in case I think of something I overlooked.

Perhaps worth remarking that the new NodalMachine fields state and upstream_state (state of the tape) do not determine whether or not a new fit-result is to be computed, only how it is computed (MLJBase.fit or MLJBase.update).


    fit!(mach::NodalMachine; rows=nothing, verbosity=1, force=false)

When called for the first time, attempt to call MLJBase.fit on
fit.model. This will fail if a machine in the dependency tape of
mach has not been trained yet, which can be resolved by fitting any
downstream node instead. Subsequent fit! calls do nothing unless:
(i) force=true, or (ii) some machine in the dependency tape of
mach has computed a new fit-result since mach last computed its
fit-result, or (iii) the specified rows have changed since the last
time a fit-result was last computed, or (iv) mach is stale (see
below). In cases (i), (ii) or (iii), MLJBase.fit is
called. Otherwise MLJBase.update is called.

A machine mach is stale if mach.model has changed since the last
time a fit-result was computed, or if if one of its training arguments
is stale. A node N is stale if N.machine is stale or one of its
arguments is stale. Source nodes are never stale.

Note that a nodal machine obtains its training data by calling its
node arguments on the specified rows (rather indexing its arguments
on those rows) and that this calling is a recursive operation on nodes
upstream of those arguments.

@ablaom
Copy link
Member Author

ablaom commented Jun 8, 2019

Let me have another go at describing the scheduler requirements, as unambiguously as possible. I'm guessing the key part is describing the edges of the dependency DAG, which is called "Crucial bit" below.

For brevity, a "machine" is either a NodalMachine object or nothing. A call to fit!(nothing) means "do nothing".

Reduction of the scheduling problem to specification of a DAG

Let N be a Node object. Below I define a DAG, denoted dag(N), whose nodes are labelled by machines, with this property: Each call fit!(N, args...) should call fit!(M, args...) for every machine M labelling a node of dag(N), subject to the constraint that a machine M1 cannot be fit before M2 if M2 labels a node of dag(N) downstream of a node labelled by M1.

The DAG dag(N) is just formal object for now, which may or may not require an explicit representation in the scheduler implementation. It is to play a similar role to the current (linear) tape field of a Node object.

Note that multiple nodes of dag(N) may have the same machine label, but that there is no harm in calling fit! multiple times on the same machine, because internally, the machine's fit method should recognise it is up-to-date and perform a no-op. (However, it is also possible that this internal logic is duplicating stuff the scheduler will track and we can get rid of this?)

Aside: The present tape objects used to play a role in the internal fitting logic of a machine but this has just been removed on master branch.

The DAG specification

Crucial bit: Let univ be the DAG whose nodes are all defined Node and Source objects, and let N1 -> N2 be an edge of univ if N2 in N1.args or N2 in N1.machine.args - that is, if N2 is an argument of N1 or if N2 is a training argument of N1's machine. Then dag(N) is defined to be the full sub-DAG of univ whose nodes are all the nodes downstream of N, including N itself.

Labels: If a node N' of dag(N) is a Source, it's label is nothing; if it is a Node, the label is N.machine (which could also be nothing if N' is static).

@ablaom
Copy link
Member Author

ablaom commented Jun 14, 2019

@jpsamaroo I have pushed a branch called composite in which I have made more changes to the learning networks design that effects the scheduling project. I wonder if before you work further on the scheduler, you could determine the degree of conflict with your work so far, and tell me whether you would be happy for me to merge this onto master now, or wait for your contribution and I mop up later.

The main change is that I have abandoned the "machine" tapes in favour of "node" tapes. This is because:

  • this is simpler: before each machine and node required a tape, now only nodes have a tape (now called nodes)

  • the node tapes actually carry more information, and I needed them to implement superior, user-friendly, "exporting" of learning network as stand-alone models, which I am quite happy about. The basic problem there is to replicate a network from a terminal node, splicing in new component models and sources.

  • these tapes (which are just "linearizations" of the basic DAG) are easier to reason about than the old ones. (By "linearization" of the DAG, I just mean an order-preserving injection into the integers - sorry I don't know the usual name for this).

@jpsamaroo
Copy link
Collaborator

jpsamaroo commented Jun 14, 2019

That's totally fine, merge it whenever you deem it most appropriate. I'll rebase and re-adjust my code once I get back to working on my PR a bit more, and I'll check the diff to get an idea for how the various bits and pieces have moved around.

I've already gotten a somewhat working prototype for using Dagger for training on just the master process, and in the process I didn't have to touch the tapes at all. So if your branch is just moving around things at the tape level, it shouldn't be too terribly impactful to the code I've already written. (And even if it is very impactful, I'm happy to take up the challenge of rebasing on top of your changes 😄 ).

Thanks for the heads-up!

@jpsamaroo
Copy link
Collaborator

I've been thinking on this a lot since I started #151, and I'm starting to come to the conclusion that I might be implementing parallel computing support a bit backwards, both in terms of difficulty level, and in terms of utility to the end user. While just dumping the MLJ DAG into Dagger sounds trivial, it's rife with complications around data sharing and synchronization. It also isn't guaranteed to actually be faster than running on the head node; in fact, I bet my PR will be a regression in performance in almost every case. There's also the mental factor to consider: this is a difficult PR that's been pretty stressful for me to wrap my head around, and the end goal feels like it keeps stepping away from me every time I take a step towards it.

However, I don't want to just drop the work I've done, and leave MLJ without the ability to make good use of Julia's excellent multiprocessing (and new multithreading) utilities. Instead, I think it would be tackle some lower-hanging fruit first, and gradually work back up to the work in #151 once those are working well (if that ends up still being desired).

So far, the functionality that seems obviously ripe for parallel exploitation are tuning, and internal fitting or evaluation of the model itself. Tuning feels pretty straightforward ("MT" means multithreaded): just copy the model to another worker (MT: just copy it), set the appropriate parameters, fit, and send back the measurement result (MT: no need to serialize anything, just return the result).

Model parallelization is maybe less straightforward, but also will probably involve less work on MLJ's part, because it's only an API change. We only need to tell the model whether or not it's permitted/expected to run in parallel, and pass in information about how it should parallelize (workers or threads, and how many of each?).

I'd like to know especially what you think @ablaom , but if anyone else has thoughts or suggestions for additional parallelization opportunities, I'm all ears 😄

@ablaom
Copy link
Member Author

ablaom commented Aug 14, 2019

@jpsamaroo Thanks for the update. Appreciate all the hard work put in so far.

Would certainly appreciate contributions around MT and parallelisation elsewhere in the package.

Here's where we are at with parallelisation (aka distributed computing). At present no MT implementation in MLJ:

models

Generally parallelism and MT at the model level would be the responsibility of those who implement the model algorithms, generally happening in external packages. I guess one issue is how the model "meta-algorithms" in MLJ play with the parallel/MT stuff happening in model training.

tuning

Tuning comes in different strategies (TuningStragegy objects); so far only Grid (for "grid search") is implemented. This is very naturally parallizable but this is not yet implemented at all.

resampling

Every tuning strategy is repeatedly evaluating a model's performance with different hyper parameter values. One gets an estimate of performance using evaluate! method. This method is parallelized but this is not tested and could possibly be improved. If you are interested, I would suggest you start with a review, adding tests, and maybe investigating MT. I've opened #207 as a start - assign yourself (after accepting collaborator invitation) if you are interested.

@ablaom
Copy link
Member Author

ablaom commented Jan 19, 2021

Update A revamp of the composition code means that learning networks are now trained "asynchronously" using tasks; see https://github.com/alan-turing-institute/MLJBase.jl/blob/6eb7eab3ffaded8a0c74b3d9782d22943c7b5311/src/composition/learning_networks/nodes.jl#L198

It should be far easier now to make this training multi-threaded, for example.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants