Skip to content
Permalink
Browse files

Add Pinning to specific tier

There are several things in this PR.

Change Inode metadata to include a set of strings that are the desired
pinning medium for the file.
Pass a desired pinning storage medium type to loadBlock method of
JobUtils.java.
This storage medium type eventually gets passed to the createBlock
method on the worker, and worker finds a storage medium which matches
the desired storage medium type.
Change the blockInfo in fileInfo to contain medium type information.

TODO (defer to phase 2):
BlockHeartBeatReport related changes

pr-link: #9084
change-id: cid-4f9ba15be23d9228b8ccf78fac609a529d648c3b
  • Loading branch information...
yuzhu authored and alluxio-bot committed May 15, 2019
1 parent 9c74c07 commit c8e5dbf678def9da51f8ebb2ef9217a06e26e11f
Showing with 2,859 additions and 613 deletions.
  1. +1 −0 core/base/src/main/java/alluxio/exception/ExceptionMessage.java
  2. +2 −1 core/client/fs/src/main/java/alluxio/client/block/stream/GrpcDataWriter.java
  3. +2 −1 core/client/fs/src/main/java/alluxio/client/block/stream/LocalFileDataWriter.java
  4. +23 −0 core/client/fs/src/main/java/alluxio/client/file/options/OutStreamOptions.java
  5. +8 −0 core/common/src/main/java/alluxio/client/file/URIStatus.java
  6. +32 −0 core/common/src/main/java/alluxio/conf/PropertyKey.java
  7. +4 −1 core/common/src/main/java/alluxio/grpc/GrpcUtils.java
  8. +24 −3 core/common/src/main/java/alluxio/wire/BlockLocation.java
  9. +24 −3 core/common/src/main/java/alluxio/wire/FileInfo.java
  10. +2 −0 core/common/src/test/java/alluxio/wire/BlockLocationTest.java
  11. +3 −2 core/server/master/src/main/java/alluxio/master/block/BlockMaster.java
  12. +3 −1 core/server/master/src/main/java/alluxio/master/block/BlockMasterWorkerServiceHandler.java
  13. +4 −4 core/server/master/src/main/java/alluxio/master/block/DefaultBlockMaster.java
  14. +2 −1 core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java
  15. +6 −0 core/server/master/src/main/java/alluxio/master/file/meta/Inode.java
  16. +18 −3 core/server/master/src/main/java/alluxio/master/file/meta/InodeTree.java
  17. +12 −0 core/server/master/src/main/java/alluxio/master/file/meta/InodeTreePersistentState.java
  18. +6 −0 core/server/master/src/main/java/alluxio/master/file/meta/InodeView.java
  19. +28 −2 core/server/master/src/main/java/alluxio/master/file/meta/MutableInode.java
  20. +7 −1 core/server/master/src/main/java/alluxio/master/file/meta/MutableInodeDirectory.java
  21. +7 −1 core/server/master/src/main/java/alluxio/master/file/meta/MutableInodeFile.java
  22. +2 −2 core/server/master/src/main/java/alluxio/master/file/replication/ReplicationChecker.java
  23. +6 −5 core/server/master/src/test/java/alluxio/master/block/BlockMasterTest.java
  24. +3 −3 core/server/master/src/test/java/alluxio/master/file/FileSystemMasterTest.java
  25. +4 −3 core/server/master/src/test/java/alluxio/master/file/meta/InodeTreeTest.java
  26. +2 −2 core/server/master/src/test/java/alluxio/master/file/replication/ReplicationCheckerTest.java
  27. +1 −1 core/server/worker/src/main/java/alluxio/worker/block/AsyncCacheRequestManager.java
  28. +5 −2 core/server/worker/src/main/java/alluxio/worker/block/BlockMasterClient.java
  29. +60 −9 core/server/worker/src/main/java/alluxio/worker/block/BlockStoreLocation.java
  30. +7 −3 core/server/worker/src/main/java/alluxio/worker/block/BlockWorker.java
  31. +19 −6 core/server/worker/src/main/java/alluxio/worker/block/DefaultBlockWorker.java
  32. +14 −0 core/server/worker/src/main/java/alluxio/worker/block/allocator/GreedyAllocator.java
  33. +16 −4 core/server/worker/src/main/java/alluxio/worker/block/allocator/MaxFreeAllocator.java
  34. +21 −4 core/server/worker/src/main/java/alluxio/worker/block/allocator/RoundRobinAllocator.java
  35. +2 −1 core/server/worker/src/main/java/alluxio/worker/block/evictor/GreedyEvictor.java
  36. +2 −1 core/server/worker/src/main/java/alluxio/worker/block/evictor/PartialLRUEvictor.java
  37. +1 −1 core/server/worker/src/main/java/alluxio/worker/block/meta/AbstractBlockMeta.java
  38. +16 −4 core/server/worker/src/main/java/alluxio/worker/block/meta/StorageDir.java
  39. +9 −0 core/server/worker/src/main/java/alluxio/worker/block/meta/StorageDirView.java
  40. +6 −1 core/server/worker/src/main/java/alluxio/worker/block/meta/StorageTier.java
  41. +2 −1 core/server/worker/src/main/java/alluxio/worker/grpc/BlockWriteHandler.java
  42. +9 −0 core/server/worker/src/main/java/alluxio/worker/grpc/BlockWriteRequest.java
  43. +2 −1 core/server/worker/src/main/java/alluxio/worker/grpc/ShortCircuitBlockWriteHandler.java
  44. +2 −1 core/server/worker/src/main/java/alluxio/worker/grpc/UfsFallbackBlockWriteHandler.java
  45. +2 −1 core/server/worker/src/test/java/alluxio/worker/block/BlockMetadataManagerTest.java
  46. +8 −1 core/server/worker/src/test/java/alluxio/worker/block/BlockStoreLocationTest.java
  47. +12 −9 core/server/worker/src/test/java/alluxio/worker/block/BlockWorkerTest.java
  48. +9 −4 core/server/worker/src/test/java/alluxio/worker/block/SpaceReserverTest.java
  49. +20 −1 core/server/worker/src/test/java/alluxio/worker/block/TieredBlockStoreTest.java
  50. +20 −6 core/server/worker/src/test/java/alluxio/worker/block/TieredBlockStoreTestUtils.java
  51. +3 −1 core/server/worker/src/test/java/alluxio/worker/block/allocator/AllocatorTestBase.java
  52. +3 −3 core/server/worker/src/test/java/alluxio/worker/block/evictor/LRUEvictorTest.java
  53. +4 −2 core/server/worker/src/test/java/alluxio/worker/block/meta/AbstractBlockMetaTest.java
  54. +3 −1 core/server/worker/src/test/java/alluxio/worker/block/meta/BlockMetaTest.java
  55. +6 −5 core/server/worker/src/test/java/alluxio/worker/block/meta/StorageDirTest.java
  56. +3 −2 core/server/worker/src/test/java/alluxio/worker/block/meta/StorageTierTest.java
  57. +2 −1 core/server/worker/src/test/java/alluxio/worker/block/meta/TempBlockMetaTest.java
  58. +1 −1 core/server/worker/src/test/java/alluxio/worker/grpc/BlockWriteHandlerTest.java
  59. +1 −0 core/transport/src/grpc/block_master.proto
  60. +4 −2 core/transport/src/grpc/block_worker.proto
  61. +1 −0 core/transport/src/grpc/common.proto
  62. +1 −0 core/transport/src/grpc/file_system_master.proto
  63. +151 −0 core/transport/src/main/java/alluxio/grpc/BlockLocation.java
  64. +14 −0 core/transport/src/main/java/alluxio/grpc/BlockLocationOrBuilder.java
  65. +82 −82 core/transport/src/main/java/alluxio/grpc/BlockMasterProto.java
  66. +40 −39 core/transport/src/main/java/alluxio/grpc/BlockWorkerProto.java
  67. +151 −0 core/transport/src/main/java/alluxio/grpc/CommitBlockPRequest.java
  68. +14 −0 core/transport/src/main/java/alluxio/grpc/CommitBlockPRequestOrBuilder.java
  69. +29 −28 core/transport/src/main/java/alluxio/grpc/CommonProto.java
  70. +153 −2 core/transport/src/main/java/alluxio/grpc/CreateLocalBlockRequest.java
  71. +14 −0 core/transport/src/main/java/alluxio/grpc/CreateLocalBlockRequestOrBuilder.java
  72. +148 −147 core/transport/src/main/java/alluxio/grpc/FileSystemMasterProto.java
  73. +169 −0 core/transport/src/main/java/alluxio/grpc/SetAttributePOptions.java
  74. +19 −0 core/transport/src/main/java/alluxio/grpc/SetAttributePOptionsOrBuilder.java
  75. +153 −2 core/transport/src/main/java/alluxio/grpc/WriteRequestCommand.java
  76. +14 −0 core/transport/src/main/java/alluxio/grpc/WriteRequestCommandOrBuilder.java
  77. +646 −81 core/transport/src/main/java/alluxio/proto/journal/File.java
  78. +170 −5 core/transport/src/main/java/alluxio/proto/meta/Block.java
  79. +286 −97 core/transport/src/main/java/alluxio/proto/meta/InodeMeta.java
  80. +6 −3 core/transport/src/proto/journal/file.proto
  81. +2 −1 core/transport/src/proto/meta/block.proto
  82. +2 −1 core/transport/src/proto/meta/inode_meta.proto
  83. +1 −0 job/server/src/main/java/alluxio/job/replicate/ReplicateDefinition.java
  84. +9 −0 job/server/src/main/java/alluxio/job/util/JobUtils.java
  85. +7 −2 shell/src/main/java/alluxio/cli/fs/command/FileSystemCommandUtils.java
  86. +26 −2 shell/src/main/java/alluxio/cli/fs/command/PinCommand.java
  87. +2 −1 shell/src/main/java/alluxio/cli/fs/command/UnpinCommand.java
  88. +18 −0 tests/src/test/java/alluxio/client/cli/fs/command/PinCommandIntegrationTest.java
  89. +1 −1 tests/src/test/java/alluxio/client/rest/BlockMasterClientRestApiTest.java
