Skip to content

Commit

Permalink
Revert "reset logunit server (#1052)"
Browse files Browse the repository at this point in the history
This reverts commit 8e63b38.
  • Loading branch information
no2chem committed Dec 8, 2017
1 parent 8e63b38 commit d5286de
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -301,18 +301,6 @@ private void rangeWrite(CorfuPayloadMsg<RangeWriteMsg> msg,
r.sendResponse(ctx, msg, CorfuMsgType.WRITE_OK.msg());
}

/**
* Resets the log unit server.
* Warning: Clears all data.
*/
@ServerHandler(type = CorfuMsgType.RESET_LOGUNIT, opTimer = metricsPrefix + "resetLogUnit")
private void resetLogUnit(CorfuMsg msg, ChannelHandlerContext ctx, IServerRouter r,
boolean isMetricsEnabled) {
streamLog.reset();
dataCache.invalidateAll();
r.sendResponse(ctx, msg, CorfuMsgType.ACK.msg());
}

/**
* Retrieve the LogUnitEntry from disk, given an address.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,4 @@ public synchronized void compact() {
}
}
}

@Override
public void reset() {
startingAddress = 0;
globalTail.set(0L);
// Clear the trimmed addresses record.
trimmed.clear();
// Clearing all data from the cache.
logCache.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,4 @@ public interface StreamLog {
* @param address address to release
*/
void release(long address, LogData entry);

/**
* Clears all data and resets all segment handlers.
*/
void reset();
}
Original file line number Diff line number Diff line change
Expand Up @@ -381,19 +381,48 @@ private void trimPrefix() {
}

// Close segments before deleting their corresponding log files
closeSegmentHandlers(endSegment);
int numFiles = 0;
long freedBytes = 0;
for (SegmentHandle sh : writeChannels.values()) {
if (sh.getSegment() <= endSegment) {
if (sh.getRefCount() != 0) {
log.warn("trimPrefix: Segment {} is trimmed, but refCount is {},"
+ " attempting to trim anyways", sh.getSegment(),
sh.getRefCount());
}
sh.close();
writeChannels.remove(sh.getFileName());
}
}

deleteFilesMatchingFilter(file -> {
try {
String segmentStr = file.getName().split("\\.")[0];
return Long.parseLong(segmentStr) < endSegment;
} catch (Exception e) {
log.warn("trimPrefix: ignoring file {}", file.getName());
return false;
File dir = new File(logDir);
FileFilter fileFilter = new FileFilter() {
public boolean accept(File file) {
try {
String segmentStr = file.getName().split("\\.")[0];
return Long.parseLong(segmentStr) < endSegment;
} catch (Exception e) {
log.warn("trimPrefix: ignoring file {}", file.getName());
return false;
}
}
});
};

File[] files = dir.listFiles(fileFilter);

for (File file : files) {
long delta = file.length();

if (!file.delete()) {
log.error("trimPrefix: Couldn't delete file {}", file.getName());
} else {
freedBytes += delta;
numFiles++;
}
}

log.info("trimPrefix: completed, end segment {}", endSegment);
log.info("trimPrefix: completed, deleted {} files, freed {} bytes, end segment {}",
numFiles, freedBytes, endSegment);
}

