Skip to content
Permalink
Browse files
Merge pull request #139 from apache/MajorRefactorAndFix
Major refactor and fix
  • Loading branch information
leerho committed Aug 16, 2021
2 parents e3a0c37 + f5ddc7d commit bcc87fff8dfc4f2a2e415a55bfe7c22330680ccd
Show file tree
Hide file tree
Showing 57 changed files with 1,051 additions and 1,833 deletions.
@@ -21,10 +21,10 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;

import org.apache.datasketches.memory.BaseState;
import org.apache.datasketches.memory.DefaultMemoryRequestServer;
import org.apache.datasketches.memory.MemoryRequestServer;
import org.apache.datasketches.memory.WritableHandle;
import org.apache.datasketches.memory.WritableMemory;
@@ -72,8 +72,13 @@ public void checkDefaultMemoryRequestServer() throws Exception {

int longs2 = 64;
int bytes2 = longs2 << 3;
MemoryRequestServer memReqSvr = origWmem.getMemoryRequestServer();
WritableMemory newWmem = memReqSvr.request(bytes2);
MemoryRequestServer memReqSvr;
if (BaseState.defaultMemReqSvr == null) {
memReqSvr = new DefaultMemoryRequestServer();
} else {
memReqSvr = origWmem.getMemoryRequestServer();
}
WritableMemory newWmem = memReqSvr.request(origWmem, bytes2);
assertFalse(newWmem.isDirect()); //on heap by default
for (int i = 0; i < longs2; i++) {
newWmem.putLong(i << 3, i);
@@ -85,18 +90,9 @@ public void checkDefaultMemoryRequestServer() throws Exception {
}
}

@Test
public void checkNullMemoryRequestServer() throws Exception {
try (WritableHandle wh = WritableMemory.allocateDirect(128, Util.nativeByteOrder, null)) {
WritableMemory wmem = wh.getWritable();
assertNotNull(wmem.getMemoryRequestServer());
}
}


@Test
public void checkNonNativeDirect() throws Exception {
try (WritableHandle h = WritableMemory.allocateDirect(128, Util.nonNativeByteOrder, null)) {
try (WritableHandle h = WritableMemory.allocateDirect(128, Util.NON_NATIVE_BYTE_ORDER, null)) {
WritableMemory wmem = h.getWritable();
wmem.putChar(0, (char) 1);
assertEquals(wmem.getByte(1), (byte) 1);
@@ -41,10 +41,10 @@
import org.apache.datasketches.memory.BaseState;
import org.apache.datasketches.memory.MapHandle;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.ReadOnlyException;
import org.apache.datasketches.memory.WritableHandle;
import org.apache.datasketches.memory.WritableMapHandle;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.memory.internal.ReadOnlyException;
import org.apache.datasketches.memory.internal.Util;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -147,7 +147,7 @@ public void checkNonNativeFile() throws Exception {
file.deleteOnExit(); //comment out if you want to examine the file.

final long bytes = 8;
try (WritableMapHandle h = WritableMemory.writableMap(file, 0L, bytes, Util.nonNativeByteOrder)) {
try (WritableMapHandle h = WritableMemory.writableMap(file, 0L, bytes, Util.NON_NATIVE_BYTE_ORDER)) {
WritableMemory wmem = h.getWritable();
wmem.putChar(0, (char) 1);
assertEquals(wmem.getByte(1), (byte) 1);
@@ -166,15 +166,17 @@ public void testMapException() throws IOException {
public void simpleMap2() throws Exception {
File file = getResourceFile("GettysburgAddress.txt");
assertTrue(isFileReadOnly(file));
try (WritableMapHandle rh = WritableMemory.writableMap(file)) { //throws
try (WritableMapHandle rh =
WritableMemory.writableMap(file)) { //throws
//
}
}

@Test(expectedExceptions = IllegalArgumentException.class)
@Test(expectedExceptions = ReadOnlyException.class)
public void checkOverLength() throws Exception {
File file = getResourceFile("GettysburgAddress.txt");
try (WritableMapHandle rh = WritableMemory.writableMap(file, 0, 1 << 20, ByteOrder.nativeOrder())) {
try (WritableMapHandle rh =
WritableMemory.writableMap(file, 0, 1 << 20, ByteOrder.nativeOrder())) {
//
} catch (IOException e) {
throw new RuntimeException(e);
@@ -21,9 +21,9 @@

import static org.testng.Assert.fail;

import org.apache.datasketches.memory.WritableHandle;
import org.apache.datasketches.memory.Buffer;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableHandle;
import org.apache.datasketches.memory.WritableMemory;
import org.testng.annotations.Test;

@@ -82,7 +82,9 @@ public void checkCheckValid() throws Exception {
wmem = hand.getWritable();
buf = wmem.asBuffer();
}
@SuppressWarnings("unused")
Memory mem = buf.asMemory();
try {
@SuppressWarnings("unused")
Memory mem = buf.asMemory();
} catch (AssertionError ae) { }
}
}
@@ -30,9 +30,9 @@
import org.apache.datasketches.memory.Buffer;
import org.apache.datasketches.memory.DefaultMemoryRequestServer;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.ReadOnlyException;
import org.apache.datasketches.memory.WritableBuffer;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.memory.internal.ReadOnlyException;
import org.testng.annotations.Test;

@SuppressWarnings("javadoc")
@@ -24,9 +24,9 @@

import java.nio.ByteBuffer;

import org.apache.datasketches.memory.WritableHandle;
import org.apache.datasketches.memory.Buffer;
import org.apache.datasketches.memory.WritableBuffer;
import org.apache.datasketches.memory.WritableHandle;
import org.apache.datasketches.memory.WritableMemory;
import org.testng.annotations.Test;

@@ -22,7 +22,7 @@
import java.nio.ByteBuffer;

import org.apache.datasketches.memory.Buffer;
import org.apache.datasketches.memory.internal.ReadOnlyException;
import org.apache.datasketches.memory.ReadOnlyException;
import org.apache.datasketches.memory.WritableBuffer;
import org.apache.datasketches.memory.WritableMemory;
import org.testng.annotations.Test;
@@ -27,6 +27,7 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

import org.apache.datasketches.memory.DefaultMemoryRequestServer;
import org.apache.datasketches.memory.MemoryRequestServer;
import org.apache.datasketches.memory.WritableMemory;
import org.testng.annotations.Test;
@@ -52,38 +53,58 @@ public class DruidIssue11544Test {
@Test
public void withByteBuffer() {
int initialLongs = 1000;
int initialMemSize = initialLongs * 8;
ByteBuffer bb = ByteBuffer.allocateDirect(initialMemSize);
int size1 = initialLongs * 8;

//Start with a ByteBuffer
ByteBuffer bb = ByteBuffer.allocateDirect(size1);
bb.order(ByteOrder.nativeOrder());

//Fill the byte buffer
for (int i = 0; i < initialLongs; i++) { bb.putLong(i * 8, i); }
//Wrap bb into WritableMemory
WritableMemory mem1 = WritableMemory.writableWrap(bb);
assertTrue(mem1.isDirect()); //confirm mem1 is off-heap

//Wrap, assuming default MemoryRequestServer
WritableMemory mem = WritableMemory.writableWrap(bb);
assertTrue(mem.isDirect()); //confirm mem is off-heap
//Acquire the DefaultMemoryRequestServer
//NOTE: it is a policy decision to allow the DefaultMemoryServer to be set as a default.
// It might be set to null. So we need to check what the current policy is.
MemoryRequestServer svr = mem1.getMemoryRequestServer();
if (svr == null) {
svr = new DefaultMemoryRequestServer();
}
assertNotNull(svr);

//Request Bigger Memory
MemoryRequestServer svr = mem.getMemoryRequestServer();
assertNotNull(svr); //before the fix, this was null.
int size2 = size1 * 2;
WritableMemory mem2 = svr.request(mem1, size2);

//Confirm that mem2 is on the heap (the default) and 2X size1
assertFalse(mem2.isDirect());
assertEquals(mem2.getCapacity(), size2);

//Move data to new memory
mem1.copyTo(0, mem2, 0, size1);

WritableMemory newMem = svr.request(initialMemSize * 2);
//Prepare to request deallocation
//In the DefaultMemoryRequestServer, this is a no-op, so nothing is actually deallocated.
svr.requestClose(mem1, mem2);
assertTrue(mem1.isValid());
assertTrue(mem2.isValid());

//Confirm that newMem is on the heap (the default) and 2X size
assertFalse(newMem.isDirect());
assertEquals(newMem.getCapacity(), 2 * initialMemSize);
//Now we are on the heap and need to grow again:
int size3 = size2 * 2;
WritableMemory mem3 = svr.request(mem2, size3);

//Confirm that mem3 is still on the heap and 2X of size2
assertFalse(mem3.isDirect());
assertEquals(mem3.getCapacity(), size3);

//Move data to new memory
mem.copyTo(0, newMem, 0, initialMemSize);
mem2.copyTo(0, mem3, 0, size2);

//Prepare to request deallocation
WritableMemory oldMem = mem;
mem = newMem;

//In the DefaultMemoryRequestServer, this is a no-op, so nothing is actually deallocated.
svr.requestClose(oldMem, newMem);
assertTrue(oldMem.isValid());
assertTrue(mem.isValid());
svr.requestClose(mem2, mem3); //No-op
assertTrue(mem2.isValid());
assertTrue(mem3.isValid());
}

}
@@ -37,21 +37,25 @@
public class ExampleMemoryRequestServerTest {

/**
* In this version all of the memory allocations are done through the MemoryRequestServer
* This version is without a TWR block.all of the memory allocations are done through the MemoryRequestServer
* and each is closed by the MemoryClient when it is done with each.
*/
@SuppressWarnings("resource")
@Test
public void checkExampleMemoryRequestServer1() {
public void checkExampleMemoryRequestServer1() throws Exception {
int bytes = 8;
ExampleMemoryRequestServer svr = new ExampleMemoryRequestServer();
WritableMemory wMem = svr.request(bytes);
MemoryClient client = new MemoryClient(wMem);
client.process();
svr.cleanup();
try (WritableHandle wh = WritableMemory.allocateDirect(8)) {
WritableMemory memStart = wh.getWritable();
WritableMemory wMem = svr.request(memStart, bytes);
MemoryClient client = new MemoryClient(wMem);
client.process();
svr.cleanup();
}
}

/**
* In this version the first memory allocation is done separately.
* In this version the first memory allocation is done up front in a TWR block.
* And then the MemoryClient allocates new memories as needed, which are then closed
* by the MemoryClient when it is done with the new memory allocations.
* The initial allocation stays open until the end where it is closed at the end of the
@@ -63,18 +67,20 @@ public void checkExampleMemoryRequestServer2() throws Exception {
int bytes = 8;
ExampleMemoryRequestServer svr = new ExampleMemoryRequestServer();
try (WritableHandle handle = WritableMemory.allocateDirect(bytes, ByteOrder.nativeOrder(), svr)) {
WritableMemory wMem = handle.getWritable();
MemoryClient client = new MemoryClient(wMem);
WritableMemory memStart = handle.getWritable();
MemoryClient client = new MemoryClient(memStart);
client.process();
svr.cleanup();
svr.cleanup(); //just to be sure all are closed.
}
}

@SuppressWarnings("resource")
@Test(expectedExceptions = IllegalArgumentException.class)
public void checkZeroCapacity() {
public void checkZeroCapacity() throws Exception {
ExampleMemoryRequestServer svr = new ExampleMemoryRequestServer();
WritableMemory.allocateDirect(0, ByteOrder.nativeOrder(), svr);
try (WritableHandle wh = WritableMemory.allocateDirect(0, ByteOrder.nativeOrder(), svr)) {

}
}

/**
@@ -88,25 +94,25 @@ static class MemoryClient {
WritableMemory smallMem;
MemoryRequestServer svr;

MemoryClient(WritableMemory wmem) {
smallMem = wmem;
svr = wmem.getMemoryRequestServer();
MemoryClient(WritableMemory memStart) {
smallMem = memStart;
svr = memStart.getMemoryRequestServer();
}

void process() {
long cap1 = smallMem.getCapacity();
smallMem.fill((byte) 1); //fill it, but not big enough
println(smallMem.toHexString("Small", 0, (int)cap1));

WritableMemory bigMem = svr.request(2 * cap1); //get bigger mem
WritableMemory bigMem = svr.request(smallMem, 2 * cap1); //get bigger mem
long cap2 = bigMem.getCapacity();
smallMem.copyTo(0, bigMem, 0, cap1); //copy data from small to big
svr.requestClose(smallMem, bigMem); //done with smallMem, release it

bigMem.fill(cap1, cap1, (byte) 2); //fill the rest of bigMem, still not big enough
println(bigMem.toHexString("Big", 0, (int)cap2));

WritableMemory giantMem = svr.request(2 * cap2); //get giant mem
WritableMemory giantMem = svr.request(bigMem, 2 * cap2); //get giant mem
long cap3 = giantMem.getCapacity();
bigMem.copyTo(0, giantMem, 0, cap2); //copy data from small to big
svr.requestClose(bigMem, giantMem); //done with bigMem, release it
@@ -119,15 +125,17 @@ void process() {

/**
* This example MemoryRequestServer is simplistic but demonstrates one of many ways to
* possibly manage the continuous requests for larger memory.
* possibly manage the continuous requests for larger memory and to track the associations between
* handles and their associated memory.
*/
public static class ExampleMemoryRequestServer implements MemoryRequestServer {
IdentityHashMap<WritableMemory, WritableHandle> map = new IdentityHashMap<>();

@SuppressWarnings("resource")
@Override
public WritableMemory request(long capacityBytes) {
WritableHandle handle = WritableMemory.allocateDirect(capacityBytes, ByteOrder.nativeOrder(), this);
public WritableMemory request(WritableMemory currentWMem, long capacityBytes) {
ByteOrder order = currentWMem.getTypeByteOrder();
WritableHandle handle = WritableMemory.allocateDirect(capacityBytes, order, this);
WritableMemory wmem = handle.getWritable();
map.put(wmem, handle); //We track the newly allocated memory and its handle.
return wmem;
@@ -137,14 +145,12 @@ public WritableMemory request(long capacityBytes) {
@Override
//here we actually release it, in reality it might be a lot more complex.
public void requestClose(WritableMemory memToRelease, WritableMemory newMemory) {
if (memToRelease != null) {
WritableHandle handle = map.get(memToRelease);
if (handle != null && handle.getWritable() == memToRelease) {
try {
handle.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
WritableHandle handle = map.get(memToRelease);
if (handle != null && handle.getWritable() == memToRelease) {
try {
handle.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
@@ -156,7 +162,7 @@ public void cleanup() {
v.close();
} catch (Exception e) {
throw new RuntimeException(e);
} //harmless
}
});
}
}

0 comments on commit bcc87ff

Please sign in to comment.