Skip to content

Commit

Permalink
Merge -c 1130503 from trunk to 1.5 branch. Fixes: AVRO-832.
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/avro/branches/branch-1.5@1130505 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
cutting committed Jun 2, 2011
1 parent 47e7213 commit e82da45
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 15 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Expand Up @@ -31,6 +31,9 @@ Avro 1.5.2 (unreleased)

AVRO-809. Java: Fix reflect for classes that have no package. (cutting)

AVRO-832. Java: Fix RPC client to correctly perform schema
resolution on message responses. (cutting)

Avro 1.5.1 (3 May 2011)

NEW FEATURES
Expand Down
18 changes: 14 additions & 4 deletions lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java
Expand Up @@ -150,15 +150,15 @@ public synchronized Object request(String messageName, Object request)
context.setResponseCallMeta(META_READER.read(null, in));

if (!in.readBoolean()) { // no error
Object response = readResponse(rm.getResponse(), in);
Object response = readResponse(rm.getResponse(), m.getResponse(), in);
context.setResponse(response);
for (RPCPlugin plugin : rpcMetaPlugins) {
plugin.clientReceiveResponse(context);
}
return response;

} else {
Exception error = readError(rm.getErrors(), in);
Exception error = readError(rm.getErrors(), m.getErrors(), in);
context.setError(error);
for (RPCPlugin plugin : rpcMetaPlugins) {
plugin.clientReceiveResponse(context);
Expand Down Expand Up @@ -275,12 +275,22 @@ public synchronized Protocol getRemote() throws IOException {
public abstract void writeRequest(Schema schema, Object request,
Encoder out) throws IOException;

@Deprecated // for compatibility in 1.5
public Object readResponse(Schema schema, Decoder in) throws IOException {
return readResponse(schema, schema, in);
}

/** Reads a response message. */
public abstract Object readResponse(Schema schema, Decoder in)
public abstract Object readResponse(Schema writer, Schema reader, Decoder in)
throws IOException;

@Deprecated // for compatibility in 1.5
public Object readError(Schema schema, Decoder in) throws IOException {
return readError(schema, schema, in);
}

/** Reads an error message. */
public abstract Exception readError(Schema schema, Decoder in)
public abstract Exception readError(Schema writer, Schema reader, Decoder in)
throws IOException;
}

Expand Up @@ -59,14 +59,15 @@ public void writeRequest(Schema schema, Object request, Encoder out)
}

@Override
public Object readResponse(Schema schema, Decoder in) throws IOException {
return new GenericDatumReader<Object>(schema).read(null, in);
public Object readResponse(Schema writer, Schema reader, Decoder in)
throws IOException {
return new GenericDatumReader<Object>(writer, reader).read(null, in);
}

@Override
public Exception readError(Schema schema, Decoder in)
public Exception readError(Schema writer, Schema reader, Decoder in)
throws IOException {
Object error = new GenericDatumReader<Object>(schema).read(null,in);
Object error = new GenericDatumReader<Object>(writer, reader).read(null,in);
if (error instanceof CharSequence)
return new AvroRuntimeException(error.toString()); // system error
return new AvroRemoteException(error);
Expand Down
Expand Up @@ -50,8 +50,8 @@ protected DatumWriter<Object> getDatumWriter(Schema schema) {
}

@Override
protected DatumReader<Object> getDatumReader(Schema schema) {
return new ReflectDatumReader<Object>(schema);
protected DatumReader<Object> getDatumReader(Schema writer, Schema reader) {
return new ReflectDatumReader<Object>(writer, reader);
}

/** Create a proxy instance whose methods invoke RPCs. */
Expand Down
Expand Up @@ -59,8 +59,13 @@ protected DatumWriter<Object> getDatumWriter(Schema schema) {
return new SpecificDatumWriter<Object>(schema);
}

@Deprecated // for compatibility in 1.5
protected DatumReader<Object> getDatumReader(Schema schema) {
return new SpecificDatumReader<Object>(schema);
return getDatumReader(schema, schema);
}

protected DatumReader<Object> getDatumReader(Schema writer, Schema reader) {
return new SpecificDatumReader<Object>(writer, reader);
}

@Override
Expand All @@ -73,14 +78,15 @@ public void writeRequest(Schema schema, Object request, Encoder out)
}

@Override
public Object readResponse(Schema schema, Decoder in) throws IOException {
return getDatumReader(schema).read(null, in);
public Object readResponse(Schema writer, Schema reader, Decoder in)
throws IOException {
return getDatumReader(writer, reader).read(null, in);
}

@Override
public Exception readError(Schema schema, Decoder in)
public Exception readError(Schema writer, Schema reader, Decoder in)
throws IOException {
Object value = getDatumReader(schema).read(null, in);
Object value = getDatumReader(writer, reader).read(null, in);
if (value instanceof Exception)
return (Exception)value;
return new AvroRuntimeException(value.toString());
Expand Down
Expand Up @@ -44,6 +44,8 @@
import java.util.List;
import java.util.Random;

import org.codehaus.jackson.node.BooleanNode;

public class TestProtocolGeneric {
private static final Logger LOG
= LoggerFactory.getLogger(TestProtocolGeneric.class);
Expand Down Expand Up @@ -211,6 +213,50 @@ public void testHandshake() throws IOException {
}
}

@Test
/** Construct and use a different protocol whose "echo" response has an extra
field to check that correct schema is used to parse response. */
public void testResponseChange() throws IOException {

List<Field> fields = new ArrayList<Field>();
for (Field f : PROTOCOL.getType("TestRecord").getFields())
fields.add(new Field(f.name(), f.schema(), null, null));
fields.add(new Field("extra", Schema.create(Schema.Type.BOOLEAN),
null, BooleanNode.TRUE));
Schema record =
Schema.createRecord("TestRecord", null, "org.apache.avro.test", false);
record.setFields(fields);

Protocol protocol = new Protocol("Simple", "org.apache.avro.test");
List<Field> params = new ArrayList<Field>();
params.add(new Field("record", record, null, null));

Protocol.Message message =
protocol.createMessage("echo", null, Schema.createRecord(params),
record,
Schema.createUnion(new ArrayList<Schema>()));
protocol.getMessages().put("echo", message);
Transceiver t
= new SocketTransceiver(new InetSocketAddress(server.getPort()));
try {
GenericRequestor r = new GenericRequestor(protocol, t);
GenericRecord args = new GenericData.Record(message.getRequest());
GenericRecord rec = new GenericData.Record(record);
rec.put("name", new Utf8("foo"));
rec.put("kind", new GenericData.EnumSymbol
(PROTOCOL.getType("Kind"), "BAR"));
rec.put("hash", new GenericData.Fixed
(PROTOCOL.getType("MD5"),
new byte[]{0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5}));
rec.put("extra", Boolean.TRUE);
args.put("record", rec);
GenericRecord response = (GenericRecord)r.request("echo", args);
assertEquals(rec, response);
} finally {
t.close();
}
}

@AfterClass
public static void testStopServer() throws IOException {
client.close();
Expand Down
Expand Up @@ -48,6 +48,7 @@ public void testStartServer() throws Exception {
}

@Override public void testHandshake() throws IOException {}
@Override public void testResponseChange() throws IOException {}

public interface ProtoInterface {
byte[] test(byte[] b);
Expand Down
Expand Up @@ -163,5 +163,6 @@ public void testWrongPassword() throws Exception {
}

@Override public void testHandshake() throws IOException {}
@Override public void testResponseChange() throws IOException {}

}

0 comments on commit e82da45

Please sign in to comment.