Skip to content

Commit

Permalink
Merge pull request #680 from CorfuDB/split-of-pr670-part-1
Browse files Browse the repository at this point in the history
Part 1 of splitting apart PR #670: non-unit-test stuff
  • Loading branch information
no2chem committed Jun 9, 2017
2 parents 25d14c4 + 4efbb97 commit d26a75f
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package org.corfudb.runtime;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.corfudb.protocols.logprotocol.CheckpointEntry;
import org.corfudb.runtime.collections.SMRMap;
import org.corfudb.runtime.exceptions.TransactionAbortedException;
import org.corfudb.runtime.object.CorfuCompileProxy;
import org.corfudb.runtime.object.ICorfuSMR;
import org.corfudb.util.Utils;
import org.corfudb.util.serializer.ISerializer;

import java.util.ArrayList;
Expand All @@ -17,7 +20,7 @@
/**
* Checkpoint multiple SMRMaps serially as a prerequisite for a later log trim.
*/

@Slf4j
public class MultiCheckpointWriter {
@Getter
private List<ICorfuSMR<Map>> maps = new ArrayList<>();
Expand Down Expand Up @@ -64,20 +67,37 @@ public long appendCheckpoints(CorfuRuntime rt, String author,
BiConsumer<CheckpointEntry,Long> postAppendFunc)
throws Exception {
long globalAddress = CheckpointWriter.startGlobalSnapshotTxn(rt);
log.trace("appendCheckpoints: author '{}' at globalAddress {} begins",
author, globalAddress);

try {
for (ICorfuSMR<Map> map : maps) {
UUID streamID = map.getCorfuStreamID();
CheckpointWriter cpw = new CheckpointWriter(rt, streamID, author, (SMRMap) map);
ISerializer serializer =
((CorfuCompileProxy<Map>) map.getCorfuSMRProxy())
.getSerializer();
cpw.setSerializer(serializer);
cpw.setPostAppendFunc(postAppendFunc);
List<Long> addresses = cpw.appendCheckpoint();
checkpointLogAddresses.addAll(addresses);
while (true) {
CheckpointWriter cpw = new CheckpointWriter(rt, streamID, author, (SMRMap) map);
ISerializer serializer =
((CorfuCompileProxy<Map>) map.getCorfuSMRProxy())
.getSerializer();
cpw.setSerializer(serializer);
cpw.setPostAppendFunc(postAppendFunc);
log.trace("appendCheckpoints: checkpoint map {} begin",
Utils.toReadableID(map.getCorfuStreamID()));
try {
List<Long> addresses = cpw.appendCheckpoint();
log.trace("appendCheckpoints: checkpoint map {} end",
Utils.toReadableID(map.getCorfuStreamID()));
checkpointLogAddresses.addAll(addresses);
break;
} catch (TransactionAbortedException ae) {
log.trace("appendCheckpoints: checkpoint map {} TransactionAbortedException, retry",
Utils.toReadableID(map.getCorfuStreamID()));
// Don't break!
}
}
}
} finally {
log.trace("appendCheckpoints: author '{}' at globalAddress {} finished",
author, globalAddress);
rt.getObjectsView().TXEnd();
}
return globalAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ protected ILogData getNextEntry(QueuedStreamContext context,
}

// Otherwise we remove entries one at a time from the read queue.
// The entry may not actually be part of the stream, so we might
// have to perform several reads.
if (getFrom.size() > 0) {
final long thisRead = getFrom.pollFirst();
ILogData ld = read(thisRead);
Expand Down

0 comments on commit d26a75f

Please sign in to comment.