@@ -285,6 +285,7 @@

// block worker
FAILED_COMMIT_BLOCK_TO_MASTER("Failed to commit block with blockId {0,number,#} to master"),
PINNED_TO_MULTIPLE_MEDIUMTYPES("File {0} pinned to multiple medium types"),

// ufs maintenance
UFS_OP_NOT_ALLOWED("Operation {0} not allowed on ufs path {1} under maintenance mode {2}"),
@@ -128,7 +128,8 @@ private GrpcDataWriter(FileSystemContext context, final WorkerNetAddress address
mWriterFlushTimeoutMs = conf.getMs(PropertyKey.USER_NETWORK_WRITER_FLUSH_TIMEOUT);

WriteRequestCommand.Builder builder =
WriteRequestCommand.newBuilder().setId(id).setTier(options.getWriteTier()).setType(type);
WriteRequestCommand.newBuilder().setId(id).setTier(options.getWriteTier()).setType(type)
.setMediumType(options.getMediumType());
if (type == RequestType.UFS_FILE) {
Protocol.CreateUfsFileOptions ufsFileOptions =
Protocol.CreateUfsFileOptions.newBuilder().setUfsPath(options.getUfsPath())
@@ -82,7 +82,8 @@ public static LocalFileDataWriter create(final FileSystemContext context,

CreateLocalBlockRequest.Builder builder =
CreateLocalBlockRequest.newBuilder().setBlockId(blockId)
.setTier(options.getWriteTier()).setSpaceToReserve(fileBufferByes);
.setTier(options.getWriteTier()).setSpaceToReserve(fileBufferByes)
.setMediumType(options.getMediumType());
if (options.getWriteType() == WriteType.ASYNC_THROUGH
&& conf.getBoolean(PropertyKey.USER_FILE_UFS_TIER_ENABLED)) {
builder.setCleanupOnFailure(false);
@@ -52,6 +52,7 @@
private int mReplicationMin;
private String mUfsPath;
private long mMountId;
private String mMediumType;

/**
* @param alluxioConf Alluxio configuration
@@ -118,6 +119,14 @@ private OutStreamOptions(AlluxioConfiguration alluxioConf) {
mReplicationDurable = alluxioConf.getInt(PropertyKey.USER_FILE_REPLICATION_DURABLE);
mReplicationMax = alluxioConf.getInt(PropertyKey.USER_FILE_REPLICATION_MAX);
mReplicationMin = alluxioConf.getInt(PropertyKey.USER_FILE_REPLICATION_MIN);
mMediumType = "";
}

/**
* @return the write medium type
*/
public String getMediumType() {
return mMediumType;
}

/**
@@ -232,6 +241,17 @@ public WriteType getWriteType() {
return mWriteType;
}

/**
* Set the write medium type of the file.
*
* @param mediumType write medium type
* @return the updated options object
*/
public OutStreamOptions setMediumType(String mediumType) {
mMediumType = mediumType;
return this;
}

/**
* Sets the acl of the file.
*
@@ -372,6 +392,7 @@ public boolean equals(Object o) {
&& Objects.equal(mCommonOptions, that.mCommonOptions)
&& Objects.equal(mGroup, that.mGroup)
&& Objects.equal(mLocationPolicy, that.mLocationPolicy)
&& Objects.equal(mMediumType, that.mMediumType)
&& Objects.equal(mMode, that.mMode)
&& Objects.equal(mMountId, that.mMountId)
&& Objects.equal(mOwner, that.mOwner)
@@ -391,6 +412,7 @@ public int hashCode() {
mCommonOptions,
mGroup,
mLocationPolicy,
mMediumType,
mMode,
mMountId,
mOwner,
@@ -411,6 +433,7 @@ public String toString() {
.add("commonOptions", mCommonOptions)
.add("group", mGroup)
.add("locationPolicy", mLocationPolicy)
.add("mediumType", mMediumType)
.add("mode", mMode)
.add("mountId", mMountId)
.add("owner", mOwner)
@@ -21,6 +21,7 @@
import com.google.common.base.Preconditions;

import java.util.List;
import java.util.Set;

import javax.annotation.concurrent.ThreadSafe;

@@ -236,6 +237,13 @@ public boolean isPinned() {
return mInfo.isPinned();
}

/**
* @return the pinned location list
*/
public Set<String> getPinnedMediumTypes() {
return mInfo.getMediumTypes();
}

/**
* @return whether the entity referenced by this uri is a mount point
*/
@@ -1648,6 +1648,13 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.MASTER)
.build();
public static final PropertyKey MASTER_TIERED_STORE_GLOBAL_MEDIUMTYPES =
new Builder(Name.MASTER_TIERED_STORE_GLOBAL_MEDIUMTYPES)
.setDefaultValue("MEM, SSD, HDD")
.setDescription("The list of medium types we support in the system.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.MASTER)
.build();
public static final PropertyKey MASTER_TTL_CHECKER_INTERVAL_MS =
new Builder(Name.MASTER_TTL_CHECKER_INTERVAL_MS)
.setAlias(new String[]{"alluxio.master.ttl.checker.interval.ms"})
@@ -2291,6 +2298,13 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.WORKER)
.build();
public static final PropertyKey WORKER_TIERED_STORE_LEVEL0_DIRS_MEDIATYPE =
new Builder(Template.WORKER_TIERED_STORE_LEVEL_DIRS_MEDIUMTYPE, 0)
.setDefaultValue("MEM")
.setDescription("The media type for the top storage tier directories ")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.WORKER)
.build();
public static final PropertyKey WORKER_TIERED_STORE_LEVEL0_DIRS_QUOTA =
new Builder(Template.WORKER_TIERED_STORE_LEVEL_DIRS_QUOTA, 0)
.setDefaultValue("${alluxio.worker.memory.size}")
@@ -2326,6 +2340,13 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.WORKER)
.build();
public static final PropertyKey WORKER_TIERED_STORE_LEVEL1_DIRS_MEDIATYPE =
new Builder(Template.WORKER_TIERED_STORE_LEVEL_DIRS_MEDIUMTYPE, 1)
.setDefaultValue("SSD")
.setDescription("The media type for the second storage tier directories ")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.WORKER)
.build();
public static final PropertyKey WORKER_TIERED_STORE_LEVEL1_DIRS_QUOTA =
new Builder(Template.WORKER_TIERED_STORE_LEVEL_DIRS_QUOTA, 1)
.setDescription("The capacity of the second storage tier.")
@@ -2360,6 +2381,13 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.WORKER)
.build();
public static final PropertyKey WORKER_TIERED_STORE_LEVEL2_DIRS_MEDIATYPE =
new Builder(Template.WORKER_TIERED_STORE_LEVEL_DIRS_MEDIUMTYPE, 2)
.setDefaultValue("HDD")
.setDescription("The media type for the third storage tier directories ")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.WORKER)
.build();
public static final PropertyKey WORKER_TIERED_STORE_LEVEL2_DIRS_QUOTA =
new Builder(Template.WORKER_TIERED_STORE_LEVEL_DIRS_QUOTA, 2)
.setDescription("The capacity of the third storage tier.")
@@ -3817,6 +3845,8 @@ private static String javadocLink(String fullyQualifiedClassname) {
"alluxio.master.tieredstore.global.level2.alias";
public static final String MASTER_TIERED_STORE_GLOBAL_LEVELS =
"alluxio.master.tieredstore.global.levels";
public static final String MASTER_TIERED_STORE_GLOBAL_MEDIUMTYPES =
"alluxio.master.tieredstore.global.media";
public static final String MASTER_TTL_CHECKER_INTERVAL_MS =
"alluxio.master.ttl.checker.interval";
public static final String MASTER_ACTIVE_UFS_SYNC_INTERVAL =
@@ -4240,6 +4270,8 @@ private Name() {} // prevent instantiation
"alluxio\\.worker\\.tieredstore\\.level(\\d+)\\.alias"),
WORKER_TIERED_STORE_LEVEL_DIRS_PATH("alluxio.worker.tieredstore.level%d.dirs.path",
"alluxio\\.worker\\.tieredstore\\.level(\\d+)\\.dirs\\.path"),
WORKER_TIERED_STORE_LEVEL_DIRS_MEDIUMTYPE("alluxio.worker.tieredstore.level%d.dirs.mediumtype",
"alluxio\\.worker\\.tieredstore\\.level(\\d+)\\.dirs\\.mediumtype"),
WORKER_TIERED_STORE_LEVEL_DIRS_QUOTA("alluxio.worker.tieredstore.level%d.dirs.quota",
"alluxio\\.worker\\.tieredstore\\.level(\\d+)\\.dirs\\.quota"),
WORKER_TIERED_STORE_LEVEL_HIGH_WATERMARK_RATIO(
@@ -183,6 +183,7 @@ public static BlockLocation fromProto(alluxio.grpc.BlockLocation blockPLocation)
blockLocation.setWorkerId(blockPLocation.getWorkerId());
blockLocation.setWorkerAddress(fromProto(blockPLocation.getWorkerAddress()));
blockLocation.setTierAlias(blockPLocation.getTierAlias());
blockLocation.setMediumType(blockPLocation.getMediumType());
return blockLocation;
}

