From 14e75cbbacd59ad661c65d1a08caa904009c7b03 Mon Sep 17 00:00:00 2001 From: Rob Austin Date: Sun, 8 Feb 2015 16:47:17 +0000 Subject: [PATCH] QUEUE-14 improved bytes queue ring buffer --- .../queue/impl/ringbuffer/BytesQueue.java | 46 +++++++++++-------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/ringbuffer/BytesQueue.java b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/ringbuffer/BytesQueue.java index 6fc51781ca..c6cc8ae881 100644 --- a/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/ringbuffer/BytesQueue.java +++ b/chronicle-queue/src/main/java/net/openhft/chronicle/queue/impl/ringbuffer/BytesQueue.java @@ -50,11 +50,11 @@ public boolean offer(@NotNull Bytes bytes) throws InterruptedException { queue.readupto.set(writeLocation); queue.write(writeLocation, bytes.remaining()); - long nextWriteLocation = queue.nextOffset(writeLocation, 8); + long offset = queue.nextOffset(writeLocation, 8); - nextWriteLocation = queue.write(bytes, nextWriteLocation); + offset = queue.write(bytes, offset); - writeLocation = nextWriteLocation; + writeLocation = offset; return true; } @@ -95,37 +95,26 @@ public Bytes poll(@NotNull Bytes using) throws InterruptedException, IllegalStat continue; long elementSize = queue.readLong(offset); - if (elementSize == -1) { - using.position(0); - using.limit(0); - return using; - } // checks that the 'using' bytes is large enough checkSize(using, elementSize); offset = queue.nextOffset(offset, 8); assert offset < queue.capacity(); - for (int i = 0; i < elementSize; offset = queue.blockForReadSpace(offset), i++) { - - if (offset == -1) { - using.position(0); - using.limit(0); - return null; - } - byte b = queue.read(offset); - using.write(b); - } + using.limit(using.position() + elementSize); + offset = queue.read(using, offset); queue.writeupto.set(offset); queue.readLocation.set(offset); - return using.flip(); + return using; } } + + private static void checkSize(@NotNull Bytes using, long elementSize) { if (using.remaining() < elementSize) throw new IllegalStateException("requires size=" + elementSize + " " + @@ -176,6 +165,25 @@ private long write(@NotNull Bytes bytes, long offset) { } + private long read(@NotNull Bytes bytes, long offset) { + + long endOffSet = nextOffset(offset, bytes.remaining()); + + if (endOffSet >= offset) { + bytes.write(buffer, offset, bytes.remaining()); + bytes.flip(); + return endOffSet; + } + + bytes.write(buffer, offset, capacity() - offset); + bytes.write(buffer, 1, bytes.remaining()); + bytes.flip(); + + return endOffSet; + + } + + boolean isBytesBigEndian() { try { putLongB(0, 1);