Skip to content

Commit

Permalink
Add append to JournalContext
Browse files Browse the repository at this point in the history
  • Loading branch information
gpang committed Mar 31, 2017
1 parent fef01c5 commit 9ba1de2
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 83 deletions.
63 changes: 21 additions & 42 deletions core/server/common/src/main/java/alluxio/master/AbstractMaster.java
Expand Up @@ -18,7 +18,7 @@
import alluxio.exception.PreconditionMessage; import alluxio.exception.PreconditionMessage;
import alluxio.master.journal.AsyncJournalWriter; import alluxio.master.journal.AsyncJournalWriter;
import alluxio.master.journal.Journal; import alluxio.master.journal.Journal;
import alluxio.master.journal.JournalEntryAppender; import alluxio.master.journal.JournalContext;
import alluxio.master.journal.JournalInputStream; import alluxio.master.journal.JournalInputStream;
import alluxio.master.journal.JournalOutputStream; import alluxio.master.journal.JournalOutputStream;
import alluxio.master.journal.JournalTailer; import alluxio.master.journal.JournalTailer;
Expand Down Expand Up @@ -258,7 +258,7 @@ protected void flushJournal() {
*/ */
protected void appendJournalEntry(JournalEntry entry, JournalContext journalContext) { protected void appendJournalEntry(JournalEntry entry, JournalContext journalContext) {
Preconditions.checkNotNull(mAsyncJournalWriter, PreconditionMessage.ASYNC_JOURNAL_WRITER_NULL); Preconditions.checkNotNull(mAsyncJournalWriter, PreconditionMessage.ASYNC_JOURNAL_WRITER_NULL);
journalContext.setFlushCounter(mAsyncJournalWriter.appendEntry(entry)); journalContext.append(entry);
} }


/** /**
Expand Down Expand Up @@ -306,64 +306,43 @@ protected ExecutorService getExecutorService() {
* @return new instance of {@link JournalContext} * @return new instance of {@link JournalContext}
*/ */
protected JournalContext createJournalContext() { protected JournalContext createJournalContext() {
return new JournalContext(); return new MasterJournalContext(mAsyncJournalWriter);
}

/**
* @return new instance of {@link JournalEntryAppender}
*/
protected JournalEntryAppender createJournalAppender(JournalContext journalContext) {
return new MasterJournalEntryAppender(journalContext, mAsyncJournalWriter);
} }


/** /**
* Context for storing journaling information. * Context for storing journaling information.
*/ */
@NotThreadSafe @NotThreadSafe
public final class JournalContext implements AutoCloseable { public final class MasterJournalContext implements JournalContext {
private final AsyncJournalWriter mAsyncJournalWriter;
private long mFlushCounter; private long mFlushCounter;


private JournalContext() { /**
* Constructs a {@link MasterJournalContext}.
*
* @param asyncJournalWriter a {@link AsyncJournalWriter}
*/
private MasterJournalContext(AsyncJournalWriter asyncJournalWriter) {
mAsyncJournalWriter = asyncJournalWriter;
mFlushCounter = INVALID_FLUSH_COUNTER; mFlushCounter = INVALID_FLUSH_COUNTER;
} }


private long getFlushCounter() { @Override
public long getFlushCounter() {
return mFlushCounter; return mFlushCounter;
} }


private void setFlushCounter(long counter) {
mFlushCounter = counter;
}

@Override @Override
public void close() { public void append(alluxio.proto.journal.Journal.JournalEntry entry) {
waitForJournalFlush(this); if (mAsyncJournalWriter != null) {
} mFlushCounter = mAsyncJournalWriter.appendEntry(entry);
} }

/**
* This class appends journal entries to the async journal.
*/
public final class MasterJournalEntryAppender implements JournalEntryAppender {
private final JournalContext mJournalContext;
private final AsyncJournalWriter mAsyncJournalWriter;

/**
* Constructs a {@link MasterJournalEntryAppender}.
*
* @param journalContext a {@link JournalContext}
* @param asyncJournalWriter a {@link AsyncJournalWriter}
*/
public MasterJournalEntryAppender(JournalContext journalContext,
AsyncJournalWriter asyncJournalWriter) {
mJournalContext = journalContext;
mAsyncJournalWriter = Preconditions.checkNotNull(asyncJournalWriter);
} }


@Override @Override
public void append(alluxio.proto.journal.Journal.JournalEntry entry) { public void close() {
if (mJournalContext != null) { if (mAsyncJournalWriter != null) {
mJournalContext.setFlushCounter(mAsyncJournalWriter.appendEntry(entry)); waitForJournalFlush(this);
} }
} }
} }
Expand Down
Expand Up @@ -13,12 +13,22 @@


import alluxio.proto.journal.Journal; import alluxio.proto.journal.Journal;


import java.io.Closeable;

/** /**
* This interface enables appending entries to the journal. * Context for storing journaling information.
*/ */
public interface JournalEntryAppender { public interface JournalContext extends Closeable {
/** /**
* @param entry the {@link Journal.JournalEntry} to append to the journal * @param entry the {@link Journal.JournalEntry} to append to the journal
*/ */
void append(Journal.JournalEntry entry); void append(Journal.JournalEntry entry);

/**
* @return the journal flush counter
*/
long getFlushCounter();

@Override
void close();
} }
Expand Up @@ -9,21 +9,33 @@
* See the NOTICE file distributed with this work for information regarding copyright ownership. * See the NOTICE file distributed with this work for information regarding copyright ownership.
*/ */


package alluxio.master; package alluxio.master.journal;


import alluxio.master.journal.JournalEntryAppender;
import alluxio.proto.journal.Journal; import alluxio.proto.journal.Journal;


/** /**
* NOOP version of JournalEntryAppender. * Noop version of JournalContext.
*/ */
public final class NoopJournalAppender implements JournalEntryAppender { public final class NoopJournalContext implements JournalContext {
public NoopJournalAppender() { /**
* Constructs the {@link NoopJournalContext}.
*/
public NoopJournalContext() {
// Do nothing // Do nothing
} }


@Override @Override
public void append(Journal.JournalEntry entry) { public void append(Journal.JournalEntry entry) {
// Do nothing // Do nothing
} }

@Override
public long getFlushCounter() {
return -1;
}

@Override
public void close() {
// Do nothing
}
} }
Expand Up @@ -32,6 +32,7 @@
import alluxio.master.block.meta.MasterBlockInfo; import alluxio.master.block.meta.MasterBlockInfo;
import alluxio.master.block.meta.MasterBlockLocation; import alluxio.master.block.meta.MasterBlockLocation;
import alluxio.master.block.meta.MasterWorkerInfo; import alluxio.master.block.meta.MasterWorkerInfo;
import alluxio.master.journal.JournalContext;
import alluxio.master.journal.JournalFactory; import alluxio.master.journal.JournalFactory;
import alluxio.master.journal.JournalInputStream; import alluxio.master.journal.JournalInputStream;
import alluxio.master.journal.JournalOutputStream; import alluxio.master.journal.JournalOutputStream;
Expand Down
Expand Up @@ -66,6 +66,7 @@
import alluxio.master.file.options.MountOptions; import alluxio.master.file.options.MountOptions;
import alluxio.master.file.options.RenameOptions; import alluxio.master.file.options.RenameOptions;
import alluxio.master.file.options.SetAttributeOptions; import alluxio.master.file.options.SetAttributeOptions;
import alluxio.master.journal.JournalContext;
import alluxio.master.journal.JournalFactory; import alluxio.master.journal.JournalFactory;
import alluxio.master.journal.JournalOutputStream; import alluxio.master.journal.JournalOutputStream;
import alluxio.metrics.MetricsSystem; import alluxio.metrics.MetricsSystem;
Expand Down Expand Up @@ -1150,7 +1151,7 @@ InodeTree.CreatePathResult createFileInternal(LockedInodePath inodePath,
options.setCacheable(true); options.setCacheable(true);
} }
InodeTree.CreatePathResult createResult = InodeTree.CreatePathResult createResult =
mInodeTree.createPath(inodePath, options, createJournalAppender(journalContext)); mInodeTree.createPath(inodePath, options, journalContext);
// If the create succeeded, the list of created inodes will not be empty. // If the create succeeded, the list of created inodes will not be empty.
List<Inode<?>> created = createResult.getCreated(); List<Inode<?>> created = createResult.getCreated();
InodeFile inode = (InodeFile) created.get(created.size() - 1); InodeFile inode = (InodeFile) created.get(created.size() - 1);
Expand Down Expand Up @@ -1430,7 +1431,7 @@ private void deleteInternal(LockedInodePath inodePath, JournalContext journalCon
mBlockMaster.removeBlocks(((InodeFile) delInode).getBlockIds(), true /* delete */); mBlockMaster.removeBlocks(((InodeFile) delInode).getBlockIds(), true /* delete */);
} }


