Skip to content

Commit

Permalink
QUEUE-14 improved write buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
Rob Austin committed Feb 7, 2015
1 parent e45c186 commit f176043
Showing 1 changed file with 23 additions and 7 deletions.
Expand Up @@ -52,11 +52,7 @@ public boolean offer(@NotNull Bytes bytes) throws InterruptedException {


long nextWriteLocation = queue.nextlocation(writeLocation, 8); long nextWriteLocation = queue.nextlocation(writeLocation, 8);


for (; bytes.remaining() > 0; nextWriteLocation = nextWriteLocation = queue.write(bytes, nextWriteLocation);
queue.nextWrite(nextWriteLocation)) {
assert nextWriteLocation != -1;
queue.write(nextWriteLocation, bytes.readByte());
}


writeLocation = nextWriteLocation; writeLocation = nextWriteLocation;
return true; return true;
Expand All @@ -76,6 +72,7 @@ public boolean offer(@NotNull Bytes bytes) throws InterruptedException {
} }
} }



/** /**
* Retrieves and removes the head of this queue, or returns {@code null} if this queue is * Retrieves and removes the head of this queue, or returns {@code null} if this queue is
* empty. * empty.
Expand Down Expand Up @@ -211,14 +208,15 @@ private static byte long0(long x) {
} }




private class VanillaByteQueue { private static class VanillaByteQueue {




final AtomicLong readLocation = new AtomicLong(); final AtomicLong readLocation = new AtomicLong();
final AtomicLong writeLocation = new AtomicLong(); final AtomicLong writeLocation = new AtomicLong();


final AtomicLong readupto = new AtomicLong(); final AtomicLong readupto = new AtomicLong();
final AtomicLong writeupto = new AtomicLong(); final AtomicLong writeupto = new AtomicLong();

private boolean isBytesBigEndian; private boolean isBytesBigEndian;
private final Bytes bytes; private final Bytes bytes;


Expand All @@ -229,6 +227,24 @@ public VanillaByteQueue(int size) {
isBytesBigEndian = isBytesBigEndian(); isBytesBigEndian = isBytesBigEndian();
} }


private long write(Bytes bytes, long offset) {

long endOffSet = nextlocation(offset, bytes.remaining());

if (endOffSet > offset) {
this.bytes.write(offset, bytes);
return endOffSet;
}


for (; bytes.remaining() > 0; offset =
nextWrite(offset)) {
assert offset != -1;
write(offset, bytes.readByte());
}

return offset;
}


boolean isBytesBigEndian() { boolean isBytesBigEndian() {
try { try {
Expand Down Expand Up @@ -360,7 +376,7 @@ long nextlocation(long location) {
return nextlocation(location, 1); return nextlocation(location, 1);
} }


long nextlocation(long location, int increment) { long nextlocation(long location, long increment) {


long result = location + increment; long result = location + increment;
if (result < capacity()) if (result < capacity())
Expand Down

0 comments on commit f176043

Please sign in to comment.