Skip to content

Commit

Permalink
CHRON-115 - A send an event during disconnect/reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Apr 12, 2015
1 parent 0b0b29c commit 68c8553
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 37 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -780,6 +780,10 @@ public TimeUnit heartbeatIntervalUnit() {
return this.heartbeatIntervalUnit; return this.heartbeatIntervalUnit;
} }


public int defaultSocketBufferSize() {
return DEFAULT_SOCKET_BUFFER_SIZE;
}

public int receiveBufferSize() { public int receiveBufferSize() {
return receiveBufferSize; return receiveBufferSize;
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import net.openhft.chronicle.tcp.AppenderAdapter; import net.openhft.chronicle.tcp.AppenderAdapter;
import net.openhft.chronicle.tcp.ChronicleTcp; import net.openhft.chronicle.tcp.ChronicleTcp;
import net.openhft.chronicle.tcp.SinkTcp; import net.openhft.chronicle.tcp.SinkTcp;
import net.openhft.chronicle.tools.ResizableDirectByteBufferBytes;
import net.openhft.chronicle.tools.WrappedChronicle; import net.openhft.chronicle.tools.WrappedChronicle;
import net.openhft.chronicle.tools.WrappedExcerpt; import net.openhft.chronicle.tools.WrappedExcerpt;
import net.openhft.lang.io.ByteBufferBytes; import net.openhft.lang.io.ByteBufferBytes;
import net.openhft.lang.io.DirectByteBufferBytes;
import net.openhft.lang.io.IByteBufferBytes;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down Expand Up @@ -96,8 +99,7 @@ private ExcerptCommon createExcerpt0() throws IOException {


private abstract class AbstractStatefulExcerpt extends WrappedExcerpt { private abstract class AbstractStatefulExcerpt extends WrappedExcerpt {
protected final Logger logger; protected final Logger logger;
protected final ByteBuffer writeBuffer; protected final ResizableDirectByteBufferBytes writeBuffer;
protected final ByteBufferBytes writeBufferBytes;
protected final ByteBuffer readBuffer; protected final ByteBuffer readBuffer;
private long lastReconnectionAttemptMS; private long lastReconnectionAttemptMS;
private long reconnectionIntervalMS; private long reconnectionIntervalMS;
Expand All @@ -107,8 +109,7 @@ protected AbstractStatefulExcerpt(final ExcerptCommon excerpt) {
super(excerpt); super(excerpt);


this.logger = LoggerFactory.getLogger(getClass().getName() + "@" + connection.toString()); this.logger = LoggerFactory.getLogger(getClass().getName() + "@" + connection.toString());
this.writeBuffer = ChronicleTcp.createBuffer(16); this.writeBuffer = new ResizableDirectByteBufferBytes(builder.minBufferSize());
this.writeBufferBytes = new ByteBufferBytes(writeBuffer);
this.readBuffer = ChronicleTcp.createBuffer(builder.minBufferSize()); this.readBuffer = ChronicleTcp.createBuffer(builder.minBufferSize());
this.reconnectionIntervalMS = builder.reconnectionIntervalMillis(); this.reconnectionIntervalMS = builder.reconnectionIntervalMillis();
this.lastReconnectionAttemptMS = 0; this.lastReconnectionAttemptMS = 0;
Expand Down Expand Up @@ -147,6 +148,7 @@ protected boolean openConnection() {


boolean connected = connection.isOpen(); boolean connected = connection.isOpen();
if(connected) { if(connected) {
builder.connectionListener().onConnect(connection.socketChannel());
this.lastReconnectionAttempt = 0; this.lastReconnectionAttempt = 0;
this.lastReconnectionAttemptMS = 0; this.lastReconnectionAttemptMS = 0;
} else { } else {
Expand Down Expand Up @@ -177,39 +179,33 @@ protected boolean shouldConnect() {
} }


protected void subscribe(long index) throws IOException { protected void subscribe(long index) throws IOException {
writeBuffer.clear(); writeBuffer.clearAll();
writeBufferBytes.clear(); writeBuffer.writeLong(ChronicleTcp.ACTION_SUBSCRIBE);

writeBuffer.writeLong(index);
writeBufferBytes.writeLong(ChronicleTcp.ACTION_SUBSCRIBE);
writeBufferBytes.writeLong(index);


MappingFunction mapping = withMapping(); MappingFunction mapping = withMapping();
if (mapping != null) { if (mapping != null) {
// write with mapping and len // write with mapping and len
writeBufferBytes.writeLong(ChronicleTcp.ACTION_WITH_MAPPING); writeBuffer.writeLong(ChronicleTcp.ACTION_WITH_MAPPING);
long pos = writeBufferBytes.position(); long pos = writeBuffer.position();

writeBufferBytes.skip(4);
long start = writeBufferBytes.position();

writeBufferBytes.writeObject(mapping);
int len = (int) (writeBufferBytes.position() - start);


writeBufferBytes.writeInt(pos, len); writeBuffer.skip(4);
long start = writeBuffer.position();


writeBuffer.writeObject(mapping);
writeBuffer.writeInt(pos, (int) (writeBuffer.position() - start));
} }


writeBuffer.position(0); writeBuffer.setBufferPositionAndLimit(0, writeBuffer.position());
writeBuffer.limit((int) writeBufferBytes.position());


connection.writeAllOrEOF(writeBuffer); connection.writeAllOrEOF(writeBuffer);
} }


protected void query(long index) throws IOException { protected void query(long index) throws IOException {
writeBuffer.clear(); writeBuffer.clearAll();
writeBuffer.putLong(ChronicleTcp.ACTION_QUERY); writeBuffer.writeLong(ChronicleTcp.ACTION_QUERY);
writeBuffer.putLong(index); writeBuffer.writeLong(index);
writeBuffer.flip(); writeBuffer.setBufferPositionAndLimit(0, writeBuffer.position());


connection.writeAllOrEOF(writeBuffer); connection.writeAllOrEOF(writeBuffer);
} }
Expand All @@ -221,7 +217,7 @@ protected boolean readNext() {
} catch(IOException e) { } catch(IOException e) {
logIOException(logger, "Exception reading from socket", e); logIOException(logger, "Exception reading from socket", e);
if(!closed) { if(!closed) {
//builder.connectionListener().onError(connection.socket(), e); builder.connectionListener().onError(connection.socketChannel(), e);
} }
} }
} }
Expand All @@ -237,12 +233,12 @@ protected boolean readNextExcerpt() {
} catch (IOException e) { } catch (IOException e) {
logIOException(logger, "Exception reading from socket", e); logIOException(logger, "Exception reading from socket", e);
if(!closed) { if(!closed) {
//builder.connectionListener().onError(connection.socket(), e); builder.connectionListener().onError(connection.socketChannel(), e);
} }


try { try {
connection.close(); connection.close();
//builder.connectionListener().onDisconnect(connection.socket()); builder.connectionListener().onDisconnect(connection.socketChannel());
} catch (IOException e2) { } catch (IOException e2) {
logger.warn("Error closing socketChannel", e2); logger.warn("Error closing socketChannel", e2);
} }
Expand Down Expand Up @@ -282,14 +278,12 @@ protected boolean doReadNextExcerpt() throws IOException {


if (connection.readUpTo(readBuffer, ChronicleTcp.HEADER_SIZE, readSpinCount)) { if (connection.readUpTo(readBuffer, ChronicleTcp.HEADER_SIZE, readSpinCount)) {
final int size = readBuffer.getInt(); final int size = readBuffer.getInt();
// consume data readBuffer.getLong(); // consume data
readBuffer.getLong();


switch (size) { switch (size) {
case ChronicleTcp.IN_SYNC_LEN: case ChronicleTcp.IN_SYNC_LEN:
return false; return false;
case ChronicleTcp.PADDED_LEN: case ChronicleTcp.PADDED_LEN:
//TODO: Indexed Vs Vanilla
return false; return false;
case ChronicleTcp.SYNC_IDX_LEN: case ChronicleTcp.SYNC_IDX_LEN:
return true; return true;
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -60,8 +60,12 @@ public static ByteBuffer createBuffer(int minSize) {
} }


public static ByteBuffer createBuffer(int minSize, ByteOrder byteOrder) { public static ByteBuffer createBuffer(int minSize, ByteOrder byteOrder) {
int newSize = (minSize + INITIAL_BUFFER_SIZE - 1) / INITIAL_BUFFER_SIZE * INITIAL_BUFFER_SIZE;
return createBufferOfSize(newSize, byteOrder); return createBufferOfSize(minBufferSize(minSize), byteOrder);
}

public static int minBufferSize(int minSize) {
return (minSize + INITIAL_BUFFER_SIZE - 1) / INITIAL_BUFFER_SIZE * INITIAL_BUFFER_SIZE;
} }


public static String connectionName(String name, final InetSocketAddress bindAddress, final InetSocketAddress connectAddress) { public static String connectionName(String name, final InetSocketAddress bindAddress, final InetSocketAddress connectAddress) {
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ protected Runnable createSessionHandler(final @NotNull SocketChannel socketChann
private abstract class SessionHandler implements Runnable, Closeable { private abstract class SessionHandler implements Runnable, Closeable {
private final SocketChannel socketChannel; private final SocketChannel socketChannel;


private long lastUnpausedNS; private long lastUnPausedNS;


protected final TcpConnection connection; protected final TcpConnection connection;
protected ExcerptTailer tailer; protected ExcerptTailer tailer;
Expand All @@ -139,7 +139,7 @@ private SessionHandler(final @NotNull SocketChannel socketChannel) {
this.tailer = null; this.tailer = null;
this.appender = null; this.appender = null;
this.lastHeartbeat = 0; this.lastHeartbeat = 0;
this.lastUnpausedNS = 0; this.lastUnPausedNS = 0;


this.readBuffer = new ResizableDirectByteBufferBytes(16); this.readBuffer = new ResizableDirectByteBufferBytes(16);
this.readBuffer.clearThreadAssociation(); this.readBuffer.clearThreadAssociation();
Expand Down Expand Up @@ -282,12 +282,12 @@ protected boolean hasRoomFor(ByteBuffer buffer, long size) {
} }


protected void pauseReset() { protected void pauseReset() {
lastUnpausedNS = System.nanoTime(); lastUnPausedNS = System.nanoTime();
pauser.reset(); pauser.reset();
} }


protected void pause() { protected void pause() {
if (lastUnpausedNS + ChronicleTcp.BUSY_WAIT_TIME_NS > System.nanoTime()) { if (lastUnPausedNS + ChronicleTcp.BUSY_WAIT_TIME_NS > System.nanoTime()) {
return; return;
} }


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/ */
package net.openhft.chronicle.tcp; package net.openhft.chronicle.tcp;


import net.openhft.lang.io.DirectByteBufferBytes;
import net.openhft.lang.model.constraints.NotNull; import net.openhft.lang.model.constraints.NotNull;


import java.io.EOFException; import java.io.EOFException;
Expand All @@ -43,6 +44,10 @@ protected void setSocketChannel(SocketChannel socketChannel) throws IOException
this.socketChannel = socketChannel; this.socketChannel = socketChannel;
} }


public SocketChannel socketChannel() {
return this.socketChannel;
}

public boolean isOpen() { public boolean isOpen() {
if(this.socketChannel != null) { if(this.socketChannel != null) {
return this.socketChannel.isOpen(); return this.socketChannel.isOpen();
Expand Down Expand Up @@ -81,14 +86,21 @@ public int write(final ByteBuffer buffer) throws IOException {
return this.socketChannel.write(buffer); return this.socketChannel.write(buffer);
} }


public void writeAllOrEOF(final DirectByteBufferBytes bb) throws IOException {
writeAllOrEOF(bb.buffer());
}

public void writeAllOrEOF(final ByteBuffer bb) throws IOException { public void writeAllOrEOF(final ByteBuffer bb) throws IOException {
// System.out.println("w - "+ChronicleTools.asString(bb));
writeAll(bb); writeAll(bb);
if (bb.remaining() > 0) { if (bb.remaining() > 0) {
throw new EOFException(); throw new EOFException();
} }
} }


public void writeAll(final DirectByteBufferBytes bb) throws IOException {
writeAll(bb.buffer());
}

public void writeAll(final ByteBuffer bb) throws IOException { public void writeAll(final ByteBuffer bb) throws IOException {
int bw = 0; int bw = 0;
while (bb.remaining() > 0) { while (bb.remaining() > 0) {
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -35,6 +35,21 @@ public ResizableDirectByteBufferBytes(ByteBuffer buffer, int start, int capacity
super(buffer, start, capacity); super(buffer, start, capacity);
} }


public void clearAll() {
buffer().clear();
clear();
}

public void setBufferPositionAndLimit(int position, int limit) {
buffer().position(position);
buffer().limit(limit);
}

public void setBufferPositionAndLimit(int position, long limit) {
buffer().position(position);
buffer().limit((int)limit);
}

public ResizableDirectByteBufferBytes resizeIfNeeded(int newCapacity) { public ResizableDirectByteBufferBytes resizeIfNeeded(int newCapacity) {
if (capacity() < newCapacity) { if (capacity() < newCapacity) {
resize(newCapacity, false, false); resize(newCapacity, false, false);
Expand Down

0 comments on commit 68c8553

Please sign in to comment.