Skip to content

Commit

Permalink
Add support code for different HBase server versions and 0.90.
Browse files Browse the repository at this point in the history
The goal is to transparently support different HBase versions with
the same client.  Originally, this client didn't care what version
the server was, because for what it uses of the protocol, nothing
had changed since it was originally written, despite the RPC protocol
version being bumped up a few times.  Unfortunately, HBase isn't
very good at maintaining backwards compatibility, and sadly 0.90
has a minor change that breaks it.

Change all HBaseRPC implementations to declare whether or not they're
sensitive to the version of the remote server.  When HBase folks break
backwards compatibility by changing the on-wire format of an RPC, then
this RPC becomes sensitive to protocol version.  This unfortunately
happened in HBase 0.90 for the Get RPC (for no good reason IMO).
HBaseRPC implementations are given the server's version when they're
asked to serialize themselves, so they can adjust their behavior based
on the version of the server they're talking to.

Add a mechanism to automatically request the RegionServer's RPC protocol
version with no extra overhead most of the time.  We used to piggyback
the magic "hello" header in the first packet sent to the RegionServer,
along with the first RPC.  We now additionally piggyback a version
request too, so that both the "hello" + 1st RPC + version request go
out in the same TCP packet (most of the time the 1st RPC is small).
If a version-sensitive RPC attempts to go out before we know the version
of the server, we delay it until the version is received.

Change-Id: I130afd4305dfbe8bac4cfe4a50a06c6e239e266e
  • Loading branch information
tsuna committed Jan 30, 2011
1 parent d753df2 commit 6bebf78
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 34 deletions.
2 changes: 1 addition & 1 deletion src/AtomicIncrementRequest.java
Expand Up @@ -175,7 +175,7 @@ private int predictSerializedSize() {
}

