Skip to content

Commit

Permalink
HCOLL-276 Server to send V/K serialisers to the stateless client - al…
Browse files Browse the repository at this point in the history
…so now auto resizing buffer
  • Loading branch information
Rob Austin committed Dec 28, 2014
1 parent 6c9e68f commit 1443cb1
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 122 deletions.
Expand Up @@ -451,7 +451,7 @@ public Bytes resizeBuffer(int size) {
final ByteBuffer result = ByteBuffer.allocate(size).order(ByteOrder.nativeOrder()); final ByteBuffer result = ByteBuffer.allocate(size).order(ByteOrder.nativeOrder());
final long bytesPosition = in.position(); final long bytesPosition = in.position();


in = new ByteBufferBytes(result.slice()); in = new ByteBufferBytes(result);


out.position(0); out.position(0);
out.limit((int) bytesPosition); out.limit((int) bytesPosition);
Expand Down
117 changes: 77 additions & 40 deletions src/main/java/net/openhft/chronicle/map/StatelessChronicleMap.java
Expand Up @@ -67,13 +67,13 @@ class StatelessChronicleMap<K, V> implements ChronicleMap<K, V>, Closeable, Clon
private ByteBufferBytes inBytes; private ByteBufferBytes inBytes;


@NotNull @NotNull
private final ReaderWithSize<K> keyReaderWithSize; private ReaderWithSize<K> keyReaderWithSize;
@NotNull @NotNull
private final WriterWithSize<K> keyWriterWithSize; private WriterWithSize<K> keyWriterWithSize;
@NotNull @NotNull
private final ReaderWithSize<V> valueReaderWithSize; private ReaderWithSize<V> valueReaderWithSize;
@NotNull @NotNull
private final WriterWithSize<V> valueWriterWithSize; private WriterWithSize<V> valueWriterWithSize;
private SocketChannel clientChannel; private SocketChannel clientChannel;


@Nullable @Nullable
Expand All @@ -83,13 +83,20 @@ class StatelessChronicleMap<K, V> implements ChronicleMap<K, V>, Closeable, Clon


private final Class<K> kClass; private final Class<K> kClass;
private final Class<V> vClass; private final Class<V> vClass;
private final boolean putReturnsNull; private boolean putReturnsNull;
private final boolean removeReturnsNull; private boolean removeReturnsNull;




private final ReentrantLock inBytesLock = new ReentrantLock(true); private final ReentrantLock inBytesLock = new ReentrantLock(true);
private final ReentrantLock outBytesLock = new ReentrantLock(); private final ReentrantLock outBytesLock = new ReentrantLock();


private final BufferResizer outBufferResizer = new BufferResizer() {
@Override
public Bytes resizeBuffer(int newCapacity) {
return resizeBufferOutBuffer(newCapacity);

}
};


static enum EventId { static enum EventId {
HEARTBEAT, HEARTBEAT,
Expand Down Expand Up @@ -119,7 +126,9 @@ static enum EventId {
PUT_ALL_WITHOUT_ACC, PUT_ALL_WITHOUT_ACC,
HASH_CODE, HASH_CODE,
MAP_FOR_KEY, MAP_FOR_KEY,
PUT_MAPPED PUT_MAPPED,
KEY_BUILDER,
VALUE_BUILDER
} }




Expand All @@ -141,38 +150,35 @@ void identifier(int identifier) {
this.config = config; this.config = config;
this.jsonConverters = chronicleMapBuilder.jsonConverters(); this.jsonConverters = chronicleMapBuilder.jsonConverters();


final BufferResizer outBufferResizer = new BufferResizer() {
@Override
public Bytes resizeBuffer(int newCapacity) {
return resizeBufferOutBuffer(newCapacity);

}
};

keyReaderWithSize = new ReaderWithSize(chronicleMapBuilder.keyBuilder);
keyWriterWithSize = new WriterWithSize(chronicleMapBuilder.keyBuilder, outBufferResizer);

valueReaderWithSize = new ReaderWithSize(chronicleMapBuilder.valueBuilder);


valueWriterWithSize = new WriterWithSize(chronicleMapBuilder.valueBuilder, outBuffer = allocateDirect(128).order(ByteOrder.nativeOrder());
outBufferResizer); outBytes = new ByteBufferBytes(outBuffer.slice());


this.putReturnsNull = chronicleMapBuilder.putReturnsNull(); inBuffer = allocateDirect(128).order(ByteOrder.nativeOrder());
this.removeReturnsNull = chronicleMapBuilder.removeReturnsNull(); inBytes = new ByteBufferBytes(inBuffer.slice());


this.vClass = chronicleMapBuilder.valueBuilder.eClass; this.vClass = chronicleMapBuilder.valueBuilder.eClass;
this.kClass = chronicleMapBuilder.keyBuilder.eClass; this.kClass = chronicleMapBuilder.keyBuilder.eClass;
this.name = config.name(); this.name = config.name();

attemptConnect(config.remoteAddress()); attemptConnect(config.remoteAddress());


outBuffer = allocateDirect(128).order(ByteOrder.nativeOrder()); checkVersion();
outBytes = new ByteBufferBytes(outBuffer.slice()); loadKeyValueSerializers();


this.putReturnsNull = chronicleMapBuilder.putReturnsNull();
this.removeReturnsNull = chronicleMapBuilder.removeReturnsNull();
}


inBuffer = allocateDirect(128).order(ByteOrder.nativeOrder()); private void loadKeyValueSerializers() {
inBytes = new ByteBufferBytes(inBuffer.slice()); final SerializationBuilder keyBuilder = fetchObject(SerializationBuilder.class, KEY_BUILDER);
final SerializationBuilder valueBuilder = fetchObject(SerializationBuilder.class, VALUE_BUILDER);


checkVersion(); keyReaderWithSize = new ReaderWithSize(keyBuilder);
keyWriterWithSize = new WriterWithSize(keyBuilder, outBufferResizer);

valueReaderWithSize = new ReaderWithSize(valueBuilder);
valueWriterWithSize = new WriterWithSize(valueBuilder, outBufferResizer);


} }


Expand Down Expand Up @@ -229,8 +235,10 @@ private void checkTimeout(long timeoutTime) {
throw new RemoteCallTimeoutException(); throw new RemoteCallTimeoutException();
} }


