Skip to content

Commit

Permalink
Use ByteBuffer rather than byte[] for index/payload get
Browse files Browse the repository at this point in the history
  • Loading branch information
apc999 committed Dec 20, 2015
1 parent 4f1b191 commit 66a3d18
Show file tree
Hide file tree
Showing 12 changed files with 153 additions and 53 deletions.
Expand Up @@ -16,15 +16,11 @@

package tachyon.client.keyvalue;

import com.google.common.base.Preconditions;
import tachyon.TachyonURI;
import tachyon.client.file.TachyonFile;
import tachyon.client.file.TachyonFileSystem;
import tachyon.client.file.TachyonFileSystem.TachyonFileSystemFactory;
import tachyon.exception.TachyonException;

import java.io.IOException;
import java.util.List;
import java.nio.ByteBuffer;

/**
* Interface of the reader class to access a Tachyon key-value file.
Expand All @@ -41,11 +37,7 @@ class Factory {
* @throws IOException
*/
public static KeyValueFileReader create(TachyonURI uri) throws TachyonException, IOException {
Preconditions.checkArgument(uri != null);
TachyonFileSystem tfs = TachyonFileSystemFactory.get();
TachyonFile tFile = tfs.open(uri);
List<Long> blockIds = tfs.getInfo(tFile).getBlockIds();
return null; // new RemoteKeyValueFileReader(blockIds);
return new ClientKeyValueFileReader(uri);
}
}

Expand All @@ -55,5 +47,7 @@ public static KeyValueFileReader create(TachyonURI uri) throws TachyonException,
* @param key key to get, cannot be null
* @return bytes of the value if found, null otherwise
*/
byte[] get(byte[] key) throws IOException;
byte[] get(byte[] key) throws IOException, TachyonException;

ByteBuffer get(ByteBuffer key) throws IOException, TachyonException;
}
Expand Up @@ -16,15 +16,16 @@
package tachyon.client.keyvalue;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.nio.ByteBuffer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

import tachyon.Constants;
import tachyon.exception.TachyonException;
import tachyon.util.io.BufferUtils;
import tachyon.util.io.ByteIOUtils;
import tachyon.worker.keyvalue.Index;
import tachyon.worker.keyvalue.LinearProbingIndex;
Expand All @@ -39,29 +40,39 @@ public final class RandomAccessKeyValueFileReader implements KeyValueFileReader

private Index mIndex;
private PayloadReader mPayloadReader;
private byte[] mBuf;
private ByteBuffer mBuf;
private int mBufferLength;

