-
Notifications
You must be signed in to change notification settings - Fork 158
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ready - Add MultiDelete Command, refactor some KV Command types #650
Conversation
…eletes. thanks to the generic interface for executeAsync, it was easy to plug this in.
…into tu2011-develop
…lock up the command future
This reverts commit 2d3bfaa.
@@ -75,8 +75,8 @@ | |||
{ | |||
|
|||
private final Location location; | |||
private final Map<Option<?>, Object> options = | |||
new HashMap<Option<?>, Object>(); | |||
private final Map<RiakOption<?>, Object> options = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is anyone going to care that Option
changed to RiakOption
?
Also, RiakOption
is in the riak
package, making the Riak
prefix redundant. Why the change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TL;DR: It allows us to use either the FetchValue.Options or DeleteValue.options in the base MultiCommand class, for when we build the individual commands.
riak-java-client/src/main/java/com/basho/riak/client/api/commands/kv/MultiCommand.java
Lines 194 to 206 in 788e071
/** | |
* A {@link RiakOption} to use with each operation. | |
* | |
* @param option an option | |
* @param value the option's associated value | |
* @param <U> the type of the option's value | |
* @return a reference to this object. | |
*/ | |
public <U> ConcreteBuilder withOption(RiakOption<U> option, U value) | |
{ | |
this.options.put(option, value); | |
return self(); | |
} |
I also had to switch FetchValue and DeleteValue's options to use the shared superclass RiakOption
over the local/subclass Option
type, but this just lets us reuse the options on the individual commands without casting:
riak-java-client/src/main/java/com/basho/riak/client/api/commands/kv/MultiCommand.java
Lines 70 to 75 in 788e071
BaseBuilder builder = createBaseBuilderType(location); | |
for (RiakOption<?> option : options.keySet()) | |
{ | |
builder.addOption(option, options.get(option)); | |
} |
We can also then use it in the same way for the MultiCommand Builder:
riak-java-client/src/main/java/com/basho/riak/client/api/commands/kv/MultiCommand.java
Line 132 in 788e071
private Map<RiakOption<?>, Object> options = new HashMap<>(); |
Yay reuse!
Also, RiakOption is in the riak package, making the Riak prefix redundant. Why the change?
RiakOption's always been named as such, no changes there.
Just a couple comments |
|
||
Submitter submitter = new Submitter(operations, maxInFlight, cluster, future); | ||
|
||
Thread t = new Thread(submitter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure, but it might be a good case to use Netty's Thread pool here
t.setDaemon(true); | ||
t.start(); | ||
new Thread(submitter).start(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the threads that are created here aren't kept track of and join()
-ed correctly, they could keep the entire JVM from shutting down cleanly.
I am assuming that is why setDaemon(true)
was used previously - it's the lazy way of not having to deal with clean shutdown.
Is there a way that at the end of the MultiCommand
these threads can be cleaned up? Does a Submitter
know when all of its work is done and is able to exit and let the thread complete?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the only thing the new thread is used for is kicking off the individual scalar commands, one at a time until it reaches it's max concurrency. If we're interrupted while doing that we set fail
on the multicommand future, and let the thread die. This thread shouldn't have ownership over anything so letting it die is clean and humane.
riak-java-client/src/main/java/com/basho/riak/client/api/commands/kv/MultiCommand.java
Lines 253 to 271 in 17aa829
@Override | |
public void run() | |
{ | |
for (BaseCommand command : commands) | |
{ | |
try | |
{ | |
inFlight.acquire(); | |
} | |
catch (InterruptedException ex) | |
{ | |
multiFuture.setFailed(ex); | |
break; | |
} | |
RiakFuture<BaseResponseType, Location> future = executeBaseCommandAsync(command, cluster); | |
future.addListener(this); | |
} | |
} |
Once it reaches the end of the run()
block, the JVM will kill the thread, so no join()
is needed.
As for shutting it down... I'll have to think on that :) Maybe daemon is the way to go there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right ... if there is a close()
or stop()
(shutdown()
?) method on the client object, something in the run()
block needs to know to make an early exit, and, most likely, the threads should be join()
-ed so that client shutdown can be deterministic. May be more work than it's worth now, so a note could be made and setDaemon(true)
put back in.
Thanks to @gerardstannard for the original PR! (#487). |
Superscedes #487, and removes duplication between MultiFetch and MultiDelete.