From ce15d31ce745e3f7640d23d44f6e03b274d3d011 Mon Sep 17 00:00:00 2001 From: David Zhu Date: Fri, 18 Sep 2015 15:28:51 -0700 Subject: [PATCH] Update upstream callers to respect ClientOption --- .../java/tachyon/client/block/TachyonBlockStore.java | 6 +++--- .../main/java/tachyon/client/file/FileInStream.java | 10 ++++++++-- .../main/java/tachyon/client/file/FileOutStream.java | 5 ++++- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/clients/unshaded/src/main/java/tachyon/client/block/TachyonBlockStore.java b/clients/unshaded/src/main/java/tachyon/client/block/TachyonBlockStore.java index 437b033da96e..17f0d75a1f14 100644 --- a/clients/unshaded/src/main/java/tachyon/client/block/TachyonBlockStore.java +++ b/clients/unshaded/src/main/java/tachyon/client/block/TachyonBlockStore.java @@ -112,7 +112,7 @@ public BlockInStream getInStream(long blockId) throws IOException { * @return a BlockOutStream which can be used to write data to the block in a streaming fashion * @throws IOException if the block cannot be written */ - public BufferedBlockOutStream getOutStream(long blockId, long blockSize, NetAddress location) + public BufferedBlockOutStream getOutStream(long blockId, long blockSize, String location) throws IOException { if (blockSize == -1) { BlockMasterClient blockMasterClient = mContext.acquireMasterClient(); @@ -132,12 +132,12 @@ public BufferedBlockOutStream getOutStream(long blockId, long blockSize, NetAddr return new RemoteBlockOutStream(blockId, blockSize); } // Location is local. - if (NetworkAddressUtils.getLocalHostName(ClientContext.getConf()).equals(location.getHost())) { + if (NetworkAddressUtils.getLocalHostName(ClientContext.getConf()).equals(location)) { Preconditions.checkState(mContext.hasLocalWorker(), "Requested write location unavailable."); return new LocalBlockOutStream(blockId, blockSize); } // Location is specified and it is remote. - return new RemoteBlockOutStream(blockId, blockSize, location.getHost()); + return new RemoteBlockOutStream(blockId, blockSize, location); } /** diff --git a/clients/unshaded/src/main/java/tachyon/client/file/FileInStream.java b/clients/unshaded/src/main/java/tachyon/client/file/FileInStream.java index c21d7ac86081..35fbc7aa65f7 100644 --- a/clients/unshaded/src/main/java/tachyon/client/file/FileInStream.java +++ b/clients/unshaded/src/main/java/tachyon/client/file/FileInStream.java @@ -23,6 +23,7 @@ import tachyon.annotation.PublicApi; import tachyon.client.BoundedStream; +import tachyon.client.ClientContext; import tachyon.client.ClientOptions; import tachyon.client.Seekable; import tachyon.client.block.BlockInStream; @@ -30,6 +31,8 @@ import tachyon.client.block.LocalBlockInStream; import tachyon.master.block.BlockId; import tachyon.thrift.FileInfo; +import tachyon.thrift.NetAddress; +import tachyon.util.network.NetworkAddressUtils; /** * A streaming API to read a file. This API represents a file as a stream of bytes and provides @@ -58,6 +61,8 @@ public final class FileInStream extends InputStream implements BoundedStream, Se /** If the stream is closed, this can only go from false to true */ private boolean mClosed; + /** The netaddress of the location that the file should write to, specified by client option */ + private NetAddress mLocation; /** Whether or not the current block should be cached. */ private boolean mShouldCacheCurrentBlock; /** Current position of the stream */ @@ -81,6 +86,7 @@ public FileInStream(FileInfo info, ClientOptions options) { mContext = FileSystemContext.INSTANCE; mShouldCache = options.getTachyonStorageType().isStore(); mShouldCacheCurrentBlock = mShouldCache; + mLocation = options.getLocation(); mClosed = false; } @@ -215,7 +221,7 @@ private void checkAndAdvanceBlockInStream() throws IOException { try { // TODO(calvin): Specify the location to be local. mCurrentCacheStream = - mContext.getTachyonBlockStore().getOutStream(currentBlockId, -1, null); + mContext.getTachyonBlockStore().getOutStream(currentBlockId, -1, mLocation.getHost()); } catch (IOException ioe) { // TODO(yupeng): Maybe debug log here. mShouldCacheCurrentBlock = false; @@ -273,7 +279,7 @@ private void seekBlockInStream(long newPos) throws IOException { if (mPos % mBlockSize == 0 && mShouldCacheCurrentBlock) { try { mCurrentCacheStream = - mContext.getTachyonBlockStore().getOutStream(currentBlockId, -1, null); + mContext.getTachyonBlockStore().getOutStream(currentBlockId, -1, mLocation.getHost()); } catch (IOException ioe) { // TODO(yupeng): Maybe debug log here. mShouldCacheCurrentBlock = false; diff --git a/clients/unshaded/src/main/java/tachyon/client/file/FileOutStream.java b/clients/unshaded/src/main/java/tachyon/client/file/FileOutStream.java index 0f5d7ef32744..10ba5139ebca 100644 --- a/clients/unshaded/src/main/java/tachyon/client/file/FileOutStream.java +++ b/clients/unshaded/src/main/java/tachyon/client/file/FileOutStream.java @@ -31,6 +31,7 @@ import tachyon.client.UnderStorageType; import tachyon.client.block.BlockStoreContext; import tachyon.client.block.BufferedBlockOutStream; +import tachyon.thrift.NetAddress; import tachyon.underfs.UnderFileSystem; import tachyon.util.io.PathUtils; import tachyon.worker.WorkerClient; @@ -55,6 +56,7 @@ public final class FileOutStream extends OutputStream implements Cancelable { private boolean mCanceled; private boolean mClosed; + private NetAddress mLocation; private boolean mShouldCacheCurrentBlock; private BufferedBlockOutStream mCurrentBlockOutStream; private List mPreviousBlockOutStreams; @@ -88,6 +90,7 @@ public FileOutStream(long fileId, ClientOptions options) throws IOException { } mClosed = false; mCanceled = false; + mLocation = options.getLocation(); mShouldCacheCurrentBlock = mTachyonStorageType.isStore(); } @@ -229,7 +232,7 @@ private void getNextBlock() throws IOException { if (mTachyonStorageType.isStore()) { mCurrentBlockOutStream = - mContext.getTachyonBlockStore().getOutStream(getNextBlockId(), mBlockSize, null); + mContext.getTachyonBlockStore().getOutStream(getNextBlockId(), mBlockSize, mLocation.getHost()); mShouldCacheCurrentBlock = true; } }