Skip to content

Commit

Permalink
Stash
Browse files Browse the repository at this point in the history
  • Loading branch information
apc999 committed Apr 20, 2017
1 parent 5eaaf05 commit d710f2b
Show file tree
Hide file tree
Showing 16 changed files with 284 additions and 41 deletions.
Expand Up @@ -108,14 +108,17 @@ public static BlockInStream createRemoteBlockInStream(FileSystemContext context,
* @param blockSize the block size
* @param blockStart the start position of the block in the UFS file
* @param address the worker network address
* @param alluxioMountPoint the mount point of the file in Alluxio
* @param mountTableVersion the version of Alluxio mount table
* @param options the in stream options
* @return the input stream
* @throws IOException if it fails to create the input stream
*/
public static BlockInStream createUfsBlockInStream(FileSystemContext context, String ufsPath,
long blockId, long blockSize, long blockStart, WorkerNetAddress address,
String alluxioMountPoint, long mountTableVersion,
InStreamOptions options) throws IOException {
return BlockInStream.createUfsBlockInStream(context, ufsPath, blockId, blockSize, blockStart,
address, options);
alluxioMountPoint, mountTableVersion, address, options);
}
}
Expand Up @@ -26,6 +26,8 @@ public final class LockBlockOptions {
private long mOffset;
private long mBlockSize;
private int mMaxUfsReadConcurrency;
private String mAlluxioMountPoint;
private long mMountTableVersion;

/**
* @return the default {@link LockBlockOptions}
Expand Down Expand Up @@ -67,6 +69,20 @@ public int getMaxUfsReadConcurrency() {
return mMaxUfsReadConcurrency;
}

/**
* @return the mount point path in Alluxio for the file this block belonging to
*/
public String getAlluxioMountPoint() {
return mAlluxioMountPoint;
}

/**
* @return the mount table version
*/
public long getMountTableVersion() {
return mMountTableVersion;
}

/**
* @param ufsPath the UFS path to set
* @return the updated options object
Expand Down Expand Up @@ -103,6 +119,24 @@ public LockBlockOptions setMaxUfsReadConcurrency(int maxUfsReadConcurrency) {
return this;
}

/**
* @param alluxioMountPoint the mount point path in Alluxio for this file
* @return the updated options object
*/
public LockBlockOptions setAlluxioMountPoint(String alluxioMountPoint) {
mAlluxioMountPoint = alluxioMountPoint;
return this;
}

/**
* @param mountTableVersion the mount table version
* @return the updated options object
*/
public LockBlockOptions setMountTableVersion(long mountTableVersion) {
mMountTableVersion = mountTableVersion;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -112,24 +146,30 @@ public boolean equals(Object o) {
return false;
}
LockBlockOptions that = (LockBlockOptions) o;
return Objects.equal(mUfsPath, that.mUfsPath)
return Objects.equal(mAlluxioMountPoint, that.mAlluxioMountPoint)
&& Objects.equal(mUfsPath, that.mUfsPath)
&& Objects.equal(mOffset, that.mOffset)
&& Objects.equal(mBlockSize, that.mBlockSize)
&& Objects.equal(mMaxUfsReadConcurrency, that.mMaxUfsReadConcurrency);
&& Objects.equal(mMaxUfsReadConcurrency, that.mMaxUfsReadConcurrency)
&& Objects.equal(mMountTableVersion, that.mMountTableVersion);
}

@Override
public int hashCode() {
return Objects.hashCode(mUfsPath, mOffset, mBlockSize, mMaxUfsReadConcurrency);
return Objects.hashCode(mAlluxioMountPoint, mBlockSize, mMaxUfsReadConcurrency,
mMountTableVersion, mOffset, mUfsPath);
}

@Override
public String toString() {
return Objects.toStringHelper(this)
.add("alluxioMountPoint", mAlluxioMountPoint)
.add("blockSize", mBlockSize)
.add("maxUfsReadConcurrency", mMaxUfsReadConcurrency)
.add("mountTableVersion", mMountTableVersion)
.add("offset", mOffset)
.add("ufsPath", mUfsPath).toString();
.add("ufsPath", mUfsPath)
.toString();
}

/**
Expand All @@ -139,10 +179,12 @@ public String toString() {
*/
public LockBlockTOptions toThrift() {
LockBlockTOptions options = new LockBlockTOptions();
options.setUfsPath(mUfsPath);
options.setOffset(mOffset);
options.setAlluxioMountPoint(mAlluxioMountPoint);
options.setBlockSize(mBlockSize);
options.setMaxUfsReadConcurrency(mMaxUfsReadConcurrency);
options.setMountTableVersion(mMountTableVersion);
options.setOffset(mOffset);
options.setUfsPath(mUfsPath);
return options;
}
}
Expand Up @@ -143,22 +143,26 @@ public static BlockInStream createRemoteBlockInStream(long blockId, long blockSi
* @param blockId the block ID
* @param blockSize the block size
* @param blockStart the position at which the block starts in the file
* @param alluxioMountPoint the mount point of the file in Alluxio
* @param mountTableVersion the version of Alluxio mount table
* @param workerNetAddress the worker network address
* @param options the options
* @throws IOException if it fails to create an instance
* @return the {@link BlockInStream} created
*/
// TODO(peis): Use options idiom (ALLUXIO-2579).
public static BlockInStream createUfsBlockInStream(FileSystemContext context, String ufsPath,
long blockId, long blockSize, long blockStart,
long blockId, long blockSize, long blockStart, String alluxioMountPoint,
long mountTableVersion,
WorkerNetAddress workerNetAddress, InStreamOptions options) throws IOException {
Closer closer = Closer.create();
try {
BlockWorkerClient blockWorkerClient =
closer.register(context.createBlockWorkerClient(workerNetAddress));
LockBlockOptions lockBlockOptions =
LockBlockOptions.defaults().setUfsPath(ufsPath).setOffset(blockStart)
.setBlockSize(blockSize).setMaxUfsReadConcurrency(options.getMaxUfsReadConcurrency());
.setBlockSize(blockSize).setMaxUfsReadConcurrency(options.getMaxUfsReadConcurrency())
.setAlluxioMountPoint(alluxioMountPoint).setMountTableVersion(mountTableVersion);

LockBlockResult lockBlockResult =
closer.register(blockWorkerClient.lockUfsBlock(blockId, lockBlockOptions)).getResult();
Expand Down
Expand Up @@ -347,7 +347,8 @@ protected BlockInStream createUnderStoreBlockInStream(long blockId, long blockSt
.setBlockWorkerInfos(mBlockStore.getWorkerInfoList()).setBlockId(blockId)
.setBlockSize(length));
return StreamFactory.createUfsBlockInStream(mContext, path, blockId, length, blockStart,
address, mInStreamOptions);
address, mStatus.getAlluxioMountPoint(), mStatus.getMountTableVersion(),
mInStreamOptions);
} catch (AlluxioException e) {
throw new IOException(e);
}
Expand Down
14 changes: 14 additions & 0 deletions core/common/src/main/java/alluxio/client/file/URIStatus.java
Expand Up @@ -206,6 +206,20 @@ public boolean isMountPoint() {
return mInfo.isMountPoint();
}

/**
* @return the mount point path in Alluxio for this file
*/
public String getAlluxioMountPoint() {
return mInfo.getAlluxioMountPoint();
}

/**
* @return the mount table version
*/
public long getMountTableVersion() {
return mInfo.getMountTableVersion();
}

/**
* @return the list of file block descriptors
*/
Expand Down
46 changes: 43 additions & 3 deletions core/common/src/main/java/alluxio/wire/FileInfo.java
Expand Up @@ -51,6 +51,8 @@ public final class FileInfo implements Serializable {
private String mPersistenceState = "";
private boolean mMountPoint;
private ArrayList<FileBlockInfo> mFileBlockInfos = new ArrayList<>();
private String mAlluxioMountPoint;
private long mMountTableVersion;

/**
* Creates a new instance of {@link FileInfo}.
Expand Down Expand Up @@ -91,6 +93,8 @@ protected FileInfo(alluxio.thrift.FileInfo fileInfo) {
mFileBlockInfos.add(new FileBlockInfo(fileBlockInfo));
}
}
mAlluxioMountPoint = fileInfo.getAlluxioMountPoint();
mMountTableVersion = fileInfo.getMountTableVersion();
}

/**
Expand Down Expand Up @@ -254,6 +258,20 @@ public List<FileBlockInfo> getFileBlockInfos() {
return mFileBlockInfos;
}

/**
* @return the mount point path in Alluxio for this file
*/
public String getAlluxioMountPoint() {
return mAlluxioMountPoint;
}

/**
* @return the mount table version
*/
public long getMountTableVersion() {
return mMountTableVersion;
}

/**
* @param fileId the file id to use
* @return the file information
Expand Down Expand Up @@ -468,6 +486,22 @@ public FileInfo setFileBlockInfos(List<FileBlockInfo> fileBlockInfos) {
return this;
}

/**
* @param alluxioMountPoint the mount point path in Alluxio for this file
* @return the file information
*/
public void setAlluxioMountPoint(String alluxioMountPoint) {
mAlluxioMountPoint = alluxioMountPoint;
}

/**
* @param mountTableVersion the mount table version
* @return the file information
*/
public void setMountTableVersion(long mountTableVersion) {
mMountTableVersion = mountTableVersion;
}

/**
* @return thrift representation of the file information
*/
Expand All @@ -481,7 +515,8 @@ protected alluxio.thrift.FileInfo toThrift() {
new alluxio.thrift.FileInfo(mFileId, mName, mPath, mUfsPath, mLength, mBlockSizeBytes,
mCreationTimeMs, mCompleted, mFolder, mPinned, mCacheable, mPersisted, mBlockIds,
mInMemoryPercentage, mLastModificationTimeMs, mTtl, mOwner, mGroup, mMode,
mPersistenceState, mMountPoint, fileBlockInfos, ThriftUtils.toThrift(mTtlAction));
mPersistenceState, mMountPoint, fileBlockInfos, ThriftUtils.toThrift(mTtlAction),
mAlluxioMountPoint, mMountTableVersion);
return info;
}

Expand All @@ -503,7 +538,9 @@ public boolean equals(Object o) {
&& mLastModificationTimeMs == that.mLastModificationTimeMs && mTtl == that.mTtl
&& mOwner.equals(that.mOwner) && mGroup.equals(that.mGroup) && mMode == that.mMode
&& mPersistenceState.equals(that.mPersistenceState) && mMountPoint == that.mMountPoint
&& mFileBlockInfos.equals(that.mFileBlockInfos) && mTtlAction == that.mTtlAction;
&& mFileBlockInfos.equals(that.mFileBlockInfos) && mTtlAction == that.mTtlAction
&& mAlluxioMountPoint.equals(that.mAlluxioMountPoint)
&& mMountTableVersion == that.mMountTableVersion;
}

@Override
Expand All @@ -524,6 +561,9 @@ public String toString() {
.add("lastModificationTimesMs", mLastModificationTimeMs).add("ttl", mTtl)
.add("ttlAction", mTtlAction).add("owner", mOwner).add("group", mGroup).add("mode", mMode)
.add("persistenceState", mPersistenceState).add("mountPoint", mMountPoint)
.add("fileBlockInfos", mFileBlockInfos).toString();
.add("fileBlockInfos", mFileBlockInfos)
.add("alluxioMountPoint", mAlluxioMountPoint)
.add("mountTableVersion", mMountTableVersion)
.toString();
}
}
Expand Up @@ -688,10 +688,9 @@ private FileInfo getFileInfoInternal(LockedInodePath inodePath)
throw new FileDoesNotExistException(e.getMessage(), e);
}
AlluxioURI resolvedUri = resolution.getUri();
// Only set the UFS path if the path is nested under a mount point.
if (!uri.equals(resolvedUri)) {
fileInfo.setUfsPath(resolvedUri.toString());
}
fileInfo.setUfsPath(resolvedUri.toString());
fileInfo.setAlluxioMountPoint(resolution.getMountPoint().toString());
fileInfo.setMountTableVersion(resolution.getVersion());
Metrics.FILE_INFOS_GOT.inc();
return fileInfo;
}
Expand Down

0 comments on commit d710f2b

Please sign in to comment.