Skip to content
Permalink
Browse files

Change the initial persist wait time to user property

Change the persist wait time to user property so that users can pass in
the value when creating the file. Persist wait time is suggested to set
to a big enough value when using WriteType=ASYNC_THOUGH

pr-link: #9082
change-id: cid-6c18cd98ed3bfb6c2c26f6596d15b5e0d645e4ad
  • Loading branch information...
LuQQiu authored and alluxio-bot committed May 16, 2019
1 parent d460b7d commit db823875588dbc413aad7340a165f31bafd93544
Showing with 1,080 additions and 451 deletions.
  1. +2 −0 core/base/src/main/java/alluxio/Constants.java
  2. +5 −2 core/client/fs/src/main/java/alluxio/client/file/FileOutStream.java
  3. +15 −11 core/client/fs/src/main/java/alluxio/client/file/FileSystemUtils.java
  4. +24 −0 core/client/fs/src/main/java/alluxio/client/file/options/OutStreamOptions.java
  5. +12 −0 core/client/fs/src/main/java/alluxio/util/FileSystemOptions.java
  6. +13 −10 core/common/src/main/java/alluxio/conf/PropertyKey.java
  7. +28 −12 core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java
  8. +4 −1 core/server/master/src/main/java/alluxio/master/file/FileSystemMaster.java
  9. +3 −1 core/server/master/src/main/java/alluxio/master/file/FileSystemMasterClientServiceHandler.java
  10. +85 −0 core/server/master/src/main/java/alluxio/master/file/contexts/ScheduleAsyncPersistenceContext.java
  11. +5 −0 core/server/master/src/main/java/alluxio/master/file/meta/InodeFile.java
  12. +5 −0 core/server/master/src/main/java/alluxio/master/file/meta/InodeFileView.java
  13. +0 −1 core/server/master/src/main/java/alluxio/master/file/meta/InodeTree.java
  14. +23 −1 core/server/master/src/main/java/alluxio/master/file/meta/MutableInodeFile.java
  15. +3 −1 core/server/master/src/test/java/alluxio/master/file/FileSystemMasterTest.java
  16. +13 −7 core/server/master/src/test/java/alluxio/master/file/PersistenceTest.java
  17. +2 −0 core/transport/src/grpc/file_system_master.proto
  18. +79 −0 core/transport/src/main/java/alluxio/grpc/CreateFilePOptions.java
  19. +9 −0 core/transport/src/main/java/alluxio/grpc/CreateFilePOptionsOrBuilder.java
  20. +275 −274 core/transport/src/main/java/alluxio/grpc/FileSystemMasterProto.java
  21. +79 −0 core/transport/src/main/java/alluxio/grpc/ScheduleAsyncPersistencePOptions.java
  22. +9 −0 core/transport/src/main/java/alluxio/grpc/ScheduleAsyncPersistencePOptionsOrBuilder.java
  23. +125 −36 core/transport/src/main/java/alluxio/proto/journal/File.java
  24. +115 −26 core/transport/src/main/java/alluxio/proto/meta/InodeMeta.java
  25. +2 −1 core/transport/src/proto/journal/file.proto
  26. +2 −1 core/transport/src/proto/meta/inode_meta.proto
  27. +17 −0 shell/src/main/java/alluxio/cli/fs/FileSystemShellUtils.java
  28. +29 −7 shell/src/main/java/alluxio/cli/fs/command/PersistCommand.java
  29. +0 −1 tests/src/test/java/alluxio/client/cli/JournalToolTest.java
  30. +17 −33 tests/src/test/java/alluxio/client/cli/fs/command/PersistCommandTest.java
  31. +1 −2 tests/src/test/java/alluxio/client/fs/AbstractFileOutStreamIntegrationTest.java
  32. +76 −17 tests/src/test/java/alluxio/client/fs/FileOutStreamAsyncWriteIntegrationTest.java
  33. +2 −2 tests/src/test/java/alluxio/client/fs/FileSystemUtilsIntegrationTest.java
  34. +1 −2 tests/src/test/java/alluxio/client/fs/UfsFallbackFileOutStreamIntegrationTest.java
  35. +0 −2 tests/src/test/java/alluxio/server/ft/journal/ufs/UfsJournalIntegrationTest.java
