Navigation Menu

Skip to content

Commit

Permalink
This is to fix DS-Java issue 358, which relates to Druid Issue 11544.
Browse files Browse the repository at this point in the history
  • Loading branch information
leerho committed Aug 8, 2021
1 parent bdbb636 commit 310a8b7
Show file tree
Hide file tree
Showing 26 changed files with 148 additions and 107 deletions.
Expand Up @@ -227,7 +227,7 @@ private static void checkByteBuffer(WritableMemory mem, long off, long cap, bool

assertTrue(mem.getByteBuffer() != null);
assertTrue(mem.getTypeByteOrder() == Util.nativeByteOrder);
assertTrue(mem.getMemoryRequestServer() == null);
assertNotNull(mem.getMemoryRequestServer());
Object obj = ReflectUtil.getUnsafeObject(mem);
//Object obj = mem.getUnsafeObject();
if (direct) {
Expand Down Expand Up @@ -269,7 +269,7 @@ private static void checkByteBuffer(WritableMemory mem, long off, long cap, bool

assertTrue(nnMem.getByteBuffer() != null);
assertTrue(nnMem.getTypeByteOrder() == Util.nonNativeByteOrder);
assertTrue(nnMem.getMemoryRequestServer() == null);
assertNotNull(nnMem.getMemoryRequestServer());
obj = ReflectUtil.getUnsafeObject(nnMem);
//obj = nnMem.getUnsafeObject();
if (direct) {
Expand Down
Expand Up @@ -47,7 +47,7 @@ public void wrapBigEndian() {
public void wrapBigEndianAsLittle() {
ByteBuffer bb = ByteBuffer.allocate(64);
bb.putChar(0, (char)1); //as BE
WritableMemory wmem = WritableMemory.writableWrap(bb, ByteOrder.LITTLE_ENDIAN);
WritableMemory wmem = WritableMemory.writableWrap(bb, ByteOrder.LITTLE_ENDIAN, null);
assertEquals(wmem.getChar(0), 256);
}

Expand Down
Expand Up @@ -32,6 +32,7 @@
* @author Lee Rhodes
*/
public interface BaseState {
static final MemoryRequestServer defaultMemReqSvr = new DefaultMemoryRequestServer();

//Byte Order Related

Expand Down
Expand Up @@ -37,7 +37,7 @@ public interface WritableBuffer extends Buffer {
* @return a new WritableBuffer for write operations on the given ByteBuffer.
*/
static WritableBuffer writableWrap(ByteBuffer byteBuf) {
return WritableBufferImpl.writableWrap(byteBuf, byteBuf.order());
return WritableBufferImpl.writableWrap(byteBuf);
}

/**
Expand All @@ -48,10 +48,13 @@ static WritableBuffer writableWrap(ByteBuffer byteBuf) {
* @param byteBuf the given ByteBuffer, must not be null
* @param byteOrder the byte order to be used, which may be independent of the byte order
* state of the given ByteBuffer
* @param memReqSvr A user-specified MemoryRequestServer.
* This is a callback mechanism for a user client to request a larger Memory.
* @return a new WritableBuffer for write operations on the given ByteBuffer.
*/
static WritableBuffer writableWrap(ByteBuffer byteBuf, ByteOrder byteOrder) {
return WritableBufferImpl.writableWrap(byteBuf, byteOrder);
static WritableBuffer writableWrap(ByteBuffer byteBuf, ByteOrder byteOrder, MemoryRequestServer memReqSvr) {
MemoryRequestServer mReqSvr = (memReqSvr == null) ? defaultMemReqSvr : memReqSvr;
return WritableBufferImpl.writableWrap(byteBuf, byteOrder, mReqSvr);
}

//DUPLICATES
Expand Down Expand Up @@ -363,10 +366,11 @@ void putBooleanArray(boolean[] srcArray, int srcOffsetBooleans,

//OTHER WRITABLE API METHODS
/**
* For Direct Memory only. Other types of backing resources will return null.
* Gets the MemoryRequestServer object used by dynamic off-heap (Direct) memory objects
* to request additional memory.
* Set using {@link WritableMemory#allocateDirect(long, MemoryRequestServer)}.
* For ByteBuffer and Direct Memory backed resources only. Heap and Map backed resources will return null.
* Gets the MemoryRequestServer object used by dynamic Memory-backed objects
* to request additional memory. To customize the actions of the MemoryRequestServer,
* extend the MemoryRequestServer interfact and
* set using {@link WritableMemory#allocateDirect(long, MemoryRequestServer)}.
* If not explicity set, this returns the {@link DefaultMemoryRequestServer}.
* @return the MemoryRequestServer object (if direct memory) or null.
*/
Expand Down
Expand Up @@ -34,31 +34,28 @@ public interface WritableMemory extends Memory {
* the same byte order, as the given ByteBuffer, unless the capacity of the given ByteBuffer is
* zero, then byte order of the returned WritableMemory object, as well as backing storage and
* read-only status are unspecified.
*
* <p><b>Note:</b> Always qualify this method with the class name, e.g.,
* <i>WritableMemory.wrap(...)</i>.
* @param byteBuf the given ByteBuffer
* @return a new WritableMemory for write operations on the given ByteBuffer.
*/
static WritableMemory writableWrap(ByteBuffer byteBuf) {
return WritableMemoryImpl.writableWrap(byteBuf, byteBuf.order());
return WritableMemoryImpl.writableWrap(byteBuf);
}

/**
* Accesses the given ByteBuffer for write operations. The returned WritableMemory object has
* the given byte order, ignoring the byte order of the given ByteBuffer. If the capacity of
* the given ByteBuffer is zero the byte order of the returned WritableMemory object
* (as well as backing storage) is unspecified.
*
* <p><b>Note:</b> Always qualify this method with the class name, e.g.,
* <i>WritableMemory.wrap(...)</i>.
* @param byteBuf the given ByteBuffer, must not be null
* @param byteOrder the byte order to be used, which may be independent of the byte order
* state of the given ByteBuffer
* @param memReqSvr A user-specified MemoryRequestServer. If null, the DefaultMemoryRequestServer is used.
* This is a callback mechanism for a user client to request a larger Memory.
* @return a new WritableMemory for write operations on the given ByteBuffer.
*/
static WritableMemory writableWrap(ByteBuffer byteBuf, ByteOrder byteOrder) {
return WritableMemoryImpl.writableWrap(byteBuf, byteOrder);
static WritableMemory writableWrap(ByteBuffer byteBuf, ByteOrder byteOrder, MemoryRequestServer memReqSvr) {
MemoryRequestServer mReqSvr = (memReqSvr == null) ? defaultMemReqSvr : memReqSvr;
return WritableMemoryImpl.writableWrap(byteBuf, byteOrder, mReqSvr);
}

//MAP
Expand Down Expand Up @@ -134,7 +131,8 @@ static WritableHandle allocateDirect(long capacityBytes) {
* Please read Javadocs for {@link Handle}.
*/
static WritableHandle allocateDirect(long capacityBytes, MemoryRequestServer memReqSvr) {
return WritableMemoryImpl.allocateDirect(capacityBytes, memReqSvr);
MemoryRequestServer mReqSvr = (memReqSvr == null) ? defaultMemReqSvr : memReqSvr;
return WritableMemoryImpl.allocateDirect(capacityBytes, mReqSvr);
}

//REGIONS
Expand Down Expand Up @@ -602,10 +600,11 @@ static WritableMemory writableWrap(double[] arr) {

//OTHER WRITABLE API METHODS
/**
* For Direct Memory only. Other types of backing resources will return null.
* Gets the MemoryRequestServer object used by dynamic off-heap (Direct) memory objects
* to request additional memory.
* Set using {@link WritableMemory#allocateDirect(long, MemoryRequestServer)}.
* For ByteBuffer and Direct Memory backed resources only. Heap and Map backed resources will return null.
* Gets the MemoryRequestServer object used by dynamic Memory-backed objects
* to request additional memory. To customize the actions of the MemoryRequestServer,
* extend the MemoryRequestServer interfact and
* set using {@link WritableMemory#allocateDirect(long, MemoryRequestServer)}.
* If not explicity set, this returns the {@link DefaultMemoryRequestServer}.
* @return the MemoryRequestServer object (if direct memory) or null.
*/
Expand Down
Expand Up @@ -22,6 +22,8 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

import org.apache.datasketches.memory.MemoryRequestServer;

/**
* Implementation of {@link WritableBufferImpl} for ByteBuffer, non-native byte order.
*
Expand All @@ -33,6 +35,7 @@ final class BBNonNativeWritableBufferImpl extends NonNativeWritableBufferImpl {
private final Object unsafeObj;
private final long nativeBaseOffset; //used to compute cumBaseOffset
private final ByteBuffer byteBuf; //holds a reference to a ByteBuffer until we are done with it.
private MemoryRequestServer memReqSvr = null; //cannot be final;
private final byte typeId;

BBNonNativeWritableBufferImpl(
Expand Down Expand Up @@ -81,6 +84,12 @@ public ByteBuffer getByteBuffer() {
return byteBuf;
}

@Override
public MemoryRequestServer getMemoryRequestServer() {
assertValid();
return memReqSvr; //cannot be null
}

@Override
long getNativeBaseOffset() {
return nativeBaseOffset;
Expand Down
Expand Up @@ -22,6 +22,8 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

import org.apache.datasketches.memory.MemoryRequestServer;

/**
* Implementation of {@link WritableMemoryImpl} for ByteBuffer, non-native byte order.
*
Expand All @@ -33,6 +35,7 @@ final class BBNonNativeWritableMemoryImpl extends NonNativeWritableMemoryImpl {
private final Object unsafeObj;
private final long nativeBaseOffset; //used to compute cumBaseOffset
private final ByteBuffer byteBuf; //holds a reference to a ByteBuffer until we are done with it.
private MemoryRequestServer memReqSvr = null; //cannot be final;
private final byte typeId;

BBNonNativeWritableMemoryImpl(
Expand All @@ -41,11 +44,13 @@ final class BBNonNativeWritableMemoryImpl extends NonNativeWritableMemoryImpl {
final long regionOffset,
final long capacityBytes,
final int typeId,
final ByteBuffer byteBuf) {
final ByteBuffer byteBuf,
final MemoryRequestServer memReqSvr) {
super(unsafeObj, nativeBaseOffset, regionOffset, capacityBytes);
this.unsafeObj = unsafeObj;
this.nativeBaseOffset = nativeBaseOffset;
this.byteBuf = byteBuf;
this.memReqSvr = (memReqSvr == null) ? defaultMemReqSvr : memReqSvr;
this.typeId = (byte) (id | (typeId & 0x7));
}

Expand All @@ -56,10 +61,10 @@ BaseWritableMemoryImpl toWritableRegion(final long offsetBytes, final long capac
return Util.isNativeByteOrder(byteOrder)
? new BBWritableMemoryImpl(
unsafeObj, nativeBaseOffset, getRegionOffset(offsetBytes), capacityBytes,
type, getByteBuffer())
type, getByteBuffer(), memReqSvr)
: new BBNonNativeWritableMemoryImpl(
unsafeObj, nativeBaseOffset, getRegionOffset(offsetBytes), capacityBytes,
type, getByteBuffer());
type, getByteBuffer(), memReqSvr);
}

@Override
Expand All @@ -80,6 +85,12 @@ public ByteBuffer getByteBuffer() {
return byteBuf;
}

@Override
public MemoryRequestServer getMemoryRequestServer() {
assertValid();
return memReqSvr; //cannot be null
}

@Override
long getNativeBaseOffset() {
return nativeBaseOffset;
Expand Down
Expand Up @@ -22,6 +22,8 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

import org.apache.datasketches.memory.MemoryRequestServer;

/**
* Implementation of {@link WritableBufferImpl} for ByteBuffer, native byte order.
*
Expand All @@ -33,6 +35,7 @@ final class BBWritableBufferImpl extends NativeWritableBufferImpl {
private final Object unsafeObj;
private final long nativeBaseOffset; //used to compute cumBaseOffset
private final ByteBuffer byteBuf; //holds a reference to a ByteBuffer until we are done with it.
private MemoryRequestServer memReqSvr = null; //cannot be final;
private final byte typeId;

BBWritableBufferImpl(
Expand Down Expand Up @@ -81,6 +84,12 @@ public ByteBuffer getByteBuffer() {
return byteBuf;
}

@Override
public MemoryRequestServer getMemoryRequestServer() {
assertValid();
return memReqSvr; //cannot be null
}

@Override
long getNativeBaseOffset() {
return nativeBaseOffset;
Expand Down
Expand Up @@ -22,6 +22,8 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

import org.apache.datasketches.memory.MemoryRequestServer;

/**
* Implementation of {@link WritableMemoryImpl} for ByteBuffer, native byte order.
*
Expand All @@ -33,6 +35,7 @@ final class BBWritableMemoryImpl extends NativeWritableMemoryImpl {
private final Object unsafeObj;
private final long nativeBaseOffset; //used to compute cumBaseOffset
private final ByteBuffer byteBuf; //holds a reference to a ByteBuffer until we are done with it.
private MemoryRequestServer memReqSvr = null; //cannot be final;
private final byte typeId;

BBWritableMemoryImpl(
Expand All @@ -41,11 +44,13 @@ final class BBWritableMemoryImpl extends NativeWritableMemoryImpl {
final long regionOffset,
final long capacityBytes,
final int typeId,
final ByteBuffer byteBuf) {
final ByteBuffer byteBuf,
final MemoryRequestServer memReqSvr) {
super(unsafeObj, nativeBaseOffset, regionOffset, capacityBytes);
this.unsafeObj = unsafeObj;
this.nativeBaseOffset = nativeBaseOffset;
this.byteBuf = byteBuf;
this.memReqSvr = (memReqSvr == null) ? defaultMemReqSvr : memReqSvr;
this.typeId = (byte) (id | (typeId & 0x7));
}

Expand All @@ -56,10 +61,10 @@ BaseWritableMemoryImpl toWritableRegion(final long offsetBytes, final long capac
return Util.isNativeByteOrder(byteOrder)
? new BBWritableMemoryImpl(
unsafeObj, nativeBaseOffset, getRegionOffset(offsetBytes), capacityBytes,
type, getByteBuffer())
type, getByteBuffer(), memReqSvr)
: new BBNonNativeWritableMemoryImpl(
unsafeObj, nativeBaseOffset, getRegionOffset(offsetBytes), capacityBytes,
type, getByteBuffer());
type, getByteBuffer(), memReqSvr);
}

@Override
Expand All @@ -80,6 +85,12 @@ public ByteBuffer getByteBuffer() {
return byteBuf;
}

@Override
public MemoryRequestServer getMemoryRequestServer() {
assertValid();
return memReqSvr; //cannot be null
}

@Override
long getNativeBaseOffset() {
return nativeBaseOffset;
Expand Down
Expand Up @@ -29,7 +29,6 @@
import java.util.concurrent.atomic.AtomicLong;

import org.apache.datasketches.memory.BaseState;
import org.apache.datasketches.memory.DefaultMemoryRequestServer;
import org.apache.datasketches.memory.MemoryRequestServer;

/**
Expand All @@ -47,7 +46,7 @@ public abstract class BaseStateImpl implements BaseState {
static final AtomicLong currentDirectMemoryMapAllocations_ = new AtomicLong();
static final AtomicLong currentDirectMemoryMapAllocated_ = new AtomicLong();

static final MemoryRequestServer defaultMemReqSvr = new DefaultMemoryRequestServer();


//class type IDs. Do not change the bit orders
// 0000 0XXX
Expand Down
Expand Up @@ -89,17 +89,18 @@ static BaseWritableMemoryImpl wrapHeapArray(final Object arr, final long offsetB
}

static BaseWritableMemoryImpl wrapByteBuffer(
final ByteBuffer byteBuf, final boolean localReadOnly, final ByteOrder byteOrder) {
final ByteBuffer byteBuf, final boolean localReadOnly, final ByteOrder byteOrder,
final MemoryRequestServer memReqSvr) {
final AccessByteBuffer abb = new AccessByteBuffer(byteBuf);
if (abb.resourceReadOnly && !localReadOnly) {
throw new ReadOnlyException("ByteBuffer is Read Only");
}
final int typeId = (abb.resourceReadOnly || localReadOnly) ? READONLY : 0;
return Util.isNativeByteOrder(byteOrder)
? new BBWritableMemoryImpl(abb.unsafeObj, abb.nativeBaseOffset,
abb.regionOffset, abb.capacityBytes, typeId, byteBuf)
abb.regionOffset, abb.capacityBytes, typeId, byteBuf, memReqSvr)
: new BBNonNativeWritableMemoryImpl(abb.unsafeObj, abb.nativeBaseOffset,
abb.regionOffset, abb.capacityBytes, typeId, byteBuf);
abb.regionOffset, abb.capacityBytes, typeId, byteBuf, memReqSvr);
}

@SuppressWarnings("resource")
Expand Down
Expand Up @@ -48,14 +48,14 @@ public static BufferImpl wrap(final ByteBuffer byteBuf) {

public static BufferImpl wrap(final ByteBuffer byteBuf, final ByteOrder byteOrder) {
final BaseWritableMemoryImpl wmem =
BaseWritableMemoryImpl.wrapByteBuffer(byteBuf, true, byteOrder);
BaseWritableMemoryImpl.wrapByteBuffer(byteBuf, true, byteOrder, defaultMemReqSvr);
final WritableBufferImpl wbuf = wmem.asWritableBuffer(true, byteOrder);
wbuf.setStartPositionEnd(0, byteBuf.position(), byteBuf.limit());
return wbuf;
}

//MAP
//Use MemoryImpl for mapping files and the asBuffer()
//Use MemoryImpl for mapping files and then call asBuffer()

//DUPLICATES
@Override
Expand Down
Expand Up @@ -98,11 +98,4 @@ public boolean isValid() {
return valid.get();
}

@Override
void checkValid() {
if (!this.isValid()) {
throw new IllegalStateException("MemoryImpl not valid.");
}
}

}
Expand Up @@ -96,11 +96,4 @@ public boolean isValid() {
return valid.get();
}

@Override
void checkValid() {
if (!this.isValid()) {
throw new IllegalStateException("MemoryImpl not valid.");
}
}

}

0 comments on commit 310a8b7

Please sign in to comment.