Skip to content

Commit

Permalink
Add block in stream factory.
Browse files Browse the repository at this point in the history
  • Loading branch information
calvinjia committed Aug 25, 2015
1 parent 71cdd3a commit 2170feb
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 17 deletions.
Expand Up @@ -15,27 +15,32 @@


package tachyon.client.next.block; package tachyon.client.next.block;


import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;


import tachyon.client.next.ClientContext; import tachyon.client.next.ClientContext;
import tachyon.client.next.InStream; import tachyon.client.next.InStream;
import tachyon.thrift.FileBlockInfo;
import tachyon.thrift.NetAddress;
import tachyon.util.network.NetworkAddressUtils; import tachyon.util.network.NetworkAddressUtils;


/** /**
* Provides a stream API to read a block from Tachyon. An instance of this extending class can be * Provides a stream API to read a block from Tachyon. An instance extending this class can be
* obtained by calling {@link TachyonBS#getInStream}. Multiple BlockInStreams can be opened for a * obtained by calling {@link TachyonBS#getInStream}. Multiple BlockInStreams can be opened for a
* block. This class is not thread safe and should only be used by one thread. * block. This class is not thread safe and should only be used by one thread.
* *
* This class provides the same methods as a Java {@link InputStream} with an additional seek * This class provides the same methods as a Java {@link InputStream} with an additional seek
* method. * method.
*/ */
public abstract class BlockInStream extends InStream { public abstract class BlockInStream extends InStream {
public static BlockInStream get(long blockId, String location) { public static BlockInStream get(long blockId, long blockSize, NetAddress location)
if (NetworkAddressUtils.getLocalHostName(ClientContext.getConf()).equals(location)) { throws IOException {
String localHostname = NetworkAddressUtils.getLocalHostName(ClientContext.getConf());
if (location.getMHost().equals(localHostname)) {
return new LocalBlockInStream(blockId); return new LocalBlockInStream(blockId);
} else { } else {
return new RemoteBlockInStream(blockId); return new RemoteBlockInStream(blockId, blockSize, location);
} }
} }
} }
Expand Up @@ -40,7 +40,7 @@ public class LocalBlockInStream extends BlockInStream {


private boolean mClosed; private boolean mClosed;


public LocalBlockInStream(long blockId, ClientOptions options) throws IOException { public LocalBlockInStream(long blockId) throws IOException {
mBlockId = blockId; mBlockId = blockId;
mClosed = false; mClosed = false;
mContext = BSContext.INSTANCE; mContext = BSContext.INSTANCE;
Expand Down
Expand Up @@ -16,14 +16,14 @@
package tachyon.client.next.block; package tachyon.client.next.block;


import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;


import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;


import tachyon.client.RemoteBlockReader; import tachyon.client.RemoteBlockReader;
import tachyon.client.next.ClientContext; import tachyon.client.next.ClientContext;
import tachyon.client.next.ClientOptions; import tachyon.thrift.NetAddress;
import tachyon.thrift.FileBlockInfo;


/** /**
* This class provides a streaming API to read a block in Tachyon. The data will be transferred * This class provides a streaming API to read a block in Tachyon. The data will be transferred
Expand All @@ -33,20 +33,17 @@ public class RemoteBlockInStream extends BlockInStream {
private final long mBlockId; private final long mBlockId;
private final BSContext mContext; private final BSContext mContext;
private final long mBlockSize; private final long mBlockSize;
private final InetSocketAddress mLocation;


private long mPos; private long mPos;
private String mRemoteHost;
private int mRemotePort;


// TODO: Make sure there is a valid Tachyon location
// TODO: Modify the locking so the stream owns the lock instead of the data server // TODO: Modify the locking so the stream owns the lock instead of the data server
public RemoteBlockInStream(FileBlockInfo blockInfo, ClientOptions options) { public RemoteBlockInStream(long blockId, long blockSize, NetAddress location) {
mBlockId = blockInfo.getBlockId(); mBlockId = blockId;
mContext = BSContext.INSTANCE; mContext = BSContext.INSTANCE;
mBlockSize = blockInfo.getLength(); mBlockSize = blockSize;
// TODO: Clean this up // TODO: Validate these fields
mRemoteHost = blockInfo.getLocations().get(0).mHost; mLocation = new InetSocketAddress(location.getMHost(), location.getMSecondaryPort());
mRemotePort = blockInfo.getLocations().get(0).mPort;
} }


@Override @Override
Expand Down Expand Up @@ -85,7 +82,9 @@ public int read(byte[] b, int off, int len) throws IOException {
// TODO: Fix needing to recreate reader each time // TODO: Fix needing to recreate reader each time
RemoteBlockReader reader = RemoteBlockReader reader =
RemoteBlockReader.Factory.createRemoteBlockReader(ClientContext.getConf()); RemoteBlockReader.Factory.createRemoteBlockReader(ClientContext.getConf());
ByteBuffer data = reader.readRemoteBlock(mRemoteHost, mRemotePort, mBlockId, mPos, bytesLeft); ByteBuffer data =
reader.readRemoteBlock(mLocation.getHostName(), mLocation.getPort(), mBlockId, mPos,
bytesLeft);
int bytesToRead = Math.min(bytesLeft, data.remaining()); int bytesToRead = Math.min(bytesLeft, data.remaining());
data.get(b, off, bytesToRead); data.get(b, off, bytesToRead);
reader.close(); reader.close();
Expand Down

0 comments on commit 2170feb

Please sign in to comment.