private void spaseCompact() {
Expand Down Expand Up @@ -1161,82 +1190,6 @@ public void close() {
public void release(long address, LogData entry) {
}

/**
* Closes all segment handlers up to and including the handler for the endSegment.
*
* @param endSegment The segment index of the last segment up to (including) the end segment.
*/
private void closeSegmentHandlers(long endSegment) {
for (SegmentHandle sh : writeChannels.values()) {
if (sh.getSegment() <= endSegment) {
if (sh.getRefCount() != 0) {
log.warn("closeSegmentHandlers: Segment {} is trimmed, but refCount is {},"
+ " attempting to trim anyways", sh.getSegment(),
sh.getRefCount());
}
sh.close();
writeChannels.remove(sh.getFileName());
}
}
}

/**
* Deletes all files matching the given filter.
*
* @param fileFilter File filter to delete files.
*/
private void deleteFilesMatchingFilter(FileFilter fileFilter) {
int numFiles = 0;
long freedBytes = 0;
File dir = new File(logDir);
File[] files = dir.listFiles(fileFilter);
for (File file : files) {
long delta = file.length();

if (!file.delete()) {
log.error("deleteFilesMatchingFilter: Couldn't delete file {}", file.getName());
} else {
freedBytes += delta;
numFiles++;
}
}
log.info("deleteFilesMatchingFilter: completed, deleted {} files, freed {} bytes",
numFiles, freedBytes);
}

/**
* Resets the Stream log.
* Clears all data and resets the handlers.
* Usage: To heal a recovering node, we require to wipe off existing data.
*/
@Override
public void reset() {
// Trim all segments
long endSegment = (globalTail.get() / RECORDS_PER_LOG_FILE);
log.warn("Global Tail:{}, endSegment={}", globalTail.get(), endSegment);

// Close segments before deleting their corresponding log files
closeSegmentHandlers(endSegment);

deleteFilesMatchingFilter(file -> {
try {
String segmentStr = file.getName().split("\\.")[0];
return Long.parseLong(segmentStr) <= endSegment;
} catch (Exception e) {
log.warn("reset: ignoring file {}", file.getName());
return false;
}
});

serverContext.setStartingAddress(0L);
serverContext.setTailSegment(0L);
globalTail.set(0L);
initializeStartingAddress();
initializeMaxGlobalAddress();

log.info("reset: Completed, end segment {}", endSegment);
}

@VisibleForTesting
Set<FileChannel> getChannelsToSync() {
return channelsToSync;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public enum CorfuMsgType {
FLUSH_CACHE(44, TypeToken.of(CorfuMsg.class), true),
TRIM_MARK_REQUEST(45, TypeToken.of(CorfuMsg.class), true),
TRIM_MARK_RESPONSE(46, new TypeToken<CorfuPayloadMsg<Long>>(){}, true),
RESET_LOGUNIT(47, TypeToken.of(CorfuMsg.class)),

WRITE_OK(50, TypeToken.of(CorfuMsg.class)),
ERROR_TRIMMED(51, TypeToken.of(CorfuMsg.class)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,11 +475,4 @@ public CompletableFuture<Boolean> writeRange(List<LogData> range) {
return router.sendMessageAndGetCompletable(CorfuMsgType.RANGE_WRITE
.payloadMsg(new RangeWriteMsg(range)));
}

/**
* Send a reset request.
*/
public CompletableFuture<Boolean> resetLogUnit() {
return router.sendMessageAndGetCompletable(CorfuMsgType.RESET_LOGUNIT.msg());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -655,44 +655,4 @@ public void testPrefixTrimAfterRestart() {
final int lastTwoSegmentsFiles = 3 * 2;
assertThat(logs.list()).hasSize(lastTwoSegmentsFiles);
}

/**
* Generates and writes 3 files worth data. Then resets the stream log and verifies that the
* files and data is cleared.
*/
@Test
public void testResetStreamLog() {
String logDir = getContext().getServerConfig().get("--log-path") + File.separator + "log";
StreamLog log = new StreamLogFiles(getContext(), false);

final long numSegments = 3;
for (long x = 0; x < RECORDS_PER_LOG_FILE * numSegments; x++) {
writeToLog(log, x);
}
final long filesToBeTrimmed = 1;
log.prefixTrim(RECORDS_PER_LOG_FILE * (filesToBeTrimmed + 1));
log.compact();

File logsDir = new File(logDir);

final int expectedFilesBeforeReset = (int) ((numSegments - filesToBeTrimmed) * 3);
final long globalTailBeforeReset = (RECORDS_PER_LOG_FILE * numSegments) - 1;
final long trimMarkBeforeReset = (RECORDS_PER_LOG_FILE * (filesToBeTrimmed + 1)) + 1;
assertThat(logsDir.list()).hasSize(expectedFilesBeforeReset);
assertThat(log.getGlobalTail()).isEqualTo(globalTailBeforeReset);
assertThat(log.getTrimMark()).isEqualTo(trimMarkBeforeReset);

log.reset();

// Files have been deleted and new 0.log files are created due to reset.
Arrays.stream(logsDir.list()).forEach(
s -> assertThat(new File(s).length()).isEqualTo(0L));

final int expectedFilesAfterReset = 3;
final long globalTailAfterReset = 0L;
final long trimMarkAfterReset = 0L;
assertThat(logsDir.list()).hasSize(expectedFilesAfterReset);
assertThat(log.getGlobalTail()).isEqualTo(globalTailAfterReset);
assertThat(log.getTrimMark()).isEqualTo(trimMarkAfterReset);
}
}

0 comments on commit d5286de

Please sign in to comment.