Skip to content

Commit

Permalink
Merge pull request #676 from CorfuDB/checkpointStack
Browse files Browse the repository at this point in the history
keep track of snapshot addresses, fix issue with fresh streams
  • Loading branch information
no2chem committed Jun 8, 2017
2 parents c2dd8b3 + 0c42668 commit 4b0caa4
Show file tree
Hide file tree
Showing 10 changed files with 477 additions and 202 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public enum CheckpointDictKey {
END_TIME(1),
START_LOG_ADDRESS(2),
ENTRY_COUNT(3),
BYTE_COUNT(4);
BYTE_COUNT(4),
SNAPSHOT_ADDRESS(5);

public final int type;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ public long startCheckpoint() {
ICorfuSMR<SMRMap> corfuObject = (ICorfuSMR<SMRMap>) this.map;
this.mdKV.put(CheckpointEntry.CheckpointDictKey.START_LOG_ADDRESS,
Long.toString(corfuObject.getCorfuSMRProxy().getVersion()));
this.mdKV.put(CheckpointEntry.CheckpointDictKey.SNAPSHOT_ADDRESS,
Long.toString(txBeginGlobalAddress));

ImmutableMap<CheckpointEntry.CheckpointDictKey,String> mdKV = ImmutableMap.copyOf(this.mdKV);
CheckpointEntry cp = new CheckpointEntry(CheckpointEntry.CheckpointEntryType.START,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,12 @@
* Created by mwei on 12/14/15.
*/
public class LogUnitException extends RuntimeException {

public LogUnitException() {

}

public LogUnitException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,10 @@
* that has been trimmed.
*/
public class TrimmedException extends LogUnitException {
public TrimmedException() {

}
public TrimmedException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.corfudb.runtime.exceptions;

/**
* This exception is thrown when a client attempts to resolve
* an upcall, but the address is trimmed before the upcall
* result can be resolved.
*
* Created by mwei on 6/7/17.
*/
public class TrimmedUpcallException extends TrimmedException {

public TrimmedUpcallException(long address) {
super("Attempted to get upcall result @" + address +
" but it was trimmed before we could read it");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@
import org.corfudb.protocols.logprotocol.SMREntry;
import org.corfudb.protocols.wireprotocol.TxResolutionInfo;
import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.exceptions.AbortCause;
import org.corfudb.runtime.exceptions.NetworkException;
import org.corfudb.runtime.exceptions.TransactionAbortedException;
import org.corfudb.runtime.exceptions.TrimmedException;
import org.corfudb.runtime.exceptions.*;
import org.corfudb.runtime.object.transactions.AbstractTransactionalContext;
import org.corfudb.runtime.object.transactions.TransactionalContext;
import org.corfudb.util.MetricsUtils;
Expand Down Expand Up @@ -252,24 +249,28 @@ private <R> R getUpcallResultInner(long timestamp, Object[] conflictObject) {
return ret == VersionLockedObject.NullValue.NULL_VALUE ? null : ret;
}

return underlyingObject.update(o-> {
o.syncObjectUnsafe(timestamp);
if (o.upcallResults.containsKey(timestamp)) {
log.trace("Upcall[{}] {} Sync'd", this, timestamp);
R ret = (R) o.upcallResults.get(timestamp);
o.upcallResults.remove(timestamp);
return ret == VersionLockedObject.NullValue.NULL_VALUE ? null : ret;
}
try {
return underlyingObject.update(o -> {
o.syncObjectUnsafe(timestamp);
if (o.upcallResults.containsKey(timestamp)) {
log.trace("Upcall[{}] {} Sync'd", this, timestamp);
R ret = (R) o.upcallResults.get(timestamp);
o.upcallResults.remove(timestamp);
return ret == VersionLockedObject.NullValue.NULL_VALUE ? null : ret;
}

// The version is already ahead, but we don't have the result.
// The only way to get the correct result
// of the upcall would be to rollback. For now, we throw an exception
// since this is generally not expected. --- and probably a bug if it happens.
throw new RuntimeException("Attempted to get the result " +
"of an upcall@" + timestamp + " but we are @"
+ underlyingObject.getVersionUnsafe() +
" and we don't have a copy");
});
// The version is already ahead, but we don't have the result.
// The only way to get the correct result
// of the upcall would be to rollback. For now, we throw an exception
// since this is generally not expected. --- and probably a bug if it happens.
throw new RuntimeException("Attempted to get the result " +
"of an upcall@" + timestamp + " but we are @"
+ underlyingObject.getVersionUnsafe() +
" and we don't have a copy");
});
} catch (TrimmedException ex) {
throw new TrimmedUpcallException(timestamp);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,11 @@ static class QueuedStreamContext extends AbstractStreamContext {
long checkpointSuccessBytes = 0L;
// No need to keep track of # of DATA entries, use context.resolvedQueue.size()?
long resolvedEstBytes = 0L;
/** The address the current checkpoint snapshot was taken at.
* The checkpoint guarantees for this stream there are no entries
* between checkpointSuccessStartAddr and checkpointSnapshotAddress.
*/
long checkpointSnapshotAddress = Address.NEVER_READ;

/** Create a new stream context with the given ID and maximum address
* to read to.
Expand All @@ -395,6 +400,7 @@ void reset() {
checkpointSuccessID = null;
checkpointSuccessStartAddr = Address.NEVER_READ;
checkpointSuccessEndAddr = Address.NEVER_READ;
checkpointSnapshotAddress = Address.NEVER_READ;
checkpointSuccessNumEntries = 0;
checkpointSuccessBytes = 0;
resolvedEstBytes = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.corfudb.protocols.logprotocol.CheckpointEntry;
import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.exceptions.OverwriteException;
import org.corfudb.runtime.exceptions.TrimmedException;
import org.corfudb.runtime.view.Address;
import org.corfudb.util.Utils;

Expand Down Expand Up @@ -241,6 +242,10 @@ else if (data.getCheckpointID().equals(context.checkpointSuccessID)) {
cpStartAddr = data.getGlobalAddress();
}
context.checkpointSuccessStartAddr = cpStartAddr;
if (cpEntry.getDict().get(CheckpointEntry.CheckpointDictKey.SNAPSHOT_ADDRESS) != null) {
context.checkpointSnapshotAddress = Long.decode(cpEntry.getDict()
.get(CheckpointEntry.CheckpointDictKey.SNAPSHOT_ADDRESS));
}
log.trace("Checkpoint[{}] HALT due to START at address {} startAddr {} type {} id {} author {}",
this, data.getGlobalAddress(), cpStartAddr, cpEntry.getCpType(),
Utils.toReadableID(cpEntry.getCheckpointID()), cpEntry.getCheckpointAuthorID());
Expand Down Expand Up @@ -271,14 +276,20 @@ protected boolean fillReadQueue(final long maxGlobal,
final UUID checkpointID = CorfuRuntime
.getStreamID(context.id.toString() + "_cp");
// Find the checkpoint, if present
if (followBackpointers(checkpointID, context.readCpQueue,
runtime.getSequencerView()
.nextToken(Collections.singleton(checkpointID), 0)
.getToken().getTokenValue()
, Address.NEVER_READ, d -> resolveCheckpoint(context, d))) {
log.trace("Read_Fill_Queue[{}] Using checkpoint with {} entries",
this, context.readCpQueue.size());
return true;
try {
if (followBackpointers(checkpointID, context.readCpQueue,
runtime.getSequencerView()
.nextToken(Collections.singleton(checkpointID), 0)
.getToken().getTokenValue()
, Address.NEVER_READ, d -> resolveCheckpoint(context, d))) {
log.trace("Read_Fill_Queue[{}] Using checkpoint with {} entries",
this, context.readCpQueue.size());
return true;
}
} catch (TrimmedException te) {
// If we reached a trim and didn't hit a checkpoint, this might be okay,
// if the stream was created recently and no checkpoint exists yet.
log.trace("Read_Fill_Queue[{}] Trim encountered and no checkpoint detected.", this);
}
}

Expand Down Expand Up @@ -335,12 +346,14 @@ protected boolean fillReadQueue(final long maxGlobal,

// Now we start traversing backpointers, if they are available. We
// start at the latest token and go backward, until we reach the
// log pointer. For each address which is less than
// log pointer -or- the checkpoint snapshot address, because all
// values from the beginning of the stream up to the snapshot address
// should be reflected. For each address which is less than
// maxGlobalAddress, we insert it into the read queue.

followBackpointers(context.id, context.readQueue,
latestTokenValue,
context.globalPointer,
Long.max(context.globalPointer, context.checkpointSnapshotAddress),
d -> BackpointerOp.INCLUDE);

return ! context.readCpQueue.isEmpty() || !context.readQueue.isEmpty();
Expand Down

0 comments on commit 4b0caa4

Please sign in to comment.