Skip to content

Commit

Permalink
CHRON-103 - Chronicle-Queue Subscriber stop working after Publisher r…
Browse files Browse the repository at this point in the history
…estarted
  • Loading branch information
lburgazzoli committed Jan 19, 2015
1 parent 19edfdb commit 47f7ac1
Show file tree
Hide file tree
Showing 16 changed files with 286 additions and 61 deletions.
Expand Up @@ -132,32 +132,31 @@ public synchronized void close() {
} }


protected void subscribe(long index) throws IOException { protected void subscribe(long index) throws IOException {

writeBuffer.clear(); writeBuffer.clear();
writeBufferBytes.clear(); writeBufferBytes.clear();


writeBufferBytes.writeLong(ChronicleTcp.ACTION_SUBSCRIBE); writeBufferBytes.writeLong(ChronicleTcp.ACTION_SUBSCRIBE);
writeBufferBytes.writeLong(index); writeBufferBytes.writeLong(index);


MappingFunction mapping = withMapping(); MappingFunction mapping = withMapping();

if (mapping != null) { if (mapping != null) {
// write with mapping and len // write with mapping and len
writeBufferBytes.writeLong(ChronicleTcp.ACTION_WITH_MAPPING); writeBufferBytes.writeLong(ChronicleTcp.ACTION_WITH_MAPPING);
long pos = writeBufferBytes.position(); long pos = writeBufferBytes.position();

writeBufferBytes.skip(4); writeBufferBytes.skip(4);
long start = writeBufferBytes.position(); long start = writeBufferBytes.position();


writeBufferBytes.writeObject(mapping); writeBufferBytes.writeObject(mapping);
int len = (int) (writeBufferBytes.position() - start); int len = (int) (writeBufferBytes.position() - start);

writeBufferBytes.writeInt(pos, len); writeBufferBytes.writeInt(pos, len);


} }


writeBuffer.position(0); writeBuffer.position(0);
writeBuffer.limit((int) writeBufferBytes.position()); writeBuffer.limit((int) writeBufferBytes.position());



connection.writeAllOrEOF(writeBuffer); connection.writeAllOrEOF(writeBuffer);
} }


Expand Down Expand Up @@ -214,9 +213,14 @@ private boolean readNextExcerpt() {
} }
} }
} }
} catch (IOException e) { } catch (IOException e1) {
logger.warn("", e); logger.warn("Exception reading nextExcerpt", e1);
close();
try {
connection.close();
} catch (IOException e2) {
logger.warn("Error closing socketChannel", e2);
}
} }


return false; return false;
Expand Down Expand Up @@ -312,9 +316,14 @@ private boolean readNextExcerpt() {
readBuffer.position(readBuffer.position() + size); readBuffer.position(readBuffer.position() + size);
return readNextExcerpt(); return readNextExcerpt();
} }
} catch (IOException e) { } catch (IOException e1) {
logger.warn("", e); logger.warn("Exception reading nextExcerpt", e1);
close();
try {
connection.close();
} catch (IOException e2) {
logger.warn("Error closing socketChannel", e2);
}
} }