@@ -196,6 +196,8 @@
public static final int REPLICATION_MAX_INFINITY = -1;

// Persistence
// The file should only be persisted after rename operation or persist CLI
public static final int NO_AUTO_PERSIST = -1;
public static final int PERSISTENCE_INVALID_JOB_ID = -1;
public static final String PERSISTENCE_INVALID_UFS_PATH = "";

@@ -12,6 +12,7 @@
package alluxio.client.file;

import alluxio.AlluxioURI;
import alluxio.Constants;
import alluxio.annotation.PublicApi;
import alluxio.client.AbstractOutStream;
import alluxio.client.AlluxioStorageType;
@@ -180,7 +181,8 @@ public void close() throws IOException {
}
}

if (!mCanceled && mUnderStorageType.isAsyncPersist()) {
if (!mCanceled && mUnderStorageType.isAsyncPersist()
&& mOptions.getPersistenceWaitTime() != Constants.NO_AUTO_PERSIST) {
// only schedule the persist for completed files.
scheduleAsyncPersist();
}
@@ -313,7 +315,8 @@ protected void scheduleAsyncPersist() throws IOException {
.acquireMasterClientResource()) {
ScheduleAsyncPersistencePOptions persistOptions =
FileSystemOptions.scheduleAsyncPersistDefaults(mContext.getPathConf(mUri)).toBuilder()
.setCommonOptions(mOptions.getCommonOptions()).build();
.setCommonOptions(mOptions.getCommonOptions())
.setPersistenceWaitTime(mOptions.getPersistenceWaitTime()).build();
masterClient.get().scheduleAsyncPersist(mUri, persistOptions);
}
}
@@ -16,6 +16,7 @@
import alluxio.conf.PropertyKey;
import alluxio.exception.AlluxioException;
import alluxio.exception.FileDoesNotExistException;
import alluxio.grpc.ScheduleAsyncPersistencePOptions;
import alluxio.util.CommonUtils;
import alluxio.util.WaitForOptions;

@@ -124,17 +125,18 @@ public static boolean waitCompleted(final FileSystem fs, final AlluxioURI uri,
}

