Skip to content

Commit

Permalink
#421 writes meta data message to sync queue when repliction connectio…
Browse files Browse the repository at this point in the history
…n is drop
  • Loading branch information
Rob Austin committed Jan 17, 2018
1 parent bcb7d42 commit 2f5e5a7
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 124 deletions.
16 changes: 0 additions & 16 deletions src/main/java/net/openhft/chronicle/queue/impl/WireStore.java
Expand Up @@ -85,20 +85,4 @@ long sequenceForPosition(ExcerptContext ec, long position, boolean inclusive)

ScanResult linearScanTo(long index, long knownIndex, ExcerptContext ec, long knownAddress);

boolean isSyncQueueConnectedViaTcpIp();

/**
* used only by the sync queue replicator
* @param isConnected {@code true} is a socket connect existing between the sync and the
* source hosts
*/
void isSyncQueueConnectedViaTcpIp(boolean isConnected);

long timeLastMessageReceivedViaTcpIp();

/**
* used only by the sync queue replicator
* @param timeMs the last time a message ( including a heartbeat message ) is sent
*/
void timeLastMessageReceivedViaTcpIp(long timeMs);
}
Expand Up @@ -14,11 +14,7 @@ public enum MetaDataField implements WireKey {
deltaCheckpointInterval,
encodedSequence,
lastIndexReplicated,
sourceId,

// one used in replication for sync queues
isSyncQueueConnectedViaTcpIp,
timeLastMessageReceivedViaTcpIp;
sourceId;

@Nullable
@Override
Expand Down
Expand Up @@ -41,15 +41,14 @@
import java.io.StreamCorruptedException;
import java.nio.BufferOverflowException;
import java.text.ParseException;
import java.util.concurrent.TimeUnit;

import static net.openhft.chronicle.queue.TailerDirection.*;
import static net.openhft.chronicle.queue.TailerState.*;
import static net.openhft.chronicle.queue.impl.single.ScanResult.*;
import static net.openhft.chronicle.queue.impl.single.ScanResult.END_OF_FILE;
import static net.openhft.chronicle.queue.impl.single.ScanResult.FOUND;

public class SingleChronicleQueueExcerpts {
private static final Logger LOG = LoggerFactory.getLogger(SingleChronicleQueueExcerpts.class);
private static final long RELEASE_WARNING_THRESHOLD_NS = TimeUnit.MILLISECONDS.toNanos(300L);

@FunctionalInterface
interface WireWriter<T> {
Expand All @@ -62,13 +61,12 @@ interface WireWriter<T> {
//
// *************************************************************************

@FunctionalInterface
public interface InternalAppender {
void writeBytes(long index, BytesStore bytes);
}

static class StoreAppender implements ExcerptAppender, ExcerptContext, InternalAppender {
public static final int REPEAT_WHILE_ROLLING = 128;
static final int REPEAT_WHILE_ROLLING = 128;
@NotNull
private final SingleChronicleQueue queue;
@NotNull
Expand Down Expand Up @@ -480,58 +478,13 @@ private void position(long position) {
this.position = position;
}

// only called for writeBytes(long index, BytesStore)
private void moveToIndexForWrite(long index) throws EOFException {
if (wire != null && wire.headerNumber() == index)
return;
int cycle = queue.rollCycle().toCycle(index);

ScanResult scanResult = moveToIndex(cycle, queue.rollCycle().toSequenceNumber(index));
switch (scanResult) {
case FOUND:
throw new IllegalStateException("Unable to move to index " + Long.toHexString(index) + " as the index already exists");
case NOT_REACHED:
throw new IllegalStateException("Unable to move to index " + Long.toHexString(index) + " beyond the end of the queue");
case NOT_FOUND:
case END_OF_FILE:
break;
default:
throw new IllegalStateException("Unknown ScanResult: " + scanResult);
}
}

ScanResult moveToIndex(int cycle, long sequenceNumber) throws UnrecoverableTimeoutException {
if (LOG.isDebugEnabled()) {
Jvm.debug().on(getClass(), "moveToIndex: " + Long.toHexString(cycle) + " " + Long.toHexString(sequenceNumber));
}

if (this.cycle != cycle) {
if (cycle > this.cycle)
rollCycleTo(cycle);
else
setCycle2(cycle, true);
}

ScanResult scanResult = this.store.moveToIndexForRead(this, sequenceNumber);
Bytes<?> bytes = wire.bytes();
if (scanResult == NOT_FOUND) {
// so you won't read any if it ran out of data.
bytes.writePosition(bytes.readPosition());
return scanResult;
}

bytes.readLimit(bytes.readPosition());
return scanResult;
}

@Override
public long lastIndexAppended() {

if (lastIndex != Long.MIN_VALUE)
return lastIndex;

if (lastPosition == Long.MIN_VALUE || wire == null) {

throw new IllegalStateException("nothing has been appended, so there is no last index");
}

Expand Down Expand Up @@ -1886,11 +1839,12 @@ public void lastAcknowledgedIndexReplicated(long acknowledgeIndex) {
return;
}

if (temp.store == null) {
WireStore store = temp.store;
if (store == null) {
Jvm.warn().on(getClass(), "Got an acknowledge index " + Long.toHexString(acknowledgeIndex) + " discarded.");
return;
}
temp.store.lastAcknowledgedIndexReplicated(acknowledgeIndex);
store.lastAcknowledgedIndexReplicated(acknowledgeIndex);
} finally {
temp.release();
}
Expand Down
Expand Up @@ -69,13 +69,6 @@ public class SingleChronicleQueueStore implements WireStore {
private LongValue lastIndexReplicated;
private int sourceId;

// used in sync replication
private BooleanValue isSyncQueueConnectedViaTcpIp;

// used in sync replication - time in milliseconds
private LongValue timeLastMessageReceivedViaTcpIp;


@NotNull

private transient Sequence sequence;
Expand Down Expand Up @@ -133,23 +126,6 @@ private SingleChronicleQueueStore(@NotNull WireIn wire) {
sourceId = wire.read(MetaDataField.sourceId).int32();
}

if (wire.bytes().readRemaining() > 0) {
this.isSyncQueueConnectedViaTcpIp = wire.read(MetaDataField.isSyncQueueConnectedViaTcpIp)
.boolForBinding(null);
} else {
this.isSyncQueueConnectedViaTcpIp = null; // disabled.
}

if (wire.bytes().readRemaining() > 0) {
this.timeLastMessageReceivedViaTcpIp = wire.read(MetaDataField
.timeLastMessageReceivedViaTcpIp)
.int64ForBinding(null);
} else {
this.timeLastMessageReceivedViaTcpIp = null; // disabled.
}



} finally {
assert wire.endUse();
}
Expand Down Expand Up @@ -227,8 +203,7 @@ public SingleChronicleQueueStore(@Nullable RollCycle rollCycle,
this.deltaCheckpointInterval = deltaCheckpointInterval;
this.lastIndexReplicated = wireType.newLongReference().get();
this.sourceId = sourceId;
this.isSyncQueueConnectedViaTcpIp = wireType.newBooleanReference().get();
this.timeLastMessageReceivedViaTcpIp = wireType.newLongReference().get();

}

public static void dumpStore(@NotNull Wire wire) {
Expand Down Expand Up @@ -273,28 +248,6 @@ public void lastAcknowledgedIndexReplicated(long newValue) {
lastAcknowledgedIndexReplicated.setMaxValue(newValue);
}

@Override
public boolean isSyncQueueConnectedViaTcpIp() {
return isSyncQueueConnectedViaTcpIp == null ? false : isSyncQueueConnectedViaTcpIp.getValue();
}

@Override
public void isSyncQueueConnectedViaTcpIp(boolean isConnected) {
if (isSyncQueueConnectedViaTcpIp != null)
isSyncQueueConnectedViaTcpIp.setValue(isConnected);
}

@Override
public long timeLastMessageReceivedViaTcpIp() {
return timeLastMessageReceivedViaTcpIp == null ? 0 : timeLastMessageReceivedViaTcpIp.getValue();
}

@Override
public void timeLastMessageReceivedViaTcpIp(long timeMs) {
if (timeLastMessageReceivedViaTcpIp != null)
timeLastMessageReceivedViaTcpIp.setValue(timeMs);
}


/**
* when using replication to another host, this is the last index that has been sent to the remote host.
Expand Down Expand Up @@ -452,9 +405,6 @@ public void writeMarshallable(@NotNull WireOut wire) {
if (Boolean.getBoolean("includeQueueHeaderField-lastIndexReplicated-and-sourceId")) {
wire.write(MetaDataField.lastIndexReplicated).int64forBinding(-1L, lastIndexReplicated);
wire.write(MetaDataField.sourceId).int32(sourceId);
wire.write(MetaDataField.isSyncQueueConnectedViaTcpIp).boolForBinding
(false, isSyncQueueConnectedViaTcpIp);
wire.write(MetaDataField.timeLastMessageReceivedViaTcpIp).int64forBinding(0L, timeLastMessageReceivedViaTcpIp);
}

wire.padToCacheAlign();
Expand Down

0 comments on commit 2f5e5a7

Please sign in to comment.