-
Notifications
You must be signed in to change notification settings - Fork 561
/
AutoBuffer.java
1520 lines (1425 loc) · 54.9 KB
/
AutoBuffer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package water;
import java.io.*;
import java.lang.reflect.Array;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.Random;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import water.util.Log;
/**
* A ByteBuffer backed mixed Input/OutputStream class.
*
* Reads/writes empty/fill the ByteBuffer as needed. When it is empty/full it
* we go to the ByteChannel for more/less. Because DirectByteBuffers are
* expensive to make, we keep a few pooled.
*
* @author <a href="mailto:cliffc@h2o.ai"></a>
*/
public class AutoBuffer {
// The direct ByteBuffer for schlorping data about.
// Set to null to indicate the AutoBuffer is closed.
ByteBuffer _bb;
public boolean isClosed() { return _bb == null ; }
// The ByteChannel for moving data in or out. Could be a SocketChannel (for
// a TCP connection) or a FileChannel (spill-to-disk) or a DatagramChannel
// (for a UDP connection). Null on closed AutoBuffers. Null on initial
// remote-writing AutoBuffers which are still deciding UDP vs TCP. Not-null
// for open AutoBuffers doing file i/o or reading any TCP/UDP or having
// written at least one buffer to TCP/UDP.
private ByteChannel _chan;
// If we need a SocketChannel, raise the priority so we get the I/O over
// with. Do not want to have some TCP socket open, blocking the TCP channel
// and then have the thread stalled out. If we raise the priority - be sure
// to lower it again. Note this is for TCP channels ONLY, and only because
// we are blocking another Node with I/O.
private int _oldPrior = -1;
// Where to send or receive data via TCP or UDP (choice made as we discover
// how big the message is); used to lazily create a Channel. If NULL, then
// _chan should be a pre-existing Channel, such as a FileChannel.
final H2ONode _h2o;
// TRUE for read-mode. FALSE for write-mode. Can be flipped for rapid turnaround.
private boolean _read;
// TRUE if this AutoBuffer has never advanced past the first "page" of data.
// The UDP-flavor, port# and task fields are only valid until we read over
// them when flipping the ByteBuffer to the next chunk of data. Used in
// asserts all over the place.
private boolean _firstPage;
// Total size written out from 'new' to 'close'. Only updated when actually
// reading or writing data, or after close(). For profiling only.
int _size, _zeros, _arys;
// More profiling: start->close msec, plus nano's spent in blocking I/O
// calls. The difference between (close-start) and i/o msec is the time the
// i/o thread spends doing other stuff (e.g. allocating Java objects or
// (de)serializing).
long _time_start_ms, _time_close_ms, _time_io_ns;
// I/O persistence flavor: Value.ICE, NFS, HDFS, S3, TCP. Used to record I/O time.
final byte _persist;
// The assumed max UDP packetsize
static final int MTU = 1500-8/*UDP packet header size*/;
// Enable this to test random TCP fails on open or write
static final Random RANDOM_TCP_DROP = null; //new Random();
// Incoming UDP request. Make a read-mode AutoBuffer from the open Channel,
// figure the originating H2ONode from the first few bytes read.
AutoBuffer( DatagramChannel sock ) throws IOException {
_chan = null;
_bb = bbMake();
_read = true; // Reading by default
_firstPage = true;
// Read a packet; can get H2ONode from 'sad'?
Inet4Address addr = null;
SocketAddress sad = sock.receive(_bb);
if( sad instanceof InetSocketAddress ) {
InetAddress address = ((InetSocketAddress) sad).getAddress();
if( address instanceof Inet4Address ) {
addr = (Inet4Address) address;
}
}
_size = _bb.position();
_bb.flip(); // Set limit=amount read, and position==0
if( addr == null ) throw new RuntimeException("Unhandled socket type: " + sad);
// Read Inet from socket, port from the stream, figure out H2ONode
_h2o = H2ONode.intern(addr, getPort());
_firstPage = true;
assert _h2o != null;
_persist = 0; // No persistance
}
// Incoming TCP request. Make a read-mode AutoBuffer from the open Channel,
// figure the originating H2ONode from the first few bytes read.
AutoBuffer( SocketChannel sock ) throws IOException {
_chan = sock;
raisePriority(); // Make TCP priority high
_bb = bbMake();
_bb.flip();
_read = true; // Reading by default
_firstPage = true;
// Read Inet from socket, port from the stream, figure out H2ONode
_h2o = H2ONode.intern(sock.socket().getInetAddress(), getPort());
_firstPage = true; // Yes, must reset this.
assert _h2o != null && _h2o != H2O.SELF;
_time_start_ms = System.currentTimeMillis();
_persist = Value.TCP;
}
// Make an AutoBuffer to write to an H2ONode. Requests for full buffer will
// open a TCP socket and roll through writing to the target. Smaller
// requests will send via UDP.
AutoBuffer( H2ONode h2o ) {
_bb = bbMake();
_chan = null; // Channel made lazily only if we write alot
_h2o = h2o;
_read = false; // Writing by default
_firstPage = true; // Filling first page
assert _h2o != null;
_time_start_ms = System.currentTimeMillis();
_persist = Value.TCP;
}
// Spill-to/from-disk request.
public AutoBuffer( FileChannel fc, boolean read, byte persist ) {
_bb = bbMake();
_chan = fc; // Write to read/write
_h2o = null; // File Channels never have an _h2o
_read = read; // Mostly assert reading vs writing
if( read ) _bb.flip();
_time_start_ms = System.currentTimeMillis();
_persist = persist; // One of Value.ICE, NFS, S3, HDFS
}
// Read from UDP multicast. Same as the byte[]-read variant, except there is an H2O.
AutoBuffer( DatagramPacket pack ) {
_size = pack.getLength();
_bb = ByteBuffer.wrap(pack.getData(), 0, pack.getLength()).order(ByteOrder.nativeOrder());
_bb.position(0);
_read = true;
_firstPage = true;
_chan = null;
_h2o = H2ONode.intern(pack.getAddress(), getPort());
_persist = 0; // No persistance
}
/** Read from a fixed byte[]; should not be closed. */
public AutoBuffer( byte[] buf ) { this(buf,0); }
/** Read from a fixed byte[]; should not be closed. */
AutoBuffer( byte[] buf, int off ) {
assert buf != null : "null fed to ByteBuffer.wrap";
_bb = ByteBuffer.wrap(buf).order(ByteOrder.nativeOrder());
_bb.position(off);
_chan = null;
_h2o = null;
_read = true;
_firstPage = true;
_persist = 0; // No persistance
}
/** Write to an ever-expanding byte[]. Instead of calling {@link #close()},
* call {@link #buf()} to retrieve the final byte[].
*/
public AutoBuffer( ) {
_bb = ByteBuffer.wrap(new byte[16]).order(ByteOrder.nativeOrder());
_chan = null;
_h2o = null;
_read = false;
_firstPage = true;
_persist = 0; // No persistance
}
/** Write to a known sized byte[]. Instead of calling close(), call
* {@link #bufClose()} to retrieve the final byte[].
*/
public AutoBuffer( int len ) {
_bb = ByteBuffer.wrap(MemoryManager.malloc1(len)).order(ByteOrder.nativeOrder());
_chan = null;
_h2o = null;
_read = false;
_firstPage = true;
_persist = 0; // No persistance
}
@Override public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("[AB ").append(_read ? "read " : "write ");
sb.append(_firstPage?"first ":"2nd ").append(_h2o);
sb.append(" ").append(Value.nameOfPersist(_persist));
if( _bb != null ) sb.append(" 0 <= ").append(_bb.position()).append(" <= ").append(_bb.limit());
if( _bb != null ) sb.append(" <= ").append(_bb.capacity());
return sb.append("]").toString();
}
// Fetch a DBB from an object pool... they are fairly expensive to make
// because a native call is required to get the backing memory. I've
// included BB count tracking code to help track leaks. As of 12/17/2012 the
// leaks are under control, but figure this may happen again so keeping these
// counters around.
private static final boolean DEBUG = Boolean.getBoolean("h2o.find-ByteBuffer-leaks");
private static final AtomicInteger BBMAKE = new AtomicInteger(0);
private static final AtomicInteger BBFREE = new AtomicInteger(0);
private static final AtomicInteger BBCACHE= new AtomicInteger(0);
private static final LinkedBlockingDeque<ByteBuffer> BBS = new LinkedBlockingDeque<ByteBuffer>();
static final int BBSIZE = 64*1024; // Bytebuffer "common big size"
public static int TCP_BUF_SIZ = BBSIZE;
private static void bbstats( AtomicInteger ai ) {
if( !DEBUG ) return;
if( (ai.incrementAndGet()&511)==511 ) {
Log.warn("BB make="+BBMAKE.get()+" free="+BBFREE.get()+" cache="+BBCACHE.get()+" size="+BBS.size());
}
}
private static ByteBuffer bbMake() {
while( true ) { // Repeat loop for DBB OutOfMemory errors
ByteBuffer bb;
try { bb = BBS.pollFirst(0,TimeUnit.SECONDS); }
catch( InterruptedException e ) { throw Log.errRTExcept(e); }
if( bb != null ) {
bbstats(BBCACHE);
return bb;
}
try {
bb = ByteBuffer.allocateDirect(BBSIZE).order(ByteOrder.nativeOrder());
bbstats(BBMAKE);
return bb;
} catch( OutOfMemoryError oome ) {
// java.lang.OutOfMemoryError: Direct buffer memory
if( !"Direct buffer memory".equals(oome.getMessage()) ) throw oome;
System.out.println("Sleeping & retrying");
try { Thread.sleep(100); } catch( InterruptedException ignore ) { }
}
}
}
private static void bbFree(ByteBuffer bb) {
bbstats(BBFREE);
bb.clear();
BBS.offerFirst(bb);
}
private int bbFree() {
if( _bb != null && _bb.isDirect() ) bbFree(_bb);
_bb = null;
return 0; // Flow-coding
}
// You thought TCP was a reliable protocol, right? WRONG! Fails 100% of the
// time under heavy network load. Connection-reset-by-peer & connection
// timeouts abound, even after a socket open and after a 1st successful
// ByteBuffer write. It *appears* that the reader is unaware that a writer
// was told "go ahead and write" by the TCP stack, so all these fails are
// only on the writer-side.
static class AutoBufferException extends RuntimeException {
final IOException _ioe;
AutoBufferException( IOException ioe ) { _ioe = ioe; }
}
// For reads, just assert all was read and close and release resources.
// (release ByteBuffer back to the common pool). For writes, force any final
// bytes out. If the write is to an H2ONode and is short, send via UDP.
// AutoBuffer close calls order; i.e. a reader close() will block until the
// writer does a close().
public final int close() {
//if( _size > 2048 ) System.out.println("Z="+_zeros+" / "+_size+", A="+_arys);
if( isClosed() ) return 0; // Already closed
assert _h2o != null || _chan != null; // Byte-array backed should not be closed
try {
if( _chan == null ) { // No channel?
if( _read ) return 0;
// For small-packet write, send via UDP. Since nothing is sent until
// now, this close() call trivially orders - since the reader will not
// even start (much less close()) until this packet is sent.
if( _bb.position() < MTU ) return udpSend();
}
// Force AutoBuffer 'close' calls to order; i.e. block readers until
// writers do a 'close' - by writing 1 more byte in the close-call which
// the reader will have to wait for.
if( hasTCP() ) { // TCP connection?
try {
if( _read ) { // Reader?
int x = get1(); // Read 1 more byte
assert x == 0xab : "AB.close instead of 0xab sentinel got "+x+", "+this;
assert _chan != null; // chan set by incoming reader, since we KNOW it is a TCP
// Write the reader-handshake-byte.
((SocketChannel)_chan).socket().getOutputStream().write(0xcd);
// do not close actually reader socket; recycle it in TCPReader thread
} else { // Writer?
put1(0xab); // Write one-more byte ; might set _chan from null to not-null
sendPartial(); // Finish partial writes; might set _chan from null to not-null
assert _chan != null; // _chan is set not-null now!
// Read the writer-handshake-byte.
int x = ((SocketChannel)_chan).socket().getInputStream().read();
// either TCP con was dropped or other side closed connection without reading/confirming (e.g. task was cancelled).
if( x == -1 ) throw new IOException("Other side closed connection before handshake byte read");
assert x == 0xcd : "Handshake; writer expected a 0xcd from reader but got "+x;
}
} catch( IOException ioe ) {
try { _chan.close(); } catch( IOException ignore ) {} // Silently close
_chan = null; // No channel now, since i/o error
throw ioe; // Rethrow after close
} finally {
if( !_read ) _h2o.freeTCPSocket((SocketChannel)_chan); // Recycle writable TCP channel
restorePriority(); // And if we raised priority, lower it back
}
} else { // FileChannel
if( !_read ) sendPartial(); // Finish partial file-system writes
_chan.close();
_chan = null; // Closed file channel
}
} catch( IOException e ) { // Dunno how to handle so crash-n-burn
throw new AutoBufferException(e);
} finally {
bbFree();
_time_close_ms = System.currentTimeMillis();
TimeLine.record_IOclose(this,_persist); // Profile AutoBuffer connections
assert isClosed();
}
return 0;
}
// Need a sock for a big read or write operation.
// See if we got one already, else open a new socket.
private void tcpOpen() throws IOException {
assert _firstPage && _bb.limit() >= 1+2+4; // At least something written
assert _chan == null;
assert _bb.position()==0;
_chan = _h2o.getTCPSocket();
raisePriority();
}
// Just close the channel here without reading anything. Without the task
// object at hand we do not know what (how many bytes) should we read from
// the channel. And since the other side will try to read confirmation from
// us before closing the channel, we can not read till the end. So we just
// close the channel and let the other side to deal with it and figure out
// the task has been cancelled (still sending ack ack back).
void drainClose() {
if( isClosed() ) return; // Already closed
assert _h2o != null || _chan != null; // Byte-array backed should not be closed
if( _chan != null ) { // Channel assumed sick from prior IOException
ByteChannel chan = _chan; // Read before closing
try { chan.close(); } catch( IOException ignore ) {} // Silently close
_chan = null; // No channel now!
if( !_read && chan instanceof SocketChannel) _h2o.freeTCPSocket((SocketChannel)chan); // Recycle writable TCP channel
}
restorePriority(); // And if we raised priority, lower it back
bbFree();
_time_close_ms = System.currentTimeMillis();
TimeLine.record_IOclose(this,_persist); // Profile AutoBuffer connections
assert isClosed();
}
// True if we opened a TCP channel, or will open one to close-and-send
boolean hasTCP() { assert !isClosed(); return _chan instanceof SocketChannel || (_h2o!=null && _bb.position() >= MTU); }
// True if we are in read-mode
boolean readMode() { return _read; }
// Size in bytes sent, after a close()
int size() { return _size; }
int zeros() { return _zeros; }
// Available bytes in this buffer to read
public int position () { return _bb.position (); }
public void position(int pos) { _bb.position(pos); }
// Return byte[] from a writable AutoBuffer
public final byte[] buf() {
assert _h2o==null && _chan==null && !_read && !_bb.isDirect();
return MemoryManager.arrayCopyOfRange(_bb.array(), _bb.arrayOffset(), _bb.position());
}
public final byte[] bufClose() {
byte[] res = _bb.array();
bbFree();
return res;
}
final boolean eof() {
assert _h2o==null && _chan==null;
return _bb.position()==_bb.limit();
}
// For TCP sockets ONLY, raise the thread priority. We assume we are
// blocking other Nodes with our network I/O, so try to get the I/O
// over with.
private void raisePriority() {
if(_oldPrior == -1){
assert _chan instanceof SocketChannel;
_oldPrior = Thread.currentThread().getPriority();
Thread.currentThread().setPriority(Thread.MAX_PRIORITY-1);
}
}
private void restorePriority() {
if( _oldPrior == -1 ) return;
Thread.currentThread().setPriority(_oldPrior);
_oldPrior = -1;
}
// Send via UDP socket. Unlike eg TCP sockets, we only need one for sending
// so we keep a global one. Also, we do not close it when done, and we do
// not connect it up-front to a target - but send the entire packet right now.
private int udpSend() throws IOException {
assert _chan == null;
TimeLine.record_send(this,false);
_size += _bb.position();
_bb.flip(); // Flip for sending
if( _h2o==H2O.SELF ) { // SELF-send is the multi-cast signal
H2O.multicast(_bb);
} else { // Else single-cast send
H2O.CLOUD_DGRAM.send(_bb, _h2o._key);
}
return 0; // Flow-coding
}
// Flip to write-mode
AutoBuffer clearForWriting() {
assert _read;
_read = false;
_bb.clear();
_firstPage = true;
return this;
}
// Flip to read-mode
public AutoBuffer flipForReading() {
assert !_read;
_read = true;
_bb.flip();
_firstPage = true;
return this;
}
public void skip( int sz ) { assert sz <= _bb.remaining() : "Requested skip: "+sz+" bytes, BUT AB contains only: "+_bb.remaining() + " bytes"; _bb.position(_bb.position()+sz); }
/** Ensure the buffer has space for sz more bytes */
private ByteBuffer getSp( int sz ) { return sz > _bb.remaining() ? getImpl(sz) : _bb; }
/** Ensure buffer has at least sz bytes in it.
* - Also, set position just past this limit for future reading. */
private ByteBuffer getSz(int sz) {
assert _firstPage : "getSz() is only valid for early UDP bytes";
if( sz > _bb.limit() ) getImpl(sz);
_bb.position(sz);
return _bb;
}
private ByteBuffer getImpl( int sz ) {
assert _read : "Reading from a buffer in write mode";
assert _chan != null : "Read to much data from a byte[] backed buffer, AB="+this;
_bb.compact(); // Move remaining unread bytes to start of buffer; prep for reading
// Its got to fit or we asked for too much
assert _bb.position()+sz <= _bb.capacity() : "("+_bb.position()+"+"+sz+" <= "+_bb.capacity()+")";
long ns = System.nanoTime();
while( _bb.position() < sz ) { // Read until we got enuf
try {
int res = _chan.read(_bb); // Read more
// Readers are supposed to be strongly typed and read the exact expected bytes.
// However, if a TCP connection fails mid-read we'll get a short-read.
// This is indistinguishable from a mis-alignment between the writer and reader!
if( res == -1 )
throw new AutoBufferException(new EOFException("Reading "+sz+" bytes, AB="+this));
if( res == 0 ) throw new RuntimeException("Reading zero bytes - so no progress?");
_size += res; // What we read
} catch( IOException e ) { // Dunno how to handle so crash-n-burn
// Linux/Ubuntu message for a reset-channel
if( e.getMessage().equals("An existing connection was forcibly closed by the remote host") )
throw new AutoBufferException(e);
// Windows message for a reset-channel
if( e.getMessage().equals("An established connection was aborted by the software in your host machine") )
throw new AutoBufferException(e);
throw Log.errRTExcept(e);
}
}
_time_io_ns += (System.nanoTime()-ns);
_bb.flip(); // Prep for handing out bytes
//for( int i=0; i < _bb.limit(); i++ ) if( _bb.get(i)==0 ) _zeros++;
_firstPage = false; // First page of data is gone gone gone
return _bb;
}
/** Put as needed to keep from overflowing the ByteBuffer. */
private ByteBuffer putSp( int sz ) {
assert !_read;
if( sz <= _bb.remaining() ) return _bb;
return sendPartial();
}
// Do something with partial results, because the ByteBuffer is full.
// If we are byte[] backed, double the backing array size.
// If we are doing I/O, ship the bytes we have now and flip the ByteBuffer.
private ByteBuffer sendPartial() {
// Writing into an expanding byte[]?
if( _h2o==null && _chan == null ) {
// This is a byte[] backed buffer; expand the backing byte[].
byte[] ary = _bb.array();
int newlen = ary.length<<1; // New size is 2x old size
int oldpos = _bb.position();
_bb = ByteBuffer.wrap(MemoryManager.arrayCopyOfRange(ary,0,newlen),oldpos,newlen-oldpos)
.order(ByteOrder.nativeOrder());
return _bb;
}
// Doing I/O with the full ByteBuffer - ship partial results
_size += _bb.position();
if( _chan == null )
TimeLine.record_send(this,true);
_bb.flip(); // Prep for writing.
try {
if( _chan == null )
tcpOpen(); // This is a big operation. Open a TCP socket as-needed.
//for( int i=0; i < _bb.limit(); i++ ) if( _bb.get(i)==0 ) _zeros++;
long ns = System.nanoTime();
while( _bb.hasRemaining() ) {
_chan.write(_bb);
if( RANDOM_TCP_DROP != null &&_chan instanceof SocketChannel && RANDOM_TCP_DROP.nextInt(100) == 0 )
throw new IOException("Random TCP Write Fail");
}
_time_io_ns += (System.nanoTime()-ns);
} catch( IOException e ) { // Some kind of TCP fail?
// Change to an unchecked exception (so we don't have to annotate every
// frick'n put1/put2/put4/read/write call). Retry & recovery happens at
// a higher level. AutoBuffers are used for many things including e.g.
// disk i/o & UDP writes; this exception only happens on a failed TCP
// write - and we don't want to make the other AutoBuffer users have to
// declare (and then ignore) this exception.
throw new AutoBufferException(e);
}
if( _bb.capacity() < 16*1024 ) _bb = bbMake();
_firstPage = false;
_bb.clear();
return _bb;
}
public String getStr(int off, int len) {
return new String(_bb.array(), _bb.arrayOffset()+off, len);
}
// -----------------------------------------------
// Utility functions to get various Java primitives
public boolean getZ() { return get1()!=0; }
public int get1 () { return getSp(1).get ()&0xFF; }
public char get2 () { return getSp(2).getChar (); }
public int get4 () { return getSp(4).getInt (); }
public float get4f() { return getSp(4).getFloat (); }
public long get8 () { return getSp(8).getLong (); }
public double get8d() { return getSp(8).getDouble(); }
public int get3() {
return (0xff & get1()) << 0 |
(0xff & get1()) << 8 |
(0xff & get1()) << 16;
}
public AutoBuffer put3( int x ) {
assert (-1<<24) <= x && x < (1<<24);
return put1((x >> 0)&0xFF).put1((x >> 8)&0xFF).put1(x >> 16);
}
public int get1 (int off) { return _bb.get (off)&0xFF; }
public char get2 (int off) { return _bb.getChar (off); }
public int get4 (int off) { return _bb.getInt (off); }
public float get4f(int off) { return _bb.getFloat (off); }
public long get8 (int off) { return _bb.getLong (off); }
public double get8d(int off) { return _bb.getDouble(off); }
public AutoBuffer put1 (int off, int v) { _bb.put (off, (byte)(v&0xFF)); return this; }
public AutoBuffer put2 (int off, char v) { _bb.putChar (off, v); return this; }
public AutoBuffer put2 (int off, short v) { _bb.putShort (off, v); return this; }
public AutoBuffer put4 (int off, int v) { _bb.putInt (off, v); return this; }
public AutoBuffer put4f(int off, float v) { _bb.putFloat (off, v); return this; }
public AutoBuffer put8 (int off, long v) { _bb.putLong (off, v); return this; }
public AutoBuffer put8d(int off, double v) { _bb.putDouble(off, v); return this; }
public AutoBuffer putZ (boolean b){ return put1(b?1:0); }
public AutoBuffer put1 ( int b) { assert b >= -128 && b <= 255 : ""+b+" is not a byte";
putSp(1).put((byte)b); return this; }
public AutoBuffer put2 ( char c) { putSp(2).putChar (c); return this; }
public AutoBuffer put2 ( short s) { putSp(2).putShort (s); return this; }
public AutoBuffer put4 ( int i) { putSp(4).putInt (i); return this; }
public AutoBuffer put4f( float f) { putSp(4).putFloat (f); return this; }
public AutoBuffer put8 ( long l) { putSp(8).putLong (l); return this; }
public AutoBuffer put8d(double d) { putSp(8).putDouble(d); return this; }
public AutoBuffer put(Freezable f) {
if( f == null ) return put2(TypeMap.NULL);
assert f.frozenType() > 0 : "No TypeMap for "+f.getClass().getName();
put2((short)f.frozenType());
return f.write(this);
}
public AutoBuffer put(Iced f) {
if( f == null ) return put2(TypeMap.NULL);
assert f.frozenType() > 0;
put2((short)f.frozenType());
return f.write(this);
}
// Put a (compressed) integer. Specifically values in the range -1 to ~250
// will take 1 byte, values near a Short will take 1+2 bytes, values near an
// Int will take 1+4 bytes, and bigger values 1+8 bytes. This compression is
// optimized for small integers (including -1 which is often used as a "array
// is null" flag when passing the array length).
AutoBuffer putInt( int x ) {
if( 0 <= (x+1)&& (x+1) <= 253 ) return put1(x+1);
if( Short.MIN_VALUE <= x && x <= Short.MAX_VALUE ) return put1(255).put2((short)x);
return put1(254).put4(x);
}
// Get a (compressed) integer. See above for the compression strategy and reasoning.
int getInt( ) {
int x = get1();
if( x <= 253 ) return x-1;
if( x==255 ) return (short)get2();
assert x==254;
return get4();
}
// Put a zero-compressed array. Compression is:
// If null : putInt(-1)
// Else
// putInt(# of leading nulls)
// putInt(# of non-nulls)
// If # of non-nulls is > 0, putInt( # of trailing nulls)
long putZA( Object[] A ) {
if( A==null ) { putInt(-1); return 0; }
int x=0; for( ; x<A.length; x++ ) if( A[x ]!=null ) break;
int y=A.length; for( ; y>x; y-- ) if( A[y-1]!=null ) break;
putInt(x); // Leading zeros to skip
putInt(y-x); // Mixed non-zero guts in middle
if( y > x ) // If any trailing nulls
putInt(A.length-y); // Trailing zeros
return ((long)x<<32)|(y-x); // Return both leading zeros, and middle non-zeros
}
// Get the lengths of a zero-compressed array.
// Returns -1 if null.
// Returns a long of (leading zeros | middle non-zeros).
// If there are non-zeros, caller has to read the trailing zero-length.
long getZA( ) {
int x=getInt(); // Length of leading zeros
if( x == -1 ) return -1; // or a null
int nz=getInt(); // Non-zero in the middle
return ((long)x<<32)|(long)nz; // Return both ints
}
public AutoBuffer putA(Iced[] fs) {
_arys++;
long xy = putZA(fs);
if( xy == -1 ) return this;
int x=(int)(xy>>32);
int y=(int)xy;
for( int i=x; i<x+y; i++ ) put(fs[i]);
return this;
}
public AutoBuffer putAA(Iced[][] fs) {
_arys++;
long xy = putZA(fs);
if( xy == -1 ) return this;
int x=(int)(xy>>32);
int y=(int)xy;
for( int i=x; i<x+y; i++ ) putA(fs[i]);
return this;
}
public AutoBuffer putAAA(Iced[][][] fs) {
_arys++;
long xy = putZA(fs);
if( xy == -1 ) return this;
int x=(int)(xy>>32);
int y=(int)xy;
for( int i=x; i<x+y; i++ ) putAA(fs[i]);
return this;
}
public AutoBuffer putA(Freezable[] fs) {
_arys++;
long xy = putZA(fs);
if( xy == -1 ) return this;
int x=(int)(xy>>32);
int y=(int)xy;
for( int i=x; i<x+y; i++ ) put(fs[i]);
return this;
}
public <T extends Freezable> T get(Class<T> t) {
short id = (short)get2();
if( id == TypeMap.NULL ) return null;
assert id > 0 : "Bad type id "+id;
return TypeMap.newFreezable(id).read(this);
}
public <T extends Iced> T get() {
short id = (short)get2();
if( id == TypeMap.NULL ) return null;
assert id > 0 : "Bad type id "+id;
return TypeMap.newInstance(id).read(this);
}
public <T extends Freezable> T[] getA(Class<T> tc) {
_arys++;
long xy = getZA();
if( xy == -1 ) return null;
int x=(int)(xy>>32); // Leading nulls
int y=(int)xy; // Middle non-zeros
int z = y==0 ? 0 : getInt(); // Trailing nulls
T[] ts = (T[]) Array.newInstance(tc, x+y+z);
for( int i = x; i < x+y; ++i ) ts[i] = get(tc);
return ts;
}
public <T extends Iced> T[][] getAA(Class<T> tc) {
_arys++;
long xy = getZA();
if( xy == -1 ) return null;
int x=(int)(xy>>32); // Leading nulls
int y=(int)xy; // Middle non-zeros
int z = y==0 ? 0 : getInt(); // Trailing nulls
Class<T[]> tcA = (Class<T[]>) Array.newInstance(tc, 0).getClass();
T[][] ts = (T[][]) Array.newInstance(tcA, x+y+z);
for( int i = x; i < x+y; ++i ) ts[i] = getA(tc);
return ts;
}
public <T extends Iced> T[][][] getAAA(Class<T> tc) {
_arys++;
long xy = getZA();
if( xy == -1 ) return null;
int x=(int)(xy>>32); // Leading nulls
int y=(int)xy; // Middle non-zeros
int z = y==0 ? 0 : getInt(); // Trailing nulls
Class<T[] > tcA = (Class<T[] >) Array.newInstance(tc , 0).getClass();
Class<T[][]> tcAA = (Class<T[][]>) Array.newInstance(tcA, 0).getClass();
T[][][] ts = (T[][][]) Array.newInstance(tcAA, x+y+z);
for( int i = x; i < x+y; ++i ) ts[i] = getAA(tc);
return ts;
}
public AutoBuffer putAStr(String[] fs) {
_arys++;
long xy = putZA(fs);
if( xy == -1 ) return this;
int x=(int)(xy>>32);
int y=(int)xy;
for( int i=x; i<x+y; i++ ) putStr(fs[i]);
return this;
}
public String[] getAStr() {
_arys++;
long xy = getZA();
if( xy == -1 ) return null;
int x=(int)(xy>>32); // Leading nulls
int y=(int)xy; // Middle non-zeros
int z = y==0 ? 0 : getInt(); // Trailing nulls
String[] ts = new String[x+y+z];
for( int i = x; i < x+y; ++i ) ts[i] = getStr();
return ts;
}
public AutoBuffer putAAStr(String[][] fs) {
_arys++;
long xy = putZA(fs);
if( xy == -1 ) return this;
int x=(int)(xy>>32);
int y=(int)xy;
for( int i=x; i<x+y; i++ ) putAStr(fs[i]);
return this;
}
public String[][] getAAStr() {
_arys++;
long xy = getZA();
if( xy == -1 ) return null;
int x=(int)(xy>>32); // Leading nulls
int y=(int)xy; // Middle non-zeros
int z = y==0 ? 0 : getInt(); // Trailing nulls
String[][] ts = new String[x+y+z][];
for( int i = x; i < x+y; ++i ) ts[i] = getAStr();
return ts;
}
// Read the smaller of _bb.remaining() and len into buf.
// Return bytes read, which could be zero.
public int read( byte[] buf, int off, int len ) {
int sz = Math.min(_bb.remaining(),len);
_bb.get(buf,off,sz);
return sz;
}
// -----------------------------------------------
// Utility functions to handle common UDP packet tasks.
// Get the 1st control byte
int getCtrl( ) { return getSz(1).get(0)&0xFF; }
// Get the port in next 2 bytes
int getPort( ) { return getSz(1+2).getChar(1); }
// Get the task# in the next 4 bytes
int getTask( ) { return getSz(1+2+4).getInt(1+2); }
// Get the flag in the next 1 byte
int getFlag( ) { return getSz(1+2+4+1).get(1+2+4); }
// Set the ctrl, port, task. Ready to write more bytes afterwards
AutoBuffer putUdp (UDP.udp type) {
assert _bb.position()==0;
putSp(1+2);
_bb.put ((byte)type.ordinal());
_bb.putChar((char)H2O.H2O_PORT); // Outgoing port is always the sender's (me) port
assert _bb.position()==1+2;
return this;
}
AutoBuffer putTask(UDP.udp type, int tasknum) {
return putUdp(type).put4(tasknum);
}
AutoBuffer putTask(int ctrl, int tasknum) {
assert _bb.position()==0;
putSp(1+2+4);
_bb.put((byte)ctrl).putChar((char)H2O.H2O_PORT).putInt(tasknum);
return this;
}
// -----------------------------------------------
// Utility functions to read & write arrays
public byte[] getA1( ) {
_arys++;
int len = getInt();
return len == -1 ? null : getA1(len);
}
public byte[] getA1( int len ) {
byte[] buf = MemoryManager.malloc1(len);
int sofar = 0;
while( sofar < len ) {
int more = Math.min(_bb.remaining(), len - sofar);
_bb.get(buf, sofar, more);
sofar += more;
if( sofar < len ) getSp(Math.min(_bb.capacity(), len-sofar));
}
return buf;
}
public short[] getA2( ) {
_arys++;
int len = getInt(); if( len == -1 ) return null;
short[] buf = MemoryManager.malloc2(len);
int sofar = 0;
while( sofar < buf.length ) {
ShortBuffer as = _bb.asShortBuffer();
int more = Math.min(as.remaining(), len - sofar);
as.get(buf, sofar, more);
sofar += more;
_bb.position(_bb.position() + as.position()*2);
if( sofar < len ) getSp(Math.min(_bb.capacity()-1, (len-sofar)*2));
}
return buf;
}
public int[] getA4( ) {
_arys++;
int len = getInt(); if( len == -1 ) return null;
int[] buf = MemoryManager.malloc4(len);
int sofar = 0;
while( sofar < buf.length ) {
IntBuffer as = _bb.asIntBuffer();
int more = Math.min(as.remaining(), len - sofar);
as.get(buf, sofar, more);
sofar += more;
_bb.position(_bb.position() + as.position()*4);
if( sofar < len ) getSp(Math.min(_bb.capacity()-3, (len-sofar)*4));
}
return buf;
}
public float[] getA4f( ) {
_arys++;
int len = getInt(); if( len == -1 ) return null;
float[] buf = MemoryManager.malloc4f(len);
int sofar = 0;
while( sofar < buf.length ) {
FloatBuffer as = _bb.asFloatBuffer();
int more = Math.min(as.remaining(), len - sofar);
as.get(buf, sofar, more);
sofar += more;
_bb.position(_bb.position() + as.position()*4);
if( sofar < len ) getSp(Math.min(_bb.capacity()-3, (len-sofar)*4));
}
return buf;
}
public long[] getA8( ) {
_arys++;
// Get the lengths of lead & trailing zero sections, and the non-zero
// middle section.
int x = getInt(); if( x == -1 ) return null;
int y = getInt(); // Non-zero in the middle
int z = y==0 ? 0 : getInt();// Trailing zeros
long[] buf = MemoryManager.malloc8(x+y+z);
switch( get1() ) { // 1,2,4 or 8 for how the middle section is passed
case 1: for( int i=x; i<x+y; i++ ) buf[i] = get1(); return buf;
case 2: for( int i=x; i<x+y; i++ ) buf[i] = (short)get2(); return buf;
case 4: for( int i=x; i<x+y; i++ ) buf[i] = get4(); return buf;
case 8: break;
default: throw H2O.fail();
}
int sofar = x;
while( sofar < x+y ) {
LongBuffer as = _bb.asLongBuffer();
int more = Math.min(as.remaining(), x+y - sofar);
as.get(buf, sofar, more);
sofar += more;
_bb.position(_bb.position() + as.position()*8);
if( sofar < x+y ) getSp(Math.min(_bb.capacity()-7, (x+y-sofar)*8));
}
return buf;
}
public double[] getA8d( ) {
_arys++;
int len = getInt(); if( len == -1 ) return null;
double[] buf = MemoryManager.malloc8d(len);
int sofar = 0;
while( sofar < len ) {
DoubleBuffer as = _bb.asDoubleBuffer();
int more = Math.min(as.remaining(), len - sofar);
as.get(buf, sofar, more);
sofar += more;
_bb.position(_bb.position() + as.position()*8);
if( sofar < len ) getSp(Math.min(_bb.capacity()-7, (len-sofar)*8));
}
return buf;
}
public byte[][] getAA1( ) {
_arys++;
long xy = getZA();
if( xy == -1 ) return null;
int x=(int)(xy>>32); // Leading nulls
int y=(int)xy; // Middle non-zeros
int z = y==0 ? 0 : getInt(); // Trailing nulls
byte[][] ary = new byte[x+y+z][];
for( int i=x; i<x+y; i++ ) ary[i] = getA1();
return ary;
}
public short[][] getAA2( ) {
_arys++;
long xy = getZA();
if( xy == -1 ) return null;
int x=(int)(xy>>32); // Leading nulls
int y=(int)xy; // Middle non-zeros
int z = y==0 ? 0 : getInt(); // Trailing nulls
short[][] ary = new short[x+y+z][];
for( int i=x; i<x+y; i++ ) ary[i] = getA2();
return ary;
}
public int[][] getAA4( ) {
_arys++;
long xy = getZA();
if( xy == -1 ) return null;
int x=(int)(xy>>32); // Leading nulls
int y=(int)xy; // Middle non-zeros
int z = y==0 ? 0 : getInt(); // Trailing nulls
int[][] ary = new int[x+y+z][];
for( int i=x; i<x+y; i++ ) ary[i] = getA4();
return ary;
}
public float[][] getAA4f( ) {
_arys++;
long xy = getZA();
if( xy == -1 ) return null;
int x=(int)(xy>>32); // Leading nulls
int y=(int)xy; // Middle non-zeros
int z = y==0 ? 0 : getInt(); // Trailing nulls
float[][] ary = new float[x+y+z][];
for( int i=x; i<x+y; i++ ) ary[i] = getA4f();
return ary;
}
public long[][] getAA8( ) {
_arys++;
long xy = getZA();
if( xy == -1 ) return null;
int x=(int)(xy>>32); // Leading nulls
int y=(int)xy; // Middle non-zeros
int z = y==0 ? 0 : getInt(); // Trailing nulls
long[][] ary = new long[x+y+z][];
for( int i=x; i<x+y; i++ ) ary[i] = getA8();
return ary;
}
public double[][] getAA8d( ) {
_arys++;
long xy = getZA();
if( xy == -1 ) return null;
int x=(int)(xy>>32); // Leading nulls
int y=(int)xy; // Middle non-zeros
int z = y==0 ? 0 : getInt(); // Trailing nulls
double[][] ary = new double[x+y+z][];
for( int i=x; i<x+y; i++ ) ary[i] = getA8d();
return ary;
}
public int[][][] getAAA4( ) {
_arys++;
long xy = getZA();
if( xy == -1 ) return null;
int x=(int)(xy>>32); // Leading nulls