diff --git a/src/main/java/net/openhft/chronicle/map/AbstractChannelReplicator.java b/src/main/java/net/openhft/chronicle/map/AbstractChannelReplicator.java index 4834549f2d..fd8d7c8854 100644 --- a/src/main/java/net/openhft/chronicle/map/AbstractChannelReplicator.java +++ b/src/main/java/net/openhft/chronicle/map/AbstractChannelReplicator.java @@ -451,7 +451,7 @@ public Bytes resizeBuffer(int size) { final ByteBuffer result = ByteBuffer.allocate(size).order(ByteOrder.nativeOrder()); final long bytesPosition = in.position(); - in = new ByteBufferBytes(result.slice()); + in = new ByteBufferBytes(result); out.position(0); out.limit((int) bytesPosition); diff --git a/src/main/java/net/openhft/chronicle/map/StatelessChronicleMap.java b/src/main/java/net/openhft/chronicle/map/StatelessChronicleMap.java index a6d2d04497..aae770ce55 100644 --- a/src/main/java/net/openhft/chronicle/map/StatelessChronicleMap.java +++ b/src/main/java/net/openhft/chronicle/map/StatelessChronicleMap.java @@ -67,13 +67,13 @@ class StatelessChronicleMap implements ChronicleMap, Closeable, Clon private ByteBufferBytes inBytes; @NotNull - private final ReaderWithSize keyReaderWithSize; + private ReaderWithSize keyReaderWithSize; @NotNull - private final WriterWithSize keyWriterWithSize; + private WriterWithSize keyWriterWithSize; @NotNull - private final ReaderWithSize valueReaderWithSize; + private ReaderWithSize valueReaderWithSize; @NotNull - private final WriterWithSize valueWriterWithSize; + private WriterWithSize valueWriterWithSize; private SocketChannel clientChannel; @Nullable @@ -83,13 +83,20 @@ class StatelessChronicleMap implements ChronicleMap, Closeable, Clon private final Class kClass; private final Class vClass; - private final boolean putReturnsNull; - private final boolean removeReturnsNull; + private boolean putReturnsNull; + private boolean removeReturnsNull; private final ReentrantLock inBytesLock = new ReentrantLock(true); private final ReentrantLock outBytesLock = new ReentrantLock(); + private final BufferResizer outBufferResizer = new BufferResizer() { + @Override + public Bytes resizeBuffer(int newCapacity) { + return resizeBufferOutBuffer(newCapacity); + + } + }; static enum EventId { HEARTBEAT, @@ -119,7 +126,9 @@ static enum EventId { PUT_ALL_WITHOUT_ACC, HASH_CODE, MAP_FOR_KEY, - PUT_MAPPED + PUT_MAPPED, + KEY_BUILDER, + VALUE_BUILDER } @@ -141,38 +150,35 @@ void identifier(int identifier) { this.config = config; this.jsonConverters = chronicleMapBuilder.jsonConverters(); - final BufferResizer outBufferResizer = new BufferResizer() { - @Override - public Bytes resizeBuffer(int newCapacity) { - return resizeBufferOutBuffer(newCapacity); - - } - }; - - keyReaderWithSize = new ReaderWithSize(chronicleMapBuilder.keyBuilder); - keyWriterWithSize = new WriterWithSize(chronicleMapBuilder.keyBuilder, outBufferResizer); - - valueReaderWithSize = new ReaderWithSize(chronicleMapBuilder.valueBuilder); - valueWriterWithSize = new WriterWithSize(chronicleMapBuilder.valueBuilder, - outBufferResizer); + outBuffer = allocateDirect(128).order(ByteOrder.nativeOrder()); + outBytes = new ByteBufferBytes(outBuffer.slice()); - this.putReturnsNull = chronicleMapBuilder.putReturnsNull(); - this.removeReturnsNull = chronicleMapBuilder.removeReturnsNull(); + inBuffer = allocateDirect(128).order(ByteOrder.nativeOrder()); + inBytes = new ByteBufferBytes(inBuffer.slice()); this.vClass = chronicleMapBuilder.valueBuilder.eClass; this.kClass = chronicleMapBuilder.keyBuilder.eClass; this.name = config.name(); + attemptConnect(config.remoteAddress()); - outBuffer = allocateDirect(128).order(ByteOrder.nativeOrder()); - outBytes = new ByteBufferBytes(outBuffer.slice()); + checkVersion(); + loadKeyValueSerializers(); + this.putReturnsNull = chronicleMapBuilder.putReturnsNull(); + this.removeReturnsNull = chronicleMapBuilder.removeReturnsNull(); + } - inBuffer = allocateDirect(128).order(ByteOrder.nativeOrder()); - inBytes = new ByteBufferBytes(inBuffer.slice()); + private void loadKeyValueSerializers() { + final SerializationBuilder keyBuilder = fetchObject(SerializationBuilder.class, KEY_BUILDER); + final SerializationBuilder valueBuilder = fetchObject(SerializationBuilder.class, VALUE_BUILDER); - checkVersion(); + keyReaderWithSize = new ReaderWithSize(keyBuilder); + keyWriterWithSize = new WriterWithSize(keyBuilder, outBufferResizer); + + valueReaderWithSize = new ReaderWithSize(valueBuilder); + valueWriterWithSize = new WriterWithSize(valueBuilder, outBufferResizer); } @@ -229,8 +235,10 @@ private void checkTimeout(long timeoutTime) { throw new RemoteCallTimeoutException(); } - private synchronized SocketChannel lazyConnect(final long timeoutMs, - final InetSocketAddress remoteAddress) { + private synchronized void lazyConnect(final long timeoutMs, + final InetSocketAddress remoteAddress) { + if (clientChannel != null) + return; if (LOG.isDebugEnabled()) LOG.debug("attempting to connect to " + remoteAddress + " ,name=" + name); @@ -259,8 +267,34 @@ private synchronized SocketChannel lazyConnect(final long timeoutMs, throw e; } } + clientChannel = result; + byte[] bytes = copyBufferBytes(); - return result; + long position = outBytes.position(); + int limit = outBuffer.limit(); + + outBuffer.clear(); + outBytes.clear(); + // outBytesLock.unlock(); + // assert !outBytesLock.isHeldByCurrentThread(); + try { + checkVersion(); + loadKeyValueSerializers(); + } finally { + outBuffer.clear(); + outBuffer.put(bytes); + outBytes.limit(limit); + outBytes.position(position); + // outBytesLock.lock(); + } + + } + + private byte[] copyBufferBytes() { + byte[] bytes = new byte[outBuffer.limit()]; + outBuffer.position(0); + outBuffer.get(bytes); + return bytes; } /** @@ -453,6 +487,8 @@ public String toString() { @NotNull public String serverApplicationVersion() { + + return fetchObject(String.class, APPLICATION_VERSION); } @@ -618,7 +654,7 @@ private Bytes resizeBufferOutBuffer(int newCapacity, long start) { private void resizeBufferInBuffer(int newCapacity, long start) { - assert !outBytesLock.isHeldByCurrentThread(); +// assert !outBytesLock.isHeldByCurrentThread(); assert inBytesLock.isHeldByCurrentThread(); if (LOG.isDebugEnabled()) @@ -1053,7 +1089,7 @@ private long send(long sizeLocation, final long startTime) { for (; ; ) { if (clientChannel == null) { - clientChannel = lazyConnect(config.timeoutMs(), config.remoteAddress()); + lazyConnect(config.timeoutMs(), config.remoteAddress()); } try { @@ -1069,7 +1105,7 @@ private long send(long sizeLocation, final long startTime) { } catch (@NotNull java.nio.channels.ClosedChannelException | ClosedConnectionException e) { checkTimeout(timeoutTime); - clientChannel = lazyConnect(config.timeoutMs(), config.remoteAddress()); + lazyConnect(config.timeoutMs(), config.remoteAddress()); } } } catch (IOException e) { @@ -1084,10 +1120,11 @@ private long send(long sizeLocation, final long startTime) { private Bytes blockingFetchReadOnly(long timeoutTime, final long transactionId) { - assert !outBytesLock.isHeldByCurrentThread(); - assert inBytesLock.isHeldByCurrentThread(); + assert inBytesLock.isHeldByCurrentThread(); + // assert !outBytesLock.isHeldByCurrentThread(); try { + return blockingFetchThrowable(timeoutTime, transactionId); } catch (IOException e) { close(); @@ -1112,7 +1149,7 @@ private Bytes blockingFetchReadOnly(long timeoutTime, final long transactionId) private Bytes blockingFetchThrowable(long timeoutTime, long transactionId) throws IOException, InterruptedException { - assert !outBytesLock.isHeldByCurrentThread(); +// assert !outBytesLock.isHeldByCurrentThread(); assert inBytesLock.isHeldByCurrentThread(); int remainingBytes = nextEntry(timeoutTime, transactionId); @@ -1157,7 +1194,7 @@ private Bytes blockingFetchThrowable(long timeoutTime, long transactionId) throw } private int nextEntry(long timeoutTime, long transactionId) throws IOException { - assert !outBytesLock.isHeldByCurrentThread(); +// assert !outBytesLock.isHeldByCurrentThread(); assert inBytesLock.isHeldByCurrentThread(); int remainingBytes; @@ -1235,7 +1272,7 @@ private int nextEntry(long timeoutTime, long transactionId) throws IOException { } private void clearParked() { - assert !outBytesLock.isHeldByCurrentThread(); +// assert !outBytesLock.isHeldByCurrentThread(); assert inBytesLock.isHeldByCurrentThread(); parkedTransactionId = 0; parkedRemainingBytes = 0; @@ -1264,7 +1301,7 @@ private void pause() { @SuppressWarnings("UnusedReturnValue") private Bytes receiveBytesFromSocket(int requiredNumberOfBytes, long timeoutTime) throws IOException { - assert !outBytesLock.isHeldByCurrentThread(); +// assert !outBytesLock.isHeldByCurrentThread(); assert inBytesLock.isHeldByCurrentThread(); inBytes.position(0); diff --git a/src/main/java/net/openhft/chronicle/map/TcpReplicator.java b/src/main/java/net/openhft/chronicle/map/TcpReplicator.java index c78284b9a7..18e1c245c8 100644 --- a/src/main/java/net/openhft/chronicle/map/TcpReplicator.java +++ b/src/main/java/net/openhft/chronicle/map/TcpReplicator.java @@ -1022,6 +1022,23 @@ void ensureBufferSize(long size) { } } + void resizeToMessage(@NotNull IllegalStateException e) { + + String message = e.getMessage(); + if (message.startsWith("java.io.IOException: Not enough available space for writing ")) { + String substring = message.substring("java.io.IOException: Not enough available space for writing ".length(), message.length()); + int i = substring.indexOf(' '); + if (i != -1) { + int size = Integer.parseInt(substring.substring(0, i)); + + long requiresExtra = size - in().remaining(); + ensureBufferSize((int) (in().capacity() + requiresExtra)); + } else + throw e; + } else + throw e; + } + Bytes in() { return entryCallback.in(); } @@ -1475,13 +1492,13 @@ Work processStatelessEvent(final byte eventId, // client immediately switch (event) { case KEY_SET: - return keySet(reader, writer.in(), transactionId); + return keySet(reader, writer, transactionId); case VALUES: - return values(reader, writer.in(), transactionId); + return values(reader, writer, transactionId); case ENTRY_SET: - return entrySet(reader, writer.in(), transactionId); + return entrySet(reader, writer, transactionId); case PUT_WITHOUT_ACC: return put(reader, timestamp, identifier); @@ -1500,16 +1517,16 @@ Work processStatelessEvent(final byte eventId, switch (event) { case LONG_SIZE: - return longSize(writer.in(), sizeLocation); + return longSize(writer, sizeLocation); case IS_EMPTY: - return isEmpty(writer.in(), sizeLocation); + return isEmpty(writer, sizeLocation); case CONTAINS_KEY: - return containsKey(reader, writer.in(), sizeLocation); + return containsKey(reader, writer, sizeLocation); case CONTAINS_VALUE: - return containsValue(reader, writer.in(), sizeLocation); + return containsValue(reader, writer, sizeLocation); case GET: return get(reader, writer, sizeLocation, timestamp); @@ -1521,133 +1538,171 @@ Work processStatelessEvent(final byte eventId, return remove(reader, writer, sizeLocation, timestamp, identifier); case CLEAR: - return clear(writer.in(), sizeLocation, timestamp, identifier); + return clear(writer, sizeLocation, timestamp, identifier); case REPLACE: return replace(reader, writer, sizeLocation, timestamp, identifier); case REPLACE_WITH_OLD_AND_NEW_VALUE: - return replaceWithOldAndNew(reader, writer.in(), + return replaceWithOldAndNew(reader, writer, sizeLocation, timestamp, identifier); case PUT_IF_ABSENT: return putIfAbsent(reader, writer, sizeLocation, timestamp, identifier); case REMOVE_WITH_VALUE: - return removeWithValue(reader, writer.in(), sizeLocation, timestamp, identifier); + return removeWithValue(reader, writer, sizeLocation, timestamp, identifier); case TO_STRING: - return toString(writer.in(), sizeLocation); + return toString(writer, sizeLocation); case APPLICATION_VERSION: - return applicationVersion(writer.in(), sizeLocation); + return applicationVersion(writer, sizeLocation); case PERSISTED_DATA_VERSION: - return persistedDataVersion(writer.in(), sizeLocation); + return persistedDataVersion(writer, sizeLocation); case PUT_ALL: - return putAll(reader, writer.in(), sizeLocation, timestamp, identifier); + return putAll(reader, writer, sizeLocation, timestamp, identifier); case HASH_CODE: - return hashCode(writer.in(), sizeLocation); + return hashCode(writer, sizeLocation); case MAP_FOR_KEY: - return mapForKey(reader, writer.in(), sizeLocation); + return mapForKey(reader, writer, sizeLocation); case PUT_MAPPED: - return putMapped(reader, writer.in(), sizeLocation); + return putMapped(reader, writer, sizeLocation); + + case KEY_BUILDER: + return writeBuilder(writer, sizeLocation, map.keyBuilder); + + case VALUE_BUILDER: + return writeBuilder(writer, sizeLocation, map.valueBuilder); default: throw new IllegalStateException("unsupported event=" + event); } } + private void writeObject(TcpReplicator.TcpSocketChannelEntryWriter writer, Object o) { + for (; ; ) { + long position = writer.in().position(); + + try { + writer.in().writeObject(o); + return; + } catch (IllegalStateException e) { + if (e.getMessage().contains("Not enough available space")) { + writer.resizeToMessage(e); + writer.in().position(position); + } else + throw e; + } + } + } + + private Work writeBuilder(TcpReplicator.TcpSocketChannelEntryWriter writer, long sizeLocation, SerializationBuilder builder) { + + try { + writeObject(writer, builder); + } catch (Exception e) { + LOG.info("", e); + + return sendException(writer, sizeLocation, e); + } + + writeSizeAndFlags(sizeLocation, false, writer.in()); + return null; + } + + @Nullable - public Work mapForKey(@NotNull ByteBufferBytes reader, @NotNull Bytes writer, + public Work mapForKey(@NotNull ByteBufferBytes reader, @NotNull TcpReplicator.TcpSocketChannelEntryWriter writer, long sizeLocation) { final K key = keyReaderWithSize.read(reader, null, null); final Function function = (Function) reader.readObject(); try { Object result = map.getMapped(key, function); - writer.writeObject(result); + writeObject(writer,result); } catch (Throwable e) { LOG.info("", e); return sendException(writer, sizeLocation, e); } - writeSizeAndFlags(sizeLocation, false, writer); + writeSizeAndFlags(sizeLocation, false, writer.in()); return null; } @Nullable - public Work putMapped(@NotNull ByteBufferBytes reader, @NotNull Bytes writer, + public Work putMapped(@NotNull ByteBufferBytes reader, @NotNull TcpReplicator.TcpSocketChannelEntryWriter writer, long sizeLocation) { final K key = keyReaderWithSize.read(reader, null, null); final UnaryOperator unaryOperator = (UnaryOperator) reader.readObject(); try { Object result = map.putMapped(key, unaryOperator); - writer.writeObject(result); + writeObject(writer,result); } catch (Throwable e) { LOG.info("", e); return sendException(writer, sizeLocation, e); } - writeSizeAndFlags(sizeLocation, false, writer); + writeSizeAndFlags(sizeLocation, false, writer.in()); return null; } @Nullable - private Work removeWithValue(Bytes reader, @NotNull Bytes writer, final long sizeLocation, + private Work removeWithValue(Bytes reader, @NotNull TcpReplicator.TcpSocketChannelEntryWriter writer, final long sizeLocation, long timestamp, byte id) { try { - writer.writeBoolean(map.removeBytesEntry(reader)); + writer.in().writeBoolean(map.removeBytesEntry(reader)); } catch (Throwable e) { return sendException(writer, sizeLocation, e); } - writeSizeAndFlags(sizeLocation, false, writer); + writeSizeAndFlags(sizeLocation, false, writer.in()); return null; } @Nullable - private Work replaceWithOldAndNew(Bytes reader, @NotNull Bytes writer, final long + private Work replaceWithOldAndNew(Bytes reader, @NotNull TcpReplicator.TcpSocketChannelEntryWriter writer, final long sizeLocation, long timestamp, byte id) { try { - map.replaceWithOldAndNew(reader, writer); + map.replaceWithOldAndNew(reader, writer.in()); } catch (Throwable e) { return sendException(writer, sizeLocation, e); } - writeSizeAndFlags(sizeLocation, false, writer); + writeSizeAndFlags(sizeLocation, false, writer.in()); return null; } @Nullable - private Work longSize(@NotNull Bytes writer, final long sizeLocation) { + private Work longSize(@NotNull TcpReplicator.TcpSocketChannelEntryWriter writer, final long sizeLocation) { try { - writer.writeLong(map.longSize()); + writer.in().writeLong(map.longSize()); } catch (Throwable e) { return sendException(writer, sizeLocation, e); } - writeSizeAndFlags(sizeLocation, false, writer); + writeSizeAndFlags(sizeLocation, false, writer.in()); return null; } @Nullable - private Work hashCode(@NotNull Bytes writer, final long sizeLocation) { + private Work hashCode(@NotNull TcpReplicator.TcpSocketChannelEntryWriter writer, final long sizeLocation) { try { - writer.writeInt(map.hashCode()); + writer.in().writeInt(map.hashCode()); } catch (Throwable e) { return sendException(writer, sizeLocation, e); } - writeSizeAndFlags(sizeLocation, false, writer); + writeSizeAndFlags(sizeLocation, false, writer.in()); return null; } @Nullable - private Work toString(@NotNull Bytes writer, final long sizeLocation) { + private Work toString(@NotNull TcpReplicator.TcpSocketChannelEntryWriter writer, final long sizeLocation) { final String str; - final long remaining = writer.remaining(); + final long remaining = writer.in().remaining(); try { str = map.toString(); } catch (Throwable e) { @@ -1661,107 +1716,108 @@ private Work toString(@NotNull Bytes writer, final long sizeLocation) { str : str.substring(0, (int) (remaining - 4)) + "..."; - writer.writeObject(result); - writeSizeAndFlags(sizeLocation, false, writer); + writeObject(writer, result); + writeSizeAndFlags(sizeLocation, false, writer.in()); return null; } @Nullable - private Work applicationVersion(@NotNull Bytes writer, final long sizeLocation) { + private Work applicationVersion(@NotNull TcpReplicator.TcpSocketChannelEntryWriter writer, final long sizeLocation) { - final long remaining = writer.remaining(); + final long remaining = writer.in().remaining(); try { String result = map.applicationVersion(); - writer.writeObject(result); + writeObject(writer, result); } catch (Throwable e) { return sendException(writer, sizeLocation, e); } assert remaining > 4; - writeSizeAndFlags(sizeLocation, false, writer); + writeSizeAndFlags(sizeLocation, false, writer.in()); return null; } @Nullable - private Work persistedDataVersion(@NotNull Bytes writer, final long sizeLocation) { + private Work persistedDataVersion(@NotNull TcpReplicator.TcpSocketChannelEntryWriter writer, final long sizeLocation) { - final long remaining = writer.remaining(); + final long remaining = writer.in().remaining(); try { String result = map.persistedDataVersion(); - writer.writeObject(result); + writeObject(writer,result); } catch (Throwable e) { return sendException(writer, sizeLocation, e); } assert remaining > 4; - writeSizeAndFlags(sizeLocation, false, writer); + writeSizeAndFlags(sizeLocation, false, writer.in()); return null; } @SuppressWarnings("SameReturnValue") @Nullable - private Work sendException(@NotNull Bytes writer, long sizeLocation, @NotNull Throwable e) { + private Work sendException(@NotNull TcpReplicator.TcpSocketChannelEntryWriter writer, + long sizeLocation, @NotNull Throwable e) { // move the position to ignore any bytes written so far - writer.position(sizeLocation + HEADER_SIZE); + writer.in().position(sizeLocation + HEADER_SIZE); writeException(writer, e); - writeSizeAndFlags(sizeLocation + SIZE_OF_TRANSACTION_ID, true, writer); - - e.printStackTrace(); + writeSizeAndFlags(sizeLocation + SIZE_OF_TRANSACTION_ID, true, writer.in()); return null; } @Nullable - private Work isEmpty(@NotNull Bytes writer, final long sizeLocation) { + private Work isEmpty(@NotNull TcpReplicator.TcpSocketChannelEntryWriter writer, + final long sizeLocation) { try { - writer.writeBoolean(map.isEmpty()); + writer.in().writeBoolean(map.isEmpty()); } catch (Throwable e) { return sendException(writer, sizeLocation, e); } - writeSizeAndFlags(sizeLocation, false, writer); + writeSizeAndFlags(sizeLocation, false, writer.in()); return null; } @Nullable - private Work containsKey(Bytes reader, @NotNull Bytes writer, final long sizeLocation) { + private Work containsKey(Bytes reader, @NotNull TcpReplicator.TcpSocketChannelEntryWriter writer, final long sizeLocation) { try { - writer.writeBoolean(map.containsBytesKey(reader)); + writer.in().writeBoolean(map.containsBytesKey(reader)); } catch (Throwable e) { return sendException(writer, sizeLocation, e); } - writeSizeAndFlags(sizeLocation, false, writer); + writeSizeAndFlags(sizeLocation, false, writer.in()); return null; } @Nullable - private Work containsValue(Bytes reader, @NotNull Bytes writer, final long sizeLocation) { + private Work containsValue(Bytes reader, @NotNull TcpReplicator.TcpSocketChannelEntryWriter writer, final long sizeLocation) { // todo optimize -- eliminate final V v = valueReaderWithSize.read(reader, null, null); try { - writer.writeBoolean(map.containsValue(v)); + writer.in().writeBoolean(map.containsValue(v)); } catch (Throwable e) { return sendException(writer, sizeLocation, e); } - writeSizeAndFlags(sizeLocation, false, writer); + writeSizeAndFlags(sizeLocation, false, writer.in()); return null; } @Nullable - private Work get(Bytes reader, TcpReplicator.TcpSocketChannelEntryWriter writer, + private Work get(Bytes reader, + TcpReplicator.TcpSocketChannelEntryWriter writer, final long sizeLocation, long transactionId) { try { map.getBytes(reader, writer); } catch (Throwable e) { - return sendException(writer.in(), sizeLocation, e); + return sendException(writer, sizeLocation, e); } writeSizeAndFlags(sizeLocation, false, writer.in()); return null; @@ -1780,7 +1836,7 @@ private Work put(Bytes reader, TcpReplicator.TcpSocketChannelEntryWriter writer, try { map.put(reader, writer); } catch (Throwable e) { - return sendException(writer.in(), sizeLocation, e); + return sendException(writer, sizeLocation, e); } writeSizeAndFlags(sizeLocation, false, writer.in()); return null; @@ -1799,21 +1855,23 @@ private Work remove(Bytes reader, TcpReplicator.TcpSocketChannelEntryWriter writ try { map.removeBytesKeyOutputPrevValue(reader, writer); } catch (Throwable e) { - return sendException(writer.in(), sizeLocation, e); + return sendException(writer, sizeLocation, e); } writeSizeAndFlags(sizeLocation, false, writer.in()); return null; } @Nullable - private Work putAll(@NotNull Bytes reader, @NotNull Bytes writer, final long sizeLocation, + private Work putAll(@NotNull Bytes reader, + @NotNull TcpReplicator.TcpSocketChannelEntryWriter writer, + final long sizeLocation, long timestamp, byte id) { try { map.putAll(reader); } catch (Throwable e) { return sendException(writer, sizeLocation, e); } - writeSizeAndFlags(sizeLocation, false, writer); + writeSizeAndFlags(sizeLocation, false, writer.in()); return null; } @@ -1825,19 +1883,19 @@ private Work putAll(@NotNull Bytes reader, long timestamp, byte id) { } @Nullable - private Work clear(@NotNull Bytes writer, final long sizeLocation, long timestamp, byte id) { + private Work clear(@NotNull TcpReplicator.TcpSocketChannelEntryWriter writer, final long sizeLocation, long timestamp, byte id) { try { map.clear(); } catch (Throwable e) { return sendException(writer, sizeLocation, e); } - writeSizeAndFlags(sizeLocation, false, writer); + writeSizeAndFlags(sizeLocation, false, writer.in()); return null; } @Nullable - private Work values(@NotNull Bytes reader, @NotNull Bytes writer, final long transactionId) { + private Work values(@NotNull Bytes reader, @NotNull TcpReplicator.TcpSocketChannelEntryWriter writer, final long transactionId) { Collection values; @@ -1879,7 +1937,7 @@ public boolean doWork(@NotNull Bytes out) { } @Nullable - private Work keySet(@NotNull Bytes reader, @NotNull final Bytes writer, + private Work keySet(@NotNull Bytes reader, @NotNull final TcpReplicator.TcpSocketChannelEntryWriter writer, final long transactionId) { Set ks; @@ -1922,7 +1980,8 @@ public boolean doWork(@NotNull Bytes out) { } @Nullable - private Work entrySet(@NotNull final Bytes reader, @NotNull Bytes writer, + private Work entrySet(@NotNull final Bytes reader, + @NotNull TcpReplicator.TcpSocketChannelEntryWriter writer, final long transactionId) { final Set> entries; @@ -1976,7 +2035,7 @@ private Work putIfAbsent(Bytes reader, TcpReplicator.TcpSocketChannelEntryWriter try { map.putIfAbsent(reader, writer); } catch (Throwable e) { - return sendException(writer.in(), sizeLocation, e); + return sendException(writer, sizeLocation, e); } writeSizeAndFlags(sizeLocation, false, writer.in()); return null; @@ -1988,7 +2047,7 @@ private Work replace(Bytes reader, TcpReplicator.TcpSocketChannelEntryWriter wri try { map.replaceKV(reader, writer); } catch (Throwable e) { - return sendException(writer.in(), sizeLocation, e); + return sendException(writer, sizeLocation, e); } writeSizeAndFlags(sizeLocation, false, writer.in()); return null; @@ -2028,8 +2087,8 @@ private void writeSizeAndFlags(long locationOfSize, boolean isException, @NotNul } - private void writeException(@NotNull Bytes out, Throwable e) { - out.writeObject(e); + private void writeException(@NotNull TcpReplicator.TcpSocketChannelEntryWriter out, Throwable e) { + writeObject(out, e); } @NotNull @@ -2051,8 +2110,8 @@ private Map readEntries(@NotNull Bytes reader) { } @Nullable - private Work sendException(@NotNull Bytes reader, @NotNull Bytes writer, @NotNull Throwable e) { - final long sizeLocation = reflectTransactionId(writer, reader.readLong()); + private Work sendException(@NotNull Bytes reader, @NotNull TcpReplicator.TcpSocketChannelEntryWriter writer, @NotNull Throwable e) { + final long sizeLocation = reflectTransactionId(writer.in(), reader.readLong()); return sendException(writer, sizeLocation, e); } diff --git a/src/main/java/net/openhft/chronicle/map/VanillaChronicleMap.java b/src/main/java/net/openhft/chronicle/map/VanillaChronicleMap.java index 0c95601d0c..c95c9fd634 100755 --- a/src/main/java/net/openhft/chronicle/map/VanillaChronicleMap.java +++ b/src/main/java/net/openhft/chronicle/map/VanillaChronicleMap.java @@ -112,10 +112,13 @@ class VanillaChronicleMap, transient BytesStore ms; transient long headerSize; transient Set> entrySet; + transient final SerializationBuilder keyBuilder; + transient final SerializationBuilder valueBuilder; + public VanillaChronicleMap(ChronicleMapBuilder builder) throws IOException { - SerializationBuilder keyBuilder = builder.keyBuilder; + keyBuilder = builder.keyBuilder; kClass = keyBuilder.eClass; keySizeMarshaller = keyBuilder.sizeMarshaller(); originalKeyReader = keyBuilder.reader(); @@ -127,7 +130,7 @@ public VanillaChronicleMap(ChronicleMapBuilder builder) throws IOException originalMetaKeyInterop = (MKI) keyBuilder.metaInterop(); metaKeyInteropProvider = (MetaProvider) keyBuilder.metaInteropProvider(); - SerializationBuilder valueBuilder = builder.valueBuilder; + valueBuilder = builder.valueBuilder; vClass = valueBuilder.eClass; if (vClass.getName().endsWith("$$Native")) { nativeValueClass = vClass; diff --git a/src/test/java/net/openhft/chronicle/map/CHMUseCasesTest.java b/src/test/java/net/openhft/chronicle/map/CHMUseCasesTest.java index 8dbc45a82d..f67628be14 100644 --- a/src/test/java/net/openhft/chronicle/map/CHMUseCasesTest.java +++ b/src/test/java/net/openhft/chronicle/map/CHMUseCasesTest.java @@ -38,7 +38,7 @@ public class CHMUseCasesTest { private static final String TMP = System.getProperty("java.io.tmpdir"); - enum TypeOfMap {SIMPLE, SIMPLE_PERSISTED, REPLICATED, STATELESS} + enum TypeOfMap {STATELESS,SIMPLE, SIMPLE_PERSISTED, REPLICATED} private final TypeOfMap typeOfMap;