-
Notifications
You must be signed in to change notification settings - Fork 295
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
khmer multiprocessing + seqan #655
Open
camillescott
wants to merge
117
commits into
master
Choose a base branch
from
feature/threading_seqan
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Bring in fixes to threading in do-partition.py
This reverts commit ac399a5.
Merged
Re: my previous comment about that serious memory leak, it has been resolved in #692. |
|
Uses boost's included bcp utility to extract relevant files. Command used was: `bcp boost/lockfree/queue.hpp --namespace=pkgboost --boost=/usr/include/ third-party/boost` This places the packaged boost into its own namespace, `pkgboost`, to avoid collisions with existing boost installations.
…for all async module
… issue, add explicit check for is_threadsafe, add more explicit state management to async_sequence_processor
ping @camillescott |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Multiprocessing for khmer.
Addresses #76 and #92; greatly extends khmer's multiprocessing capabilities.
See #638 for original PR.
See #656 for a pull against the seqan branch which masks those changes and improves readability.
See khmer-metrics for some performance profiling.
From original pull:
Example usage from Python can be found here: https://github.com/camillescott/khmer-metrics/blob/master/test_async_diginorm.py
@ctb @mr-c @luizirber thoughts on the Python interface are welcome. As of now, interaction with processed reads is mediated by an iterator over the output queue, which returns khmer::read_parsers::Read objects.
Design
The current design builds off the following assumptions:
With these constraints in mind, I have begun by focusing on streaming tasks and taking advantage of asynchronous IO and hashtable access. The basic building block is thus the
Async
abstract base class, which:khmer::Hashtable
instance on constructionHashIntoType
,Read
, orconst char *
.consume
method, which is expected to be threadsafe and pull from the input queuestart(int n_threads)
method which launches the specified number of threads runningconsume
stop()
method which stops the runningconsume
threadsAll the actual async implementations build off this class. For example, the
AsyncSequenceWriter
inherits fromAsync<const char *>
, and itsconsume
method breaks down the input sequences into k-mers and writes them to the given hashtable.The
AsyncSequenceProcessor
is another abstract class which builds offAsync<Read>
, adding an output queue and an additional reader thread; the reader thread parses reads from a file given tostart
, which are asynchronously pushed to the input queue. The consume threads still pull off this queue, and are expected to push their results to the output queue. It also declares astop_iter
method, which returns false when the conditions indicate that all parsing and processing is complete and is used for the python interface.AsyncDiginorm
(and any other future read processors, say, abundfilt) inherits fromAsyncSequenceProcessor
. Itsconsume
method implements digital normalization with a cutoff value given to thestart
method.Python Interface
As expected, the various processors are exposed as Python objects. For now, only
AsyncDiginorm
is fully wrapped, thoughAsyncSequenceProcessor
is partially wrapped. Theirnew
methods pull the pointer to aHashtable
object from the object's python wrapper and pass it to the constructor. The progress getters, start, and stop methods are exposed. A user creates a counting hash, then anAsyncDiginorm
object, and passes that table in. Then, they callstart
with the desired cutoff, filename, and number of threads, which launches the parser thread and consume threads, which run asynchronously. The final piece is output, which is the reason forAsyncSequenceProcessor
to be exposed; it defines an iterator over the output queue, which callsiter_stop
to determine status. Maintaining the class hierarchy in Python-land not adds structure, but also avoids needing to redefine this iterator for every processor class.Boost
This implementation uses
boost::lockfree::queue.hpp
. This is a non-blocking, lock-free, multi-producer multi-consumer queue. Queues are a possible huge bottleneck, and these lockfree queues are considerably faster in this case than a trivial locked queue. Their implementation means that they have a max size of 65535; this doesn't really matter, as I have limited the max queue length to 50000 as is. This is to avoid the read parser getting ahead of the processor threads and filling up main memory. The parser thread simply spins until it can push to the queue again.There is some debate to be had as to whether boost is a good solution, but for now I'd rather spend time working on khmer's internals and not reinventing the data structure wheel. At the request of @mr-c, I have package a subset of boost in third-party. Conveniently, the boost devs provide a tool called bcp for just that. The command I ran to extract the relevant files is:
The
--namespace
option renames all the boost namespaces. I have done this to make sure users are linking to our version of boost and not a local version they have installed.This once again adds a pile of new files, but I think avoiding the hassle of implementing the queue ourselves is worthwhile. Note that going this direction also opens up the option of using boost to tackle the streaming problem, but that's for @mr-c, @ctb, and @bocajnotnef to figure out :)
NOTE: I rolled back this change for now because it once again made it impossible to review. However, this is the process we can/should use for final merge.
Further Considerations
An important consideration is that the asynchronous nature of this method means results are not replicable. In particular, digital normalization on smaller file sizes can run into considerable variability (+/- 20000 reads kept on a 1m read input), because of the async hash writer thread. However, as the hashtable becomes more saturated, the writer thread "catches up" to the processor threads, and the results (should) converge toward what one would expect from a normal, serial run. Curiously, this also means that the program runs faster the longer it runs, approaching the IO speed (given number of threads, disk speed, etc), because processor threads are no longer waiting for the writer thread to write out the reads in its queue.
Project TODO
boost::lockfree
with @ctbboost::lockfree
into third-partyMerge Checklist
Check for code coverage.
make pep8
,make diff_pylint_report
,make cppcheck
, andmake doc
output. Usemake format
and manual fixing as needed.changes were made?