if (i == 0 && !replayed) { if (i == 0) {
// Journal right before deleting the "root" of the sub-tree from the parent, since the // Journal right before deleting the "root" of the sub-tree from the parent, since the
// parent is read locked. // parent is read locked.
DeleteFileEntry deleteFile = DeleteFileEntry.newBuilder().setId(delInode.getId()) DeleteFileEntry deleteFile = DeleteFileEntry.newBuilder().setId(delInode.getId())
Expand Down Expand Up @@ -1706,7 +1707,7 @@ private InodeTree.CreatePathResult createDirectoryInternal(LockedInodePath inode
FileDoesNotExistException { FileDoesNotExistException {
try { try {
InodeTree.CreatePathResult createResult = InodeTree.CreatePathResult createResult =
mInodeTree.createPath(inodePath, options, createJournalAppender(journalContext)); mInodeTree.createPath(inodePath, options, journalContext);
InodeDirectory inodeDirectory = (InodeDirectory) inodePath.getInode(); InodeDirectory inodeDirectory = (InodeDirectory) inodePath.getInode();
// If inodeDirectory's ttl not equals Constants.NO_TTL, it should insert into mTtlBuckets // If inodeDirectory's ttl not equals Constants.NO_TTL, it should insert into mTtlBuckets
if (createResult.getCreated().size() > 0) { if (createResult.getCreated().size() > 0) {
Expand Down Expand Up @@ -1911,8 +1912,7 @@ private void renameInternal(LockedInodePath srcInodePath, LockedInodePath dstIno
while (!sameMountDirs.empty()) { while (!sameMountDirs.empty()) {
InodeDirectory dir = sameMountDirs.pop(); InodeDirectory dir = sameMountDirs.pop();
if (!dir.isPersisted()) { if (!dir.isPersisted()) {
InodeUtils.syncPersistDirectory(dir, mInodeTree, mMountTable, InodeUtils.syncPersistDirectory(dir, mInodeTree, mMountTable, journalContext);
createJournalAppender(journalContext));
} }
} }


Expand Down
Expand Up @@ -13,7 +13,7 @@


import alluxio.master.block.BlockId; import alluxio.master.block.BlockId;
import alluxio.master.block.ContainerIdGenerable; import alluxio.master.block.ContainerIdGenerable;
import alluxio.master.journal.JournalEntryAppender; import alluxio.master.journal.JournalContext;
import alluxio.master.journal.JournalEntryRepresentable; import alluxio.master.journal.JournalEntryRepresentable;
import alluxio.proto.journal.File.InodeDirectoryIdGeneratorEntry; import alluxio.proto.journal.File.InodeDirectoryIdGeneratorEntry;
import alluxio.proto.journal.Journal.JournalEntry; import alluxio.proto.journal.Journal.JournalEntry;
Expand Down Expand Up @@ -52,10 +52,10 @@ synchronized long getNewDirectoryId() {
/** /**
* Returns the next directory id, and journals the state. * Returns the next directory id, and journals the state.
* *
* @param journalAppender the appender to journal to, if not null * @param journalContext the journal context
* @return the next directory id * @return the next directory id
*/ */
synchronized long getNewDirectoryId(JournalEntryAppender journalAppender) { synchronized long getNewDirectoryId(JournalContext journalContext) {
initialize(); initialize();
long directoryId = BlockId.createBlockId(mContainerId, mSequenceNumber); long directoryId = BlockId.createBlockId(mContainerId, mSequenceNumber);
if (mSequenceNumber == BlockId.getMaxSequenceNumber()) { if (mSequenceNumber == BlockId.getMaxSequenceNumber()) {
Expand All @@ -65,8 +65,8 @@ synchronized long getNewDirectoryId(JournalEntryAppender journalAppender) {
} else { } else {
mSequenceNumber++; mSequenceNumber++;
} }
if (journalAppender != null) { if (journalContext != null) {
journalAppender.append(toJournalEntry()); journalContext.append(toJournalEntry());
} }
return directoryId; return directoryId;
} }
Expand Down
Expand Up @@ -29,7 +29,7 @@
import alluxio.master.file.options.CreateFileOptions; import alluxio.master.file.options.CreateFileOptions;
import alluxio.master.file.options.CreatePathOptions; import alluxio.master.file.options.CreatePathOptions;
import alluxio.master.journal.JournalCheckpointStreamable; import alluxio.master.journal.JournalCheckpointStreamable;
import alluxio.master.journal.JournalEntryAppender; import alluxio.master.journal.JournalContext;
import alluxio.master.journal.JournalOutputStream; import alluxio.master.journal.JournalOutputStream;
import alluxio.proto.journal.File; import alluxio.proto.journal.File;
import alluxio.proto.journal.File.InodeDirectoryEntry; import alluxio.proto.journal.File.InodeDirectoryEntry;
Expand Down Expand Up @@ -466,7 +466,7 @@ public InodeDirectory getRoot() {
* *
* @param inodePath the path * @param inodePath the path
* @param options method options * @param options method options
* @param journalAppender the journal appender * @param journalContext the journal context
* @return a {@link CreatePathResult} representing the modified inodes and created inodes during * @return a {@link CreatePathResult} representing the modified inodes and created inodes during
* path creation * path creation
* @throws FileAlreadyExistsException when there is already a file at path if we want to create a * @throws FileAlreadyExistsException when there is already a file at path if we want to create a
Expand All @@ -480,7 +480,7 @@ public InodeDirectory getRoot() {
* option is false * option is false
*/ */
public CreatePathResult createPath(LockedInodePath inodePath, CreatePathOptions<?> options, public CreatePathResult createPath(LockedInodePath inodePath, CreatePathOptions<?> options,
JournalEntryAppender journalAppender) JournalContext journalContext)
throws FileAlreadyExistsException, BlockInfoException, InvalidPathException, IOException, throws FileAlreadyExistsException, BlockInfoException, InvalidPathException, IOException,
FileDoesNotExistException { FileDoesNotExistException {
AlluxioURI path = inodePath.getUri(); AlluxioURI path = inodePath.getUri();
Expand Down Expand Up @@ -538,7 +538,7 @@ public CreatePathResult createPath(LockedInodePath inodePath, CreatePathOptions<
if (options.isPersisted()) { if (options.isPersisted()) {
// Synchronously persist directories. These inodes are already READ locked. // Synchronously persist directories. These inodes are already READ locked.
for (Inode inode : traversalResult.getNonPersisted()) { for (Inode inode : traversalResult.getNonPersisted()) {
InodeUtils.syncPersistDirectory((InodeDirectory) inode, this, mMountTable, journalAppender); InodeUtils.syncPersistDirectory((InodeDirectory) inode, this, mMountTable, journalContext);
} }
} }
if (pathIndex < (pathComponents.length - 1) || currentInodeDirectory.getChild(name) == null) { if (pathIndex < (pathComponents.length - 1) || currentInodeDirectory.getChild(name) == null) {
Expand All @@ -551,7 +551,7 @@ public CreatePathResult createPath(LockedInodePath inodePath, CreatePathOptions<
File.InodeLastModificationTimeEntry inodeLastModificationTime = File.InodeLastModificationTimeEntry inodeLastModificationTime =
File.InodeLastModificationTimeEntry.newBuilder().setId(currentInodeDirectory.getId()) File.InodeLastModificationTimeEntry.newBuilder().setId(currentInodeDirectory.getId())
.setLastModificationTimeMs(options.getOperationTimeMs()).build(); .setLastModificationTimeMs(options.getOperationTimeMs()).build();
journalAppender.append( journalContext.append(
Journal.JournalEntry.newBuilder().setInodeLastModificationTime(inodeLastModificationTime) Journal.JournalEntry.newBuilder().setInodeLastModificationTime(inodeLastModificationTime)
.build()); .build());
} }
Expand All @@ -569,7 +569,7 @@ public CreatePathResult createPath(LockedInodePath inodePath, CreatePathOptions<
for (int k = pathIndex; k < (pathComponents.length - 1); k++) { for (int k = pathIndex; k < (pathComponents.length - 1); k++) {
InodeDirectory dir = null; InodeDirectory dir = null;
while (dir == null) { while (dir == null) {
dir = InodeDirectory.create(mDirectoryIdGenerator.getNewDirectoryId(journalAppender), dir = InodeDirectory.create(mDirectoryIdGenerator.getNewDirectoryId(journalContext),
currentInodeDirectory.getId(), pathComponents[k], missingDirOptions); currentInodeDirectory.getId(), pathComponents[k], missingDirOptions);
// Lock the newly created inode before subsequent operations, and add it to the lock group. // Lock the newly created inode before subsequent operations, and add it to the lock group.
lockList.lockWriteAndCheckNameAndParent(dir, currentInodeDirectory, pathComponents[k]); lockList.lockWriteAndCheckNameAndParent(dir, currentInodeDirectory, pathComponents[k]);
Expand All @@ -593,7 +593,7 @@ public CreatePathResult createPath(LockedInodePath inodePath, CreatePathOptions<
InodeUtils.syncPersistDirectory(dir, this, mMountTable, null); InodeUtils.syncPersistDirectory(dir, this, mMountTable, null);
} }
// Journal the new inode. // Journal the new inode.
journalAppender.append(dir.toJournalEntry()); journalContext.append(dir.toJournalEntry());


// After creation and journaling, downgrade to a read lock. // After creation and journaling, downgrade to a read lock.
lockList.unlockLast(); lockList.unlockLast();
Expand Down Expand Up @@ -625,7 +625,7 @@ public CreatePathResult createPath(LockedInodePath inodePath, CreatePathOptions<
// The final path component already exists and is not persisted, so it should be added // The final path component already exists and is not persisted, so it should be added
// to the non-persisted Inodes of traversalResult. // to the non-persisted Inodes of traversalResult.
InodeUtils InodeUtils
.syncPersistDirectory((InodeDirectory) lastInode, this, mMountTable, journalAppender); .syncPersistDirectory((InodeDirectory) lastInode, this, mMountTable, journalContext);
} else if (!lastInode.isDirectory() || !(options instanceof CreateDirectoryOptions } else if (!lastInode.isDirectory() || !(options instanceof CreateDirectoryOptions
&& ((CreateDirectoryOptions) options).isAllowExists())) { && ((CreateDirectoryOptions) options).isAllowExists())) {
String errorMessage = ExceptionMessage.FILE_ALREADY_EXISTS.getMessage(path); String errorMessage = ExceptionMessage.FILE_ALREADY_EXISTS.getMessage(path);
Expand All @@ -636,7 +636,7 @@ public CreatePathResult createPath(LockedInodePath inodePath, CreatePathOptions<
// create the new inode // create the new inode
if (options instanceof CreateDirectoryOptions) { if (options instanceof CreateDirectoryOptions) {
CreateDirectoryOptions directoryOptions = (CreateDirectoryOptions) options; CreateDirectoryOptions directoryOptions = (CreateDirectoryOptions) options;
lastInode = InodeDirectory.create(mDirectoryIdGenerator.getNewDirectoryId(journalAppender), lastInode = InodeDirectory.create(mDirectoryIdGenerator.getNewDirectoryId(journalContext),
currentInodeDirectory.getId(), name, directoryOptions); currentInodeDirectory.getId(), name, directoryOptions);
// Lock the created inode before subsequent operations, and add it to the lock group. // Lock the created inode before subsequent operations, and add it to the lock group.
lockList.lockWriteAndCheckNameAndParent(lastInode, currentInodeDirectory, name); lockList.lockWriteAndCheckNameAndParent(lastInode, currentInodeDirectory, name);
Expand Down Expand Up @@ -670,7 +670,7 @@ public CreatePathResult createPath(LockedInodePath inodePath, CreatePathOptions<
} }


// Journal the new inode. // Journal the new inode.
journalAppender.append(lastInode.toJournalEntry()); journalContext.append(lastInode.toJournalEntry());


createdInodes.add(lastInode); createdInodes.add(lastInode);
extensibleInodePath.getInodes().add(lastInode); extensibleInodePath.getInodes().add(lastInode);
Expand Down
Expand Up @@ -12,7 +12,7 @@
package alluxio.master.file.meta; package alluxio.master.file.meta;


import alluxio.AlluxioURI; import alluxio.AlluxioURI;
import alluxio.master.journal.JournalEntryAppender; import alluxio.master.journal.JournalContext;
import alluxio.proto.journal.File; import alluxio.proto.journal.File;
import alluxio.proto.journal.Journal; import alluxio.proto.journal.Journal;
import alluxio.security.authorization.Mode; import alluxio.security.authorization.Mode;
Expand All @@ -38,10 +38,10 @@ private InodeUtils() {} // prevent instantiation
* @param dir the {@link InodeDirectory} to persist * @param dir the {@link InodeDirectory} to persist
* @param inodeTree the {@link InodeTree} * @param inodeTree the {@link InodeTree}
* @param mountTable the {@link MountTable} * @param mountTable the {@link MountTable}
* @param journalAppender the appender to journal the persist entry to, if not null * @param journalContext the journal context
*/ */
public static void syncPersistDirectory(InodeDirectory dir, InodeTree inodeTree, public static void syncPersistDirectory(InodeDirectory dir, InodeTree inodeTree,
MountTable mountTable, JournalEntryAppender journalAppender) { MountTable mountTable, JournalContext journalContext) {
// TODO(gpang): use a max timeout. // TODO(gpang): use a max timeout.
while (dir.getPersistenceState() != PersistenceState.PERSISTED) { while (dir.getPersistenceState() != PersistenceState.PERSISTED) {
if (dir.compareAndSwapPersistenceState(PersistenceState.NOT_PERSISTED, if (dir.compareAndSwapPersistenceState(PersistenceState.NOT_PERSISTED,
Expand All @@ -58,13 +58,11 @@ public static void syncPersistDirectory(InodeDirectory dir, InodeTree inodeTree,
ufs.mkdirs(ufsUri, mkdirsOptions); ufs.mkdirs(ufsUri, mkdirsOptions);
dir.setPersistenceState(PersistenceState.PERSISTED); dir.setPersistenceState(PersistenceState.PERSISTED);


if (journalAppender != null) { // Append the persist entry to the journal.
// Append the persist entry to the journal. File.PersistDirectoryEntry persistDirectory =
File.PersistDirectoryEntry persistDirectory = File.PersistDirectoryEntry.newBuilder().setId(dir.getId()).build();
File.PersistDirectoryEntry.newBuilder().setId(dir.getId()).build(); journalContext.append(
journalAppender.append( Journal.JournalEntry.newBuilder().setPersistDirectory(persistDirectory).build());
Journal.JournalEntry.newBuilder().setPersistDirectory(persistDirectory).build());
}
success = true; success = true;
} catch (Exception e) { } catch (Exception e) {
// Ignore // Ignore
Expand Down

0 comments on commit 9ba1de2

Please sign in to comment.