public RandomAccessKeyValueFileReader(byte[] fileBytes) {
public RandomAccessKeyValueFileReader(ByteBuffer fileBytes) {
mBuf = Preconditions.checkNotNull(fileBytes);
mBufferLength = mBuf.length;
mBufferLength = mBuf.limit();
mIndex = createIndex();
mPayloadReader = createPayloadReader();
}

public Index createIndex() {
int indexOffset = ByteIOUtils.readInt(mBuf, mBufferLength - 4);
// TODO(binfan): this array copy might be expensive and unnecessary
byte[] indexBytes = Arrays.copyOfRange(mBuf, indexOffset, mBufferLength - 4);
ByteBuffer indexBytes =
BufferUtils.sliceByteBuffer(mBuf, indexOffset, mBufferLength - 4 - indexOffset);
return LinearProbingIndex.loadFromByteArray(indexBytes);
}

public RandomAccessPayloadReader createPayloadReader() {
return new RandomAccessPayloadReader(mBuf);
}

// This could be slow when value size is large, use in cautious.
@Override
public byte[] get(byte[] key) throws IOException {
public byte[] get(byte[] key) throws IOException, TachyonException {
ByteBuffer valueBuffer = get(ByteBuffer.wrap(key));
if (valueBuffer == null) {
return null;
}
return BufferUtils.newByteArrayFromByteBuffer(valueBuffer);
}

@Override
public ByteBuffer get(ByteBuffer key) throws IOException {
LOG.trace("get: key");
return mIndex.get(key, mPayloadReader);
}
Expand Down
Expand Up @@ -22,6 +22,8 @@
import tachyon.worker.keyvalue.Index;
import tachyon.worker.keyvalue.RandomAccessPayloadReader;

import java.nio.ByteBuffer;

/**
* unit tests of {@link OutStreamKeyValueFileWriter} and {@link RandomAccessKeyValueFileReader}
*/
Expand Down Expand Up @@ -51,7 +53,7 @@ public void buildAndLoadTest() throws Exception {
mWriter.put(KEY2, VALUE2);
mWriter.build();
byte[] fileData = mOutStream.toByteArray();
mReader = new RandomAccessKeyValueFileReader(fileData);
mReader = new RandomAccessKeyValueFileReader(ByteBuffer.wrap(fileData));
Assert.assertArrayEquals(VALUE1, mReader.get(KEY1));
Assert.assertArrayEquals(VALUE2, mReader.get(KEY2));

Expand Down
30 changes: 30 additions & 0 deletions common/src/main/java/tachyon/util/io/BufferUtils.java
Expand Up @@ -317,5 +317,35 @@ public static void fastCopy(final ReadableByteChannel src, final WritableByteCha
}
}

/**
* Creates a byte array from the given ByteBuffer, respecting the position of buffer.
*
* @param buf source ByteBuffer
* @return a newly created byte array
*/
public static byte[] newByteArrayFromByteBuffer(ByteBuffer buf) {
final int length = buf.remaining();
byte[] bytes = new byte[length];
// transfer bytes from this buffer into the given destination array
buf.duplicate().get(bytes, 0, length);
return bytes;
}

/**
* Creates a new ByteBuffer sliced from a given ByteBuffer. The new ByteBuffer shares the
* content of the existing one, but with independent position/mark/limit. After slicing, the
* new ByteBuffer has position 0, and the input ByteBuffer is unmodified.
*
* @param buffer source ByteBuffer to slice
* @param position position in the source ByteBuffer to slice
* @param length length of the sliced ByteBuffer
* @return sliced ByteBuffer
*/
public static ByteBuffer sliceByteBuffer(ByteBuffer buffer, int position, int length) {
ByteBuffer slicedBuffer = ((ByteBuffer) buffer.duplicate().position(position)).slice();
slicedBuffer.limit(length);
return slicedBuffer;
}

private BufferUtils() {} // prevent instantiation
}
21 changes: 21 additions & 0 deletions common/src/main/java/tachyon/util/io/ByteIOUtils.java
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;


/**
Expand All @@ -33,6 +34,10 @@ public static byte readByte(byte[] buf, int pos) {
return (byte) (buf[pos] & 0xff);
}

public static byte readByte(ByteBuffer buf, int pos) {
return (byte) (buf.get(pos) & 0xff);
}

public static short readShort(byte[] buf, int pos) {
checkBoundary(buf, pos, 1);
return (short) (((short) (buf[pos ++] & 0xff) << 8) | ((short) (buf[pos] & 0xff)));
Expand All @@ -44,6 +49,12 @@ public static int readInt(byte[] buf, int pos) {
| ((buf[pos ++] & 0xff) << 8) | (buf[pos] & 0xff));
}

public static int readInt(ByteBuffer buf, int pos) {
return (((buf.get(pos) & 0xff) << 24) | ((buf.get(pos + 1) & 0xff) << 16)
| ((buf.get(pos + 2) & 0xff) << 8) | (buf.get(pos + 3) & 0xff));
}


public static long readLong(byte[] buf, int pos) {
checkBoundary(buf, pos, 7);
return (((long) (buf[pos ++] & 0xff) << 56) | ((long) (buf[pos ++] & 0xff) << 48)
Expand All @@ -57,6 +68,10 @@ public static void writeByte(byte[] buf, int pos, byte v) {
buf[pos] = v;
}

public static void writeByte(ByteBuffer buf, int pos, byte v) {
buf.put(pos, v);
}

/**
* Writes a single byte value (1 byte) to the output stream. This is equivalent of
* {@link OutputStream#write(int)}.
Expand Down Expand Up @@ -97,6 +112,12 @@ public static void writeInt(byte[] buf, int pos, int v) {
buf[pos] = (byte) (0xff & v);
}

public static void writeInt(ByteBuffer buf, int pos, int v) {
buf.put(pos ++, (byte) (0xff & (v >> 24)));
buf.put(pos ++, (byte) (0xff & (v >> 16)));
buf.put(pos ++, (byte) (0xff & (v >> 8)));
buf.put(pos, (byte) (0xff & v));
}

/**
* Writes a specific integer value (4 bytes) to the output stream.
Expand Down
5 changes: 3 additions & 2 deletions common/src/main/java/tachyon/worker/keyvalue/Index.java
Expand Up @@ -16,6 +16,7 @@
package tachyon.worker.keyvalue;

import java.io.IOException;
import java.nio.ByteBuffer;

/**
* Interface of key-value Index.
Expand All @@ -38,9 +39,9 @@ public interface Index {
*
* @param key bytes of key
* @param reader the byte array of all key value payload
* @return bytes of value, or null if not found
* @return ByteBuffer of value, or null if not found
*/
byte[] get(byte[] key, PayloadReader reader) throws IOException ;
ByteBuffer get(ByteBuffer key, PayloadReader reader) throws IOException;

/**
* @return byte array of this index
Expand Down
Expand Up @@ -16,11 +16,13 @@
package tachyon.worker.keyvalue;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;

import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;

import tachyon.util.io.BufferUtils;
import tachyon.util.io.ByteIOUtils;

/**
Expand Down Expand Up @@ -48,7 +50,7 @@ public final class LinearProbingIndex implements Index {
/** Size of each bucket in bytes */
private static final int BUCKET_SIZE_BYTES = (Byte.SIZE + Integer.SIZE) / Byte.SIZE;

private byte[] mBuf;
private ByteBuffer mBuf;
private int mNumBuckets;
private int mKeyCount;

Expand All @@ -59,16 +61,16 @@ public static LinearProbingIndex createEmptyIndex() {
int numBuckets = 1 << 15;
byte[] buffer = new byte[numBuckets * BUCKET_SIZE_BYTES];
Arrays.fill(buffer, (byte) 0);
return new LinearProbingIndex(buffer, numBuckets, 0);
return new LinearProbingIndex(ByteBuffer.wrap(buffer), numBuckets, 0);
}

public static LinearProbingIndex loadFromByteArray(byte[] buffer) {
int numBuckets = buffer.length / BUCKET_SIZE_BYTES;
public static LinearProbingIndex loadFromByteArray(ByteBuffer buffer) {
int numBuckets = buffer.limit() / BUCKET_SIZE_BYTES;
// TODO(binfan): fix the key count which is wrong now.
return new LinearProbingIndex(buffer, numBuckets, 0);
}

private LinearProbingIndex(byte[] buf, int numBuckets, int keyCount) {
private LinearProbingIndex(ByteBuffer buf, int numBuckets, int keyCount) {
mBuf = buf;
mNumBuckets = numBuckets;
mKeyCount = keyCount;
Expand Down Expand Up @@ -106,7 +108,7 @@ public boolean put(byte[] key, byte[] value, PayloadWriter writer) throws IOExce
}

@Override
public byte[] get(byte[] key, PayloadReader reader) {
public ByteBuffer get(ByteBuffer key, PayloadReader reader) {
int bucketIndex = indexHash(key);
byte fingerprint = fingerprintHash(key);

Expand All @@ -115,8 +117,8 @@ public byte[] get(byte[] key, PayloadReader reader) {
int pos = bucketIndex * BUCKET_SIZE_BYTES;
if (fingerprint == ByteIOUtils.readByte(mBuf, pos)) {
int offset = ByteIOUtils.readInt(mBuf, pos + 1);
byte[] keyStored = reader.getKey(offset);
if (Arrays.equals(key, keyStored)) {
ByteBuffer keyStored = reader.getKey(offset);
if (key.equals(keyStored)) {
return reader.getValue(offset);
}
}
Expand All @@ -127,7 +129,7 @@ public byte[] get(byte[] key, PayloadReader reader) {

@Override
public byte[] getBytes() {
return mBuf;
return mBuf.array();
}

/**
Expand All @@ -142,6 +144,11 @@ public int indexHash(byte[] key) {
return (v >= 0) ? v : -v;
}

public int indexHash(ByteBuffer key) {
byte[] keyBytes = BufferUtils.newByteArrayFromByteBuffer(key);
return indexHash(keyBytes);
}


/**
* Hashes the key into a non-zero fingerprint in byte.
Expand All @@ -154,4 +161,9 @@ public byte fingerprintHash(byte[] key) {
hash = (hash >> 24) & 0xff; // use high-order bits
return (byte) ((hash == 0) ? 1 : hash);
}

public byte fingerprintHash(ByteBuffer key) {
byte[] keyBytes = BufferUtils.newByteArrayFromByteBuffer(key);
return fingerprintHash(keyBytes);
}
}
10 changes: 6 additions & 4 deletions common/src/main/java/tachyon/worker/keyvalue/PayloadReader.java
Expand Up @@ -15,6 +15,8 @@

package tachyon.worker.keyvalue;

import java.nio.ByteBuffer;

/**
* Interface to access key and value from payload storage.
*/
Expand All @@ -24,15 +26,15 @@ public interface PayloadReader {
* Gets the key given the position of payload storage.
*
* @param pos position in the payload storage in bytes
* @return bytes of the key
* @return key in ByteBuffer
*/
byte[] getKey(int pos);
ByteBuffer getKey(int pos);

/**
* Gets the value given the position of payload storage.
*
* @param pos position in the payload storage in bytes
* @return bytes of the key
* @return value in ByteBuffer
*/
byte[] getValue(int pos);
ByteBuffer getValue(int pos);
}

0 comments on commit 66a3d18

Please sign in to comment.