-
Notifications
You must be signed in to change notification settings - Fork 303
/
Copy pathHBaseClient.java
4636 lines (4328 loc) · 192 KB
/
HBaseClient.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright (C) 2010-2018 The Async HBase Authors. All rights reserved.
* This file is part of Async HBase.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the StumbleUpon nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package org.hbase.async;
import java.net.Inet6Address;
import com.google.common.cache.LoadingCache;
import com.google.protobuf.InvalidProtocolBufferException;
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
import com.stumbleupon.async.DeferredGroupException;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.hbase.async.generated.ZooKeeperPB;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.DefaultChannelPipeline;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.SocketChannel;
import org.jboss.netty.channel.socket.SocketChannelConfig;
import org.jboss.netty.channel.socket.nio.NioChannelConfig;
import org.jboss.netty.channel.socket.nio.NioClientBossPool;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioWorkerPool;
import org.jboss.netty.handler.timeout.IdleState;
import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
import org.jboss.netty.handler.timeout.IdleStateEvent;
import org.jboss.netty.handler.timeout.IdleStateHandler;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.ThreadNameDeterminer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.Timer;
import org.jboss.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
/**
* A fully asynchronous, thread-safe, modern HBase client.
* <p>
* Unlike the traditional HBase client ({@code HTable}), this client should be
* instantiated only once. You can use it with any number of tables at the
* same time. The only case where you should have multiple instances is when
* you want to use multiple different clusters at the same time.
* <p>
* If you play by the rules, this client is (in theory {@code :D}) completely
* thread-safe. Read the documentation carefully to know what the requirements
* are for this guarantee to apply.
* <p>
* This client is fully non-blocking, any blocking operation will return a
* {@link Deferred} instance to which you can attach a {@link Callback} chain
* that will execute when the asynchronous operation completes.
*
* <h1>Note regarding {@code HBaseRpc} instances passed to this class</h1>
* Every {@link HBaseRpc} passed to a method of this class should not be
* changed or re-used until the {@code Deferred} returned by that method
* calls you back. <strong>Changing or re-using any {@link HBaseRpc} for
* an RPC in flight will lead to <em>unpredictable</em> results and voids
* your warranty</strong>.
*
* <h1>Data Durability</h1>
* Some methods or RPC types take a {@code durable} argument. When an edit
* requests to be durable, the success of the RPC guarantees that the edit is
* safely and durably stored by HBase and won't be lost. In case of server
* failures, the edit won't be lost although it may become momentarily
* unavailable. Setting the {@code durable} argument to {@code false} makes
* the operation complete faster (and puts a lot less strain on HBase), but
* removes this durability guarantee. In case of a server failure, the edit
* may (or may not) be lost forever. When in doubt, leave it to {@code true}
* (or use the corresponding method that doesn't accept a {@code durable}
* argument as it will default to {@code true}). Setting it to {@code false}
* is useful in cases where data-loss is acceptable, e.g. during batch imports
* (where you can re-run the whole import in case of a failure), or when you
* intend to do statistical analysis on the data (in which case some missing
* data won't affect the results as long as the data loss caused by machine
* failures preserves the distribution of your data, which depends on how
* you're building your row keys and how you're using HBase, so be careful).
* <p>
* Bear in mind that this durability guarantee holds only once the RPC has
* completed successfully. Any edit temporarily buffered on the client side
* or in-flight will be lost if the client itself crashes. You can control
* how much buffering is done by the client by using {@link #setFlushInterval}
* and you can force-flush the buffered edits by calling {@link #flush}. When
* you're done using HBase, you <strong>must not</strong> just give up your
* reference to your {@code HBaseClient}, you must shut it down gracefully by
* calling {@link #shutdown}. If you fail to do this, then all edits still
* buffered by the client will be lost.
* <p>
* <b>NOTE</b>: This entire section assumes that you use a distributed file
* system that provides HBase with the required durability semantics. If
* you use HDFS, make sure you have a version of HDFS that provides HBase
* the necessary API and semantics to durability store its data.
*
* <h1>{@code throws} clauses</h1>
* None of the asynchronous methods in this API are expected to throw an
* exception. But the {@link Deferred} object they return to you can carry an
* exception that you should handle (using "errbacks", see the javadoc of
* {@link Deferred}). In order to be able to do proper asynchronous error
* handling, you need to know what types of exceptions you're expected to face
* in your errbacks. In order to document that, the methods of this API use
* javadoc's {@code @throws} to spell out the exception types you should
* handle in your errback. Asynchronous exceptions will be indicated as such
* in the javadoc with "(deferred)".
* <p>
* For instance, if a method {@code foo} pretends to throw an
* {@link UnknownScannerException} and returns a {@code Deferred<Whatever>},
* then you should use the method like so:
* <pre>
* HBaseClient client = ...;
* {@link Deferred}{@code <Whatever>} d = client.foo();
* d.addCallbacks(new {@link Callback}{@code <Whatever, SomethingElse>}() {
* SomethingElse call(Whatever arg) {
* LOG.info("Yay, RPC completed successfully!");
* return new SomethingElse(arg.getWhateverResult());
* }
* String toString() {
* return "handle foo response";
* }
* },
* new {@link Callback}{@code <Exception, Object>}() {
* Object call(Exception arg) {
* if (arg instanceof {@link UnknownScannerException}) {
* LOG.error("Oops, we used the wrong scanner?", arg);
* return otherAsyncOperation(); // returns a {@code Deferred<Blah>}
* }
* LOG.error("Sigh, the RPC failed and we don't know what to do", arg);
* return arg; // Pass on the error to the next errback (if any);
* }
* String toString() {
* return "foo errback";
* }
* });
* </pre>
* This code calls {@code foo}, and upon successful completion transforms the
* result from a {@code Whatever} to a {@code SomethingElse} (which will then
* be given to the next callback in the chain, if any). When there's a
* failure, the errback is called instead and it attempts to handle a
* particular type of exception by retrying the operation differently.
*/
public final class HBaseClient {
/*
* TODO(tsuna): Address the following.
*
* - Properly handle disconnects.
* - Attempt to reconnect a couple of times, see if it was a transient
* network blip.
* - If the -ROOT- region is unavailable when we start, we should
* put a watch in ZK instead of polling it every second.
* - Stats:
* - QPS per RPC type.
* - Latency histogram per RPC type (requires open-sourcing the SU Java
* stats classes that I wrote in a separate package).
* - Cache hit rate in the local META cache.
* - RPC errors and retries.
* - Typical batch size when flushing edits (is that useful?).
* - Write unit tests and benchmarks!
*/
private static final Logger LOG = LoggerFactory.getLogger(HBaseClient.class);
/**
* An empty byte array you can use. This can be useful for instance with
* {@link Scanner#setStartKey} and {@link Scanner#setStopKey}.
*/
public static final byte[] EMPTY_ARRAY = new byte[0];
/** A byte array containing a single zero byte. */
private static final byte[] ZERO_ARRAY = new byte[] { 0 };
protected static final byte[] ROOT = new byte[] { '-', 'R', 'O', 'O', 'T', '-' };
protected static final byte[] ROOT_REGION = new byte[] { '-', 'R', 'O', 'O', 'T', '-', ',', ',', '0' };
/** HBase 0.98 and up: -ROOT- is now hbase:root */
static final byte[] HBASE98_ROOT =
new byte[] { 'h', 'b', 'a', 's', 'e', ':', 'r', 'o', 'o', 't'};
static final byte[] HBASE98_ROOT_REGION =
new byte[] { 'h', 'b', 'a', 's', 'e', ':', 'r', 'o', 'o', 't', ',',',', '0' };
protected static final byte[] META = new byte[] { '.', 'M', 'E', 'T', 'A', '.' };
protected static final byte[] INFO = new byte[] { 'i', 'n', 'f', 'o' };
protected static final byte[] REGIONINFO = new byte[] { 'r', 'e', 'g', 'i', 'o', 'n', 'i', 'n', 'f', 'o' };
protected static final byte[] SERVER = new byte[] { 's', 'e', 'r', 'v', 'e', 'r' };
/** HBase 0.95 and up: .META. is now hbase:meta */
protected static final byte[] HBASE96_META =
new byte[] { 'h', 'b', 'a', 's', 'e', ':', 'm', 'e', 't', 'a' };
/** New for HBase 0.95 and up: the name of META is fixed. */
protected static final byte[] META_REGION_NAME =
new byte[] { 'h', 'b', 'a', 's', 'e', ':', 'm', 'e', 't', 'a', ',', ',', '1' };
/** New for HBase 0.95 and up: the region info for META is fixed. */
protected static final RegionInfo META_REGION =
new RegionInfo(HBASE96_META, META_REGION_NAME, EMPTY_ARRAY);
/**
* In HBase 0.95 and up, this magic number is found in a couple places.
* It's used in the znode that points to the .META. region, to
* indicate that the contents of the znode is a protocol buffer.
* It's also used in the value of the KeyValue found in the .META. table
* that contain a {@link RegionInfo}, to indicate that the value contents
* is a protocol buffer.
*/
static final int PBUF_MAGIC = 1346524486; // 4 bytes: "PBUF"
/**
* Timer we use to handle all our timeouts.
* TODO(tsuna): Get it through the ctor to share it with others.
*/
private final HashedWheelTimer timer;
/** A separate timer thread used for processing RPC timeout callbacks. We
* keep it separate as a bad HBase server can cause a timeout storm and we
* don't want to block any flushes and operations on other region servers.
*/
private final HashedWheelTimer rpc_timeout_timer;
/** Up to how many milliseconds can we buffer an edit on the client side. */
private volatile short flush_interval;
/** How many different counters do we want to keep in memory for buffering. */
private volatile int increment_buffer_size;
/**
* Low and high watermarks when buffering RPCs due to an NSRE.
* @see #handleNSRE
*/
private int nsre_low_watermark;
private int nsre_high_watermark;
/**
* Factory through which we will create all its channels / sockets.
*/
private final ClientSocketChannelFactory channel_factory;
/** Watcher to keep track of the -ROOT- region in ZooKeeper. */
private final ZKClient zkclient;
/**
* The client currently connected to the -ROOT- region.
* If this is {@code null} then we currently don't know where the -ROOT-
* region is and we're waiting for a notification from ZooKeeper to tell
* us where it is.
* Note that with HBase 0.95, {@link #has_root} would be false, and this
* would instead point to the .META. region.
*/
private volatile RegionClient rootregion;
/**
* Whether or not there is a -ROOT- region.
* When connecting to HBase 0.95 and up, this would be set to false, so we
* would go straight to .META. instead (except in the case of Split meta
* where there are multiple meta regions and a root to route to them).
*/
volatile boolean has_root = true;
/**
* Maps {@code (table, start_key)} pairs to the {@link RegionInfo} that
* serves this key range for this table.
* <p>
* The keys in this map are region names.
* @see #createRegionSearchKey
* Because it's a sorted map, we can efficiently find a region given an
* arbitrary key.
* @see #getRegion
* <p>
* This map and the next 2 maps contain the same data, but indexed
* differently. There is no consistency guarantee across the maps.
* They are not updated all at the same time atomically. This map
* is always the first to be updated, because that's the map from
* which all the lookups are done in the fast-path of the requests
* that need to locate a region. The second map to be updated is
* {@link region2client}, because it comes second in the fast-path
* of every requests that need to locate a region. The third map
* is only used to handle RegionServer disconnections gracefully.
* <p>
* Note: before using the {@link RegionInfo} you pull out of this map,
* you <b>must</b> ensure that {@link RegionInfo#table} doesn't return
* {@link #EMPTY_ARRAY}. If it does, it means you got a special entry
* used to indicate that this region is known to be unavailable right
* now due to an NSRE. You must not use this {@link RegionInfo} as
* if it was a normal entry.
* @see #handleNSRE
*/
private final ConcurrentSkipListMap<byte[], RegionInfo> regions_cache =
new ConcurrentSkipListMap<byte[], RegionInfo>(RegionInfo.REGION_NAME_CMP);
/**
* Maps a {@link RegionInfo} to the client currently connected to the
* RegionServer that serves this region.
* <p>
* The opposite mapping is stored in {@link #client2regions}.
* There's no consistency guarantee with that other map.
* See the javadoc for {@link #regions_cache} regarding consistency.
*/
private final ConcurrentHashMap<RegionInfo, RegionClient> region2client =
new ConcurrentHashMap<RegionInfo, RegionClient>();
/**
* Maps a client connected to a RegionServer to the list of regions we know
* it's serving so far.
* <p>
* The opposite mapping is stored in {@link #region2client}.
* There's no consistency guarantee with that other map.
* See the javadoc for {@link #regions_cache} regarding consistency.
* <p>
* Each list in the map is protected by its own monitor lock.
*/
private final ConcurrentHashMap<RegionClient, ArrayList<RegionInfo>>
client2regions = new ConcurrentHashMap<RegionClient, ArrayList<RegionInfo>>();
/**
* Cache that maps a RegionServer address ("ip:port") to the client
* connected to it.
* <p>
* Access to this map must be synchronized by locking its monitor.
* Lock ordering: when locking both this map and a RegionClient, the
* RegionClient must always be locked first to avoid deadlocks. Logging
* the contents of this map (or calling toString) requires copying it first.
* <p>
* This isn't a {@link ConcurrentHashMap} because we don't use it frequently
* (just when connecting to / disconnecting from RegionServers) and when we
* add something to it, we want to do an atomic get-and-put, but
* {@code putIfAbsent} isn't a good fit for us since it requires to create
* an object that may be "wasted" in case another thread wins the insertion
* race, and we don't want to create unnecessary connections.
* <p>
* Upon disconnection, clients are automatically removed from this map.
* We don't use a {@code ChannelGroup} because a {@code ChannelGroup} does
* the clean-up on the {@code channelClosed} event, which is actually the
* 3rd and last event to be fired when a channel gets disconnected. The
* first one to get fired is, {@code channelDisconnected}. This matters to
* us because we want to purge disconnected clients from the cache as
* quickly as possible after the disconnection, to avoid handing out clients
* that are going to cause unnecessary errors.
* @see RegionClientPipeline#handleDisconnect
*/
private final HashMap<String, RegionClient> ip2client =
new HashMap<String, RegionClient>();
/**
* Map of region name to list of pending RPCs for this region.
* <p>
* The array-list isn't expected to be empty, except during rare race
* conditions. When the list is non-empty, the first element in the
* list should be a special "probe" RPC we build to detect when the
* region NSRE'd is back online.
* <p>
* For more details on how this map is used, please refer to the
* documentation of {@link #handleNSRE}.
* <p>
* Each list in the map is protected by its own monitor lock.
*/
private final ConcurrentSkipListMap<byte[], ArrayList<HBaseRpc>> got_nsre =
new ConcurrentSkipListMap<byte[], ArrayList<HBaseRpc>>(RegionInfo.REGION_NAME_CMP);
/**
* Buffer for atomic increment coalescing.
* This buffer starts out null, and remains so until the first time we need
* to buffer an increment. Once lazily initialized, this buffer will never
* become null again.
* <p>
* We do this so that we can lazily schedule the flush timer only if we ever
* have buffered increments. Applications without buffered increments don't
* need to pay any memory for the buffer or any CPU time for a useless timer.
* @see #setupIncrementCoalescing
*/
private volatile LoadingCache<BufferedIncrement, BufferedIncrement.Amount> increment_buffer;
private volatile LoadingCache<BufferedMultiColumnIncrement, BufferedMultiColumnIncrement.Amounts> multi_column_increment_buffer;
/** The configuration for this client */
private final Config config;
/** Integers for thread naming */
final static AtomicInteger BOSS_THREAD_ID = new AtomicInteger();
final static AtomicInteger WORKER_THREAD_ID = new AtomicInteger();
final static AtomicInteger TIMER_THREAD_ID = new AtomicInteger();
/** Default RPC timeout in milliseconds from the config */
private final int rpc_timeout;
/** Whether or not we have to scan meta instead of making getClosestBeforeRow calls. */
private volatile boolean scan_meta;
/** Whether or not to split meta is in force. */
protected boolean split_meta;
private boolean increment_buffer_durable = false;
// ------------------------ //
// Client usage statistics. //
// ------------------------ //
/** Number of connections created by {@link #newClient}. */
private final Counter num_connections_created = new Counter();
/** How many {@code -ROOT-} lookups were made. */
private final Counter root_lookups = new Counter();
/** How many {@code .META.} lookups were made (with a permit). */
private final Counter meta_lookups_with_permit = new Counter();
/** How many {@code .META.} lookups were made (without a permit). */
private final Counter meta_lookups_wo_permit = new Counter();
/** Number of calls to {@link #flush}. */
private final Counter num_flushes = new Counter();
/** Number of NSREs handled by {@link #handleNSRE}. */
private final Counter num_nsres = new Counter();
/** Number of RPCs delayed by {@link #handleNSRE}. */
private final Counter num_nsre_rpcs = new Counter();
/** Number of {@link MultiAction} sent to the network. */
final Counter num_multi_rpcs = new Counter();
/** Number of calls to {@link #get}. */
private final Counter num_gets = new Counter();
/** Number of calls to {@link #openScanner}. */
private final Counter num_scanners_opened = new Counter();
/** Number of calls to {@link #scanNextRows}. */
private final Counter num_scans = new Counter();
/** Number calls to {@link #put}. */
private final Counter num_puts = new Counter();
/** Number calls to {@link #append}. */
private final Counter num_appends = new Counter();
/** Number calls to {@link #lockRow}. */
private final Counter num_row_locks = new Counter();
/** Number calls to {@link #delete}. */
private final Counter num_deletes = new Counter();
/** Number of {@link AtomicIncrementRequest} sent. */
private final Counter num_atomic_increments = new Counter();
/** Number of region clients closed due to being idle. */
private final Counter idle_connections_closed = new Counter();
/**
* Constructor.
* @param quorum_spec The specification of the quorum, e.g.
* {@code "host1,host2,host3"}.
*/
public HBaseClient(final String quorum_spec) {
this(quorum_spec, "/hbase");
}
/**
* Constructor.
* @param quorum_spec The specification of the quorum, e.g.
* {@code "host1,host2,host3"}.
* @param base_path The base path under which is the znode for the
* -ROOT- region.
*/
public HBaseClient(final String quorum_spec, final String base_path) {
this(quorum_spec, base_path, defaultChannelFactory(new Config()));
}
/**
* Constructor for advanced users with special needs.
* <p>
* <strong>NOTE:</strong> Only advanced users who really know what they're
* doing should use this constructor. Passing an inappropriate thread
* pool, or blocking its threads will prevent this {@code HBaseClient}
* from working properly or lead to poor performance.
* @param quorum_spec The specification of the quorum, e.g.
* {@code "host1,host2,host3"}.
* @param base_path The base path under which is the znode for the
* -ROOT- region.
* @param executor The executor from which to obtain threads for NIO
* operations. It is <strong>strongly</strong> encouraged to use a
* {@link Executors#newCachedThreadPool} or something equivalent unless
* you're sure to understand how Netty creates and uses threads.
* Using a fixed-size thread pool will not work the way you expect.
* <p>
* Note that calling {@link #shutdown} on this client will <b>NOT</b>
* shut down the executor.
* @see NioClientSocketChannelFactory
* @since 1.2
*/
public HBaseClient(final String quorum_spec, final String base_path,
final Executor executor) {
this(quorum_spec, base_path, new CustomChannelFactory(executor));
}
/**
* Constructor for advanced users with special needs.
* <p>
* Most users don't need to use this constructor.
* @param quorum_spec The specification of the quorum, e.g.
* {@code "host1,host2,host3"}.
* @param base_path The base path under which is the znode for the
* -ROOT- region.
* @param channel_factory A custom factory to use to create sockets.
* <p>
* Note that calling {@link #shutdown} on this client will also cause the
* shutdown and release of the factory and its underlying thread pool.
* @since 1.2
*/
public HBaseClient(final String quorum_spec, final String base_path,
final ClientSocketChannelFactory channel_factory) {
this.channel_factory = channel_factory;
zkclient = new ZKClient(quorum_spec, base_path);
config = new Config();
rpc_timeout = config.getInt("hbase.rpc.timeout");
timer = newTimer(config, "HBaseClient");
rpc_timeout_timer = newTimer(config, "RPC Timeout Timer");
flush_interval = config.getShort("hbase.rpcs.buffered_flush_interval");
increment_buffer_size = config.getInt("hbase.increments.buffer_size");
nsre_low_watermark = config.getInt("hbase.nsre.low_watermark");
nsre_high_watermark = config.getInt("hbase.nsre.high_watermark");
if (config.properties.containsKey("hbase.increments.durable")) {
increment_buffer_durable = config.getBoolean("hbase.increments.durable");
}
if (config.hasProperty("hbase.meta.scan")) {
scan_meta = config.getBoolean("hbase.meta.scan");
} else {
scan_meta = Boolean.parseBoolean(
System.getProperty("hbase.meta.scan", "false"));
}
if (config.hasProperty("hbase.meta.split")) {
split_meta = config.getBoolean("hbase.meta.split");
} else {
split_meta = Boolean.parseBoolean(
System.getProperty("hbase.meta.split", "false"));
}
}
/**
* Constructor accepting a configuration object with at least the
* "hbase.zookeeper.quorum" specified in the format {@code "host1,host2,host3"}.
* @param config A configuration object
* @since 1.7
*/
public HBaseClient(final Config config) {
this(config, defaultChannelFactory(config));
}
/**
* Constructor accepting a configuration object with at least the
* "hbase.zookeeper.quorum" specified in the format {@code "host1,host2,host3"}
* and an executor thread pool.
* @param config A configuration object
* @param executor The executor from which to obtain threads for NIO
* operations. It is <strong>strongly</strong> encouraged to use a
* {@link Executors#newCachedThreadPool} or something equivalent unless
* you're sure to understand how Netty creates and uses threads.
* Using a fixed-size thread pool will not work the way you expect.
* <p>
* Note that calling {@link #shutdown} on this client will <b>NOT</b>
* shut down the executor.
* @see NioClientSocketChannelFactory
* @since 1.7
*/
public HBaseClient(final Config config, final Executor executor) {
this(config, new CustomChannelFactory(executor));
}
/**
* Constructor accepting a configuration object with at least the
* "hbase.zookeeper.quorum" specified in the format {@code "host1,host2,host3"}
* and a custom channel factory for advanced users.
* <p>
* Most users don't need to use this constructor.
* @param config A configuration object
* @param channel_factory A custom factory to use to create sockets.
* <p>
* Note that calling {@link #shutdown} on this client will also cause the
* shutdown and release of the factory and its underlying thread pool.
* @since 1.7
*/
public HBaseClient(final Config config,
final ClientSocketChannelFactory channel_factory) {
this.channel_factory = channel_factory;
zkclient = new ZKClient(config.getString("hbase.zookeeper.quorum"),
config.getString("hbase.zookeeper.znode.parent"));
this.config = config;
rpc_timeout = config.getInt("hbase.rpc.timeout");
timer = newTimer(config, "HBaseClient");
rpc_timeout_timer = newTimer(config, "RPC Timeout Timer");
flush_interval = config.getShort("hbase.rpcs.buffered_flush_interval");
increment_buffer_size = config.getInt("hbase.increments.buffer_size");
nsre_low_watermark = config.getInt("hbase.nsre.low_watermark");
nsre_high_watermark = config.getInt("hbase.nsre.high_watermark");
if (config.properties.containsKey("hbase.increments.durable")) {
increment_buffer_durable = config.getBoolean("hbase.increments.durable");
}
if (config.hasProperty("hbase.meta.scan")) {
scan_meta = config.getBoolean("hbase.meta.scan");
} else {
scan_meta = Boolean.parseBoolean(
System.getProperty("hbase.meta.scan", "false"));
}
if (config.hasProperty("hbase.meta.split")) {
split_meta = config.getBoolean("hbase.meta.split");
} else {
split_meta = Boolean.parseBoolean(
System.getProperty("hbase.meta.split", "false"));
}
}
/**
* Package private timer constructor that provides a useful name for the
* timer thread.
* @param config The config object used to pull out the tick interval
* @param name A name to stash in the timer
* @return A timer
*/
static HashedWheelTimer newTimer(final Config config, final String name) {
class TimerThreadNamer implements ThreadNameDeterminer {
@Override
public String determineThreadName(String currentThreadName,
String proposedThreadName) throws Exception {
return "AsyncHBase Timer " + name + " #" + TIMER_THREAD_ID.incrementAndGet();
}
}
if (config == null) {
return new HashedWheelTimer(Executors.defaultThreadFactory(),
new TimerThreadNamer(), 100, MILLISECONDS, 512);
}
return new HashedWheelTimer(Executors.defaultThreadFactory(),
new TimerThreadNamer(), config.getShort("hbase.timer.tick"),
MILLISECONDS, config.getInt("hbase.timer.ticks_per_wheel"));
}
/** Creates a default channel factory in case we haven't been given one.
* The factory will use Netty defaults and provide thread naming rules for
* easier debugging.
* @param config The config to pull settings from
*/
private static NioClientSocketChannelFactory defaultChannelFactory(
final Config config) {
class BossThreadNamer implements ThreadNameDeterminer {
@Override
public String determineThreadName(String currentThreadName,
String proposedThreadName) throws Exception {
return "AsyncHBase I/O Boss #" + BOSS_THREAD_ID.incrementAndGet();
}
}
class WorkerThreadNamer implements ThreadNameDeterminer {
@Override
public String determineThreadName(String currentThreadName,
String proposedThreadName) throws Exception {
return "AsyncHBase I/O Worker #" + WORKER_THREAD_ID.incrementAndGet();
}
}
final Executor executor = Executors.newCachedThreadPool();
final NioClientBossPool boss_pool =
new NioClientBossPool(executor, 1, newTimer(config, "Boss Pool"),
new BossThreadNamer());
final int num_workers = config.hasProperty("hbase.workers.size") ?
config.getInt("hbase.workers.size") :
Runtime.getRuntime().availableProcessors() * 2;
final NioWorkerPool worker_pool = new NioWorkerPool(executor,
num_workers, new WorkerThreadNamer());
return new NioClientSocketChannelFactory(boss_pool, worker_pool);
}
/** A custom channel factory that doesn't shutdown its executor. */
private static final class CustomChannelFactory
extends NioClientSocketChannelFactory {
CustomChannelFactory(final Executor executor) {
super(executor, executor);
}
@Override
public void releaseExternalResources() {
// Do nothing, we don't want to shut down the executor.
}
}
/**
* Returns a snapshot of usage statistics for this client.
* @since 1.3
*/
public ClientStats stats() {
final LoadingCache<BufferedIncrement, BufferedIncrement.Amount> cache =
increment_buffer;
long inflight_rpcs = 0;
long pending_rpcs = 0;
long pending_batched_rpcs = 0;
int dead_region_clients = 0;
final Collection<RegionClient> region_clients = client2regions.keySet();
for (final RegionClient rc : region_clients) {
final RegionClientStats stats = rc.stats();
inflight_rpcs += stats.inflightRPCs();
pending_rpcs += stats.pendingRPCs();
pending_batched_rpcs += stats.pendingBatchedRPCs();
if (stats.isDead()) {
dead_region_clients++;
}
}
return new ClientStats(
num_connections_created.get(),
root_lookups.get(),
meta_lookups_with_permit.get(),
meta_lookups_wo_permit.get(),
num_flushes.get(),
num_nsres.get(),
num_nsre_rpcs.get(),
num_multi_rpcs.get(),
num_gets.get(),
num_scanners_opened.get(),
num_scans.get(),
num_puts.get(),
num_appends.get(),
num_row_locks.get(),
num_deletes.get(),
num_atomic_increments.get(),
cache != null ? cache.stats() : BufferedIncrement.ZERO_STATS,
inflight_rpcs,
pending_rpcs,
pending_batched_rpcs,
dead_region_clients,
region_clients.size(),
idle_connections_closed.get()
);
}
/**
* Returns a list of region client stats objects for debugging.
* @return A list of region client statistics
* @since 1.7
*/
public List<RegionClientStats> regionStats() {
final Collection<RegionClient> region_clients = client2regions.keySet();
final List<RegionClientStats> stats =
new ArrayList<RegionClientStats>(region_clients.size());
for (final RegionClient rc : region_clients) {
stats.add(rc.stats());
}
return stats;
}
/**
* Flushes to HBase any buffered client-side write operation.
* <p>
* @return A {@link Deferred}, whose callback chain will be invoked when
* everything that was buffered at the time of the call has been flushed.
* <p>
* Note that this doesn't guarantee that <b>ALL</b> outstanding RPCs have
* completed. This doesn't introduce any sort of global sync point. All
* it does really is it sends any buffered RPCs to HBase.
*/
public Deferred<Object> flush() {
{
// If some RPCs are waiting for -ROOT- to be discovered, we too must wait
// because some of those RPCs could be edits that we must wait on.
final Deferred<Object> d = zkclient.getDeferredRootIfBeingLookedUp();
if (d != null) {
LOG.debug("Flush needs to wait on {} to come back",
has_root ? (split_meta ? new String(HBASE98_ROOT_REGION) : new String(ROOT))
: (split_meta ? new String(HBASE96_META) : new String(META)));
final class RetryFlush implements Callback<Object, Object> {
public Object call(final Object arg) {
LOG.debug("Flush retrying after {} came back",
has_root ? (split_meta ? new String(HBASE98_ROOT_REGION) : new String(ROOT))
: (split_meta ? new String(HBASE96_META) : new String(META)));
return flush();
}
public String toString() {
return "retry flush";
}
}
return d.addBoth(new RetryFlush());
}
}
num_flushes.increment();
final boolean need_sync;
{
final LoadingCache<BufferedIncrement, BufferedIncrement.Amount> buf =
increment_buffer; // Single volatile-read.
if (buf != null && !buf.asMap().isEmpty()) {
flushBufferedIncrements(buf);
need_sync = true;
} else {
final LoadingCache<BufferedMultiColumnIncrement, BufferedMultiColumnIncrement.Amounts> multiColumnBuf =
multi_column_increment_buffer; // Single volatile-read.
if (multiColumnBuf != null && !multiColumnBuf.asMap().isEmpty()) {
flushBufferedMultiColumnIncrements(multiColumnBuf);
need_sync = true;
} else {
need_sync = false;
}
}
}
final ArrayList<Deferred<Object>> d =
new ArrayList<Deferred<Object>>(client2regions.size()
+ got_nsre.size() * 8);
// Bear in mind that we're traversing a ConcurrentHashMap, so we may get
// clients that have been removed from the map since we started iterating.
for (final RegionClient client : client2regions.keySet()) {
d.add(need_sync ? client.sync() : client.flush());
}
for (final ArrayList<HBaseRpc> nsred : got_nsre.values()) {
synchronized (nsred) {
for (final HBaseRpc rpc : nsred) {
if (rpc instanceof HBaseRpc.IsEdit) {
d.add(rpc.getDeferred());
}
}
}
}
@SuppressWarnings("unchecked")
final Deferred<Object> flushed = (Deferred) Deferred.group(d);
return flushed;
}
/**
* Sets the maximum time (in milliseconds) for which edits can be buffered.
* <p>
* This interval will be honored on a "best-effort" basis. Edits can be
* buffered for longer than that due to GC pauses, the resolution of the
* underlying timer, thread scheduling at the OS level (particularly if the
* OS is overloaded with concurrent requests for CPU time), any low-level
* buffering in the TCP/IP stack of the OS, etc.
* <p>
* Setting a longer interval allows the code to batch requests more
* efficiently but puts you at risk of greater data loss if the JVM
* or machine was to fail. It also entails that some edits will not
* reach HBase until a longer period of time, which can be troublesome
* if you have other applications that need to read the "latest" changes.
* <p>
* Setting this interval to 0 disables this feature.
* <p>
* The change is guaranteed to take effect at most after a full interval
* has elapsed, <i>using the previous interval</i> (which is returned).
* @param flush_interval A positive time interval in milliseconds.
* @return The previous flush interval.
* @throws IllegalArgumentException if {@code flush_interval < 0}.
*/
public short setFlushInterval(final short flush_interval) {
// Note: if we have buffered increments, they'll pick up the new flush
// interval next time the current timer fires.
if (flush_interval < 0) {
throw new IllegalArgumentException("Negative: " + flush_interval);
}
final short prev = config.getShort("hbase.rpcs.buffered_flush_interval");
config.overrideConfig("hbase.rpcs.buffered_flush_interval",
Short.toString(flush_interval));
this.flush_interval = flush_interval;
return prev;
}
/**
* Changes the size of the increment buffer.
* <p>
* <b>NOTE:</b> because there is no way to resize the existing buffer,
* this method will flush the existing buffer and create a new one.
* This side effect might be unexpected but is unfortunately required.
* <p>
* This determines the maximum number of counters this client will keep
* in-memory to allow increment coalescing through
* {@link #bufferAtomicIncrement}.
* <p>
* The greater this number, the more memory will be used to buffer
* increments, and the more efficient increment coalescing can be
* if you have a high-throughput application with a large working
* set of counters.
* <p>
* If your application has excessively large keys or qualifiers, you might
* consider using a lower number in order to reduce memory usage.
* @param increment_buffer_size The new size of the buffer.
* @return The previous size of the buffer.
* @throws IllegalArgumentException if {@code increment_buffer_size < 0}.
* @since 1.3
*/
public int setIncrementBufferSize(final int increment_buffer_size) {
if (increment_buffer_size < 0) {
throw new IllegalArgumentException("Negative: " + increment_buffer_size);
}
final int current = config.getInt("hbase.increments.buffer_size");
if (current == increment_buffer_size) {
return current;
}
config.overrideConfig("hbase.increments.buffer_size",
Integer.toString(increment_buffer_size));
this.increment_buffer_size = increment_buffer_size;
final LoadingCache<BufferedIncrement, BufferedIncrement.Amount> prev =
increment_buffer; // Volatile-read.
if (prev != null) { // Need to resize.
makeIncrementBuffer(); // Volatile-write.
flushBufferedIncrements(prev);
}
return current;
}
/**
* Returns the timer used by this client.
* <p>
* All timeouts, retries and other things that need to "sleep
* asynchronously" use this timer. This method is provided so
* that you can also schedule your own timeouts using this timer,
* if you wish to share this client's timer instead of creating
* your own.
* <p>
* The precision of this timer is implementation-defined but is
* guaranteed to be no greater than 20ms.
* @since 1.2
*/
public Timer getTimer() {
return timer;
}
/**
* Return the configuration object for this client
* @return The config object for this client
* @since 1.7
*/
public Config getConfig() {
return config;
}
/**
* Schedules a new timeout.
* @param task The task to execute when the timer times out.
* @param timeout_ms The timeout, in milliseconds (strictly positive).
*/
void newTimeout(final TimerTask task, final long timeout_ms) {
try {
timer.newTimeout(task, timeout_ms, MILLISECONDS);
} catch (IllegalStateException e) {
// This can happen if the timer fires just before shutdown()
// is called from another thread, and due to how threads get
// scheduled we tried to call newTimeout() after timer.stop().
LOG.warn("Failed to schedule timer."
+ " Ignore this if we're shutting down.", e);
}
}
/**
* Returns the maximum time (in milliseconds) for which edits can be buffered.
* <p>
* The default value is an unspecified and implementation dependant, but is
* guaranteed to be non-zero.
* <p>
* A return value of 0 indicates that edits are sent directly to HBase
* without being buffered.
* @see #setFlushInterval
*/
public short getFlushInterval() {