/** Serializes this request. */
ChannelBuffer serialize() {
ChannelBuffer serialize(final byte unused_server_version) {
final ChannelBuffer buf = newBuffer(predictSerializedSize());
buf.writeInt(6); // Number of parameters.

Expand Down
2 changes: 1 addition & 1 deletion src/DeleteRequest.java
Expand Up @@ -205,7 +205,7 @@ private int predictSerializedSize() {
}

/** Serializes this request. */
ChannelBuffer serialize() {
ChannelBuffer serialize(final byte unused_server_version) {
final ChannelBuffer buf = newBuffer(predictSerializedSize());
buf.writeInt(2); // Number of parameters.

Expand Down
17 changes: 14 additions & 3 deletions src/GetRequest.java
Expand Up @@ -141,14 +141,18 @@ public GetRequest withRowLock(final RowLock lock) {
return this;
}

boolean versionSensitive() {
return true; // Sad. HBASE-3174 broke backwards compatibilty!@#$%^ :(
}

/**
* Predicts a lower bound on the serialized size of this RPC.
* This is to avoid using a dynamic buffer, to avoid re-sizing the buffer.
* Since we use a static buffer, if the prediction is wrong and turns out
* to be less than what we need, there will be an exception which will
* prevent the RPC from being serialized. That'd be a severe bug.
*/
private int predictSerializedSize() {
private int predictSerializedSize(final byte server_version) {
int size = 0;
size += 4; // int: Number of parameters.
size += 1; // byte: Type of the 1st parameter.
Expand All @@ -163,6 +167,9 @@ private int predictSerializedSize() {
size += 8; // long: Lock ID.
size += 4; // int: Max number of versions to return.
size += 1; // byte: Whether or not to use a filter.
if (server_version >= 26) { // New in 0.90 (because of HBASE-3174).
size += 1; // byte: Whether or not to cache the blocks read.
}
size += 8; // long: Minimum timestamp.
size += 8; // long: Maximum timestamp.
size += 1; // byte: Boolean: "all time".
Expand All @@ -181,8 +188,8 @@ private int predictSerializedSize() {
}

/** Serializes this request. */
ChannelBuffer serialize() {
final ChannelBuffer buf = newBuffer(predictSerializedSize());
ChannelBuffer serialize(final byte server_version) {
final ChannelBuffer buf = newBuffer(predictSerializedSize(server_version));
buf.writeInt(2); // Number of parameters.

// 1st param: byte array containing region name
Expand All @@ -200,6 +207,10 @@ ChannelBuffer serialize() {
// writeByteArray(buf, filter name as byte array);
// write the filter itself

if (server_version >= 26) { // New in 0.90 (because of HBASE-3174).
buf.writeByte(0x01); // boolean (true): whether to cache the blocks.
}

// TimeRange
buf.writeLong(0); // Minimum timestamp.
buf.writeLong(Long.MAX_VALUE); // Maximum timestamp.
Expand Down
19 changes: 18 additions & 1 deletion src/HBaseRpc.java
Expand Up @@ -145,8 +145,15 @@ public abstract class HBaseRpc {
*
* Notice that this method is package-private, so only classes within this
* package can use this as a base class.
*
* @param server_version The RPC protocol version of the server this RPC is
* going to. If the version of the server is unknown, this will be -1. If
* this RPC cares a lot about the version of the server (due to backwards
* incompatible changes in the RPC serialization), the concrete class should
* override {@link #versionSensitive} to make sure it doesn't get -1, as the
* version is lazily fetched.
*/
abstract ChannelBuffer serialize();
abstract ChannelBuffer serialize(byte server_version);

/**
* Name of the method to invoke on the server side.
Expand Down Expand Up @@ -305,6 +312,16 @@ final boolean hasDeferred() {
return deferred != null;
}

/**
* Is the encoding of this RPC sensitive to the RPC protocol version?.
* Override this method to return {@code true} in order to guarantee that
* the version of the remote server this RPC is going to will be known by
* the time this RPC gets serialized.
*/
boolean versionSensitive() {
return false;
}

public String toString() {
return "HBaseRpc(method=" + Bytes.pretty(method)
+ ", table=" + Bytes.pretty(table)
Expand Down
2 changes: 1 addition & 1 deletion src/MultiPutRequest.java
Expand Up @@ -200,7 +200,7 @@ private int predictSerializedSize() {
}

/** Serializes this request. */
ChannelBuffer serialize() {
ChannelBuffer serialize(final byte unused_server_version) {
// Due to the wire format expected by HBase, we need to group all the
// edits by region, then by key, then by family. HBase does this by
// building a crazy map-of-map-of-map-of-list-of-edits, but this is
Expand Down
2 changes: 1 addition & 1 deletion src/PutRequest.java
Expand Up @@ -252,7 +252,7 @@ private int predictSerializedSize() {
}

/** Serializes this request. */
ChannelBuffer serialize() {
ChannelBuffer serialize(final byte unused_server_version) {
final ChannelBuffer buf = newBuffer(predictSerializedSize());
buf.writeInt(2); // Number of parameters.

Expand Down
159 changes: 138 additions & 21 deletions src/RegionClient.java
Expand Up @@ -123,6 +123,20 @@ final class RegionClient extends ReplayingDecoder<VoidEnum> {
*/
private boolean dead = false;

/**
* What RPC protocol version is this RegionServer using?.
* -1 means unknown. No synchronization is typically used to read / write
* this value, as it's typically accessed after a volatile read or updated
* before a volatile write on {@link #deferred_server_version}.
*/
private byte server_version = -1;

/**
* If we're in the process of looking up the server version...
* ... This will be non-null.
*/
private volatile Deferred<Long> deferred_server_version;

/**
* Multi-put request in which we accumulate buffered edits.
* Manipulating this reference requires synchronizing on `this'.
Expand Down Expand Up @@ -381,39 +395,91 @@ public void operationComplete(final ChannelFuture future) {
return d;
}

private static final byte[] GET_PROTOCOL_VERSION = new byte[] {
'g', 'e', 't',
'P', 'r', 'o', 't', 'o', 'c', 'o', 'l',
'V', 'e', 'r', 's', 'i', 'o', 'n'
};

final static class GetProtocolVersionRequest extends HBaseRpc {

GetProtocolVersionRequest() {
super(GET_PROTOCOL_VERSION);
}

ChannelBuffer serialize(final byte unused_server_version) {
/** Pre-serialized form for this RPC, which is always the same. */
// num param + type 1 + string length + string + type 2 + long
final ChannelBuffer buf = newBuffer(4 + 1 + 1 + 44 + 1 + 8);
buf.writeInt(2); // Number of parameters.
// 1st param.
writeHBaseString(buf, "org.apache.hadoop.hbase.ipc.HRegionInterface");
writeHBaseLong(buf, 24); // 2nd param.
return buf;
}
};

/**
* Returns a new {@link GetProtocolVersionRequest} ready to go.
* The RPC returned already has the right callback set.
*/
@SuppressWarnings("unchecked")
private GetProtocolVersionRequest getProtocolVersionRequest() {
final GetProtocolVersionRequest rpc = new GetProtocolVersionRequest();
final Deferred/*<Long>*/ version = rpc.getDeferred();
deferred_server_version = version; // Volatile write.
version.addCallback(got_protocol_version);
return rpc;
}

/**
* Asks the server which RPC protocol version it's running.
* @return a Deferred {@link Long}.
*/
@SuppressWarnings("unchecked")
public Deferred<Long> getProtocolVersion() {
final HBaseRpc rpc = new HBaseRpc(Bytes.ISO88591("getProtocolVersion")) {
ChannelBuffer serialize() {
// num param + type 1 + string length + string + type 2 + long
final ChannelBuffer buf = newBuffer(4 + 1 + 1 + 44 + 1 + 8);
buf.writeInt(2); // Number of parameters.
// 1st param.
writeHBaseString(buf, "org.apache.hadoop.hbase.ipc.HRegionInterface");
// 2nd param.
writeHBaseLong(buf, 24);
return buf;
}
};
Deferred<Long> version = deferred_server_version; // Volatile read.
if (server_version != -1) {
return Deferred.fromResult((long) server_version);
}
// Non-atomic check-then-update is OK here. In case of a race and a
// thread overwrites this reference by creating another lookup, we just
// pay for an unnecessary network round-trip, not a big deal.
if (version != null) {
return version;
}

final Deferred<Long> d = rpc.getDeferred()
.addCallback(got_protocol_version);
final GetProtocolVersionRequest rpc = getProtocolVersionRequest();
sendRpc(rpc);
return d;
return (Deferred) rpc.getDeferred();
}

/** Singleton callback to handle responses of getProtocolVersion RPCs. */
private final Callback<Long, Object> got_protocol_version =
new Callback<Long, Object>() {
public Long call(final Object response) {
if (response instanceof Long) {
return (Long) response;
} else {
if (!(response instanceof Long)) {
throw new InvalidResponseException(Long.class, response);
}
final Long version = (Long) response;
final long v = version;
if (v < 0 || v > Byte.MAX_VALUE) {
throw new InvalidResponseException("getProtocolVersion returned a "
+ (v < 0 ? "negative" : "too large") + " value", version);
}
final byte prev_version = server_version;
server_version = (byte) v;
deferred_server_version = null; // Volatile write.
if (prev_version == -1) { // We're 1st to get the version.
if (LOG.isDebugEnabled()) {
LOG.debug(chan + " uses RPC protocol version " + server_version);
}
} else if (prev_version != server_version) {
LOG.error("WTF? We previously found that " + chan + " uses RPC"
+ " protocol version " + prev_version + " but now the "
+ " server claims to be using version " + server_version);
}
return (Long) response;
}
public String toString() {
return "type getProtocolVersion response";
Expand Down Expand Up @@ -486,7 +552,7 @@ final class GetClosestRowBefore extends HBaseRpc {
}

@Override
ChannelBuffer serialize() {
ChannelBuffer serialize(final byte unused_server_version) {
// region.length and row.length will use at most a 3-byte VLong.
// This is because VLong wastes 1 byte of meta-data + 2 bytes of
// payload. HBase's own KeyValue code uses a short to store the row
Expand Down Expand Up @@ -876,6 +942,22 @@ public void exceptionCaught(final ChannelHandlerContext ctx,
// Low-level encoding and decoding //
// ------------------------------- //

/**
* Callback that retries the given RPC and returns its argument unchanged.
*/
final class RetryRpc<T> implements Callback<T, T> {
private final HBaseRpc rpc;

RetryRpc(final HBaseRpc rpc) {
this.rpc = rpc;
}

public T call(final T arg) {
sendRpc(rpc);
return arg;
}
}

/**
* Encodes an RPC and sends it downstream (to the wire).
* <p>
Expand All @@ -888,6 +970,10 @@ private ChannelBuffer encode(final HBaseRpc rpc) {
if (!rpc.hasDeferred()) {
throw new AssertionError("Should never happen! rpc=" + rpc);
}
if (rpc.versionSensitive() && server_version == -1) {
getProtocolVersion().addBoth(new RetryRpc<Long>(rpc));
return null;
}

// TODO(tsuna): Add rate-limiting here. We don't want to send more than
// N QPS to a given region server.
Expand All @@ -898,7 +984,7 @@ private ChannelBuffer encode(final HBaseRpc rpc) {
final int rpcid = this.rpcid.incrementAndGet();
ChannelBuffer payload;
try {
payload = rpc.serialize();
payload = rpc.serialize(server_version);
// We assume that payload has enough bytes at the beginning for us to
// "fill in the blanks" and put the RPC header. This is accounted for
// automatically by HBaseRpc#newBuffer. If someone creates their own
Expand Down Expand Up @@ -1342,7 +1428,21 @@ public void handleDownstream(final ChannelHandlerContext ctx,
final MessageEvent me = (MessageEvent) event;
final ChannelBuffer payload = (ChannelBuffer) me.getMessage();
final ChannelBuffer header = ChannelBuffers.wrappedBuffer(HELLO_HEADER);
final ChannelBuffer buf = ChannelBuffers.wrappedBuffer(header, payload);

// Piggyback a version request in the 1st packet, after the payload
// we were trying to send. This way we'll have the version handy
// pretty quickly. Since it's most likely going to fit in the same
// packet we send out, it adds ~zero overhead. But don't piggyback
// a version request if the payload is already a version request.
ChannelBuffer buf;
if (!isVersionRequest(payload)) {
final RegionClient client = ctx.getPipeline().get(RegionClient.class);
final ChannelBuffer version =
client.encode(client.getProtocolVersionRequest());
buf = ChannelBuffers.wrappedBuffer(header, payload, version);
} else {
buf = ChannelBuffers.wrappedBuffer(header, payload);
}
// We're going to send the header, so let's remove ourselves from the
// pipeline.
try {
Expand All @@ -1360,6 +1460,23 @@ public void handleDownstream(final ChannelHandlerContext ctx,
ctx.sendDownstream(event);
}

/** Inspects the payload and returns true if it's a version RPC. */
private static boolean isVersionRequest(final ChannelBuffer payload) {
final int length = GET_PROTOCOL_VERSION.length;
// Header = 4+4+2, followed by method name.
if (payload.readableBytes() < 4 + 4 + 2 + length) {
return false; // Too short to be a version request.
}
for (int i = 0; i < length; i++) {
if (payload.getByte(4 + 4 + 2 + i) != GET_PROTOCOL_VERSION[i]) {
return false;
}
}
// No other RPC has a name that starts with "getProtocolVersion"
// so this must be a version request.
return true;
}

}

}
4 changes: 2 additions & 2 deletions src/RowLockRequest.java
Expand Up @@ -81,7 +81,7 @@ private int predictSerializedSize() {
}

/** Serializes this request. */
ChannelBuffer serialize() {
ChannelBuffer serialize(final byte unused_server_version) {
final ChannelBuffer buf = newBuffer(predictSerializedSize());
buf.writeInt(2); // Number of parameters.

Expand Down Expand Up @@ -116,7 +116,7 @@ static final class ReleaseRequest extends HBaseRpc {
this.lock = lock;
}

ChannelBuffer serialize() {
ChannelBuffer serialize(final byte unused_server_version) {
// num param + type 1 + region length + region + type 2 + long
final ChannelBuffer buf = newBuffer(4 + 1 + 3 + region.name().length
+ 1 + 8);
Expand Down
6 changes: 3 additions & 3 deletions src/Scanner.java
Expand Up @@ -804,7 +804,7 @@ private int predictSerializedSize() {
}

/** Serializes this request. */
ChannelBuffer serialize() {
ChannelBuffer serialize(final byte unused_server_version) {
final ChannelBuffer buf = newBuffer(predictSerializedSize());
buf.writeInt(2); // Number of parameters.

Expand Down Expand Up @@ -876,7 +876,7 @@ public GetNextRowsRequest() {
}

/** Serializes this request. */
ChannelBuffer serialize() {
ChannelBuffer serialize(final byte unused_server_version) {
final ChannelBuffer buf = newBuffer(4 + 1 + 8 + 1 + 4);
buf.writeInt(2); // Number of parameters.
writeHBaseLong(buf, scanner_id);
Expand All @@ -901,7 +901,7 @@ public CloseScannerRequest(final long scanner_id) {
}

/** Serializes this request. */
ChannelBuffer serialize() {
ChannelBuffer serialize(final byte unused_server_version) {
final ChannelBuffer buf = newBuffer(4 + 1 + 8);
buf.writeInt(1); // Number of parameters.
writeHBaseLong(buf, scanner_id);
Expand Down

0 comments on commit 6bebf78

Please sign in to comment.