Skip to content

Commit

Permalink
Refactor UfsJournal
Browse files Browse the repository at this point in the history
  • Loading branch information
peisun1115 committed Apr 4, 2017
1 parent 06c21e9 commit 57ae51d
Show file tree
Hide file tree
Showing 15 changed files with 556 additions and 452 deletions.
Expand Up @@ -24,16 +24,11 @@
import alluxio.util.URIUtils;
import alluxio.util.UnderFileSystemUtils;

import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;

import javax.annotation.concurrent.ThreadSafe;

Expand Down Expand Up @@ -142,15 +137,10 @@ public void format() throws IOException {
LOG.info("Formatting {}", location);
if (mUfs.isDirectory(location.toString())) {
for (UnderFileStatus status : mUfs.listStatus(location.toString())) {
URI childPath = URIUtils.appendPathOrDie(location, status.getName());
boolean failedToDelete;
if (status.isDirectory()) {
failedToDelete = !mUfs
.deleteDirectory(childPath.toString(), DeleteOptions.defaults().setRecursive(true));
} else {
failedToDelete = !mUfs.deleteFile(childPath.toString());
}
if (failedToDelete) {
String childPath = URIUtils.appendPathOrDie(location, status.getName()).toString();
if (status.isDirectory()
&& !mUfs.deleteDirectory(childPath, DeleteOptions.defaults().setRecursive(true))
|| status.isFile() && !mUfs.deleteFile(childPath)) {
throw new IOException(String.format("Failed to delete %s", childPath));
}
}
Expand Down Expand Up @@ -191,224 +181,4 @@ URI getTmpDir() {
UnderFileSystem getUfs() {
return mUfs;
}

/**
* Encodes a checkpoint location under the checkpoint directory.
*
* @param end the end sequence number (exclusive)
* @return the location
*/
URI encodeCheckpointFileLocation(long end) {
String filename = String.format("0x%x-0x%x", 0, end);
URI location = URIUtils.appendPathOrDie(getCheckpointDir(), filename);
return location;
}

/**
* Creates a log location under the log directory.
*
* @param start the start sequence number (inclusive)
* @param end the end sequence number (exclusive)
* @return the location
*/
URI encodeLogFileLocation(long start, long end) {
String filename = String.format("0x%x-0x%x", start, end);
URI location = URIUtils.appendPathOrDie(getLogDir(), filename);
return location;
}

/**
* Creates a temporary location under the temporary directory.
*
* @return the location
*/
URI encodeTemporaryCheckpointFileLocation() {
return URIUtils.appendPathOrDie(getTmpDir(), UUID.randomUUID().toString());
}

/**
* Decodes a checkpoint or a log file name into a {@link UfsJournalFile}.
*
* @param filename the filename
* @return the instance of {@link UfsJournalFile}, null if the file invalid
*/
UfsJournalFile decodeLogFile(String filename) {
URI location = URIUtils.appendPathOrDie(getLogDir(), filename);
try {
String[] parts = filename.split("-");

// There can be temporary files in logs directory. Skip them.
if (parts.length != 2) {
return null;
}
long start = Long.decode(parts[0]);
long end = Long.decode(parts[1]);
return UfsJournalFile.createLogFile(location, start, end);
} catch (IllegalStateException e) {
LOG.error("Illegal journal file {}.", location);
throw e;
} catch (NumberFormatException e) {
// There can be temporary files (e.g. created for rename).
return null;
}
}

/**
* Decodes a checkpoint file name into a {@link UfsJournalFile}.
*
* @param filename the filename
* @return the instance of {@link UfsJournalFile}, null if the file invalid
*/
UfsJournalFile decodeCheckpointFile(String filename) {
URI location = URIUtils.appendPathOrDie(getCheckpointDir(), filename);
try {
String[] parts = filename.split("-");

// There can be temporary files in logs directory. Skip them.
if (parts.length != 2) {
return null;
}
long start = Long.decode(parts[0]);
long end = Long.decode(parts[1]);

Preconditions.checkState(start == 0);
return UfsJournalFile.createCheckpointFile(location, end);
} catch (IllegalStateException e) {
LOG.error("Illegal journal file {}.", location);
throw e;
} catch (NumberFormatException e) {
// There can be temporary files (e.g. created for rename).
return null;
}
}

/**
* Decodes a temporary checkpoint file name into a {@link UfsJournalFile}.
*
* @param filename the temporary checkpoint file name
* @return the instance of {@link UfsJournalFile}
*/
UfsJournalFile decodeTemporaryCheckpointFile(String filename) {
URI location = URIUtils.appendPathOrDie(getTmpDir(), filename);
return UfsJournalFile.createTmpCheckpointFile(location);
}

/**
* A snapshot of everything in the journal.
*/
static class Snapshot {
/** The committed checkpoints. */
final List<UfsJournalFile> mCheckpoints;
/** The journal edit logs including the incomplete log. */
final List<UfsJournalFile> mLogs;
/** The temporary checkpoint files. */
final List<UfsJournalFile> mTemporaryCheckpoints;

/**
* Creates an instance of the journal snapshot.
*
* @param checkpoints the checkpoints
* @param logs the logs including the incomplete log
* @param temporaryCheckpoints the temporary checkpoint files
*/
Snapshot(List<UfsJournalFile> checkpoints, List<UfsJournalFile> logs,
List<UfsJournalFile> temporaryCheckpoints) {
mCheckpoints = checkpoints;
mLogs = logs;
mTemporaryCheckpoints = temporaryCheckpoints;
}
}

/**
* Creates a snapshot of the journal.
*
* @return the journal snapshot
* @throws IOException if any I/O errors occur
*/
Snapshot getSnapshot() throws IOException {
// Checkpoints.
List<UfsJournalFile> checkpoints = new ArrayList<>();
UnderFileStatus[] statuses = mUfs.listStatus(getCheckpointDir().toString());
if (statuses != null) {
for (UnderFileStatus status : statuses) {
UfsJournalFile file = decodeCheckpointFile(status.getName());
if (file != null) {
checkpoints.add(file);
}
}
Collections.sort(checkpoints);
}

List<UfsJournalFile> logs = new ArrayList<>();
statuses = mUfs.listStatus(getLogDir().toString());
if (statuses != null) {
for (UnderFileStatus status : statuses) {
UfsJournalFile file = decodeLogFile(status.getName());
if (file != null) {
logs.add(file);
}
}
Collections.sort(logs);
}

List<UfsJournalFile> tmpCheckpoints = new ArrayList<>();
statuses = mUfs.listStatus(getTmpDir().toString());
if (statuses != null) {
for (UnderFileStatus status : statuses) {
tmpCheckpoints.add(decodeTemporaryCheckpointFile(status.getName()));
}
}

return new Snapshot(checkpoints, logs, tmpCheckpoints);
}

/**
* Gets the current log (the incomplete log) that is being written to.
*
* @return the current log
* @throws IOException if any I/O errors occur
*/
UfsJournalFile getCurrentLog() throws IOException {
List<UfsJournalFile> logs = new ArrayList<>();
UnderFileStatus[] statuses = mUfs.listStatus(getLogDir().toString());
if (statuses != null) {
for (UnderFileStatus status : statuses) {
UfsJournalFile file = decodeLogFile(status.getName());
if (file != null) {
logs.add(file);
}
}
if (!logs.isEmpty()) {
UfsJournalFile file = Collections.max(logs);
if (file.isIncompleteLog()) {
return file;
}
}
}
return null;
}

/**
* Gets the first journal log sequence number that is not yet checkpointed.
*
* @return the first journal log sequence number that is not yet checkpointed
* @throws IOException if any I/O errors occur
*/
long getNextLogSequenceToCheckpoint() throws IOException {
List<UfsJournalFile> checkpoints = new ArrayList<>();
UnderFileStatus[] statuses = mUfs.listStatus(getCheckpointDir().toString());
if (statuses != null) {
for (UnderFileStatus status : statuses) {
UfsJournalFile file = decodeCheckpointFile(status.getName());
if (file != null) {
checkpoints.add(file);
}
}
Collections.sort(checkpoints);
}
if (checkpoints.isEmpty()) {
return 0;
}
return checkpoints.get(checkpoints.size() - 1).getEnd();
}
}
Expand Up @@ -67,10 +67,10 @@ final class UfsJournalCheckpointWriter implements JournalWriter {
throws IOException {
mJournal = Preconditions.checkNotNull(journal);

mTmpCheckpointFileLocation = mJournal.encodeTemporaryCheckpointFileLocation();
mTmpCheckpointFileLocation = UfsJournalFile.encodeTemporaryCheckpointFileLocation(mJournal);
mTmpCheckpointStream = mJournal.getUfs().create(mTmpCheckpointFileLocation.toString());
mCheckpointFile = UfsJournalFile.createCheckpointFile(
mJournal.encodeCheckpointFileLocation(options.getNextSequenceNumber()),
UfsJournalFile.encodeCheckpointFileLocation(mJournal, options.getNextSequenceNumber()),
options.getNextSequenceNumber());
}

Expand Down Expand Up @@ -103,10 +103,10 @@ public void close() throws IOException {
mTmpCheckpointStream.close();

// Delete the temporary checkpoint if there is a newer checkpoint committed.
UfsJournal.Snapshot snapshot = mJournal.getSnapshot();
if (snapshot != null && !snapshot.mCheckpoints.isEmpty()) {
UfsJournalSnapshot snapshot = UfsJournalSnapshot.getSnapshot(mJournal);
if (snapshot != null && !snapshot.getCheckpoints().isEmpty()) {
UfsJournalFile checkpoint =
snapshot.mCheckpoints.get(snapshot.mCheckpoints.size() - 1);
snapshot.getCheckpoints().get(snapshot.getCheckpoints().size() - 1);
if (mCheckpointFile.getEnd() <= checkpoint.getEnd()) {
mJournal.getUfs().deleteFile(mTmpCheckpointFileLocation.toString());
return;
Expand Down

0 comments on commit 57ae51d

Please sign in to comment.