/**
* Convenience method for {@code #persistAndWait(fs, uri, -1)}. i.e. wait for an indefinite period
* of time to persist. This will block for an indefinite period of time if the path is never
* persisted. Use with care.
* Convenience method for {@code #persistAndWait(fs, uri, persistenceWaitTime, -1)}.
* i.e. wait for an indefinite period of time to persist. This will block
* for an indefinite period of time if the path is never persisted. Use with care.
*
* @param fs {@link FileSystem} to carry out Alluxio operations
* @param uri the uri of the file to persist
* @param persistenceWaitTime the persistence wait time
*/
public static void persistAndWait(final FileSystem fs, final AlluxioURI uri)
throws FileDoesNotExistException, IOException, AlluxioException, TimeoutException,
InterruptedException {
persistAndWait(fs, uri, -1);
public static void persistAndWait(final FileSystem fs, final AlluxioURI uri,
long persistenceWaitTime) throws FileDoesNotExistException, IOException, AlluxioException,
TimeoutException, InterruptedException {
persistAndWait(fs, uri, persistenceWaitTime, -1);
}

/**
@@ -143,14 +145,16 @@ public static void persistAndWait(final FileSystem fs, final AlluxioURI uri)
*
* @param fs {@link FileSystem} to carry out Alluxio operations
* @param uri the uri of the file to persist
* @param persistenceWaitTime the initial persistence wait time
* @param timeoutMs max amount of time to wait for persist in milliseconds. -1 to wait
* indefinitely
* @throws TimeoutException if the persist takes longer than the timeout
*/
public static void persistAndWait(final FileSystem fs, final AlluxioURI uri, int timeoutMs)
throws FileDoesNotExistException, IOException, AlluxioException, TimeoutException,
InterruptedException {
fs.persist(uri);
public static void persistAndWait(final FileSystem fs, final AlluxioURI uri,
long persistenceWaitTime, int timeoutMs) throws FileDoesNotExistException, IOException,
AlluxioException, TimeoutException, InterruptedException {
fs.persist(uri, ScheduleAsyncPersistencePOptions
.newBuilder().setPersistenceWaitTime(persistenceWaitTime).build());
CommonUtils.waitFor(String.format("%s to be persisted", uri) , () -> {
try {
return fs.getStatus(uri).isPersisted();
@@ -47,6 +47,7 @@
private String mGroup;
private Mode mMode;
private AccessControlList mAcl;
private long mPersistenceWaitTime;
private int mReplicationDurable;
private int mReplicationMax;
private int mReplicationMin;
@@ -80,6 +81,9 @@ public OutStreamOptions(CreateFilePOptions options, AlluxioConfiguration alluxio
if (options.hasMode()) {
mMode = Mode.fromProto(options.getMode());
}
if (options.hasPersistenceWaitTime()) {
mPersistenceWaitTime = options.getPersistenceWaitTime();
}
if (options.hasReplicationDurable()) {
mReplicationDurable = options.getReplicationDurable();
}
@@ -116,6 +120,7 @@ private OutStreamOptions(AlluxioConfiguration alluxioConf) {
mMode = ModeUtils.applyFileUMask(Mode.defaults(), alluxioConf
.get(PropertyKey.SECURITY_AUTHORIZATION_PERMISSION_UMASK));
mMountId = IdUtils.INVALID_MOUNT_ID;
mPersistenceWaitTime = alluxioConf.getMs(PropertyKey.USER_FILE_PERSISTENCE_INITIAL_WAIT_TIME);
mReplicationDurable = alluxioConf.getInt(PropertyKey.USER_FILE_REPLICATION_DURABLE);
mReplicationMax = alluxioConf.getInt(PropertyKey.USER_FILE_REPLICATION_MAX);
mReplicationMin = alluxioConf.getInt(PropertyKey.USER_FILE_REPLICATION_MIN);
@@ -192,6 +197,13 @@ public Mode getMode() {
return mMode;
}

/**
* @return the persistence initial wait time
*/
public long getPersistenceWaitTime() {
return mPersistenceWaitTime;
}

/**
* @return the number of block replication for durable write
*/
@@ -342,6 +354,15 @@ public OutStreamOptions setGroup(String group) {
return this;
}

/**
* @param persistenceWaitTime the persistence initial wait time
* @return the updated options object
*/
public OutStreamOptions setPersistenceWaitTime(long persistenceWaitTime) {
mPersistenceWaitTime = persistenceWaitTime;
return this;
}

/**
* @param replicationDurable the number of block replication for durable write
* @return the updated options object
@@ -396,6 +417,7 @@ public boolean equals(Object o) {
&& Objects.equal(mMode, that.mMode)
&& Objects.equal(mMountId, that.mMountId)
&& Objects.equal(mOwner, that.mOwner)
&& Objects.equal(mPersistenceWaitTime, that.mPersistenceWaitTime)
&& Objects.equal(mReplicationDurable, that.mReplicationDurable)
&& Objects.equal(mReplicationMax, that.mReplicationMax)
&& Objects.equal(mReplicationMin, that.mReplicationMin)
@@ -416,6 +438,7 @@ public int hashCode() {
mMode,
mMountId,
mOwner,
mPersistenceWaitTime,
mReplicationDurable,
mReplicationMax,
mReplicationMin,
@@ -440,6 +463,7 @@ public String toString() {
.add("ufsPath", mUfsPath)
.add("writeTier", mWriteTier)
.add("writeType", mWriteType)
.add("persistenceWaitTime", mPersistenceWaitTime)
.add("replicationDurable", mReplicationDurable)
.add("replicationMax", mReplicationMax)
.add("replicationMin", mReplicationMin)
@@ -42,6 +42,17 @@
* will populate the gRPC options objects with the proper values based on the given configuration.
*/
public class FileSystemOptions {
/**
* @param conf Alluxio configuration
* @return options based on the configuration
*/
public static ScheduleAsyncPersistencePOptions scheduleAsyncPersistenceDefaults(
AlluxioConfiguration conf) {
return ScheduleAsyncPersistencePOptions.newBuilder()
.setCommonOptions(commonDefaults(conf))
.setPersistenceWaitTime(0)
.build();
}

/**
* @param conf Alluxio configuration
@@ -78,6 +89,7 @@ public static CreateFilePOptions createFileDefaults(AlluxioConfiguration conf) {
.setCommonOptions(commonDefaults(conf))
.setMode(ModeUtils.applyFileUMask(Mode.defaults(),
conf.get(PropertyKey.SECURITY_AUTHORIZATION_PERMISSION_UMASK)).toProto())
.setPersistenceWaitTime(conf.getMs(PropertyKey.USER_FILE_PERSISTENCE_INITIAL_WAIT_TIME))
.setRecursive(false)
.setReplicationDurable(conf.getInt(PropertyKey.USER_FILE_REPLICATION_DURABLE))
.setReplicationMax(conf.getInt(PropertyKey.USER_FILE_REPLICATION_MAX))
@@ -1544,14 +1544,6 @@ public String toString() {
new Builder(Name.MASTER_PERSISTENCE_INITIAL_INTERVAL_MS)
.setDefaultValue(Constants.SECOND_MS)
.build();
public static final PropertyKey MASTER_PERSISTENCE_INITIAL_WAIT_TIME_MS =
new Builder(Name.MASTER_PERSISTENCE_INITIAL_WAIT_TIME_MS)
.setDefaultValue("5min")
.setDescription(String.format("Time to wait before starting the persistence job. "
+ "When %s is set to %s, set to a big enough value "
+ "to avoid conflicts between cache and through job.",
Name.USER_FILE_WRITE_TYPE_DEFAULT, WritePType.ASYNC_THROUGH))
.build();
public static final PropertyKey MASTER_PERSISTENCE_MAX_INTERVAL_MS =
new Builder(Name.MASTER_PERSISTENCE_MAX_INTERVAL_MS)
.setDefaultValue(Constants.HOUR_MS)
@@ -2786,6 +2778,17 @@ public String toString() {
+ "to commit results.")
.setScope(Scope.CLIENT)
.build();
public static final PropertyKey USER_FILE_PERSISTENCE_INITIAL_WAIT_TIME =
new Builder(Name.USER_FILE_PERSISTENCE_INITIAL_WAIT_TIME)
.setDefaultValue("0")
.setDescription(String.format("Time to wait before starting the persistence job. "
+ "When the value is set to -1, the file will be persisted by rename operation "
+ "or persist CLI but will not be automatically persisted in other cases. "
+ "This is to avoid the heavy object copy in rename operation when %s is set to %s. "
+ "This value should be smaller than the value of %s",
Name.USER_FILE_WRITE_TYPE_DEFAULT, WritePType.ASYNC_THROUGH,
Name.MASTER_PERSISTENCE_MAX_TOTAL_WAIT_TIME_MS))
.build();
public static final PropertyKey USER_FILE_WAITCOMPLETED_POLL_MS =
new Builder(Name.USER_FILE_WAITCOMPLETED_POLL_MS)
.setAlias(new String[]{"alluxio.user.file.waitcompleted.poll.ms"})
@@ -3809,8 +3812,6 @@ private static String javadocLink(String fullyQualifiedClassname) {
"alluxio.master.metrics.time.series.interval";
public static final String MASTER_PERSISTENCE_INITIAL_INTERVAL_MS =
"alluxio.master.persistence.initial.interval.ms";
public static final String MASTER_PERSISTENCE_INITIAL_WAIT_TIME_MS =
"alluxio.master.persistence.initial.wait.time.ms";
public static final String MASTER_PERSISTENCE_MAX_TOTAL_WAIT_TIME_MS =
"alluxio.master.persistence.max.total.wait.time.ms";
public static final String MASTER_PERSISTENCE_MAX_INTERVAL_MS =
@@ -4073,6 +4074,8 @@ private static String javadocLink(String fullyQualifiedClassname) {
"alluxio.user.file.load.ttl.action";
public static final String USER_FILE_READ_TYPE_DEFAULT = "alluxio.user.file.readtype.default";
public static final String USER_FILE_PERSIST_ON_RENAME = "alluxio.user.file.persist.on.rename";
public static final String USER_FILE_PERSISTENCE_INITIAL_WAIT_TIME =
"alluxio.user.file.persistence.initial.wait.time";
public static final String USER_FILE_REPLICATION_MAX = "alluxio.user.file.replication.max";
public static final String USER_FILE_REPLICATION_MIN = "alluxio.user.file.replication.min";
public static final String USER_FILE_REPLICATION_DURABLE =
@@ -77,6 +77,7 @@
import alluxio.master.file.contexts.LoadMetadataContext;
import alluxio.master.file.contexts.MountContext;
import alluxio.master.file.contexts.RenameContext;
import alluxio.master.file.contexts.ScheduleAsyncPersistenceContext;
import alluxio.master.file.contexts.SetAclContext;
import alluxio.master.file.contexts.SetAttributeContext;
import alluxio.master.file.contexts.WorkerHeartbeatContext;
@@ -567,18 +568,17 @@ public void start(Boolean isPrimary) throws IOException {
// (mPersistRequests)
for (Long id : mInodeTree.getToBePersistedIds()) {
Inode inode = mInodeStore.get(id).get();
if (inode.isDirectory()) {
continue;
}
if (inode.getPersistenceState() != PersistenceState.TO_BE_PERSISTED) {
if (inode.isDirectory()
|| inode.getPersistenceState() != PersistenceState.TO_BE_PERSISTED
|| inode.asFile().getShouldPersistTime() == Constants.NO_AUTO_PERSIST) {
continue;
}
InodeFile inodeFile = inode.asFile();
if (inodeFile.getPersistJobId() == Constants.PERSISTENCE_INVALID_JOB_ID) {
mPersistRequests.put(inodeFile.getId(), new alluxio.time.ExponentialTimer(
ServerConfiguration.getMs(PropertyKey.MASTER_PERSISTENCE_INITIAL_INTERVAL_MS),
ServerConfiguration.getMs(PropertyKey.MASTER_PERSISTENCE_MAX_INTERVAL_MS),
ServerConfiguration.getMs(PropertyKey.MASTER_PERSISTENCE_INITIAL_WAIT_TIME_MS),
getPersistenceWaitTime(inodeFile.getShouldPersistTime()),
ServerConfiguration.getMs(PropertyKey.MASTER_PERSISTENCE_MAX_TOTAL_WAIT_TIME_MS)));
} else {
AlluxioURI path;
@@ -588,7 +588,9 @@ public void start(Boolean isPrimary) throws IOException {
LOG.error("Failed to determine path for inode with id {}", id, e);
continue;
}
addPersistJob(id, inodeFile.getPersistJobId(), path, inodeFile.getTempUfsPath());
addPersistJob(id, inodeFile.getPersistJobId(),
getPersistenceWaitTime(inodeFile.getShouldPersistTime()),
path, inodeFile.getTempUfsPath());
}
}
if (ServerConfiguration
@@ -1267,7 +1269,7 @@ public FileInfo createFile(AlluxioURI path, CreateFileContext context)
* @return the list of created inodes
*/
List<Inode> createFileInternal(RpcContext rpcContext, LockedInodePath inodePath,
CreateFileContext context)
CreateFileContext context)
throws InvalidPathException, FileAlreadyExistsException, BlockInfoException, IOException,
FileDoesNotExistException {
if (mWhitelist.inList(inodePath.getUri().toString())) {
@@ -2007,10 +2009,13 @@ private void renameInternal(RpcContext rpcContext, LockedInodePath srcInodePath,
.setId(srcInode.getId())
.setPersistenceState(PersistenceState.TO_BE_PERSISTED.name())
.build());
long shouldPersistTime = srcInode.asFile().getShouldPersistTime();
long persistenceWaitTime = shouldPersistTime == Constants.NO_AUTO_PERSIST ? 0
: getPersistenceWaitTime(shouldPersistTime);
mPersistRequests.put(srcInode.getId(), new alluxio.time.ExponentialTimer(
ServerConfiguration.getMs(PropertyKey.MASTER_PERSISTENCE_INITIAL_INTERVAL_MS),
ServerConfiguration.getMs(PropertyKey.MASTER_PERSISTENCE_MAX_INTERVAL_MS),
ServerConfiguration.getMs(PropertyKey.MASTER_PERSISTENCE_INITIAL_WAIT_TIME_MS),
persistenceWaitTime,
ServerConfiguration.getMs(PropertyKey.MASTER_PERSISTENCE_MAX_TOTAL_WAIT_TIME_MS)));
}
}
@@ -3007,7 +3012,7 @@ private void setAttributeInternal(RpcContext rpcContext, LockedInodePath inodePa
}

@Override
public void scheduleAsyncPersistence(AlluxioURI path)
public void scheduleAsyncPersistence(AlluxioURI path, ScheduleAsyncPersistenceContext context)
throws AlluxioException, UnavailableException {
try (RpcContext rpcContext = createRpcContext();
LockedInodePath inodePath = mInodeTree.lockFullInodePath(path, LockPattern.WRITE_INODE)) {
@@ -3023,7 +3028,7 @@ public void scheduleAsyncPersistence(AlluxioURI path)
mPersistRequests.put(inode.getId(), new alluxio.time.ExponentialTimer(
ServerConfiguration.getMs(PropertyKey.MASTER_PERSISTENCE_INITIAL_INTERVAL_MS),
ServerConfiguration.getMs(PropertyKey.MASTER_PERSISTENCE_MAX_INTERVAL_MS),
ServerConfiguration.getMs(PropertyKey.MASTER_PERSISTENCE_INITIAL_WAIT_TIME_MS),
context.getPersistenceWaitTime(),
ServerConfiguration.getMs(PropertyKey.MASTER_PERSISTENCE_MAX_TOTAL_WAIT_TIME_MS)));
}
}
@@ -3712,21 +3717,32 @@ public void stopSync(AlluxioURI syncPoint)
/**
* @param fileId file ID
* @param jobId persist job ID
* @param persistenceWaitTime persistence initial wait time
* @param uri Alluxio Uri of the file
* @param tempUfsPath temp UFS path
*/
private void addPersistJob(long fileId, long jobId, AlluxioURI uri, String tempUfsPath) {
private void addPersistJob(long fileId, long jobId, long persistenceWaitTime, AlluxioURI uri,
String tempUfsPath) {
alluxio.time.ExponentialTimer timer = mPersistRequests.remove(fileId);
if (timer == null) {
timer = new alluxio.time.ExponentialTimer(
ServerConfiguration.getMs(PropertyKey.MASTER_PERSISTENCE_INITIAL_INTERVAL_MS),
ServerConfiguration.getMs(PropertyKey.MASTER_PERSISTENCE_MAX_INTERVAL_MS),
ServerConfiguration.getMs(PropertyKey.MASTER_PERSISTENCE_INITIAL_WAIT_TIME_MS),
persistenceWaitTime,
ServerConfiguration.getMs(PropertyKey.MASTER_PERSISTENCE_MAX_TOTAL_WAIT_TIME_MS));
}
mPersistJobs.put(fileId, new PersistJob(jobId, fileId, uri, tempUfsPath, timer));
}

private long getPersistenceWaitTime(long shouldPersistTime) {
long currentTime = System.currentTimeMillis();
if (shouldPersistTime >= currentTime) {
return shouldPersistTime - currentTime;
} else {
return 0;
}
}

/**
* Periodically schedules jobs to persist files and updates metadata accordingly.
*/
@@ -37,6 +37,7 @@
import alluxio.master.file.contexts.ListStatusContext;
import alluxio.master.file.contexts.MountContext;
import alluxio.master.file.contexts.RenameContext;
import alluxio.master.file.contexts.ScheduleAsyncPersistenceContext;
import alluxio.master.file.contexts.SetAclContext;
import alluxio.master.file.contexts.SetAttributeContext;
import alluxio.master.file.contexts.WorkerHeartbeatContext;
@@ -436,8 +437,10 @@ void setAttribute(AlluxioURI path, SetAttributeContext options)
* Schedules a file for async persistence.
*
* @param path the path of the file for persistence
* @param context the schedule async persistence context
*/
void scheduleAsyncPersistence(AlluxioURI path) throws AlluxioException, UnavailableException;
void scheduleAsyncPersistence(AlluxioURI path, ScheduleAsyncPersistenceContext context)
throws AlluxioException, UnavailableException;

/**
* Update the operation mode for the given ufs path under one or more mount points.

0 comments on commit db82387

Please sign in to comment.
You can’t perform that action at this time.