@@ -422,7 +423,9 @@ public static PAclEntryType toProto(AclEntryType aclEntryType) {
public static alluxio.grpc.BlockLocation toProto(BlockLocation blockLocation) {
return alluxio.grpc.BlockLocation.newBuilder().setWorkerId(blockLocation.getWorkerId())
.setWorkerAddress(toProto(blockLocation.getWorkerAddress()))
.setTierAlias(blockLocation.getTierAlias()).build();
.setTierAlias(blockLocation.getTierAlias())
.setMediumType(blockLocation.getMediumType())
.build();
}

/**
@@ -32,6 +32,7 @@
private long mWorkerId;
private WorkerNetAddress mWorkerAddress = new WorkerNetAddress();
private String mTierAlias = "";
private String mMediumType = "";

/**
* Creates a new instance of {@link BlockLocation}.
@@ -59,6 +60,13 @@ public String getTierAlias() {
return mTierAlias;
}

/**
* @return the medium type
*/
public String getMediumType() {
return mMediumType;
}

/**
* @param workerId the worker id to use
* @return the block location
@@ -88,6 +96,17 @@ public BlockLocation setTierAlias(String tierAlias) {
return this;
}

/**
*
* @param mediumType the medium type to use
* @return the block location
*/
public BlockLocation setMediumType(String mediumType) {
Preconditions.checkNotNull(mediumType, "mediumType");
mMediumType = mediumType;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) {
@@ -98,19 +117,21 @@ public boolean equals(Object o) {
}
BlockLocation that = (BlockLocation) o;
return mWorkerId == that.mWorkerId && mWorkerAddress.equals(that.mWorkerAddress)
&& mTierAlias.equals(that.mTierAlias);
&& mTierAlias.equals(that.mTierAlias) && mMediumType.equals(that.mMediumType);
}

@Override
public int hashCode() {
return Objects.hashCode(mWorkerId, mWorkerAddress, mTierAlias);
return Objects.hashCode(mWorkerId, mWorkerAddress, mTierAlias, mMediumType);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("workerId", mWorkerId)
.add("address", mWorkerAddress)
.add("tierAlias", mTierAlias).toString();
.add("tierAlias", mTierAlias)
.add("mediumType", mMediumType)
.toString();
}
}
@@ -22,7 +22,9 @@

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import javax.annotation.concurrent.NotThreadSafe;

