Skip to content

Commit

Permalink
HCOLL-291 Remove unnecessary entry size calculation in tcp replication
Browse files Browse the repository at this point in the history
  • Loading branch information
Rob Austin committed Jan 12, 2015
1 parent eda4a3a commit 58bf971
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -474,52 +474,69 @@ public boolean shouldBeIgnored(final Bytes entry, final int chronicleId) {
@Override
public boolean onEntry(final Bytes entry, final int chronicleId) {

long startOfEntry = entry.position();
long pos0 = in.position();
long start = 0;
try {
// used to denote that this is not a stateless map event
in.writeByte(StatelessChronicleMap.EventId.STATEFUL_UPDATE.ordinal());

long sizeLocation = in.position();

// used to denote that this is not a stateless map event
in.writeByte(StatelessChronicleMap.EventId.STATEFUL_UPDATE.ordinal());
// this is where we will store the size of the entry
in.skip(SIZE_OF_SIZE);

long sizeLocation = in.position();
start = in.position();

// this is where we will store the size of the entry
in.skip(SIZE_OF_SIZE);
externalizable.writeExternalEntry(entry, in, chronicleId);

int entrySize = externalizable.sizeOfEntry(entry, chronicleId);
if (in.position() == start) {
in.position(pos0);
return false;
}

if (entrySize > in.remaining()) {
// write the length of the entry, just before the start, so when we read it back
// we read the length of the entry first and hence know how many preceding writer to read
final long bytesWritten = (int) (in.position() - start);

long newSize = in.position() + entrySize;
if (bytesWritten > Integer.MAX_VALUE)
throw new IllegalStateException("entry too large, " +
"entries are limited to a size of " + Integer.MAX_VALUE);

// This can occur when we pack a number of entries into the buffer and the
// last entry is very large.
if (newSize > Integer.MAX_VALUE)
return false;
if (LOG.isDebugEnabled())
LOG.debug("sending entry of entrySize=" + (int) bytesWritten);

resizeBuffer((int) newSize);
}
in.writeInt(sizeLocation, (int) bytesWritten);

final long start = in.position();
} catch (IllegalArgumentException e) {

externalizable.writeExternalEntry(entry, in, chronicleId);
// reset the entries position
entry.position(startOfEntry);

if (in.position() == start) {
// reset the in-buffers position
in.position(pos0);
return false;
}
long remaining = in.remaining();
int entrySize = externalizable.sizeOfEntry(entry, chronicleId);

// write the length of the entry, just before the start, so when we read it back
// we read the length of the entry first and hence know how many preceding writer to read
final long bytesWritten = (int) (in.position() - start);

if (bytesWritten > Integer.MAX_VALUE)
throw new IllegalStateException("entry too large, the entry size=" + entrySize + ", " +
"entries are limited to a size of " + Integer.MAX_VALUE);
if (entrySize > remaining) {

if (LOG.isDebugEnabled())
LOG.debug("sending entry of entrySize=" + (int) bytesWritten);
long newSize = start + entrySize;

// This can occur when we pack a number of entries into the buffer and the
// last entry is very large.
if (newSize > Integer.MAX_VALUE)
return false;

in.writeInt(sizeLocation, (int) bytesWritten);
resizeBuffer((int) newSize);

in.position(pos0);
entry.position(startOfEntry);
return onEntry(entry, chronicleId);
} else
throw e;

}
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1499,7 +1499,7 @@ public boolean nextEntry(@NotNull final EntryCallback entryCallback, final int c
continue;
}

// it may not be successful if the buffer can not be resided so we will
// it may not be successful if the buffer can not be re-sized so we will
// process it later, by NOT clearing the changes.clear(position)
final boolean success = entryCallback.onEntry(entry, chronicleId);
entryCallback.onAfterEntry();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public void testReplicationWithAcquireUsingLocked() throws Exception {

try (ChronicleMap<CharSequence, TestInstrumentVOInterface> map2 =
ChronicleMapBuilder.of(CharSequence.class,
TestInstrumentVOInterface.class)
.replication((byte) 2, config2)
.create()) {
TestInstrumentVOInterface.class)
.replication((byte) 2, config2)
.create()) {

map2a = map2;

Expand All @@ -59,6 +59,7 @@ public void testReplicationWithAcquireUsingLocked() throws Exception {
Thread.sleep(1);
}


Assert.assertEquals(map1a, map2a);
}
}
Expand Down Expand Up @@ -91,11 +92,11 @@ public void testReplicationWithEmptyOffHeapObject() throws Exception {

try (ChronicleMap<CharSequence, TestInstrumentVOInterface> map2 =
ChronicleMapBuilder.of(CharSequence.class,
TestInstrumentVOInterface.class)
.putReturnsNull(true)
.removeReturnsNull(true)
.replication((byte) 2, config2)
.entries(5000L).averageKeySize("hello".length()).create()) {
TestInstrumentVOInterface.class)
.putReturnsNull(true)
.removeReturnsNull(true)
.replication((byte) 2, config2)
.entries(5000L).averageKeySize("hello".length()).create()) {

map2a = map2;

Expand Down Expand Up @@ -144,11 +145,11 @@ public void testReplicationWithOffHeapObject() throws Exception {

try (ChronicleMap<CharSequence, TestInstrumentVOInterface> map2 =
ChronicleMapBuilder.of(CharSequence.class,
TestInstrumentVOInterface.class)
.putReturnsNull(true)
.removeReturnsNull(true)
.replication((byte) 2, config2)
.entries(5000L).averageKeySize("hello".length()).create()) {
TestInstrumentVOInterface.class)
.putReturnsNull(true)
.removeReturnsNull(true)
.replication((byte) 2, config2)
.entries(5000L).averageKeySize("hello".length()).create()) {

map2a = map2;

Expand Down

0 comments on commit 58bf971

Please sign in to comment.