Skip to content

Commit

Permalink
Update upstream callers to respect ClientOption
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzhu committed Sep 18, 2015
1 parent 8f52c7a commit ce15d31
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 6 deletions.
Expand Up @@ -112,7 +112,7 @@ public BlockInStream getInStream(long blockId) throws IOException {
* @return a BlockOutStream which can be used to write data to the block in a streaming fashion * @return a BlockOutStream which can be used to write data to the block in a streaming fashion
* @throws IOException if the block cannot be written * @throws IOException if the block cannot be written
*/ */
public BufferedBlockOutStream getOutStream(long blockId, long blockSize, NetAddress location) public BufferedBlockOutStream getOutStream(long blockId, long blockSize, String location)
throws IOException { throws IOException {
if (blockSize == -1) { if (blockSize == -1) {
BlockMasterClient blockMasterClient = mContext.acquireMasterClient(); BlockMasterClient blockMasterClient = mContext.acquireMasterClient();
Expand All @@ -132,12 +132,12 @@ public BufferedBlockOutStream getOutStream(long blockId, long blockSize, NetAddr
return new RemoteBlockOutStream(blockId, blockSize); return new RemoteBlockOutStream(blockId, blockSize);
} }
// Location is local. // Location is local.
if (NetworkAddressUtils.getLocalHostName(ClientContext.getConf()).equals(location.getHost())) { if (NetworkAddressUtils.getLocalHostName(ClientContext.getConf()).equals(location)) {
Preconditions.checkState(mContext.hasLocalWorker(), "Requested write location unavailable."); Preconditions.checkState(mContext.hasLocalWorker(), "Requested write location unavailable.");
return new LocalBlockOutStream(blockId, blockSize); return new LocalBlockOutStream(blockId, blockSize);
} }
// Location is specified and it is remote. // Location is specified and it is remote.
return new RemoteBlockOutStream(blockId, blockSize, location.getHost()); return new RemoteBlockOutStream(blockId, blockSize, location);
} }


/** /**
Expand Down
Expand Up @@ -23,13 +23,16 @@


import tachyon.annotation.PublicApi; import tachyon.annotation.PublicApi;
import tachyon.client.BoundedStream; import tachyon.client.BoundedStream;
import tachyon.client.ClientContext;
import tachyon.client.ClientOptions; import tachyon.client.ClientOptions;
import tachyon.client.Seekable; import tachyon.client.Seekable;
import tachyon.client.block.BlockInStream; import tachyon.client.block.BlockInStream;
import tachyon.client.block.BufferedBlockOutStream; import tachyon.client.block.BufferedBlockOutStream;
import tachyon.client.block.LocalBlockInStream; import tachyon.client.block.LocalBlockInStream;
import tachyon.master.block.BlockId; import tachyon.master.block.BlockId;
import tachyon.thrift.FileInfo; import tachyon.thrift.FileInfo;
import tachyon.thrift.NetAddress;
import tachyon.util.network.NetworkAddressUtils;


/** /**
* A streaming API to read a file. This API represents a file as a stream of bytes and provides * A streaming API to read a file. This API represents a file as a stream of bytes and provides
Expand Down Expand Up @@ -58,6 +61,8 @@ public final class FileInStream extends InputStream implements BoundedStream, Se


/** If the stream is closed, this can only go from false to true */ /** If the stream is closed, this can only go from false to true */
private boolean mClosed; private boolean mClosed;
/** The netaddress of the location that the file should write to, specified by client option */
private NetAddress mLocation;
/** Whether or not the current block should be cached. */ /** Whether or not the current block should be cached. */
private boolean mShouldCacheCurrentBlock; private boolean mShouldCacheCurrentBlock;
/** Current position of the stream */ /** Current position of the stream */
Expand All @@ -81,6 +86,7 @@ public FileInStream(FileInfo info, ClientOptions options) {
mContext = FileSystemContext.INSTANCE; mContext = FileSystemContext.INSTANCE;
mShouldCache = options.getTachyonStorageType().isStore(); mShouldCache = options.getTachyonStorageType().isStore();
mShouldCacheCurrentBlock = mShouldCache; mShouldCacheCurrentBlock = mShouldCache;
mLocation = options.getLocation();
mClosed = false; mClosed = false;
} }


Expand Down Expand Up @@ -215,7 +221,7 @@ private void checkAndAdvanceBlockInStream() throws IOException {
try { try {
// TODO(calvin): Specify the location to be local. // TODO(calvin): Specify the location to be local.
mCurrentCacheStream = mCurrentCacheStream =
mContext.getTachyonBlockStore().getOutStream(currentBlockId, -1, null); mContext.getTachyonBlockStore().getOutStream(currentBlockId, -1, mLocation.getHost());
} catch (IOException ioe) { } catch (IOException ioe) {
// TODO(yupeng): Maybe debug log here. // TODO(yupeng): Maybe debug log here.
mShouldCacheCurrentBlock = false; mShouldCacheCurrentBlock = false;
Expand Down Expand Up @@ -273,7 +279,7 @@ private void seekBlockInStream(long newPos) throws IOException {
if (mPos % mBlockSize == 0 && mShouldCacheCurrentBlock) { if (mPos % mBlockSize == 0 && mShouldCacheCurrentBlock) {
try { try {
mCurrentCacheStream = mCurrentCacheStream =
mContext.getTachyonBlockStore().getOutStream(currentBlockId, -1, null); mContext.getTachyonBlockStore().getOutStream(currentBlockId, -1, mLocation.getHost());
} catch (IOException ioe) { } catch (IOException ioe) {
// TODO(yupeng): Maybe debug log here. // TODO(yupeng): Maybe debug log here.
mShouldCacheCurrentBlock = false; mShouldCacheCurrentBlock = false;
Expand Down
Expand Up @@ -31,6 +31,7 @@
import tachyon.client.UnderStorageType; import tachyon.client.UnderStorageType;
import tachyon.client.block.BlockStoreContext; import tachyon.client.block.BlockStoreContext;
import tachyon.client.block.BufferedBlockOutStream; import tachyon.client.block.BufferedBlockOutStream;
import tachyon.thrift.NetAddress;
import tachyon.underfs.UnderFileSystem; import tachyon.underfs.UnderFileSystem;
import tachyon.util.io.PathUtils; import tachyon.util.io.PathUtils;
import tachyon.worker.WorkerClient; import tachyon.worker.WorkerClient;
Expand All @@ -55,6 +56,7 @@ public final class FileOutStream extends OutputStream implements Cancelable {


private boolean mCanceled; private boolean mCanceled;
private boolean mClosed; private boolean mClosed;
private NetAddress mLocation;
private boolean mShouldCacheCurrentBlock; private boolean mShouldCacheCurrentBlock;
private BufferedBlockOutStream mCurrentBlockOutStream; private BufferedBlockOutStream mCurrentBlockOutStream;
private List<BufferedBlockOutStream> mPreviousBlockOutStreams; private List<BufferedBlockOutStream> mPreviousBlockOutStreams;
Expand Down Expand Up @@ -88,6 +90,7 @@ public FileOutStream(long fileId, ClientOptions options) throws IOException {
} }
mClosed = false; mClosed = false;
mCanceled = false; mCanceled = false;
mLocation = options.getLocation();
mShouldCacheCurrentBlock = mTachyonStorageType.isStore(); mShouldCacheCurrentBlock = mTachyonStorageType.isStore();
} }


Expand Down Expand Up @@ -229,7 +232,7 @@ private void getNextBlock() throws IOException {


if (mTachyonStorageType.isStore()) { if (mTachyonStorageType.isStore()) {
mCurrentBlockOutStream = mCurrentBlockOutStream =
mContext.getTachyonBlockStore().getOutStream(getNextBlockId(), mBlockSize, null); mContext.getTachyonBlockStore().getOutStream(getNextBlockId(), mBlockSize, mLocation.getHost());
mShouldCacheCurrentBlock = true; mShouldCacheCurrentBlock = true;
} }
} }
Expand Down

0 comments on commit ce15d31

Please sign in to comment.