@@ -45,6 +47,7 @@
private boolean mCompleted;
private boolean mFolder;
private boolean mPinned;
private Set<String> mMediumTypes = new HashSet<>();
private boolean mCacheable;
private boolean mPersisted;
private ArrayList<Long> mBlockIds = new ArrayList<>();
@@ -298,6 +301,13 @@ public DefaultAccessControlList getDefaultAcl() {
return (mDefaultAcl == null) ? new ArrayList<>() : mDefaultAcl.toStringEntries();
}

/**
* @return a set of pinned locations
*/
public Set<String> getMediumTypes() {
return mMediumTypes;
}

/**
* @param fileId the file id to use
* @return the file information
@@ -575,6 +585,15 @@ public FileInfo setDefaultAcl(DefaultAccessControlList defaultAcl) {
return this;
}

/**
* @param mediumTypes the pinned locations
* @return the file information
*/
public FileInfo setMediumTypes(Set<String> mediumTypes) {
mMediumTypes = mediumTypes;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) {
@@ -598,7 +617,8 @@ public boolean equals(Object o) {
&& mMountId == that.mMountId && mInAlluxioPercentage == that.mInAlluxioPercentage
&& mUfsFingerprint.equals(that.mUfsFingerprint)
&& Objects.equal(mAcl, that.mAcl)
&& Objects.equal(mDefaultAcl, that.mDefaultAcl);
&& Objects.equal(mDefaultAcl, that.mDefaultAcl)
&& Objects.equal(mMediumTypes, that.mMediumTypes);
}

@Override
@@ -607,7 +627,7 @@ public int hashCode() {
mCreationTimeMs, mCompleted, mFolder, mPinned, mCacheable, mPersisted, mBlockIds,
mInMemoryPercentage, mLastModificationTimeMs, mTtl, mOwner, mGroup, mMode, mReplicationMax,
mReplicationMin, mPersistenceState, mMountPoint, mFileBlockInfos, mTtlAction,
mInAlluxioPercentage, mUfsFingerprint, mAcl, mDefaultAcl);
mInAlluxioPercentage, mUfsFingerprint, mAcl, mDefaultAcl, mMediumTypes);
}