private synchronized SocketChannel lazyConnect(final long timeoutMs, private synchronized void lazyConnect(final long timeoutMs,
final InetSocketAddress remoteAddress) { final InetSocketAddress remoteAddress) {
if (clientChannel != null)
return;


if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("attempting to connect to " + remoteAddress + " ,name=" + name); LOG.debug("attempting to connect to " + remoteAddress + " ,name=" + name);
Expand Down Expand Up @@ -259,8 +267,34 @@ private synchronized SocketChannel lazyConnect(final long timeoutMs,
throw e; throw e;
} }
} }
clientChannel = result;
byte[] bytes = copyBufferBytes();


return result; long position = outBytes.position();
int limit = outBuffer.limit();

outBuffer.clear();
outBytes.clear();
// outBytesLock.unlock();
// assert !outBytesLock.isHeldByCurrentThread();
try {
checkVersion();
loadKeyValueSerializers();
} finally {
outBuffer.clear();
outBuffer.put(bytes);
outBytes.limit(limit);
outBytes.position(position);
// outBytesLock.lock();
}

}

private byte[] copyBufferBytes() {
byte[] bytes = new byte[outBuffer.limit()];
outBuffer.position(0);
outBuffer.get(bytes);
return bytes;
} }


/** /**
Expand Down Expand Up @@ -453,6 +487,8 @@ public String toString() {


@NotNull @NotNull
public String serverApplicationVersion() { public String serverApplicationVersion() {


return fetchObject(String.class, APPLICATION_VERSION); return fetchObject(String.class, APPLICATION_VERSION);
} }


Expand Down Expand Up @@ -618,7 +654,7 @@ private Bytes resizeBufferOutBuffer(int newCapacity, long start) {




private void resizeBufferInBuffer(int newCapacity, long start) { private void resizeBufferInBuffer(int newCapacity, long start) {
assert !outBytesLock.isHeldByCurrentThread(); // assert !outBytesLock.isHeldByCurrentThread();
assert inBytesLock.isHeldByCurrentThread(); assert inBytesLock.isHeldByCurrentThread();


if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
Expand Down Expand Up @@ -1053,7 +1089,7 @@ private long send(long sizeLocation, final long startTime) {


for (; ; ) { for (; ; ) {
if (clientChannel == null) { if (clientChannel == null) {
clientChannel = lazyConnect(config.timeoutMs(), config.remoteAddress()); lazyConnect(config.timeoutMs(), config.remoteAddress());
} }
try { try {


Expand All @@ -1069,7 +1105,7 @@ private long send(long sizeLocation, final long startTime) {


} catch (@NotNull java.nio.channels.ClosedChannelException | ClosedConnectionException e) { } catch (@NotNull java.nio.channels.ClosedChannelException | ClosedConnectionException e) {
checkTimeout(timeoutTime); checkTimeout(timeoutTime);
clientChannel = lazyConnect(config.timeoutMs(), config.remoteAddress()); lazyConnect(config.timeoutMs(), config.remoteAddress());
} }
} }
} catch (IOException e) { } catch (IOException e) {
Expand All @@ -1084,10 +1120,11 @@ private long send(long sizeLocation, final long startTime) {


private Bytes blockingFetchReadOnly(long timeoutTime, final long transactionId) { private Bytes blockingFetchReadOnly(long timeoutTime, final long transactionId) {


assert !outBytesLock.isHeldByCurrentThread();
assert inBytesLock.isHeldByCurrentThread();


assert inBytesLock.isHeldByCurrentThread();
// assert !outBytesLock.isHeldByCurrentThread();
try { try {

return blockingFetchThrowable(timeoutTime, transactionId); return blockingFetchThrowable(timeoutTime, transactionId);
} catch (IOException e) { } catch (IOException e) {
close(); close();
Expand All @@ -1112,7 +1149,7 @@ private Bytes blockingFetchReadOnly(long timeoutTime, final long transactionId)
private Bytes blockingFetchThrowable(long timeoutTime, long transactionId) throws IOException, private Bytes blockingFetchThrowable(long timeoutTime, long transactionId) throws IOException,
InterruptedException { InterruptedException {


assert !outBytesLock.isHeldByCurrentThread(); // assert !outBytesLock.isHeldByCurrentThread();
assert inBytesLock.isHeldByCurrentThread(); assert inBytesLock.isHeldByCurrentThread();


int remainingBytes = nextEntry(timeoutTime, transactionId); int remainingBytes = nextEntry(timeoutTime, transactionId);
Expand Down Expand Up @@ -1157,7 +1194,7 @@ private Bytes blockingFetchThrowable(long timeoutTime, long transactionId) throw
} }


private int nextEntry(long timeoutTime, long transactionId) throws IOException { private int nextEntry(long timeoutTime, long transactionId) throws IOException {
assert !outBytesLock.isHeldByCurrentThread(); // assert !outBytesLock.isHeldByCurrentThread();
assert inBytesLock.isHeldByCurrentThread(); assert inBytesLock.isHeldByCurrentThread();


int remainingBytes; int remainingBytes;
Expand Down Expand Up @@ -1235,7 +1272,7 @@ private int nextEntry(long timeoutTime, long transactionId) throws IOException {
} }


private void clearParked() { private void clearParked() {
assert !outBytesLock.isHeldByCurrentThread(); // assert !outBytesLock.isHeldByCurrentThread();
assert inBytesLock.isHeldByCurrentThread(); assert inBytesLock.isHeldByCurrentThread();
parkedTransactionId = 0; parkedTransactionId = 0;
parkedRemainingBytes = 0; parkedRemainingBytes = 0;
Expand Down Expand Up @@ -1264,7 +1301,7 @@ private void pause() {
@SuppressWarnings("UnusedReturnValue") @SuppressWarnings("UnusedReturnValue")
private Bytes receiveBytesFromSocket(int requiredNumberOfBytes, long timeoutTime) throws IOException { private Bytes receiveBytesFromSocket(int requiredNumberOfBytes, long timeoutTime) throws IOException {


assert !outBytesLock.isHeldByCurrentThread(); // assert !outBytesLock.isHeldByCurrentThread();
assert inBytesLock.isHeldByCurrentThread(); assert inBytesLock.isHeldByCurrentThread();


inBytes.position(0); inBytes.position(0);
Expand Down

0 comments on commit 1443cb1

Please sign in to comment.