/
RiakClient.java
483 lines (456 loc) · 19.2 KB
/
RiakClient.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
/*
* Copyright 2013 Basho Technologies Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.basho.riak.client.api;
import com.basho.riak.client.core.RiakCluster;
import com.basho.riak.client.core.RiakFuture;
import com.basho.riak.client.core.RiakNode;
import com.basho.riak.client.core.util.HostAndPort;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* <script src="https://google-code-prettify.googlecode.com/svn/loader/run_prettify.js"></script>
* The client used to perform operations on Riak.
* <p>
* The core of the Java client models a Riak cluster:
* </p>
* <img src="doc-files/client-image.png">
* </p>
* <p>
* The easiest way to get started with the client API is using one of the static
* methods provided to instantiate and start the client:
* </p>
* <pre class="prettyprint">
* {@code
* RiakClient client =
* RiakClient.newClient("192.168.1.1","192.168.1.2","192.168.1.3"); } </pre>
*
* Note that the Riak Java client uses the Riak Protocol Buffers API exclusively.
* <p>
* For more complex configurations you will instantiate one or more {@link com.basho.riak.client.core.RiakNode}s
* and build a {@link com.basho.riak.client.core.RiakCluster} to supply to the
* RiakClient constructor.
* </p>
* <pre class="prettyprint">
* {@code
* RiakNode.Builder builder = new RiakNode.Builder();
* builder.withMinConnections(10);
* builder.withMaxConnections(50);
*
* List<String> addresses = new LinkedList<String>();
* addresses.add("192.168.1.1");
* addresses.add("192.168.1.2");
* addresses.add("192.168.1.3");
*
* List<RiakNode> nodes = RiakNode.Builder.buildNodes(builder, addresses);
* RiakCluster cluster = new RiakCluster.Builder(nodes).build();
* cluster.start();
* RiakClient client = new RiakClient(cluster); }</pre>
* <p>
* Once you have a client, {@literal RiakCommand}s from the {@literal com.basho.riak.client.api.commands.*}
* packages are built then executed by the client:
* <pre class="prettyprint">
* {@code
* Namespace ns = new Namespace("default","my_bucket");
* Location loc = new Location(ns, "my_key");
* FetchValue fv = new FetchValue.Builder(loc).build();
* FetchValue.Response response = client.execute(fv);
* RiakObject obj = response.getValue(RiakObject.class);}</pre>
* </p>
* <p>
* You can also execute all {@literal RiakCommand}s asynchronously. A
* {@link RiakFuture} for the operation is immediately returned:
* <pre class="prettyprint">
* {@code
* Namespace ns = new Namespace("default","my_bucket");
* Location loc = new Location(ns, "my_key");
* FetchValue fv = new FetchValue.Builder(loc).build();
* RiakFuture<FetchValue.Response, Location> future = client.executeAsync(fv);
* future.await();
* if (future.isSuccess())
* {
* FetchValue.Response response = future.getNow();
* RiakObject obj = response.getValue(RiakObject.class);
* ...
* }
* else
* {
* Throwable error = future.cause();
* ...
* }}</pre>
* </p>
* <p>
* <h1>RiakCommand subclasses</h1>
* <h4>Fetching, storing and deleting objects</h4>
* <ul>
* <li>{@link com.basho.riak.client.api.commands.kv.FetchValue}</li>
* <li>{@link com.basho.riak.client.api.commands.kv.MultiFetch}</li>
* <li>{@link com.basho.riak.client.api.commands.kv.StoreValue}</li>
* <li>{@link com.basho.riak.client.api.commands.kv.UpdateValue}</li>
* <li>{@link com.basho.riak.client.api.commands.kv.DeleteValue}</li>
* </ul>
* <h4>Listing keys in a namespace</h4>
* <ul><li>{@link com.basho.riak.client.api.commands.kv.ListKeys}</li></ul>
* <h4>Secondary index (2i) commands</h4>
* <ul>
* <li>{@link com.basho.riak.client.api.commands.indexes.RawIndexQuery}</li>
* <li>{@link com.basho.riak.client.api.commands.indexes.BinIndexQuery}</li>
* <li>{@link com.basho.riak.client.api.commands.indexes.IntIndexQuery}</li>
* <li>{@link com.basho.riak.client.api.commands.indexes.BigIntIndexQuery}</li>
* </ul>
* <h4>Fetching and storing datatypes (CRDTs)</h4>
* <ul>
* <li>{@link com.basho.riak.client.api.commands.datatypes.FetchCounter}</li>
* <li>{@link com.basho.riak.client.api.commands.datatypes.FetchSet}</li>
* <li>{@link com.basho.riak.client.api.commands.datatypes.FetchMap}</li>
* <li>{@link com.basho.riak.client.api.commands.datatypes.UpdateCounter}</li>
* <li>{@link com.basho.riak.client.api.commands.datatypes.UpdateSet}</li>
* <li>{@link com.basho.riak.client.api.commands.datatypes.UpdateMap}</li>
* </ul>
* <h4>Querying and modifying buckets</h4>
* <ul>
* <li>{@link com.basho.riak.client.api.commands.buckets.FetchBucketProperties}</li>
* <li>{@link com.basho.riak.client.api.commands.buckets.StoreBucketProperties}</li>
* <li>{@link com.basho.riak.client.api.commands.buckets.ListBuckets}</li>
* </ul>
* <h4>Search commands</h4>
* <ul>
* <li>{@link com.basho.riak.client.api.commands.search.Search}</li>
* <li>{@link com.basho.riak.client.api.commands.search.FetchIndex}</li>
* <li>{@link com.basho.riak.client.api.commands.search.StoreIndex}</li>
* <li>{@link com.basho.riak.client.api.commands.search.DeleteIndex}</li>
* <li>{@link com.basho.riak.client.api.commands.search.FetchSchema}</li>
* <li>{@link com.basho.riak.client.api.commands.search.StoreSchema}</li>
* </ul>
* <h4>Map-Reduce</h4>
* <ul>
* <li>{@link com.basho.riak.client.api.commands.mapreduce.BucketMapReduce}</li>
* <li>{@link com.basho.riak.client.api.commands.mapreduce.BucketKeyMapReduce}</li>
* <li>{@link com.basho.riak.client.api.commands.mapreduce.IndexMapReduce}</li>
* <li>{@link com.basho.riak.client.api.commands.mapreduce.SearchMapReduce}</li>
* </ul>
* @author Dave Rusek <drusek at basho dot com>
* @author Brian Roach <roach at basho dot com>
* @author Alex Moore <amoore at basho.com>
* @author Sergey Galkin <srggal at gmail dot com>
* @since 2.0
*/
public class RiakClient
{
private final RiakCluster cluster;
/**
* Create a new RiakClient to perform operations on the given cluster.
* <p>
* The RiakClient provides a user API on top of the client core. Once
* instantiated, commands are submitted to it for execution on Riak.
* </p>
* @param cluster the started RiakCluster to use.
*/
public RiakClient(RiakCluster cluster)
{
this.cluster = cluster;
}
/**
* Static factory method to create a new client instance.
* This method produces a client that connects to 127.0.0.1 on the default
* protocol buffers port (8087).
*
* @return a new client instance.
* @throws UnknownHostException
*/
public static RiakClient newClient() throws UnknownHostException
{
RiakNode.Builder builder = new RiakNode.Builder()
.withMinConnections(10);
RiakCluster cluster = new RiakCluster.Builder(builder.build()).build();
cluster.start();
return new RiakClient(cluster);
}
/**
* Static factory method to create a new client instance.
* This method produces a client connected to the supplied addresses on
* the supplied port.
* @param remoteAddresses a list of IP addresses or hostnames
* @param port the (protocol buffers) port to connect to on the supplied hosts.
* @return a new client instance
* @throws UnknownHostException if a supplied hostname cannot be resolved.
*/
public static RiakClient newClient(int port, String... remoteAddresses) throws UnknownHostException
{
return newClient(port, Arrays.asList(remoteAddresses));
}
/**
* Static factory method to create a new client instance.
* This method produces a client connected to the supplied addresses on
* the default (protocol buffers) port (8087).
* @param remoteAddresses a list of IP addresses or hostnames
* @return a new client instance
* @throws UnknownHostException if a supplied hostname cannot be resolved.
*/
public static RiakClient newClient(List<String> remoteAddresses) throws UnknownHostException
{
return newClient(RiakNode.Builder.DEFAULT_REMOTE_PORT, remoteAddresses);
}
/**
* Static factory method to create a new client instance.
* This method produces a client connected to the supplied addresses on
* the default (protocol buffers) port (8087).
* @param remoteAddresses a list of IP addresses or hostnames
* @return a new client instance
* @throws UnknownHostException if a supplied hostname cannot be resolved.
*/
public static RiakClient newClient(String... remoteAddresses) throws UnknownHostException
{
return newClient(RiakNode.Builder.DEFAULT_REMOTE_PORT, Arrays.asList(remoteAddresses));
}
/**
* Static factory method to create a new client instance.
* This method produces a client connected to the supplied addresses on
* the supplied port.
* @param remoteAddresses a list of IP addresses or hostnames
* @param port the (protocol buffers) port to connect to on the supplied hosts.
* @return a new client instance
* @throws UnknownHostException if a supplied hostname cannot be resolved.
*/
public static RiakClient newClient(int port, List<String> remoteAddresses) throws UnknownHostException
{
RiakNode.Builder builder = createDefaultNodeBuilder()
.withRemotePort(port);
return newClient(builder, remoteAddresses);
}
/**
* Static factory method to create a new client instance.
* This method produces a client connected to the supplied addresses.
* @param addresses one or more addresses to connect to.
* @return a new RiakClient instance.
* @throws java.net.UnknownHostException if a supplied hostname cannot be resolved.
*/
public static RiakClient newClient(InetSocketAddress... addresses) throws UnknownHostException
{
final List<String> remoteAddresses = new ArrayList<>(addresses.length);
for (InetSocketAddress addy : addresses)
{
remoteAddresses.add(
String.format("%s:%s", addy.getHostName(), addy.getPort())
);
}
return newClient(createDefaultNodeBuilder(), remoteAddresses);
}
/**
* Static factory method to create a new client instance.
* This method produces a client connected to the supplied addresses and containing the {@link RiakNode}s
* that will be build by using provided builder.
* @param addresses one or more addresses to connect to.
* @return a new RiakClient instance.
* @throws java.net.UnknownHostException if a supplied hostname cannot be resolved.
* @since 2.0.3
* @see com.basho.riak.client.core.RiakCluster.Builder#RiakCluster.Builder(RiakNode.Builder, List)
*/
// NB: IntelliJ will see the above @see statement as invalid, but it's correct: https://bugs.openjdk.java.net/browse/JDK-8031625
public static RiakClient newClient(RiakNode.Builder nodeBuilder, List<String> addresses) throws UnknownHostException
{
final RiakCluster cluster = new RiakCluster.Builder(nodeBuilder, addresses).build();
cluster.start();
return new RiakClient(cluster);
}
/**
* Static factory method to create a new client instance.
*
* @since 2.0.3
* @see #newClient(RiakNode.Builder, List)
*/
public static RiakClient newClient(RiakNode.Builder nodeBuilder, String... addresses) throws UnknownHostException
{
return newClient(nodeBuilder, Arrays.asList(addresses));
}
/**
* Static factory method to create a new client instance.
*
* @since 2.0.6
*/
public static RiakClient newClient(Collection<HostAndPort> hosts) throws UnknownHostException
{
return newClient(hosts, createDefaultNodeBuilder());
}
/**
* Static factory method to create a new client instance.
*
* @since 2.0.6
*/
public static RiakClient newClient(Collection<HostAndPort> hosts, RiakNode.Builder nodeBuilder) throws UnknownHostException
{
final RiakCluster cluster = new RiakCluster.Builder(hosts, nodeBuilder).build();
cluster.start();
return new RiakClient(cluster);
}
/**
*
* @since 2.0.3
*/
public static RiakNode.Builder createDefaultNodeBuilder()
{
return new RiakNode.Builder()
.withMinConnections(10);
}
/**
* Execute a RiakCommand synchronously.
* <p>
* Calling this method causes the client to execute the provided RiakCommand synchronously.
* It will block until the operation completes then either return the response
* on success or throw an exception on failure.
* </p>
*
* @param command
* The RiakCommand to execute.
* @param <T>
* The RiakCommand's return type.
* @param <S> The RiakCommand's query info type.
* @return a response from Riak.
* @throws ExecutionException if the command fails for any reason.
* @throws InterruptedException
*/
public <T,S> T execute(RiakCommand<T,S> command) throws ExecutionException, InterruptedException
{
return command.execute(cluster);
}
/**
* Execute a RiakCommand synchronously with a specified client timeout.
* <p>
* Calling this method causes the client to execute the provided RiakCommand synchronously.
* It will block until the operation completes or up to the given timeout.
* It will either return the response on success or throw an
* exception on failure.
* Note: Using this timeout is different that setting a timeout on the command
* itself using the timeout() method of the command's associated builder.
* The command timeout is a Riak-side timeout value. This timeout is client-side.
* </p>
*
* @param command
* The RiakCommand to execute.
* @param timeout the amount of time to wait before returning an exception
* @param unit the unit of time.
* @param <T>
* The RiakCommand's return type.
* @param <S>
* The RiakCommand's query info type.
* @return a response from Riak.
* @throws ExecutionException
* if the command fails for any reason.
* @throws InterruptedException
* @throws TimeoutException
* if the call to execute the command did not finish within the time limit
*/
public <T, S> T execute(RiakCommand<T, S> command, long timeout, TimeUnit unit) throws ExecutionException,
InterruptedException, TimeoutException {
return command.execute(cluster, timeout, unit);
}
/**
* Execute a RiakCommand asynchronously.
* <p>
* Calling this method causes the client to execute the provided RiakCommand
* asynchronously. It will immediately return a RiakFuture that contains the
* running operation.
* @param <T> RiakCommand's return type.
* @param <S> The RiakCommand's query info type.
* @param command The RiakCommand to execute.
* @return a RiakFuture for the operation.
* @see RiakFuture
*/
public <T,S> RiakFuture<T,S> executeAsync(RiakCommand<T,S> command)
{
return command.executeAsync(cluster);
}
/**
* Execute a StreamableRiakCommand asynchronously, and stream the results back before
* the command {@link RiakFuture#isDone() is done}.
* <p>
* Calling this method causes the client to execute the provided
* StreamableRiakCommand asynchronously.
* It will immediately return a RiakFuture that contains an
* <b>immediately</b> available result (via {@link RiakFuture#get()}) that
* data will be streamed to.
* The RiakFuture will also keep track of the overall operation's progress
* with the {@link RiakFuture#isDone}, etc methods.
* </p>
* <p>
* Because the consumer thread will poll for new results, it is advisable to check the
* consumer thread's interrupted status via
* {@link Thread#isInterrupted() Thread.currentThread().isInterrupted() }, as the result
* iterator will not propagate an InterruptedException, but it will set the Thread's
* interrupted flag.
* </p>
* @param <I> StreamableRiakCommand's immediate return type, available before the command/operation is complete.
* @param <S> The RiakCommand's query info type.
* @param command The RiakCommand to execute.
* @param timeoutMS The polling timeout in milliseconds for each result chunk.
* If the timeout is reached it will try again, instead of blocking indefinitely.
* If the value is too small (less than the average chunk arrival time), the
* result iterator will essentially busy wait.
* If the timeout is too large (much greater than the average chunk arrival time),
* the result iterator can block the consuming thread from seeing the done()
* status until the timeout is reached.
* @return a RiakFuture for the operation
* @since 2.1.0
* @see RiakFuture
*/
public <I extends StreamableRiakCommand.StreamableResponse,S> RiakFuture<I,S> executeAsyncStreaming(StreamableRiakCommand<I, S, ?, ?> command, int timeoutMS)
{
return command.executeAsyncStreaming(cluster, timeoutMS);
}
/**
* Shut down the client and the underlying RiakCluster.
* <p>
* The underlying client core (RiakCluster) uses a number of threads as
* does Netty. Calling this method will shut down all those threads cleanly.
* Failure to do so may prevent your application from exiting.
* </p>
* @return a future that will complete when shutdown
*/
public Future<Boolean> shutdown()
{
return cluster.shutdown();
}
/**
* Get the RiakCluster being used by this client.
* <p>
* Allows for adding/removing nodes, etc.
* </p>
* @return The RiakCluster instance being used by this client.
*/
public RiakCluster getRiakCluster()
{
return cluster;
}
/**
* Cleans up any Thread-Local variables after shutdown.
* This operation is useful when you are in a container environment, and you
* do not want to leave the thread local variables in the threads you do not manage.
* Call this method when your application is being unloaded from the container, <b>after</b>
* all {@link RiakNode}, {@link RiakCluster}, and {@link com.basho.riak.client.api.RiakClient}
* objects are in the shutdown state.
*/
public void cleanup()
{
cluster.cleanup();
}
}