@Override
@@ -618,7 +638,8 @@ public String toString() {
.add("path", mPath)
.add("ufsPath", mUfsPath).add("length", mLength).add("blockSizeBytes", mBlockSizeBytes)
.add("creationTimeMs", mCreationTimeMs).add("completed", mCompleted).add("folder", mFolder)
.add("pinned", mPinned).add("cacheable", mCacheable).add("persisted", mPersisted)
.add("pinned", mPinned).add("pinnedlocation", mMediumTypes)
.add("cacheable", mCacheable).add("persisted", mPersisted)
.add("blockIds", mBlockIds).add("inMemoryPercentage", mInMemoryPercentage)
.add("lastModificationTimesMs", mLastModificationTimeMs).add("ttl", mTtl)
.add("ttlAction", mTtlAction).add("owner", mOwner).add("group", mGroup).add("mode", mMode)
@@ -52,10 +52,12 @@ public static BlockLocation createRandom() {
long workerId = random.nextLong();
WorkerNetAddress workerAddress = WorkerNetAddressTest.createRandom();
String tierAlias = CommonUtils.randomAlphaNumString(random.nextInt(10));
String mediumType = CommonUtils.randomAlphaNumString(random.nextInt(3));

result.setWorkerId(workerId);
result.setWorkerAddress(workerAddress);
result.setTierAlias(tierAlias);
result.setMediumType(mediumType);

return result;
}
@@ -115,13 +115,14 @@ void validateBlocks(Function<Long, Boolean> validator, boolean repair)
* @param workerId the worker id committing the block
* @param usedBytesOnTier the updated used bytes on the tier of the worker
* @param tierAlias the alias of the storage tier where the worker is committing the block to
* @param mediumType the medium type where the worker is committing the block to
* @param blockId the committing block id
* @param length the length of the block
* @throws NotFoundException if the workerId is not active
*/
// TODO(binfan): check the logic is correct or not when commitBlock is a retry
void commitBlock(long workerId, long usedBytesOnTier, String tierAlias, long blockId, long
length) throws NotFoundException, UnavailableException;
void commitBlock(long workerId, long usedBytesOnTier, String tierAlias, String mediumType,
long blockId, long length) throws NotFoundException, UnavailableException;

/**
* Marks a block as committed, but without a worker location. This means the block is only in ufs.
@@ -89,10 +89,12 @@ public void commitBlock(CommitBlockPRequest request,
final long usedBytesOnTier = request.getUsedBytesOnTier();
final String tierAlias = request.getTierAlias();
final long blockId = request.getBlockId();
final String mediumType = request.getMediumType();
final long length = request.getLength();

RpcUtils.call(LOG, (RpcUtils.RpcCallableThrowsIOException<CommitBlockPResponse>) () -> {
mBlockMaster.commitBlock(workerId, usedBytesOnTier, tierAlias, blockId, length);
mBlockMaster.commitBlock(workerId, usedBytesOnTier, tierAlias, mediumType,
blockId, length);
return CommitBlockPResponse.getDefaultInstance();
}, "commitBlock", "request=%s", responseObserver, request);
}

0 comments on commit c8e5dbf

Please sign in to comment.
You can’t perform that action at this time.