Skip to content

Commit

Permalink
ARROW-8168:[Java][Plasma] Improve Java Plasma client off-heap memory …
Browse files Browse the repository at this point in the history
…usage

Implement create() and getByteBuffer() method, Java client could use off-heap ByteBuffer to avoid memory copy.

Closes #6715 from jikunshang/ARROW-8168

Lead-authored-by: Ji Kunshang <kunshang.ji@intel.com>
Co-authored-by: kunshang <kunshang.ji@intel.com>
Signed-off-by: Wes McKinney <wesm+git@apache.org>
  • Loading branch information
jikunshang authored and wesm committed Mar 30, 2020
1 parent 0facdc7 commit a81aacd
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,38 @@ public List<ObjectStoreData> get(byte[][] objectIds, int timeoutMs) {
return ret;
}

/**
* Get an object in Plasma Store with objectId. Will return an off-heap ByteBuffer.
*
* @param objectId used to identify an object.
* @param timeoutMs time in milliseconfs to wait before this request time out.
* @param isMetadata get this object's metadata or data.
*/
public ByteBuffer getObjAsByteBuffer(byte[] objectId, int timeoutMs, boolean isMetadata) {
byte[][] objectIds = new byte[][]{objectId};
ByteBuffer[][] bufs = PlasmaClientJNI.get(conn, objectIds, timeoutMs);
return bufs[0][isMetadata ? 1 : 0];
}

@Override
public long evict(long numBytes) {
return PlasmaClientJNI.evict(conn, numBytes);
}

// wrapper methods --------------------

/**
* Create an object in Plasma Store with particular size. Will return an off-heap ByteBuffer.
*
* @param objectId used to identify an object.
* @param size size in bytes to be allocated for this object.
* @param metadata this object's metadata. It should be null if there is no metadata.
*/
public ByteBuffer create(byte[] objectId, int size, byte[] metadata)
throws DuplicateObjectException, PlasmaOutOfMemoryException {
return PlasmaClientJNI.create(conn, objectId, size, metadata);
}

/**
* Seal the buffer in the PlasmaStore for a particular object ID.
* Once a buffer has been sealed, the buffer is immutable and can only be accessed through get.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.arrow.plasma;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -237,6 +238,30 @@ public void doTest() {

}

public void doByteBufferTest() {
System.out.println("Start ByteBuffer test.");
PlasmaClient client = (PlasmaClient)pLink;
byte[] id = new byte[20];
Arrays.fill(id, (byte)10);
ByteBuffer buf = client.create(id, 100, null);
assert buf.isDirect();
for (int i = 0; i < 10; i++) {
buf.putInt(i);
}
client.seal(id);
client.release(id);
// buf is not available now.
assert client.contains(id);
System.out.println("Plasma java client create test success.");

ByteBuffer buf1 = client.getObjAsByteBuffer(id, -1, false);
assert buf1.limit() == 100;
for (int i = 0; i < 10; i++) {
assert buf1.getInt() == i;
}
System.out.println("Plasma java client getObjAsByteBuffer test success");
client.release(id);
}

private byte[] getArrayFilledWithValue(int arrayLength, byte val) {
byte[] arr = new byte[arrayLength];
Expand All @@ -251,6 +276,7 @@ public String getStoreAddress() {
public static void main(String[] args) throws Exception {

PlasmaClientTest plasmaClientTest = new PlasmaClientTest();
plasmaClientTest.doByteBufferTest();
plasmaClientTest.doTest();

}
Expand Down

0 comments on commit a81aacd

Please sign in to comment.