return true; return true;
Expand Down
Expand Up @@ -75,7 +75,6 @@ public class IndexedChronicle implements Chronicle {
*/ */
IndexedChronicle(@NotNull ChronicleQueueBuilder.IndexedChronicleQueueBuilder builder) throws IOException { IndexedChronicle(@NotNull ChronicleQueueBuilder.IndexedChronicleQueueBuilder builder) throws IOException {


//this.config = config.clone();
this.builder = builder.clone(); this.builder = builder.clone();
this.basePath = builder.path().getAbsolutePath(); this.basePath = builder.path().getAbsolutePath();


Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;


Expand Down
Expand Up @@ -341,7 +341,6 @@ protected boolean onSelectionKey(final SelectionKey key) throws IOException {
protected boolean onRead(final SelectionKey key) throws IOException { protected boolean onRead(final SelectionKey key) throws IOException {
try { try {
final long action = readUpTo(8).readLong(); final long action = readUpTo(8).readLong();

switch ((int) action) { switch ((int) action) {
case (int) ChronicleTcp.ACTION_WITH_MAPPING: case (int) ChronicleTcp.ACTION_WITH_MAPPING:
return onMapping(key, readUpTo(4).readInt()); return onMapping(key, readUpTo(4).readInt());
Expand All @@ -365,7 +364,7 @@ protected boolean onRead(final SelectionKey key) throws IOException {
} }
protected boolean onWrite(final SelectionKey key) throws IOException { protected boolean onWrite(final SelectionKey key) throws IOException {
final long now = System.currentTimeMillis(); final long now = System.currentTimeMillis();
Object attachment = key.attachment(); final Object attachment = key.attachment();


if (running.get() && !write(attachment)) { if (running.get() && !write(attachment)) {
if (lastHeartbeat <= now) { if (lastHeartbeat <= now) {
Expand Down Expand Up @@ -712,7 +711,9 @@ protected boolean write(Object attached) throws IOException {
* @see * @see
*/ */
private static Bytes applyMapping(@NotNull final ExcerptTailer source, private static Bytes applyMapping(@NotNull final ExcerptTailer source,
@Nullable Object attached, Bytes withMappedBuffer) { @Nullable Object attached,
Bytes withMappedBuffer) {

if (attached == null) { if (attached == null) {
return source; return source;
} }
Expand Down
Expand Up @@ -22,6 +22,7 @@


import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;


Expand Down Expand Up @@ -62,6 +63,22 @@ public void close() throws IOException {
} }
} }


public String debugString() {
if(this.socketChannel != null) {
try {
StringBuilder sb = new StringBuilder();
sb.append("[");
sb.append(this.socketChannel.getLocalAddress());
sb.append(" -> ");
sb.append(this.socketChannel.getRemoteAddress());
sb.append("]");
} catch(IOException e) {
}
}

return "[] -> []";
}

public int write(final ByteBuffer buffer) throws IOException { public int write(final ByteBuffer buffer) throws IOException {
return this.socketChannel.write(buffer); return this.socketChannel.write(buffer);
} }
Expand Down
Expand Up @@ -123,11 +123,11 @@ public int port() {
return this.port.get(); return this.port.get();
} }


public int getAndCheckPort() { public int getAndAssertOnError() {
final int port = port(); final int port = port();
assertNotEquals(-1, port); assertNotEquals(-1, port);


LOGGER.info("{} : listening on port {}", getTestName(), port); //LOGGER.info("{} : listening on port {}", getTestName(), port);


return port; return port;
} }
Expand Down
Expand Up @@ -84,7 +84,7 @@ public void testJira77(Chronicle chronicleSrc, Chronicle chronicleTarget) throws
.connectionListener(portSupplier) .connectionListener(portSupplier)
.build(); .build();


final int port = portSupplier.getAndCheckPort(); final int port = portSupplier.getAndAssertOnError();
final Chronicle chronicleSink = ChronicleQueueBuilder.sink(chronicleTarget) final Chronicle chronicleSink = ChronicleQueueBuilder.sink(chronicleTarget)
.minBufferSize(2 * BYTES_LENGTH) .minBufferSize(2 * BYTES_LENGTH)
.connectAddress("localhost", port) .connectAddress("localhost", port)
Expand Down Expand Up @@ -134,7 +134,7 @@ public void testJira80(final ChronicleQueueBuilder chronicleMasterBuilder, final


chronicleSource.clear(); chronicleSource.clear();


final int port = portSupplier.getAndCheckPort(); final int port = portSupplier.getAndAssertOnError();
final ExcerptAppender appender = chronicleSource.createAppender(); final ExcerptAppender appender = chronicleSource.createAppender();
for (long i = 0; i < (chunks * itemsPerChunk); i++) { for (long i = 0; i < (chunks * itemsPerChunk); i++) {
appender.startExcerpt(); appender.startExcerpt();
Expand Down
Expand Up @@ -36,13 +36,17 @@
import java.io.ObjectOutput; import java.io.ObjectOutput;
import java.io.ObjectOutputStream; import java.io.ObjectOutputStream;
import java.io.Serializable; import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;


import static net.openhft.chronicle.ChronicleQueueBuilder.ReplicaChronicleQueueBuilder; import static net.openhft.chronicle.ChronicleQueueBuilder.ReplicaChronicleQueueBuilder;
import static net.openhft.chronicle.ChronicleQueueBuilder.indexed; import static net.openhft.chronicle.ChronicleQueueBuilder.indexed;
import static net.openhft.chronicle.ChronicleQueueBuilder.sink;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;


/** /**
Expand All @@ -66,7 +70,7 @@ public void testOverTCP() throws IOException, InterruptedException {
.connectionListener(portSupplier) .connectionListener(portSupplier)
.build(); .build();


final int port = portSupplier.getAndCheckPort(); final int port = portSupplier.getAndAssertOnError();
final Chronicle sink = indexed(basePathSink) final Chronicle sink = indexed(basePathSink)
.sink() .sink()
.connectAddress("localhost", port) .connectAddress("localhost", port)
Expand Down Expand Up @@ -137,7 +141,7 @@ public void testPricePublishing1() throws IOException, InterruptedException {
.connectionListener(portSupplier) .connectionListener(portSupplier)
.build(); .build();


final int port = portSupplier.getAndCheckPort(); final int port = portSupplier.getAndAssertOnError();
final Chronicle sink = indexed(basePathSink) final Chronicle sink = indexed(basePathSink)
.sink() .sink()
.connectAddress("localhost", port) .connectAddress("localhost", port)
Expand Down Expand Up @@ -189,7 +193,7 @@ public void testPricePublishing2() throws IOException, InterruptedException {
.connectionListener(portSupplier) .connectionListener(portSupplier)
.build(); .build();


final int port = portSupplier.getAndCheckPort(); final int port = portSupplier.getAndAssertOnError();
final Chronicle sink = indexed(basePathSink) final Chronicle sink = indexed(basePathSink)
.sink() .sink()
.connectAddress("localhost", port) .connectAddress("localhost", port)
Expand Down Expand Up @@ -243,7 +247,7 @@ public void testPricePublishing3() throws IOException, InterruptedException {
.connectionListener(portSupplier) .connectionListener(portSupplier)
.build(); .build();


final int port = portSupplier.getAndCheckPort(); final int port = portSupplier.getAndAssertOnError();
final Chronicle sink = indexed(basePathSink) final Chronicle sink = indexed(basePathSink)
.sink() .sink()
.connectAddress("localhost", port) .connectAddress("localhost", port)
Expand Down Expand Up @@ -454,6 +458,10 @@ public void testIndexedJira80() throws IOException {
indexed(basePath + "-slave") indexed(basePath + "-slave")
); );
} }

// *************************************************************************
//
// *************************************************************************


static final int RATE = Integer.getInteger("rate", 100000); static final int RATE = Integer.getInteger("rate", 100000);
static final int COUNT = Integer.getInteger("count", RATE * 2); static final int COUNT = Integer.getInteger("count", RATE * 2);
Expand Down Expand Up @@ -558,7 +566,7 @@ public void testIndexedNonBlockingClient() throws Exception {


final ReplicaChronicleQueueBuilder sinkBuilder = indexed(basePathSink) final ReplicaChronicleQueueBuilder sinkBuilder = indexed(basePathSink)
.sink() .sink()
.connectAddress("localhost", portSupplier.getAndCheckPort()) .connectAddress("localhost", portSupplier.getAndAssertOnError())
.readSpinCount(5); .readSpinCount(5);


final Chronicle sinnk = sinkBuilder.build(); final Chronicle sinnk = sinkBuilder.build();
Expand All @@ -569,4 +577,98 @@ public void testIndexedNonBlockingClient() throws Exception {
sinkBuilder.heartbeatIntervalMillis() sinkBuilder.heartbeatIntervalMillis()
); );
} }

@Test
public void testIndexedClientReconnection() throws Exception {
final String basePathSource = getIndexedTestPath("-source");
final String basePathSink = getIndexedTestPath("-sink");
final PortSupplier portSupplier = new PortSupplier();
final int items = 20;
final CountDownLatch latch = new CountDownLatch(items);

Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
final Chronicle sink = indexed(basePathSink)
.sink()
.connectAddressProvider(new AddressProvider() {
@Override
public InetSocketAddress get() {
return new InetSocketAddress(
"localhost",
portSupplier.getAndAssertOnError());
}
})
.build();

ExcerptTailer tailer = sink.createTailer();
while(latch.getCount() > 0) {
if(tailer.nextIndex()) {
assertEquals(items - latch.getCount(), tailer.readLong());
tailer.finish();
latch.countDown();
} else {
Thread.sleep(1000);
}
}

tailer.clear();
sink.close();
sink.clear();
} catch (Exception e) {
LOGGER.warn("", e);
errorCollector.addError(e);
}
}
});

t.start();

// Source 1
Chronicle source1 = indexed(basePathSource)
.source()
.bindAddress(0)
.connectionListener(portSupplier)
.build();

ExcerptAppender appender1 = source1.createAppender();
for(long i=0; i < items / 2 ; i++) {
appender1.startExcerpt(8);
appender1.writeLong(i);
appender1.finish();
}

appender1.close();

while(latch.getCount() > 10) {
Thread.sleep(250);
}

source1.close();

portSupplier.reset();

// Source 2
Chronicle source2 = indexed(basePathSource)
.source()
.bindAddress(0)
.connectionListener(portSupplier)
.build();

ExcerptAppender appender2 = source2.createAppender();
for(long i=items / 2; i < items; i++) {
appender2.startExcerpt(8);
appender2.writeLong(i);
appender2.finish();
}

appender2.close();

latch.await(5, TimeUnit.SECONDS);
assertEquals(0, latch.getCount());

source2.close();
source2.clear();
}
} }
Expand Up @@ -42,7 +42,7 @@ public void testPersistedLocalIndexedSink_001() throws Exception {
.connectionListener(portSupplier) .connectionListener(portSupplier)
.build(); .build();


final int port = portSupplier.getAndCheckPort(); final int port = portSupplier.getAndAssertOnError();
final Chronicle sink = ChronicleQueueBuilder.sink(chronicle) final Chronicle sink = ChronicleQueueBuilder.sink(chronicle)
.sharedChronicle(true) .sharedChronicle(true)
.connectAddress("localhost", port) .connectAddress("localhost", port)
Expand Down
Expand Up @@ -40,7 +40,7 @@ public void testPersistedLocalVanillaSink_001() throws Exception {
.connectionListener(portSupplier) .connectionListener(portSupplier)
.build(); .build();


final int port = portSupplier.getAndCheckPort(); final int port = portSupplier.getAndAssertOnError();
final Chronicle sink = ChronicleQueueBuilder.sink(chronicle) final Chronicle sink = ChronicleQueueBuilder.sink(chronicle)
.sharedChronicle(true) .sharedChronicle(true)
.connectAddress("localhost",port) .connectAddress("localhost",port)
Expand Down

0 comments on commit 47f7ac1

Please sign in to comment.