-
Notifications
You must be signed in to change notification settings - Fork 2.9k
/
BufferUtils.java
381 lines (350 loc) · 12.7 KB
/
BufferUtils.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
/*
* Licensed to the University of California, Berkeley under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package tachyon.util.io;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import tachyon.Constants;
/**
* Utilities related to buffers, not only {@link ByteBuffer}.
*/
@NotThreadSafe
// TODO(jsimsa): Make this class thread-safe.
public final class BufferUtils {
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);
private static Method sCleanerCleanMethod;
private static Method sByteBufferCleanerMethod;
/**
* Converts a byte to an integer.
*
* @param b the byte to convert
* @return the integer representation of the byte
*/
public static int byteToInt(byte b) {
return b & 0xFF;
}
/**
* Forces to unmap a direct buffer if this buffer is no longer used. After calling this method,
* this direct buffer should be discarded. This is unsafe operation and currently a walk-around to
* avoid huge memory occupation caused by memory map.
*
* <p>
* NOTE: DirectByteBuffers are not guaranteed to be garbage-collected immediately after their
* references are released and may lead to OutOfMemoryError. This function helps by calling the
* Cleaner method of a DirectByteBuffer explicitly. See <a
* href="http://stackoverflow.com/questions/1854398/how-to-garbage-collect-a-direct-buffer-java"
* >more discussion</a>.
*
* @param buffer the byte buffer to be unmapped, this must be a direct buffer
*/
public static void cleanDirectBuffer(ByteBuffer buffer) {
Preconditions.checkNotNull(buffer);
Preconditions.checkArgument(buffer.isDirect(), "buffer isn't a DirectByteBuffer");
try {
if (sByteBufferCleanerMethod == null) {
sByteBufferCleanerMethod = buffer.getClass().getMethod("cleaner");
sByteBufferCleanerMethod.setAccessible(true);
}
final Object cleaner = sByteBufferCleanerMethod.invoke(buffer);
if (cleaner == null) {
if (buffer.capacity() > 0) {
LOG.warn("Failed to get cleaner for ByteBuffer: {}", buffer.getClass().getName());
}
// The cleaner could be null when the buffer is initialized as zero capacity.
return;
}
if (sCleanerCleanMethod == null) {
sCleanerCleanMethod = cleaner.getClass().getMethod("clean");
}
sCleanerCleanMethod.invoke(cleaner);
} catch (Exception e) {
LOG.warn("Failed to unmap direct ByteBuffer: {}", buffer.getClass().getName(), e);
} finally {
// Force to drop reference to the buffer to clean
buffer = null;
}
}
/**
* Clones a {@link ByteBuffer}.
* <p>
* The new bytebuffer will have the same content, but the type of the bytebuffer may not be the
* same.
*
* @param buf The ByteBuffer to clone
* @return The new ByteBuffer
*/
public static ByteBuffer cloneByteBuffer(ByteBuffer buf) {
ByteBuffer ret = ByteBuffer.allocate(buf.limit() - buf.position());
if (buf.hasArray()) {
ret.put(buf.array(), buf.position(), buf.limit() - buf.position());
} else {
// direct buffer
ret.put(buf);
}
ret.flip();
return ret;
}
/**
* Clones a list of {@link ByteBuffer}s.
*
* @param source the list of ByteBuffers to clone
* @return the new list of ByteBuffers
*/
public static List<ByteBuffer> cloneByteBufferList(List<ByteBuffer> source) {
List<ByteBuffer> ret = new ArrayList<ByteBuffer>(source.size());
for (ByteBuffer b : source) {
ret.add(cloneByteBuffer(b));
}
return ret;
}
/**
* Extracts a correct {@link ByteBuffer} from Thrift RPC result.
*
* @param data result of Thrift RPC
* @return ByteBuffer with data extracted from the Thrift RPC result
*/
public static ByteBuffer generateNewByteBufferFromThriftRPCResults(ByteBuffer data) {
// TODO(cc): This is a trick to fix the issue in thrift. Change the code to use metadata
// directly when thrift fixes the issue.
ByteBuffer correctData = ByteBuffer.allocate(data.limit() - data.position());
correctData.put(data);
correctData.flip();
return correctData;
}
/**
* Puts a byte (the first byte of an integer) into a {@link ByteBuffer}.
*
* @param buf ByteBuffer to use
* @param b byte to put into the ByteBuffer
*/
public static void putIntByteBuffer(ByteBuffer buf, int b) {
buf.put((byte) (b & 0xFF));
}
/**
* Gets an increasing sequence of bytes starting at zero.
*
* @param len the target length of the sequence
* @return an increasing sequence of bytes
*/
public static byte[] getIncreasingByteArray(int len) {
return getIncreasingByteArray(0, len);
}
/**
* Gets an increasing sequence of bytes starting with the given value.
*
* @param start the starting value to use
* @param len the target length of the sequence
* @return an increasing sequence of bytes
*/
public static byte[] getIncreasingByteArray(int start, int len) {
byte[] ret = new byte[len];
for (int k = 0; k < len; k ++) {
ret[k] = (byte) (k + start);
}
return ret;
}
/**
* Checks if the given byte array starts with an increasing sequence of bytes starting at zero of
* length equal to or greater than the given length.
*
* @param len the target length of the sequence
* @param arr the byte array to check
* @return true if the byte array has a prefix of length {@code len} that is an increasing
* sequence of bytes starting at zero
*/
public static boolean equalIncreasingByteArray(int len, byte[] arr) {
return equalIncreasingByteArray(0, len, arr);
}
/**
* Checks if the given byte array starts with an increasing sequence of bytes starting at the
* given value of length equal to or greater than the given length.
*
* @param start the starting value to use
* @param len the target length of the sequence
* @param arr the byte array to check
* @return true if the byte array has a prefix of length {@code len} that is an increasing
* sequence of bytes starting at {@code start}
*/
public static boolean equalIncreasingByteArray(int start, int len, byte[] arr) {
if (arr == null || arr.length != len) {
return false;
}
for (int k = 0; k < len; k ++) {
if (arr[k] != (byte) (start + k)) {
return false;
}
}
return true;
}
/**
* Gets a {@link ByteBuffer} containing an increasing sequence of bytes starting at zero.
*
* @param len the target length of the sequence
* @return ByteBuffer containing an increasing sequence of bytes
*/
public static ByteBuffer getIncreasingByteBuffer(int len) {
return getIncreasingByteBuffer(0, len);
}
/**
* Gets a {@link ByteBuffer} containing an increasing sequence of bytes starting at the given
* value.
*
* @param len the target length of the sequence
* @param start the starting value to use
* @return ByteBuffer containing an increasing sequence of bytes
*/
public static ByteBuffer getIncreasingByteBuffer(int start, int len) {
return ByteBuffer.wrap(getIncreasingByteArray(start, len));
}
/**
* Checks if the given {@link ByteBuffer} starts with an increasing sequence of bytes starting at
* the given value of length equal to or greater than the given length.
*
* @param start the starting value to use
* @param len the target length of the sequence
* @param buf the ByteBuffer to check
* @return true if the ByteBuffer has a prefix of length {@code len} that is an increasing
* sequence of bytes starting at {@code start}
*/
public static boolean equalIncreasingByteBuffer(int start, int len, ByteBuffer buf) {
if (buf == null) {
return false;
}
buf.rewind();
if (buf.remaining() != len) {
return false;
}
for (int k = 0; k < len; k ++) {
if (buf.get() != (byte) (start + k)) {
return false;
}
}
return true;
}
/**
* Gets a {@link ByteBuffer} containing an increasing sequence of integers starting at zero.
*
* @param len the target length of the sequence
* @return ByteBuffer containing an increasing sequence of integers
*/
public static ByteBuffer getIncreasingIntBuffer(int len) {
return getIncreasingIntBuffer(0, len);
}
/**
* Get a {@link ByteBuffer} containing an increasing sequence of integers starting at the given
* value.
*
* @param start the starting value to use
* @param len the target length of the sequence
* @return ByteBuffer containing an increasing sequence of integers
*/
public static ByteBuffer getIncreasingIntBuffer(int start, int len) {
ByteBuffer ret = ByteBuffer.allocate(len * 4);
for (int k = 0; k < len; k ++) {
ret.putInt(start + k);
}
ret.flip();
return ret;
}
/**
* Writes buffer to the given file path.
*
* @param path file path to write the data
* @param buffer raw data
* @throws IOException if the operation fails
*/
public static void writeBufferToFile(String path, byte[] buffer) throws IOException {
FileOutputStream os = new FileOutputStream(path);
try {
os.write(buffer);
} finally {
os.close();
}
}
/**
* An efficient copy between two channels with a fixed-size buffer.
*
* @param src the source channel
* @param dest the destination channel
* @throws IOException if the copy fails
*/
public static void fastCopy(final ReadableByteChannel src, final WritableByteChannel dest)
throws IOException {
// TODO(yupeng): make the buffer size configurable
final ByteBuffer buffer = ByteBuffer.allocateDirect(16 * 1024);
while (src.read(buffer) != -1) {
buffer.flip();
dest.write(buffer);
buffer.compact();
}
buffer.flip();
while (buffer.hasRemaining()) {
dest.write(buffer);
}
}
/**
* Creates a byte array from the given ByteBuffer, the position property of input
* {@link ByteBuffer} remains unchanged.
*
* @param buf source ByteBuffer
* @return a newly created byte array
*/
public static byte[] newByteArrayFromByteBuffer(ByteBuffer buf) {
final int length = buf.remaining();
byte[] bytes = new byte[length];
// transfer bytes from this buffer into the given destination array
buf.duplicate().get(bytes, 0, length);
return bytes;
}
/**
* Creates a new ByteBuffer sliced from a given ByteBuffer. The new ByteBuffer shares the
* content of the existing one, but with independent position/mark/limit. After slicing, the
* new ByteBuffer has position 0, and the input ByteBuffer is unmodified.
*
* @param buffer source ByteBuffer to slice
* @param position position in the source ByteBuffer to slice
* @param length length of the sliced ByteBuffer
* @return the sliced ByteBuffer
*/
public static ByteBuffer sliceByteBuffer(ByteBuffer buffer, int position, int length) {
ByteBuffer slicedBuffer = ((ByteBuffer) buffer.duplicate().position(position)).slice();
slicedBuffer.limit(length);
return slicedBuffer;
}
/**
* Convenience method for {@link #sliceByteBuffer(ByteBuffer, int, int)} where the last parameter
* is the number of remaining bytes in the new buffer.
*
* @param buffer source {@link ByteBuffer} to slice
* @param position position in the source {@link ByteBuffer} to slice
* @return the sliced {@link ByteBuffer}
*/
public static ByteBuffer sliceByteBuffer(ByteBuffer buffer, int position) {
// The following is an optimization comparing to directly calling
// sliceByteBuffer(ByteBuffer, int, int) needs to compute the length of the sliced buffer and
// set the limit, but those operations should have been taken care of by the slice() method.
return ((ByteBuffer) buffer.duplicate().position(position)).slice();
}
private BufferUtils() {} // prevent instantiation
}