-
Notifications
You must be signed in to change notification settings - Fork 253
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
adding queue support for parallel processing. This allows you to take a #17
Conversation
… advantage of a long running set of parallel workers with little effort. Output can also be in the form of a queue.
Just an idea: |
good point on the multiple machine scenario. I have been using resque as well. The simplicity of this library to the user is very nice. I develop a lot of scripts that talk to mongodb to get the work out. so my scripts can be launched on multiple boxes to get the same effect. parallel allows me to multi thread things without the complexity of separate code for the generator and the consumer. So I think this addition is a perfect fit for my usecase. |
if you do not need the results, maybe the simples thing would be to use:
|
I don't always need the results but in many cases I need to know if it completed successfully. I also want to take advantage of your separate process management and unknown amount of upfront work |
i did some work on an enumerable implementation, i think this could be a very good base for your queue hacks It now can use any object that responds to each |
It is getting closer. any way to expose the producer queue and the consumer queue? |
i think the queue logic can stay outside of parallel, e.g. class QueueWrapper
def initialize(queue)
@queue=queue
end
def each(&block)
loop{ yield @queue.pop(true) }
rescue ThreadError
# queue is empty, time to quit
end
end
Parallel.map(QueueReader.new(queue)){ .... } |
right but how do you give the queue more work after you have called parallel.map ? in this case you could continue to feed the queue more work but if it ever emptied out it would shutdown. |
then use the blocking pop, so the workers will hang and you can add work at any time |
blocking pop will cause issues with being able to shutdown the queue at some point. if you have 5 workers and only one item remaining 4 will block and need a way to let them know when they are done. that is why I added the closed? method and the non blocking. |
Then add the closed checks inside each e.g.
or something similar, the main idea was to keep the queue logic out of parallel and inside a wrapper, so so you o not have to maintain a fairly complex parallel modification, but just a simple wrapper. |
are you at a stopping point? I can give it a shot and see if it meets my needs. my fork is working good. I also don't mind having a new gem. |
I am currently not planing on merging the enumerable branch, since it would degrade performance and make it impossible to work on non-dumpable items. But I want to keep it around as kind of official 'if you really need it' alternative for cases like this queue or another old issue with mongo, so people do not get stuck and do not have to fork the project |
I understand. and I think either way we go it should be an easily installable gem. My prev question was more to see if you were at a stopping point so I could see what issues I have. the gem would be a new project but I could still be a fork and would be easy to merge any of your future updates into it. |
Sounds like a good plan :) |
here you go. take a look and let me know what you think.
I mainly left things in place and put some checks around where you were writing and reading from the array.
I added some tests and some comments.
If you want to refactor for having a consistent order in and out that is fine. My use case doesn't need it and I would like the ability to bypass
take a look at my other gem that I am using to feed parallel.
https://github.com/pbrumm/queue_bundle
I am using that in a thread to watch for events on mongodb and feed them in to get processed.
And using the output queue to get the results out