diff --git a/core/client/src/main/java/alluxio/client/block/StreamFactory.java b/core/client/src/main/java/alluxio/client/block/StreamFactory.java index b31754c4f543..3750c3fb558c 100644 --- a/core/client/src/main/java/alluxio/client/block/StreamFactory.java +++ b/core/client/src/main/java/alluxio/client/block/StreamFactory.java @@ -108,14 +108,17 @@ public static BlockInStream createRemoteBlockInStream(FileSystemContext context, * @param blockSize the block size * @param blockStart the start position of the block in the UFS file * @param address the worker network address + * @param alluxioMountPoint the mount point of the file in Alluxio + * @param mountTableVersion the version of Alluxio mount table * @param options the in stream options * @return the input stream * @throws IOException if it fails to create the input stream */ public static BlockInStream createUfsBlockInStream(FileSystemContext context, String ufsPath, long blockId, long blockSize, long blockStart, WorkerNetAddress address, + String alluxioMountPoint, long mountTableVersion, InStreamOptions options) throws IOException { return BlockInStream.createUfsBlockInStream(context, ufsPath, blockId, blockSize, blockStart, - address, options); + alluxioMountPoint, mountTableVersion, address, options); } } diff --git a/core/client/src/main/java/alluxio/client/block/options/LockBlockOptions.java b/core/client/src/main/java/alluxio/client/block/options/LockBlockOptions.java index 5e7bf9f5bb7f..0792ec61a445 100644 --- a/core/client/src/main/java/alluxio/client/block/options/LockBlockOptions.java +++ b/core/client/src/main/java/alluxio/client/block/options/LockBlockOptions.java @@ -26,6 +26,8 @@ public final class LockBlockOptions { private long mOffset; private long mBlockSize; private int mMaxUfsReadConcurrency; + private String mAlluxioMountPoint; + private long mMountTableVersion; /** * @return the default {@link LockBlockOptions} @@ -67,6 +69,20 @@ public int getMaxUfsReadConcurrency() { return mMaxUfsReadConcurrency; } + /** + * @return the mount point path in Alluxio for the file this block belonging to + */ + public String getAlluxioMountPoint() { + return mAlluxioMountPoint; + } + + /** + * @return the mount table version + */ + public long getMountTableVersion() { + return mMountTableVersion; + } + /** * @param ufsPath the UFS path to set * @return the updated options object @@ -103,6 +119,24 @@ public LockBlockOptions setMaxUfsReadConcurrency(int maxUfsReadConcurrency) { return this; } + /** + * @param alluxioMountPoint the mount point path in Alluxio for this file + * @return the updated options object + */ + public LockBlockOptions setAlluxioMountPoint(String alluxioMountPoint) { + mAlluxioMountPoint = alluxioMountPoint; + return this; + } + + /** + * @param mountTableVersion the mount table version + * @return the updated options object + */ + public LockBlockOptions setMountTableVersion(long mountTableVersion) { + mMountTableVersion = mountTableVersion; + return this; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -112,24 +146,30 @@ public boolean equals(Object o) { return false; } LockBlockOptions that = (LockBlockOptions) o; - return Objects.equal(mUfsPath, that.mUfsPath) + return Objects.equal(mAlluxioMountPoint, that.mAlluxioMountPoint) + && Objects.equal(mUfsPath, that.mUfsPath) && Objects.equal(mOffset, that.mOffset) && Objects.equal(mBlockSize, that.mBlockSize) - && Objects.equal(mMaxUfsReadConcurrency, that.mMaxUfsReadConcurrency); + && Objects.equal(mMaxUfsReadConcurrency, that.mMaxUfsReadConcurrency) + && Objects.equal(mMountTableVersion, that.mMountTableVersion); } @Override public int hashCode() { - return Objects.hashCode(mUfsPath, mOffset, mBlockSize, mMaxUfsReadConcurrency); + return Objects.hashCode(mAlluxioMountPoint, mBlockSize, mMaxUfsReadConcurrency, + mMountTableVersion, mOffset, mUfsPath); } @Override public String toString() { return Objects.toStringHelper(this) + .add("alluxioMountPoint", mAlluxioMountPoint) .add("blockSize", mBlockSize) .add("maxUfsReadConcurrency", mMaxUfsReadConcurrency) + .add("mountTableVersion", mMountTableVersion) .add("offset", mOffset) - .add("ufsPath", mUfsPath).toString(); + .add("ufsPath", mUfsPath) + .toString(); } /** @@ -139,10 +179,12 @@ public String toString() { */ public LockBlockTOptions toThrift() { LockBlockTOptions options = new LockBlockTOptions(); - options.setUfsPath(mUfsPath); - options.setOffset(mOffset); + options.setAlluxioMountPoint(mAlluxioMountPoint); options.setBlockSize(mBlockSize); options.setMaxUfsReadConcurrency(mMaxUfsReadConcurrency); + options.setMountTableVersion(mMountTableVersion); + options.setOffset(mOffset); + options.setUfsPath(mUfsPath); return options; } } diff --git a/core/client/src/main/java/alluxio/client/block/stream/BlockInStream.java b/core/client/src/main/java/alluxio/client/block/stream/BlockInStream.java index 0e9dca0aa0b6..2e468d98bf13 100644 --- a/core/client/src/main/java/alluxio/client/block/stream/BlockInStream.java +++ b/core/client/src/main/java/alluxio/client/block/stream/BlockInStream.java @@ -143,6 +143,8 @@ public static BlockInStream createRemoteBlockInStream(long blockId, long blockSi * @param blockId the block ID * @param blockSize the block size * @param blockStart the position at which the block starts in the file + * @param alluxioMountPoint the mount point of the file in Alluxio + * @param mountTableVersion the version of Alluxio mount table * @param workerNetAddress the worker network address * @param options the options * @throws IOException if it fails to create an instance @@ -150,7 +152,8 @@ public static BlockInStream createRemoteBlockInStream(long blockId, long blockSi */ // TODO(peis): Use options idiom (ALLUXIO-2579). public static BlockInStream createUfsBlockInStream(FileSystemContext context, String ufsPath, - long blockId, long blockSize, long blockStart, + long blockId, long blockSize, long blockStart, String alluxioMountPoint, + long mountTableVersion, WorkerNetAddress workerNetAddress, InStreamOptions options) throws IOException { Closer closer = Closer.create(); try { @@ -158,7 +161,8 @@ public static BlockInStream createUfsBlockInStream(FileSystemContext context, St closer.register(context.createBlockWorkerClient(workerNetAddress)); LockBlockOptions lockBlockOptions = LockBlockOptions.defaults().setUfsPath(ufsPath).setOffset(blockStart) - .setBlockSize(blockSize).setMaxUfsReadConcurrency(options.getMaxUfsReadConcurrency()); + .setBlockSize(blockSize).setMaxUfsReadConcurrency(options.getMaxUfsReadConcurrency()) + .setAlluxioMountPoint(alluxioMountPoint).setMountTableVersion(mountTableVersion); LockBlockResult lockBlockResult = closer.register(blockWorkerClient.lockUfsBlock(blockId, lockBlockOptions)).getResult(); diff --git a/core/client/src/main/java/alluxio/client/file/FileInStream.java b/core/client/src/main/java/alluxio/client/file/FileInStream.java index b78f032a9066..7f6da2fe4c56 100644 --- a/core/client/src/main/java/alluxio/client/file/FileInStream.java +++ b/core/client/src/main/java/alluxio/client/file/FileInStream.java @@ -347,7 +347,8 @@ protected BlockInStream createUnderStoreBlockInStream(long blockId, long blockSt .setBlockWorkerInfos(mBlockStore.getWorkerInfoList()).setBlockId(blockId) .setBlockSize(length)); return StreamFactory.createUfsBlockInStream(mContext, path, blockId, length, blockStart, - address, mInStreamOptions); + address, mStatus.getAlluxioMountPoint(), mStatus.getMountTableVersion(), + mInStreamOptions); } catch (AlluxioException e) { throw new IOException(e); } diff --git a/core/common/src/main/java/alluxio/client/file/URIStatus.java b/core/common/src/main/java/alluxio/client/file/URIStatus.java index ddb25043d151..ef78ec2776c5 100644 --- a/core/common/src/main/java/alluxio/client/file/URIStatus.java +++ b/core/common/src/main/java/alluxio/client/file/URIStatus.java @@ -206,6 +206,20 @@ public boolean isMountPoint() { return mInfo.isMountPoint(); } + /** + * @return the mount point path in Alluxio for this file + */ + public String getAlluxioMountPoint() { + return mInfo.getAlluxioMountPoint(); + } + + /** + * @return the mount table version + */ + public long getMountTableVersion() { + return mInfo.getMountTableVersion(); + } + /** * @return the list of file block descriptors */ diff --git a/core/common/src/main/java/alluxio/wire/FileInfo.java b/core/common/src/main/java/alluxio/wire/FileInfo.java index e1f57b08db82..f7bef74ffda0 100644 --- a/core/common/src/main/java/alluxio/wire/FileInfo.java +++ b/core/common/src/main/java/alluxio/wire/FileInfo.java @@ -51,6 +51,8 @@ public final class FileInfo implements Serializable { private String mPersistenceState = ""; private boolean mMountPoint; private ArrayList mFileBlockInfos = new ArrayList<>(); + private String mAlluxioMountPoint; + private long mMountTableVersion; /** * Creates a new instance of {@link FileInfo}. @@ -91,6 +93,8 @@ protected FileInfo(alluxio.thrift.FileInfo fileInfo) { mFileBlockInfos.add(new FileBlockInfo(fileBlockInfo)); } } + mAlluxioMountPoint = fileInfo.getAlluxioMountPoint(); + mMountTableVersion = fileInfo.getMountTableVersion(); } /** @@ -254,6 +258,20 @@ public List getFileBlockInfos() { return mFileBlockInfos; } + /** + * @return the mount point path in Alluxio for this file + */ + public String getAlluxioMountPoint() { + return mAlluxioMountPoint; + } + + /** + * @return the mount table version + */ + public long getMountTableVersion() { + return mMountTableVersion; + } + /** * @param fileId the file id to use * @return the file information @@ -468,6 +486,22 @@ public FileInfo setFileBlockInfos(List fileBlockInfos) { return this; } + /** + * @param alluxioMountPoint the mount point path in Alluxio for this file + * @return the file information + */ + public void setAlluxioMountPoint(String alluxioMountPoint) { + mAlluxioMountPoint = alluxioMountPoint; + } + + /** + * @param mountTableVersion the mount table version + * @return the file information + */ + public void setMountTableVersion(long mountTableVersion) { + mMountTableVersion = mountTableVersion; + } + /** * @return thrift representation of the file information */ @@ -481,7 +515,8 @@ protected alluxio.thrift.FileInfo toThrift() { new alluxio.thrift.FileInfo(mFileId, mName, mPath, mUfsPath, mLength, mBlockSizeBytes, mCreationTimeMs, mCompleted, mFolder, mPinned, mCacheable, mPersisted, mBlockIds, mInMemoryPercentage, mLastModificationTimeMs, mTtl, mOwner, mGroup, mMode, - mPersistenceState, mMountPoint, fileBlockInfos, ThriftUtils.toThrift(mTtlAction)); + mPersistenceState, mMountPoint, fileBlockInfos, ThriftUtils.toThrift(mTtlAction), + mAlluxioMountPoint, mMountTableVersion); return info; } @@ -503,7 +538,9 @@ public boolean equals(Object o) { && mLastModificationTimeMs == that.mLastModificationTimeMs && mTtl == that.mTtl && mOwner.equals(that.mOwner) && mGroup.equals(that.mGroup) && mMode == that.mMode && mPersistenceState.equals(that.mPersistenceState) && mMountPoint == that.mMountPoint - && mFileBlockInfos.equals(that.mFileBlockInfos) && mTtlAction == that.mTtlAction; + && mFileBlockInfos.equals(that.mFileBlockInfos) && mTtlAction == that.mTtlAction + && mAlluxioMountPoint.equals(that.mAlluxioMountPoint) + && mMountTableVersion == that.mMountTableVersion; } @Override @@ -524,6 +561,9 @@ public String toString() { .add("lastModificationTimesMs", mLastModificationTimeMs).add("ttl", mTtl) .add("ttlAction", mTtlAction).add("owner", mOwner).add("group", mGroup).add("mode", mMode) .add("persistenceState", mPersistenceState).add("mountPoint", mMountPoint) - .add("fileBlockInfos", mFileBlockInfos).toString(); + .add("fileBlockInfos", mFileBlockInfos) + .add("alluxioMountPoint", mAlluxioMountPoint) + .add("mountTableVersion", mMountTableVersion) + .toString(); } } diff --git a/core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java b/core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java index 9448a5abbdcc..a9409073abda 100644 --- a/core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java +++ b/core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java @@ -688,10 +688,9 @@ private FileInfo getFileInfoInternal(LockedInodePath inodePath) throw new FileDoesNotExistException(e.getMessage(), e); } AlluxioURI resolvedUri = resolution.getUri(); - // Only set the UFS path if the path is nested under a mount point. - if (!uri.equals(resolvedUri)) { - fileInfo.setUfsPath(resolvedUri.toString()); - } + fileInfo.setUfsPath(resolvedUri.toString()); + fileInfo.setAlluxioMountPoint(resolution.getMountPoint().toString()); + fileInfo.setMountTableVersion(resolution.getVersion()); Metrics.FILE_INFOS_GOT.inc(); return fileInfo; } diff --git a/core/server/master/src/main/java/alluxio/master/file/meta/MountTable.java b/core/server/master/src/main/java/alluxio/master/file/meta/MountTable.java index 7e110520c1e9..3b0f1a1a3bb0 100644 --- a/core/server/master/src/main/java/alluxio/master/file/meta/MountTable.java +++ b/core/server/master/src/main/java/alluxio/master/file/meta/MountTable.java @@ -58,6 +58,13 @@ public final class MountTable implements JournalEntryIterable { @GuardedBy("mLock") private final Map mMountTable; + @GuardedBy("mLock") + private long mVersion; + + private static long getCurrentVersion() { + return System.currentTimeMillis(); + } + /** * Creates a new instance of {@link MountTable}. */ @@ -67,6 +74,7 @@ public MountTable() { mLock = new ReentrantReadWriteLock(); mReadLock = mLock.readLock(); mWriteLock = mLock.writeLock(); + mVersion = getCurrentVersion(); } @Override @@ -172,6 +180,7 @@ public void add(AlluxioURI alluxioUri, AlluxioURI ufsUri, MountOptions options) } } mMountTable.put(alluxioPath, new MountInfo(ufsUri, options)); + mVersion = getCurrentVersion(); } } @@ -186,6 +195,7 @@ public void clear() { if (mountInfo != null) { mMountTable.put(ROOT, mountInfo); } + mVersion = getCurrentVersion(); } } @@ -206,6 +216,7 @@ public boolean delete(AlluxioURI uri) { try (LockResource r = new LockResource(mWriteLock)) { if (mMountTable.containsKey(path)) { mMountTable.remove(path); + mVersion = getCurrentVersion(); return true; } LOG.warn("Mount point {} does not exist.", path); @@ -273,16 +284,14 @@ public Resolution resolve(AlluxioURI uri) throws InvalidPathException { LOG.debug("Resolving {}", path); // This will re-acquire the read lock, but that is allowed. String mountPoint = getMountPoint(uri); - if (mountPoint != null) { - MountInfo info = mMountTable.get(mountPoint); - AlluxioURI ufsUri = info.getUfsUri(); - // TODO(gpang): this ufs should probably be cached. - UnderFileSystem ufs = UnderFileSystem.Factory.getMountPoint(ufsUri.toString(), - info.getOptions().getProperties()); - AlluxioURI resolvedUri = ufs.resolveUri(ufsUri, path.substring(mountPoint.length())); - return new Resolution(resolvedUri, ufs, info.getOptions().isShared()); - } - return new Resolution(uri, null, false); + MountInfo info = mMountTable.get(mountPoint); + AlluxioURI ufsUri = info.getUfsUri(); + // TODO(gpang): this ufs should probably be cached. + UnderFileSystem ufs = UnderFileSystem.Factory.getMountPoint(ufsUri.toString(), + info.getOptions().getProperties()); + AlluxioURI resolvedUri = ufs.resolveUri(ufsUri, path.substring(mountPoint.length())); + return new Resolution(resolvedUri, ufs, info.getOptions().isShared(), + new AlluxioURI(mountPoint), mVersion); } } @@ -314,11 +323,16 @@ public final class Resolution { private final AlluxioURI mUri; private final UnderFileSystem mUfs; private final boolean mShared; + private final AlluxioURI mMountPoint; + private final long mVersion; - private Resolution(AlluxioURI uri, UnderFileSystem ufs, boolean shared) { + private Resolution(AlluxioURI uri, UnderFileSystem ufs, boolean shared, AlluxioURI mountPoint, + long version) { mUri = uri; mUfs = ufs; mShared = shared; + mMountPoint = mountPoint; + mVersion = version; } /** @@ -341,5 +355,19 @@ public UnderFileSystem getUfs() { public boolean getShared() { return mShared; } + + /** + * @return the mount point in Alluxio on the queried Uri + */ + public AlluxioURI getMountPoint() { + return mMountPoint; + } + + /** + * @return the version of mount table + */ + public long getVersion() { + return mVersion; + } } } diff --git a/core/server/worker/src/main/java/alluxio/worker/AlluxioWorkerService.java b/core/server/worker/src/main/java/alluxio/worker/AlluxioWorkerService.java index 4c9b878f61f9..474d8fff8326 100644 --- a/core/server/worker/src/main/java/alluxio/worker/AlluxioWorkerService.java +++ b/core/server/worker/src/main/java/alluxio/worker/AlluxioWorkerService.java @@ -14,6 +14,7 @@ import alluxio.Server; import alluxio.wire.WorkerNetAddress; import alluxio.worker.block.BlockWorker; +import alluxio.worker.file.FileDataManager; import java.net.InetSocketAddress; @@ -48,6 +49,11 @@ private Factory() {} // prevent instantiation */ BlockWorker getBlockWorker(); + /** + * @return the file data manager for this Alluxio worker + */ + FileDataManager getFileDataManager(); + /** * @return the worker's data service bind host (used by unit test only) */ diff --git a/core/server/worker/src/main/java/alluxio/worker/DefaultAlluxioWorker.java b/core/server/worker/src/main/java/alluxio/worker/DefaultAlluxioWorker.java index b883d6e6e6af..58c9b9e06055 100644 --- a/core/server/worker/src/main/java/alluxio/worker/DefaultAlluxioWorker.java +++ b/core/server/worker/src/main/java/alluxio/worker/DefaultAlluxioWorker.java @@ -28,6 +28,7 @@ import alluxio.worker.block.BlockWorker; import alluxio.worker.block.DefaultBlockWorker; import alluxio.worker.file.DefaultFileSystemWorker; +import alluxio.worker.file.FileDataManager; import com.google.common.base.Function; import com.google.common.base.Throwables; @@ -176,6 +177,11 @@ public BlockWorker getBlockWorker() { return mBlockWorker; } + @Override + public FileDataManager getFileDataManager() { + return mFileSystemWorker.getFileDataManager(); + } + @Override public InetSocketAddress getRpcAddress() { return mRpcAddress; diff --git a/core/server/worker/src/main/java/alluxio/worker/block/meta/UnderFileSystemBlockMeta.java b/core/server/worker/src/main/java/alluxio/worker/block/meta/UnderFileSystemBlockMeta.java index 222db9a2e7d7..94cf07d258c8 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/meta/UnderFileSystemBlockMeta.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/meta/UnderFileSystemBlockMeta.java @@ -24,6 +24,10 @@ public final class UnderFileSystemBlockMeta { private final long mOffset; /** The block size in bytes. */ private final long mBlockSize; + /** The mount point in Alluxio for the file that this block belonging to. */ + private final String mAlluxioMountPoint; + /** The mount table version. */ + private final long mMountTableVersion; /** * Creates an instance of {@link UnderFileSystemBlockMeta}. @@ -38,6 +42,8 @@ public UnderFileSystemBlockMeta(long sessionId, long blockId, OpenUfsBlockOption mUnderFileSystemPath = options.getUnderFileSystemPath(); mOffset = options.getOffset(); mBlockSize = options.getBlockSize(); + mAlluxioMountPoint = options.getAlluxioMountPoint(); + mMountTableVersion = options.getMountTableVersion(); } /** @@ -74,4 +80,18 @@ public long getOffset() { public long getBlockSize() { return mBlockSize; } + + /** + * @return the mount point in Alluxio for the file this block belonging to + */ + public String getAlluxioMountPoint() { + return mAlluxioMountPoint; + } + + /** + * @return the mount table version + */ + public long getMountTableVersion() { + return mMountTableVersion; + } } diff --git a/core/server/worker/src/main/java/alluxio/worker/block/options/OpenUfsBlockOptions.java b/core/server/worker/src/main/java/alluxio/worker/block/options/OpenUfsBlockOptions.java index 118a76dc7175..16ba4239638a 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/options/OpenUfsBlockOptions.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/options/OpenUfsBlockOptions.java @@ -25,9 +25,12 @@ public final class OpenUfsBlockOptions { private final long mOffset; /** The block size in bytes. */ private final long mBlockSize; - /** The maximum concurrent UFS reader on the UFS block allowed when opening the block. */ private final int mMaxUfsReadConcurrency; + /** The mount point in Alluxio for the file that this block belonging to. */ + private final String mAlluxioMountPoint; + /** The mount table version. */ + private final long mMountTableVersion; /** * Creates an instance of {@link OpenUfsBlockOptions}. @@ -39,6 +42,8 @@ public OpenUfsBlockOptions(LockBlockTOptions options) { mOffset = options.getOffset(); mBlockSize = options.getBlockSize(); mMaxUfsReadConcurrency = options.getMaxUfsReadConcurrency(); + mAlluxioMountPoint = options.getAlluxioMountPoint(); + mMountTableVersion = options.getMountTableVersion(); } /** @@ -69,6 +74,20 @@ public int getMaxUfsReadConcurrency() { return mMaxUfsReadConcurrency; } + /** + * @return the mount point in Alluxio for the file this block belonging to + */ + public String getAlluxioMountPoint() { + return mAlluxioMountPoint; + } + + /** + * @return the mount table version + */ + public long getMountTableVersion() { + return mMountTableVersion; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -78,22 +97,27 @@ public boolean equals(Object o) { return false; } OpenUfsBlockOptions that = (OpenUfsBlockOptions) o; - return Objects.equal(mBlockSize, that.mBlockSize) + return Objects.equal(mAlluxioMountPoint, that.mAlluxioMountPoint) + && Objects.equal(mBlockSize, that.mBlockSize) && Objects.equal(mMaxUfsReadConcurrency, that.mMaxUfsReadConcurrency) + && Objects.equal(mMountTableVersion, that.mMountTableVersion) && Objects.equal(mOffset, that.mOffset) && Objects.equal(mUnderFileSystemPath, that.mUnderFileSystemPath); } @Override public int hashCode() { - return Objects.hashCode(mBlockSize, mMaxUfsReadConcurrency, mOffset, mUnderFileSystemPath); + return Objects.hashCode(mAlluxioMountPoint, mBlockSize, mMaxUfsReadConcurrency, + mMountTableVersion, mOffset, mUnderFileSystemPath); } @Override public String toString() { return Objects.toStringHelper(this) + .add("alluxioMountPoint", mAlluxioMountPoint) .add("blockSize", mBlockSize) .add("maxUfsReadConcurrency", mMaxUfsReadConcurrency) + .add("mountTableVersion", mMountTableVersion) .add("offset", mOffset) .add("underFileSystemPath", mUnderFileSystemPath) .toString(); diff --git a/core/server/worker/src/main/java/alluxio/worker/file/DefaultFileSystemWorker.java b/core/server/worker/src/main/java/alluxio/worker/file/DefaultFileSystemWorker.java index c182506c76f6..b4ad5c7eb4ae 100644 --- a/core/server/worker/src/main/java/alluxio/worker/file/DefaultFileSystemWorker.java +++ b/core/server/worker/src/main/java/alluxio/worker/file/DefaultFileSystemWorker.java @@ -103,4 +103,8 @@ public void stop() { // This needs to be shutdownNow because heartbeat threads will only stop when interrupted. getExecutorService().shutdownNow(); } + + public FileDataManager getFileDataManager() { + return mFileDataManager; + } } diff --git a/core/server/worker/src/main/java/alluxio/worker/file/FileDataManager.java b/core/server/worker/src/main/java/alluxio/worker/file/FileDataManager.java index e9ec67da9bfc..1e558cef3264 100644 --- a/core/server/worker/src/main/java/alluxio/worker/file/FileDataManager.java +++ b/core/server/worker/src/main/java/alluxio/worker/file/FileDataManager.java @@ -75,6 +75,15 @@ public final class FileDataManager { /** A per worker rate limiter to throttle async persistence. */ private final RateLimiter mPersistenceRateLimiter; + private final Object mUfsConfigurationLock = new Object(); + + /** Map from Alluxio mount point to the corresponding ufs configuration. */ + @GuardedBy("mUfsConfigurationLock") + private final Map> mUfsMap; + + @GuardedBy("mUfsConfigurationLock") + private long mUfsMapVersion; + /** * Creates a new instance of {@link FileDataManager}. * @@ -86,6 +95,38 @@ public FileDataManager(BlockWorker blockWorker, RateLimiter persistenceRateLimit mPersistingInProgressFiles = new HashMap<>(); mPersistedFiles = new HashSet<>(); mPersistenceRateLimiter = persistenceRateLimiter; + mUfsMap = new HashMap<>(); + mUfsMapVersion = 0; + } + + /** + * Gets the properties for the given Alluxio mount point. + * + * @param alluxioPath Uri for an Alluxio mount point + * @return the configuration of the UFS + */ + public Map getUfsProperties(String alluxioPath) { + synchronized (mUfsConfigurationLock) { + return getUfsProperties(alluxioPath, mUfsMapVersion); + } + } + + /** + * Gets the properties for the given Alluxio mount point and a specific version. + * + * @param alluxioPath Uri for an Alluxio mount point + * @param version version of the mount table associated with the mount point + * @return the configuration of the UFS + */ + public Map getUfsProperties(String alluxioPath, long version) { + // TODO fill me + synchronized (mUfsConfigurationLock) { + if (version >= mUfsMapVersion || !mUfsMap.containsKey(alluxioPath)) { + // local copy is stale + // call master APi and update map + } + return mUfsMap.get(alluxioPath); + } } /** @@ -157,9 +198,11 @@ private void addPersistedFile(long fileId) { */ private synchronized boolean fileExistsInUfs(long fileId) throws IOException { FileInfo fileInfo = mBlockWorker.getFileInfo(fileId); + String mountPoint = fileInfo.getAlluxioMountPoint(); String dstPath = fileInfo.getUfsPath(); - UnderFileSystem ufs = UnderFileSystem.Factory.get(dstPath); + UnderFileSystem ufs = + UnderFileSystem.Factory.getMountPoint(mountPoint, getUfsProperties(mountPoint)); return ufs.isFile(dstPath); } @@ -227,8 +270,10 @@ public void persistFile(long fileId, List blockIds) throws AlluxioExceptio } String dstPath = prepareUfsFilePath(fileId); - UnderFileSystem ufs = UnderFileSystem.Factory.get(dstPath); FileInfo fileInfo = mBlockWorker.getFileInfo(fileId); + String mountPoint = fileInfo.getAlluxioMountPoint(); + UnderFileSystem ufs = + UnderFileSystem.Factory.getMountPoint(mountPoint, getUfsProperties(mountPoint)); OutputStream outputStream = ufs.create(dstPath, CreateOptions.defaults() .setOwner(fileInfo.getOwner()).setGroup(fileInfo.getGroup()) .setMode(new Mode((short) fileInfo.getMode()))); @@ -301,7 +346,9 @@ private String prepareUfsFilePath(long fileId) throws AlluxioException, IOExcept FileSystem fs = FileSystem.Factory.get(); URIStatus status = fs.getStatus(alluxioPath); String ufsPath = status.getUfsPath(); - UnderFileSystem ufs = UnderFileSystem.Factory.get(ufsPath); + String mountPoint = fileInfo.getAlluxioMountPoint(); + UnderFileSystem ufs = + UnderFileSystem.Factory.getMountPoint(mountPoint, getUfsProperties(mountPoint)); UnderFileSystemUtils.prepareFilePath(alluxioPath, ufsPath, fs, ufs); return ufsPath; } diff --git a/core/server/worker/src/main/java/alluxio/worker/netty/DataServerUfsFileWriteHandler.java b/core/server/worker/src/main/java/alluxio/worker/netty/DataServerUfsFileWriteHandler.java index 2548d577e0a3..01697e8d0f81 100644 --- a/core/server/worker/src/main/java/alluxio/worker/netty/DataServerUfsFileWriteHandler.java +++ b/core/server/worker/src/main/java/alluxio/worker/netty/DataServerUfsFileWriteHandler.java @@ -17,6 +17,7 @@ import alluxio.security.authorization.Mode; import alluxio.underfs.UnderFileSystem; import alluxio.underfs.options.CreateOptions; +import alluxio.worker.file.FileDataManager; import com.codahale.metrics.Counter; import io.netty.buffer.ByteBuf; @@ -41,18 +42,20 @@ @NotThreadSafe final class DataServerUfsFileWriteHandler extends DataServerWriteHandler { private static final long UNUSED_SESSION_ID = -1; + private final FileDataManager mFileDataManager; private class FileWriteRequestInternal extends WriteRequestInternal { - private final String mPath; + private final String mUfsPath; private final UnderFileSystem mUnderFileSystem; private final OutputStream mOutputStream; FileWriteRequestInternal(Protocol.WriteRequest request) throws Exception { super(request.getId(), UNUSED_SESSION_ID); - mPath = request.getUfsPath(); - mUnderFileSystem = UnderFileSystem.Factory.get(mPath); + mUfsPath = request.getUfsPath(); + mUnderFileSystem = UnderFileSystem.Factory.getMountPoint(request.getAlluxioMountPoint(), + mFileDataManager.getUfsProperties(request.getAlluxioMountPoint())); mOutputStream = - mUnderFileSystem.create(mPath, CreateOptions.defaults().setOwner(request.getOwner()) + mUnderFileSystem.create(mUfsPath, CreateOptions.defaults().setOwner(request.getOwner()) .setGroup(request.getGroup()).setMode(new Mode((short) request.getMode()))); } @@ -65,7 +68,7 @@ public void close() throws IOException { void cancel() throws IOException { // TODO(calvin): Consider adding cancel to the ufs stream api. mOutputStream.close(); - mUnderFileSystem.deleteFile(mPath); + mUnderFileSystem.deleteFile(mUfsPath); } } @@ -73,9 +76,11 @@ void cancel() throws IOException { * Creates an instance of {@link DataServerUfsFileWriteHandler}. * * @param executorService the executor service to run {@link PacketWriter}s + * @param fileDataManager the file data manager */ - DataServerUfsFileWriteHandler(ExecutorService executorService) { + DataServerUfsFileWriteHandler(ExecutorService executorService, FileDataManager fileDataManager) { super(executorService); + mFileDataManager = fileDataManager; } @Override diff --git a/core/server/worker/src/main/java/alluxio/worker/netty/PipelineHandler.java b/core/server/worker/src/main/java/alluxio/worker/netty/PipelineHandler.java index 6537c86a670a..67f5a253f5ab 100644 --- a/core/server/worker/src/main/java/alluxio/worker/netty/PipelineHandler.java +++ b/core/server/worker/src/main/java/alluxio/worker/netty/PipelineHandler.java @@ -60,7 +60,7 @@ protected void initChannel(SocketChannel ch) throws Exception { pipeline.addLast("dataServerUfsBlockReadHandler", new DataServerUfsBlockReadHandler( NettyExecutors.UFS_BLOCK_READER_EXECUTOR, mWorker.getBlockWorker())); pipeline.addLast("dataServerUfsFileWriteHandler", new DataServerUfsFileWriteHandler( - NettyExecutors.FILE_WRITER_EXECUTOR)); + NettyExecutors.FILE_WRITER_EXECUTOR, mWorker.getFileDataManager())); // Unsupported Message Handler pipeline.addLast("dataServerUnsupportedMessageHandler", new