Skip to content

Commit

Permalink
Merge branch 'master' into cascading_lru
Browse files Browse the repository at this point in the history
  • Loading branch information
cc committed Jul 12, 2015
2 parents 9d11ce3 + 9af906e commit 20f13e8
Show file tree
Hide file tree
Showing 64 changed files with 1,723 additions and 508 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ public void close() throws IOException {
} catch (IOException ioe) {
if (mWriteType.isMustCache()) {
LOG.error(ioe.getMessage(), ioe);
throw new IOException("Fail to cache: " + mWriteType, ioe);
throw new IOException("Fail to cache: " + mWriteType + ", message: " + ioe.getMessage(),
ioe);
} else {
LOG.warn("Fail to cache for: ", ioe);
}
Expand Down Expand Up @@ -202,7 +203,7 @@ public void write(byte[] b, int off, int len) throws IOException {
} catch (IOException e) {
if (mWriteType.isMustCache()) {
LOG.error(e.getMessage(), e);
throw new IOException("Fail to cache: " + mWriteType, e);
throw new IOException("Fail to cache: " + mWriteType + ", message: " + e.getMessage(), e);
} else {
LOG.warn("Fail to cache for: ", e);
}
Expand Down Expand Up @@ -230,7 +231,7 @@ public void write(int b) throws IOException {
} catch (IOException e) {
if (mWriteType.isMustCache()) {
LOG.error(e.getMessage(), e);
throw new IOException("Fail to cache: " + mWriteType, e);
throw new IOException("Fail to cache: " + mWriteType + ", message: " + e.getMessage(), e);
} else {
LOG.warn("Fail to cache for: ", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ public final class NettyClient {
private static final EventLoopGroup WORKER_GROUP = NettyUtils.createEventLoop(CHANNEL_TYPE,
TACHYON_CONF.getInt(Constants.USER_NETTY_WORKER_THREADS, 0), "netty-client-worker-%d", true);

// The maximum number of seconds to wait for a response from the server.
public static final long TIMEOUT_SECOND = 1L;
// The maximum number of milliseconds to wait for a response from the server.
public static final long TIMEOUT_MS =
TACHYON_CONF.getInt(Constants.USER_NETTY_TIMEOUT_MS, 3000);

/**
* Creates and returns a new Netty client bootstrap for clients to connect to remote servers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,15 @@
import org.slf4j.LoggerFactory;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;

import tachyon.Constants;
import tachyon.client.RemoteBlockReader;
import tachyon.conf.TachyonConf;
import tachyon.network.ChannelType;
import tachyon.network.NettyUtils;
import tachyon.network.protocol.RPCBlockRequest;
import tachyon.network.protocol.RPCBlockResponse;
import tachyon.network.protocol.RPCErrorResponse;
import tachyon.network.protocol.RPCMessage;
import tachyon.network.protocol.RPCMessageDecoder;
import tachyon.network.protocol.RPCMessageEncoder;
import tachyon.network.protocol.RPCResponse;

/**
Expand Down Expand Up @@ -74,25 +64,28 @@ public ByteBuffer readRemoteBlock(String host, int port, long blockId, long offs
mHandler.addListener(listener);
channel.writeAndFlush(new RPCBlockRequest(blockId, offset, length));

RPCResponse response = listener.get(NettyClient.TIMEOUT_SECOND, TimeUnit.SECONDS);
RPCResponse response = listener.get(NettyClient.TIMEOUT_MS, TimeUnit.MILLISECONDS);
channel.close().sync();

if (response.getType() == RPCMessage.Type.RPC_BLOCK_RESPONSE) {
RPCBlockResponse blockResponse = (RPCBlockResponse) response;
LOG.info("Data " + blockId + " from remote machine " + address + " received");
switch (response.getType()) {
case RPC_BLOCK_RESPONSE:
RPCBlockResponse blockResponse = (RPCBlockResponse) response;
LOG.info("Data " + blockId + " from remote machine " + address + " received");

if (blockResponse.getBlockId() < 0) {
LOG.info("Data " + blockResponse.getBlockId() + " is not in remote machine.");
return null;
}
return blockResponse.getPayloadDataBuffer().getReadOnlyByteBuffer();
} else {
LOG.error("Unexpected response message type: " + response.getType() + " (expected: "
+ RPCMessage.Type.RPC_BLOCK_RESPONSE + ")");
RPCResponse.Status status = blockResponse.getStatus();
if (status == RPCResponse.Status.SUCCESS) {
return blockResponse.getPayloadDataBuffer().getReadOnlyByteBuffer();
}
throw new IOException(status.getMessage() + " response: " + blockResponse);
case RPC_ERROR_RESPONSE:
RPCErrorResponse error = (RPCErrorResponse) response;
throw new IOException(error.getStatus().getMessage());
default:
throw new IOException("Unexpected response message type: " + response.getType()
+ " (expected: " + RPCMessage.Type.RPC_BLOCK_RESPONSE + ")");
}
} catch (Exception e) {
LOG.error("exception in netty client: " + e + " message: " + e.getMessage());
throw new IOException(e);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,15 @@
import org.slf4j.LoggerFactory;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;

import tachyon.Constants;
import tachyon.client.RemoteBlockWriter;
import tachyon.conf.TachyonConf;
import tachyon.network.ChannelType;
import tachyon.network.NettyUtils;
import tachyon.network.protocol.RPCBlockWriteRequest;
import tachyon.network.protocol.RPCBlockWriteResponse;
import tachyon.network.protocol.RPCErrorResponse;
import tachyon.network.protocol.RPCMessage;
import tachyon.network.protocol.RPCMessageDecoder;
import tachyon.network.protocol.RPCMessageEncoder;
import tachyon.network.protocol.RPCResponse;
import tachyon.network.protocol.databuffer.DataByteArrayChannel;

Expand Down Expand Up @@ -101,24 +91,30 @@ public void write(byte[] bytes, int offset, int length) throws IOException {
channel.writeAndFlush(new RPCBlockWriteRequest(mUserId, mBlockId, mWrittenBytes, length,
new DataByteArrayChannel(bytes, offset, length)));

RPCResponse response = listener.get(NettyClient.TIMEOUT_SECOND, TimeUnit.SECONDS);
RPCResponse response = listener.get(NettyClient.TIMEOUT_MS, TimeUnit.MILLISECONDS);
channel.close().sync();

if (response.getType() == RPCMessage.Type.RPC_BLOCK_WRITE_RESPONSE) {
RPCBlockWriteResponse resp = (RPCBlockWriteResponse) response;
LOG.info("status: {} from remote machine {} received", resp.getStatus(), mAddress);

if (!resp.getStatus()) {
throw new IOException("error writing blockId: " + mBlockId + ", userId: " + mUserId
+ ", address: " + mAddress);
}
mWrittenBytes += length;
} else {
LOG.error("Unexpected response message type: " + response.getType() + " (expected: "
+ RPCMessage.Type.RPC_BLOCK_WRITE_RESPONSE + ")");
switch (response.getType()) {
case RPC_BLOCK_WRITE_RESPONSE:
RPCBlockWriteResponse resp = (RPCBlockWriteResponse) response;
RPCResponse.Status status = resp.getStatus();
LOG.info("status: {} from remote machine {} received", status, mAddress);

if (status != RPCResponse.Status.SUCCESS) {
throw new IOException("error writing blockId: " + mBlockId + ", userId: " + mUserId
+ ", address: " + mAddress + ", message: " + status.getMessage());
}
mWrittenBytes += length;
break;
case RPC_ERROR_RESPONSE:
RPCErrorResponse error = (RPCErrorResponse) response;
throw new IOException(error.getStatus().getMessage());
default:
throw new IOException("Unexpected response message type: " + response.getType()
+ " (expected: " + RPCMessage.Type.RPC_BLOCK_WRITE_RESPONSE + ")");
}
} catch (Exception e) {
throw new IOException(e.getMessage());
throw new IOException(e);
} finally {
mHandler.removeListener(listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public ByteBuffer readRemoteBlock(String host, int port, long blockId, long offs
int numRead = recvMsg.recv(socketChannel);
if (numRead == -1) {
LOG.warn("Read nothing");
if (!recvMsg.isMessageReady()) {
// The stream has ended, but the message is not complete.
LOG.error("Response was not received completely.");
return null;
}
}
}
LOG.info("Data " + blockId + " from remote machine " + address + " received");
Expand Down
1 change: 1 addition & 0 deletions common/src/main/java/tachyon/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ public class Constants {
public static final String USER_NETTY_WORKER_THREADS =
"tachyon.user.network.netty.worker.threads";
public static final String USER_NETTY_CHANNEL = "tachyon.user.network.netty.channel";
public static final String USER_NETTY_TIMEOUT_MS = "tachyon.user.network.netty.timeout.ms";
public static final String USER_REMOTE_READ_BUFFER_SIZE_BYTE =
"tachyon.user.remote.read.buffer.size.byte";
public static final String USER_DEFAULT_WRITE_TYPE = "tachyon.user.file.writetype.default";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ public Type getType() {
* @return The decoded RPCBlockRequest object
*/
public static RPCBlockRequest decode(ByteBuf in) {
// TODO: remove this short when client also uses netty.
in.readShort();
long blockId = in.readLong();
long offset = in.readLong();
long length = in.readLong();
Expand All @@ -60,15 +58,12 @@ public static RPCBlockRequest decode(ByteBuf in) {

@Override
public int getEncodedLength() {
// TODO: adjust the length when client also uses netty.
// 3 longs (mBlockId, mOffset, mLength) + 1 short (DATA_SERVER_REQUEST_MESSAGE)
return Longs.BYTES * 3 + Shorts.BYTES;
// 3 longs (mBLockId, mOffset, mLength)
return Longs.BYTES * 3;
}

@Override
public void encode(ByteBuf out) {
// TODO: remove this short when client also uses netty.
out.writeShort(DataServerMessage.DATA_SERVER_REQUEST_MESSAGE);
out.writeLong(mBlockId);
out.writeLong(mOffset);
out.writeLong(mLength);
Expand Down
44 changes: 28 additions & 16 deletions common/src/main/java/tachyon/network/protocol/RPCBlockResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

import java.nio.ByteBuffer;

import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import com.google.common.primitives.Shorts;

import io.netty.buffer.ByteBuf;

import tachyon.network.protocol.databuffer.DataBuffer;
import tachyon.network.protocol.databuffer.DataByteBuffer;
import tachyon.worker.DataServerMessage;

/**
* This represents the response of a {@link RPCBlockRequest}.
Expand All @@ -34,28 +34,34 @@ public class RPCBlockResponse extends RPCResponse {
private final long mOffset;
private final long mLength;
private final DataBuffer mData;
private final Status mStatus;

public RPCBlockResponse(long blockId, long offset, long length, DataBuffer data) {
// TODO: rename this to RPCBlockReadResponse.
public RPCBlockResponse(long blockId, long offset, long length, DataBuffer data, Status status) {
mBlockId = blockId;
mOffset = offset;
mLength = length;
mData = data;
mStatus = status;
}

public Type getType() {
return Type.RPC_BLOCK_RESPONSE;
}

/**
* Creates a {@link RPCBlockResponse} that indicates an error for
* the given block.
* Creates a {@link RPCBlockResponse} object that indicates an error for the given
* {@link RPCBlockRequest}.
*
* @param blockId The Id of block requested
* @return the new error RPCBlockResponse created.
* @param request The {@link RPCBlockRequest} to generated the {@link RPCBlockResponse} for.
* @param status The {@link tachyon.network.protocol.RPCResponse.Status} for the response.
* @return The generated {@link RPCBlockResponse} object.
*/
// TODO: rename this to RPCBlockReadResponse.
public static RPCBlockResponse createErrorResponse(final long blockId) {
return new RPCBlockResponse(-blockId, 0, 0, null);
public static RPCBlockResponse createErrorResponse(final RPCBlockRequest request,
final Status status) {
Preconditions.checkArgument(status != Status.SUCCESS);
// The response has no payload, so length must be 0.
return new RPCBlockResponse(request.getBlockId(), request.getOffset(), 0, null, status);
}

/**
Expand All @@ -65,35 +71,32 @@ public static RPCBlockResponse createErrorResponse(final long blockId) {
* @return The decoded RPCBlockResponse object
*/
public static RPCBlockResponse decode(ByteBuf in) {
// TODO: remove this short when client also uses netty.
in.readShort();
long blockId = in.readLong();
long offset = in.readLong();
long length = in.readLong();
short status = in.readShort();
DataBuffer data = null;
if (length > 0) {
// TODO: look into accessing Netty ByteBuf directly, to avoid copying the data.
ByteBuffer buffer = ByteBuffer.allocate((int) length);
in.readBytes(buffer);
data = new DataByteBuffer(buffer, (int) length);
}
return new RPCBlockResponse(blockId, offset, length, data);
return new RPCBlockResponse(blockId, offset, length, data, Status.fromShort(status));
}

@Override
public int getEncodedLength() {
// TODO: adjust the length when client also uses netty.
// 3 longs (mBlockId, mOffset, mLength) + 1 short (DATA_SERVER_REQUEST_MESSAGE)
// 3 longs (mBLockId, mOffset, mLength) + 1 short (mStatus)
return Longs.BYTES * 3 + Shorts.BYTES;
}

@Override
public void encode(ByteBuf out) {
// TODO: remove this short when client also uses netty.
out.writeShort(DataServerMessage.DATA_SERVER_RESPONSE_MESSAGE);
out.writeLong(mBlockId);
out.writeLong(mOffset);
out.writeLong(mLength);
out.writeShort(mStatus.getId());
// The actual payload is not encoded here, since the RPCMessageEncoder will transfer it in a
// more efficient way.
}
Expand All @@ -103,6 +106,11 @@ public DataBuffer getPayloadDataBuffer() {
return mData;
}

@Override
public String toString() {
return "RPCBlockResponse(" + mBlockId + ", " + mOffset + ", " + mLength + ", " + mStatus + ")";
}

public long getBlockId() {
return mBlockId;
}
Expand All @@ -114,4 +122,8 @@ public long getLength() {
public long getOffset() {
return mOffset;
}

public Status getStatus() {
return mStatus;
}
}

0 comments on commit 20f13e8

Please sign in to comment.