Skip to content

Commit

Permalink
QUEUE-14 improved bytes queue ring buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
Rob Austin committed Feb 8, 2015
1 parent a404283 commit 14e75cb
Showing 1 changed file with 27 additions and 19 deletions.
Expand Up @@ -50,11 +50,11 @@ public boolean offer(@NotNull Bytes bytes) throws InterruptedException {
queue.readupto.set(writeLocation); queue.readupto.set(writeLocation);
queue.write(writeLocation, bytes.remaining()); queue.write(writeLocation, bytes.remaining());


long nextWriteLocation = queue.nextOffset(writeLocation, 8); long offset = queue.nextOffset(writeLocation, 8);


nextWriteLocation = queue.write(bytes, nextWriteLocation); offset = queue.write(bytes, offset);


writeLocation = nextWriteLocation; writeLocation = offset;
return true; return true;


} }
Expand Down Expand Up @@ -95,37 +95,26 @@ public Bytes poll(@NotNull Bytes using) throws InterruptedException, IllegalStat
continue; continue;


long elementSize = queue.readLong(offset); long elementSize = queue.readLong(offset);
if (elementSize == -1) {
using.position(0);
using.limit(0);
return using;
}


// checks that the 'using' bytes is large enough // checks that the 'using' bytes is large enough
checkSize(using, elementSize); checkSize(using, elementSize);


offset = queue.nextOffset(offset, 8); offset = queue.nextOffset(offset, 8);
assert offset < queue.capacity(); assert offset < queue.capacity();
for (int i = 0; i < elementSize; offset = queue.blockForReadSpace(offset), i++) {

if (offset == -1) {
using.position(0);
using.limit(0);
return null;
}


byte b = queue.read(offset); using.limit(using.position() + elementSize);
using.write(b);
}


offset = queue.read(using, offset);
queue.writeupto.set(offset); queue.writeupto.set(offset);
queue.readLocation.set(offset); queue.readLocation.set(offset);


return using.flip(); return using;
} }


} }




private static void checkSize(@NotNull Bytes using, long elementSize) { private static void checkSize(@NotNull Bytes using, long elementSize) {
if (using.remaining() < elementSize) if (using.remaining() < elementSize)
throw new IllegalStateException("requires size=" + elementSize + " " + throw new IllegalStateException("requires size=" + elementSize + " " +
Expand Down Expand Up @@ -176,6 +165,25 @@ private long write(@NotNull Bytes bytes, long offset) {


} }


private long read(@NotNull Bytes bytes, long offset) {

long endOffSet = nextOffset(offset, bytes.remaining());

if (endOffSet >= offset) {
bytes.write(buffer, offset, bytes.remaining());
bytes.flip();
return endOffSet;
}

bytes.write(buffer, offset, capacity() - offset);
bytes.write(buffer, 1, bytes.remaining());
bytes.flip();

return endOffSet;

}


boolean isBytesBigEndian() { boolean isBytesBigEndian() {
try { try {
putLongB(0, 1); putLongB(0, 1);
Expand Down

0 comments on commit 14e75cb

Please sign in to comment.