Skip to content

Commit

Permalink
Delta reset (#3637)
Browse files Browse the repository at this point in the history
* Add TransferSegmentBuilderTest

* Rearrange code in TransferSegment

* Rearrange code in TransferSegment part 2

* Rearrange code in TransferSegment part 3

* RedundancyCalculatorTest small changes

* TransferSegmentRangeSingleBuilder clean ups

* clean-ups

* StreamLog reset for committed tail state transfer (delta reset)

* StreamLog reset for committed tail state transfer (delta reset). Small clean-ups

* fix RedundancyCalculatorTest

* StreamLogFilesTest get back to junit 4

* Full reset when not in prefix

* Fix StateTransferIT

* Fix runtime resource leaks in AbstractViewTest

* Fix corfu runtime leaks in integration tests

* Disable test logs output

* Fix reset test

* clean-ups

* get rid of fullreset

* Fix StreamingIT checkpointing method

---------

Co-authored-by: Pavel Zaytsev <zaytsev.pavel.a@gmail.com>
  • Loading branch information
xnull and PavelZaytsev committed Jun 21, 2023
1 parent 24d37e5 commit ad01838
Show file tree
Hide file tree
Showing 50 changed files with 909 additions and 762 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,12 @@ private void process() {
"logunit.write.timer", "type", "single");
break;
case RANGE_WRITE:
List<LogData> range = payload.getRangeWriteLogRequest().getLogDataList()
.stream().map(CorfuProtocolLogData::getLogData).collect(Collectors.toList());
List<LogData> range = payload.getRangeWriteLogRequest()
.getLogDataList()
.stream()
.map(CorfuProtocolLogData::getLogData)
.collect(Collectors.toList());

MicroMeterUtils.time(() -> streamLog.append(range),
"logunit.write.timer", "type", "range");
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.corfudb.runtime.proto.service.CorfuMessage.PriorityLevel;
import org.corfudb.runtime.proto.service.CorfuMessage.RequestMsg;
import org.corfudb.runtime.proto.service.CorfuMessage.RequestPayloadMsg.PayloadCase;
import org.corfudb.runtime.proto.service.CorfuMessage.ResponseMsg;
import org.corfudb.runtime.view.stream.StreamAddressSpace;
import org.corfudb.util.Utils;

Expand Down Expand Up @@ -239,8 +240,11 @@ private void handleCommittedTailRequest(RequestMsg req, ChannelHandlerContext ct

// Note: we reuse the request header as the ignore_cluster_id and
// ignore_epoch fields are the same in both cases.
router.sendResponse(getResponseMsg(getHeaderMsg(req.getHeader()),
getCommittedTailResponseMsg(streamLog.getCommittedTail())), ctx);
ResponseMsg committedTailResp = getResponseMsg(
getHeaderMsg(req.getHeader()),
getCommittedTailResponseMsg(streamLog.getCommittedTail())
);
router.sendResponse(committedTailResp, ctx);
}

/**
Expand Down Expand Up @@ -433,8 +437,11 @@ private void handleKnownAddressRequest(RequestMsg req, ChannelHandlerContext ctx

// Note: we reuse the request header as the ignore_cluster_id and
// ignore_epoch fields are the same in both cases.
router.sendResponse(getResponseMsg(getHeaderMsg(req.getHeader()),
getKnownAddressResponseMsg(knownAddresses)), ctx);
ResponseMsg responseMsg = getResponseMsg(
getHeaderMsg(req.getHeader()),
getKnownAddressResponseMsg(knownAddresses)
);
router.sendResponse(responseMsg, ctx);
} catch (Exception e) {
handleException(e, ctx, req, router);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ private boolean isBootstrapped(RequestMsg req) {
public synchronized void handleOrchestratorMsg(@Nonnull RequestMsg req,
@Nonnull ChannelHandlerContext ctx,
@Nonnull IServerRouter r) {
log.debug("handleOrchestratorMsg: {}",
TextFormat.shortDebugString(req.getPayload().getOrchestratorRequest()));
String orchestratorMsg = TextFormat.shortDebugString(req.getPayload().getOrchestratorRequest());
log.debug("handleOrchestratorMsg: {}", orchestratorMsg);
orchestrator.handle(req, ctx, r);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ public class DataStore implements KvDataStore {
* @param opts map of option strings
* @param cleanupTask method to cleanup DataStore files
*/
public DataStore(@Nonnull Map<String, Object> opts,
@Nonnull Consumer<String> cleanupTask) {
public DataStore(@Nonnull Map<String, Object> opts, @Nonnull Consumer<String> cleanupTask) {

if ((opts.containsKey("--memory") && (Boolean) opts.get("--memory")) || !opts.containsKey("--log-path")) {
boolean isInMem = opts.containsKey("--memory") && (Boolean) opts.get("--memory");
if (isInMem || !opts.containsKey("--log-path")) {
this.logDirPath = null;
this.cleanupTask = fileName -> {};
cache = buildMemoryDs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import lombok.extern.slf4j.Slf4j;
import org.corfudb.infrastructure.BatchProcessor.BatchProcessorContext;
import org.corfudb.infrastructure.datastore.DataStore;
import org.corfudb.infrastructure.log.FileSystemAgent.FileSystemConfig;
import org.corfudb.protocols.wireprotocol.LogData;
import org.corfudb.protocols.wireprotocol.StreamsAddressResponse;
Expand Down Expand Up @@ -37,6 +38,7 @@ public class InMemoryStreamLog implements StreamLog {
private volatile LogMetadata logMetadata;
private AtomicLong committedTail;
private final FileSystemAgent fsAgent;
private final StreamLogDataStore dataStore;

/**
* Returns an object that stores a stream log in memory.
Expand All @@ -45,7 +47,11 @@ public InMemoryStreamLog(BatchProcessorContext batchProcessorContext) {
logCache = new ConcurrentHashMap<>();
trimmed = ConcurrentHashMap.newKeySet();
startingAddress = 0;
logMetadata = new LogMetadata();
Map<String, Object> opts = new HashMap<>();
opts.put("--memory", true);
this.dataStore = new StreamLogDataStore(new DataStore(opts, fileName -> {
}));
logMetadata = new LogMetadata(dataStore);
committedTail = new AtomicLong(Address.NON_ADDRESS);

Path dummyLogDir = new File(".").toPath().toAbsolutePath();
Expand Down Expand Up @@ -223,7 +229,7 @@ public synchronized void compact() {
@Override
public void reset() {
startingAddress = 0;
logMetadata = new LogMetadata();
logMetadata = new LogMetadata(dataStore);
// Clear the trimmed addresses record.
trimmed.clear();
// Clearing all data from the cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.util.Map;
import java.util.UUID;

import static org.corfudb.infrastructure.log.StreamLogFiles.RECORDS_PER_LOG_FILE;


/**
* A container object that holds log tail offsets and the global
Expand All @@ -29,6 +31,8 @@
@Slf4j
public class LogMetadata {

private final StreamLogDataStore dataStore;

@Getter
private volatile long globalTail;

Expand All @@ -38,10 +42,11 @@ public class LogMetadata {
@Getter
private final Map<UUID, Long> streamTails;

public LogMetadata() {
public LogMetadata(StreamLogDataStore dataStore) {
this.globalTail = Address.NON_ADDRESS;
this.streamTails = new HashMap<>();
this.streamsAddressSpaceMap = new HashMap<>();
this.dataStore = dataStore;
}

public void update(List<LogData> entries) {
Expand Down Expand Up @@ -158,4 +163,14 @@ public void prefixTrim(long address) {
streamAddressMap.getValue().trim(address);
}
}

public void syncTailSegment(long address) {
// TODO(Maithem) since writing a record and setting the tail segment is not
// an atomic operation, it is possible to set an incorrect tail segment. In
// that case we will need to scan more than one segment
updateGlobalTail(address);
long segment = address / RECORDS_PER_LOG_FILE;

dataStore.updateTailSegment(segment);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.NonNull;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.corfudb.common.metrics.micrometer.MicroMeterUtils;
import org.corfudb.infrastructure.ResourceQuota;
import org.corfudb.protocols.wireprotocol.LogData;
import org.corfudb.runtime.exceptions.DataCorruptionException;

import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
Expand Down Expand Up @@ -50,7 +52,8 @@
* @author Maithem
*/
@Slf4j
public class Segment {
@ToString
public class Segment implements Closeable {
public static final int METADATA_SIZE = LogFormat.Metadata.newBuilder()
.setLengthChecksum(-1)
.setPayloadChecksum(-1)
Expand All @@ -66,9 +69,11 @@ public class Segment {

final long id;

@ToString.Exclude
@NonNull
private FileChannel writeChannel;

@ToString.Exclude
@NonNull
private FileChannel readChannel;

Expand All @@ -77,10 +82,12 @@ public class Segment {

private boolean isDirty;

@ToString.Exclude
private final Index index;

private int refCount = 0;

@ToString.Exclude
private final ResourceQuota logSize;

public Segment(long segmentId, int segmentSize, Path segmentsDir, ResourceQuota logSize) {
Expand Down Expand Up @@ -533,6 +540,7 @@ public synchronized void release() {
refCount--;
}

@Override
public void close() {

Set<FileChannel> channels = new HashSet<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class StreamLogDataStore {
COMMITTED_TAIL_PREFIX, COMMITTED_TAIL_KEY, Long.class
);

private static final long ZERO_ADDRESS = 0L;
public static final long ZERO_ADDRESS = 0L;

@NonNull
private final KvDataStore dataStore;
Expand Down Expand Up @@ -92,6 +92,15 @@ public void updateTailSegment(long newTailSegment) {
tailSegment.set(newTailSegment);
}

/**
* Reset starting address.
*/
public void resetStartingAddress(long newAddress) {
log.info("Reset starting address. Current address: {}", startingAddress.get());
dataStore.put(STARTING_ADDRESS_RECORD, newAddress);
startingAddress.set(newAddress);
}

/**
* Returns the dataStore starting address.
*
Expand Down Expand Up @@ -152,19 +161,10 @@ public synchronized void updateCommittedTail(long newCommittedTail) {
/**
* Reset tail segment.
*/
public void resetTailSegment() {
public void resetTailSegment(long latestAddress) {
log.info("Reset tail segment. Current segment: {}", tailSegment.get());
dataStore.put(TAIL_SEGMENT_RECORD, ZERO_ADDRESS);
tailSegment.set(ZERO_ADDRESS);
}

/**
* Reset starting address.
*/
public void resetStartingAddress() {
log.info("Reset starting address. Current address: {}", startingAddress.get());
dataStore.put(STARTING_ADDRESS_RECORD, ZERO_ADDRESS);
startingAddress.set(ZERO_ADDRESS);
dataStore.put(TAIL_SEGMENT_RECORD, latestAddress);
tailSegment.set(latestAddress);
}

/**
Expand Down

0 comments on commit ad01838

Please sign in to comment.