Skip to content
Permalink
Browse files
Cleaned up the Message interface so that it's obvious and easy to use…
… Framed and Unframed encoding of the messages.

 


git-svn-id: https://svn.apache.org/repos/asf/activemq/sandbox/activemq-protobuf@702614 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
chirino committed Oct 7, 2008
1 parent 20f2163 commit 1a4381414b859784057f83b54f3e58aebd347d09
Showing 5 changed files with 258 additions and 164 deletions.
@@ -111,6 +111,6 @@ public void testMultipleFilesOption() throws Exception {
.setNestedEnum(MessageWithNoOuter.NestedEnum.BAZ)
.setForeignEnum(EnumWithNoOuter.BAR)
;
assertEquals(message.toString(), MessageWithNoOuter.parseFrom(message.toByteArray()).toString());
assertEquals(message.toString(), MessageWithNoOuter.parseUnframed(message.toUnframedByteArray()).toString());
}
}
@@ -28,10 +28,10 @@ public class WireFormatTest extends TestCase {
public void testSerialization() throws Exception {
TestAllTypes message = TestUtil.getAllSet();

byte[] rawBytes = message.toByteArray();
assertEquals(rawBytes.length, message.serializedSize());
byte[] rawBytes = message.toUnframedByteArray();
assertEquals(rawBytes.length, message.serializedSizeUnframed());

TestAllTypes message2 = TestAllTypes.parseFrom(rawBytes);
TestAllTypes message2 = TestAllTypes.parseUnframed(rawBytes);

TestUtil.assertAllFieldsSet(message2);
}
@@ -16,115 +16,98 @@
*/
package org.apache.activemq.protobuf;

import com.google.protobuf.ByteString;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.ExtensionRegistry;
import com.google.protobuf.InvalidProtocolBufferException;

import static org.apache.activemq.protobuf.WireInfo.*;
import static org.apache.activemq.protobuf.WireInfo.WIRETYPE_END_GROUP;
import static org.apache.activemq.protobuf.WireInfo.WIRETYPE_LENGTH_DELIMITED;
import static org.apache.activemq.protobuf.WireInfo.WIRETYPE_START_GROUP;
import static org.apache.activemq.protobuf.WireInfo.makeTag;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;

import com.google.protobuf.ByteString;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.InvalidProtocolBufferException;

abstract public class BaseMessage<T> implements Message<T> {

protected int memoizedSerializedSize = -1;

static protected <T> void addAll(Iterable<T> values, Collection<? super T> list) {
if (values instanceof Collection) {
@SuppressWarnings("unsafe")
Collection<T> collection = (Collection<T>)values;
list.addAll(collection);
} else {
for (T value : values) {
list.add(value);
}
}
}

abstract public T clone() throws CloneNotSupportedException;

static protected void writeGroup(CodedOutputStream output, int tag, BaseMessage message) throws IOException {
output.writeTag(tag, WIRETYPE_START_GROUP);
message.writePartialTo(output);
output.writeTag(tag, WIRETYPE_END_GROUP);
}

static protected void writeMessage(CodedOutputStream output, int tag, BaseMessage message) throws IOException {
output.writeTag(tag, WIRETYPE_LENGTH_DELIMITED);
output.writeRawVarint32(message.serializedSize());
message.writePartialTo(output);
}
///////////////////////////////////////////////////////////////////
// Write related helpers.
///////////////////////////////////////////////////////////////////

static protected <T extends BaseMessage> T readGroup(CodedInputStream input, ExtensionRegistry extensionRegistry, int tag, T group) throws IOException {
group.mergeFrom(input, extensionRegistry);
input.checkLastTagWas(makeTag(tag, WIRETYPE_END_GROUP));
return group;
public void writeFramed(CodedOutputStream output) throws IOException {
output.writeRawVarint32(serializedSizeUnframed());
writeUnframed(output);
}

static protected <T extends BaseMessage> T readMessage(CodedInputStream input, ExtensionRegistry extensionRegistry, T message) throws IOException {
int length = input.readRawVarint32();
int oldLimit = input.pushLimit(length);
message.mergeFrom(input, extensionRegistry);
input.checkLastTagWas(0);
input.popLimit(oldLimit);
return message;
}

static protected int computeGroupSize(int tag, BaseMessage message) {
return CodedOutputStream.computeTagSize(tag) * 2 + message.serializedSize();
}

static protected int computeMessageSize(int tag, BaseMessage message) {
int t = message.serializedSize();
return CodedOutputStream.computeTagSize(tag) + CodedOutputStream.computeRawVarint32Size(t) + t;
}

public T mergeFrom(CodedInputStream input) throws IOException {
return mergeFrom(input, ExtensionRegistry.getEmptyRegistry());
}

public byte[] toByteArray() {
public byte[] toUnframedByteArray() {
try {
byte[] result = new byte[serializedSize()];
byte[] result = new byte[serializedSizeUnframed()];
CodedOutputStream output = CodedOutputStream.newInstance(result);
writePartialTo(output);
writeUnframed(output);
output.checkNoSpaceLeft();
return result;
} catch (IOException e) {
throw new RuntimeException("Serializing to a byte array threw an IOException " + "(should never happen).", e);
}
}

protected List<String> prefix(List<String> missingFields, String prefix) {
ArrayList<String> rc = new ArrayList<String>(missingFields.size());
for (String v : missingFields) {
rc.add(prefix+v);


public byte[] toFramedByteArray() {
try {
byte[] result = new byte[serializedSizeFramed()];
CodedOutputStream output = CodedOutputStream.newInstance(result);
writeFramed(output);
output.checkNoSpaceLeft();
return result;
} catch (IOException e) {
throw new RuntimeException("Serializing to a byte array threw an IOException " + "(should never happen).", e);
}
return rc;
}
}

public void writeFramed(OutputStream output) throws IOException {
CodedOutputStream codedOutput = CodedOutputStream.newInstance(output);
writeFramed(codedOutput);
codedOutput.flush();
}

public void writeTo(OutputStream output) throws IOException {
public void writeUnframed(OutputStream output) throws IOException {
CodedOutputStream codedOutput = CodedOutputStream.newInstance(output);
writeTo(codedOutput);
writeUnframed(codedOutput);
codedOutput.flush();
}

public void writeTo(CodedOutputStream output) throws java.io.IOException {
writePartialTo(output);
output.writeTag(0, WIRETYPE_END_GROUP);
public int serializedSizeFramed() {
int t = serializedSizeUnframed();
return CodedOutputStream.computeRawVarint32Size(t) + t;

}

///////////////////////////////////////////////////////////////////
// Read related helpers.
///////////////////////////////////////////////////////////////////

public T mergeFramed(CodedInputStream input) throws IOException {
int length = input.readRawVarint32();
int oldLimit = input.pushLimit(length);
T rc= mergeUnframed(input);
input.checkLastTagWas(0);
input.popLimit(oldLimit);
return rc;
}

public T mergeFrom(ByteString data) throws InvalidProtocolBufferException {
public T mergeUnframed(ByteString data) throws InvalidProtocolBufferException {
try {
CodedInputStream input = data.newCodedInput();
mergeFrom(input);
mergeUnframed(input);
input.checkLastTagWas(0);
return (T)this;
} catch (InvalidProtocolBufferException e) {
@@ -133,11 +116,11 @@ public T mergeFrom(ByteString data) throws InvalidProtocolBufferException {
throw new RuntimeException("Reading from a ByteString threw an IOException (should " + "never happen).", e);
}
}

public T mergeFrom(ByteString data, ExtensionRegistry extensionRegistry) throws InvalidProtocolBufferException {
public T mergeFramed(ByteString data) throws InvalidProtocolBufferException {
try {
CodedInputStream input = data.newCodedInput();
mergeFrom(input, extensionRegistry);
mergeFramed(input);
input.checkLastTagWas(0);
return (T)this;
} catch (InvalidProtocolBufferException e) {
@@ -147,10 +130,10 @@ public T mergeFrom(ByteString data, ExtensionRegistry extensionRegistry) throws
}
}

public T mergeFrom(byte[] data) throws InvalidProtocolBufferException {
public T mergeUnframed(byte[] data) throws InvalidProtocolBufferException {
try {
CodedInputStream input = CodedInputStream.newInstance(data);
mergeFrom(input);
mergeUnframed(input);
input.checkLastTagWas(0);
return (T)this;
} catch (InvalidProtocolBufferException e) {
@@ -159,11 +142,11 @@ public T mergeFrom(byte[] data) throws InvalidProtocolBufferException {
throw new RuntimeException("Reading from a byte array threw an IOException (should " + "never happen).", e);
}
}

public T mergeFrom(byte[] data, ExtensionRegistry extensionRegistry) throws InvalidProtocolBufferException {
public T mergeFramed(byte[] data) throws InvalidProtocolBufferException {
try {
CodedInputStream input = CodedInputStream.newInstance(data);
mergeFrom(input, extensionRegistry);
mergeFramed(input);
input.checkLastTagWas(0);
return (T)this;
} catch (InvalidProtocolBufferException e) {
@@ -173,16 +156,122 @@ public T mergeFrom(byte[] data, ExtensionRegistry extensionRegistry) throws Inva
}
}

public T mergeFrom(InputStream input) throws IOException {
public T mergeUnframed(InputStream input) throws IOException {
CodedInputStream codedInput = CodedInputStream.newInstance(input);
mergeFrom(codedInput);
mergeUnframed(codedInput);
return (T)this;
}

public T mergeFramed(InputStream input) throws IOException {
int length = readRawVarint32(input);
byte []data = new byte[length];
int pos = 0;
while( pos < length ) {
int r = input.read(data, pos, length-pos);
if( r < 0 ) {
throw new InvalidProtocolBufferException("Input stream ended before a full message frame could be read.");
}
pos+=r;
}
return mergeUnframed(data);
}

public T mergeFrom(InputStream input, ExtensionRegistry extensionRegistry) throws IOException {
CodedInputStream codedInput = CodedInputStream.newInstance(input);
mergeFrom(codedInput, extensionRegistry);
return (T)this;
///////////////////////////////////////////////////////////////////
// Internal implementation methods.
///////////////////////////////////////////////////////////////////
static protected <T> void addAll(Iterable<T> values, Collection<? super T> list) {
if (values instanceof Collection) {
@SuppressWarnings("unsafe")
Collection<T> collection = (Collection<T>)values;
list.addAll(collection);
} else {
for (T value : values) {
list.add(value);
}
}
}

static protected void writeGroup(CodedOutputStream output, int tag, BaseMessage message) throws IOException {
output.writeTag(tag, WIRETYPE_START_GROUP);
message.writeUnframed(output);
output.writeTag(tag, WIRETYPE_END_GROUP);
}

static protected <T extends BaseMessage> T readGroup(CodedInputStream input, int tag, T group) throws IOException {
group.mergeUnframed(input);
input.checkLastTagWas(makeTag(tag, WIRETYPE_END_GROUP));
return group;
}

static protected int computeGroupSize(int tag, BaseMessage message) {
return CodedOutputStream.computeTagSize(tag) * 2 + message.serializedSizeUnframed();
}


static protected void writeMessage(CodedOutputStream output, int tag, BaseMessage message) throws IOException {
output.writeTag(tag, WIRETYPE_LENGTH_DELIMITED);
message.writeFramed(output);
}

static protected int computeMessageSize(int tag, BaseMessage message) {
return CodedOutputStream.computeTagSize(tag) + message.serializedSizeFramed();
}

protected List<String> prefix(List<String> missingFields, String prefix) {
ArrayList<String> rc = new ArrayList<String>(missingFields.size());
for (String v : missingFields) {
rc.add(prefix+v);
}
return rc;
}


/**
* Read a raw Varint from the stream. If larger than 32 bits, discard the
* upper bits.
*/
static protected int readRawVarint32(InputStream is) throws IOException {
byte tmp = readRawByte(is);
if (tmp >= 0) {
return tmp;
}
int result = tmp & 0x7f;
if ((tmp = readRawByte(is)) >= 0) {
result |= tmp << 7;
} else {
result |= (tmp & 0x7f) << 7;
if ((tmp = readRawByte(is)) >= 0) {
result |= tmp << 14;
} else {
result |= (tmp & 0x7f) << 14;
if ((tmp = readRawByte(is)) >= 0) {
result |= tmp << 21;
} else {
result |= (tmp & 0x7f) << 21;
result |= (tmp = readRawByte(is)) << 28;
if (tmp < 0) {
// Discard upper 32 bits.
for (int i = 0; i < 5; i++) {
if (readRawByte(is) >= 0) return result;
}
throw new InvalidProtocolBufferException(
"CodedInputStream encountered a malformed varint.");
}
}
}
}
return result;
}

static protected byte readRawByte(InputStream is) throws IOException {
int rc = is.read();
if( rc == -1 ) {
throw new InvalidProtocolBufferException(
"While parsing a protocol message, the input ended unexpectedly " +
"in the middle of a field. This could mean either than the " +
"input has been truncated or that an embedded message " +
"misreported its own length.");
}
return (byte) rc;
}
}

0 comments on commit 1a43814

Please sign in to comment.