diff --git a/api/src/main/java/io/grpc/InternalMetadata.java b/api/src/main/java/io/grpc/InternalMetadata.java index cfd6fa5c0e6..d011eb9e8c7 100644 --- a/api/src/main/java/io/grpc/InternalMetadata.java +++ b/api/src/main/java/io/grpc/InternalMetadata.java @@ -18,6 +18,7 @@ import com.google.common.io.BaseEncoding; import io.grpc.Metadata.AsciiMarshaller; +import io.grpc.Metadata.BinaryStreamMarshaller; import io.grpc.Metadata.Key; import java.nio.charset.Charset; @@ -82,4 +83,46 @@ public static byte[][] serialize(Metadata md) { public static int headerCount(Metadata md) { return md.headerCount(); } + + /** + * Serializes all metadata entries, leaving some values as {@link InputStream}s. + * + *

Produces serialized names and values interleaved. result[i*2] are names, while + * result[i*2+1] are values. + * + *

Names are byte arrays as described according to the {@link Metadata#serialize} + * method. Values are either byte arrays or {@link InputStream}s. + */ + @Internal + public static Object[] serializePartial(Metadata md) { + return md.serializePartial(); + } + + /** + * Creates a holder for a pre-parsed value read by the transport. + * + * @param marshaller The {@link Metadata#BinaryStreamMarshaller} associated with this value. + * @param value The value to store. + * @return an object holding the pre-parsed value for this key. + */ + @Internal + public static Object parsedValue(BinaryStreamMarshaller marshaller, T value) { + return new Metadata.LazyValue<>(marshaller, value); + } + + /** + * Creates a new {@link Metadata} instance from serialized data, + * with some values pre-parsed. Metadata will mutate the passed in array. + * + * @param usedNames The number of names used. + * @param namesAndValues An array of interleaved names and values, + * with each name (at even indices) represented as a byte array, + * and each value (at odd indices) represented as either a byte + * array or an object returned by the {@link #parsedValue} + * method. + */ + @Internal + public static Metadata newMetadataWithParsedValues(int usedNames, Object[] namesAndValues) { + return new Metadata(usedNames, namesAndValues); + } } diff --git a/api/src/main/java/io/grpc/Metadata.java b/api/src/main/java/io/grpc/Metadata.java index c7559d5d6b0..07446155432 100644 --- a/api/src/main/java/io/grpc/Metadata.java +++ b/api/src/main/java/io/grpc/Metadata.java @@ -23,6 +23,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.io.BaseEncoding; +import com.google.common.io.ByteStreams; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -118,27 +122,41 @@ public String parseAsciiString(String serialized) { * Constructor called by the transport layer when it receives binary metadata. Metadata will * mutate the passed in array. * - * @param usedNames the number of + * @param usedNames the number of names */ Metadata(int usedNames, byte[]... binaryValues) { - assert (binaryValues.length & 1) == 0 : "Odd number of key-value pairs " + binaryValues.length; + this(usedNames, (Object[]) binaryValues); + } + + /** + * Constructor called by the transport layer when it receives partially-parsed metadata. + * Metadata will mutate the passed in array. + * + * @param usedNames the number of names + * @param namesAndValues an array of interleaved names and values, with each name + * (at even indices) represented by a byte array, and values (at odd indices) as + * described by {@link InternalMetadata#newMetadataWithParsedValues}. + */ + Metadata(int usedNames, Object[] namesAndValues) { + assert (namesAndValues.length & 1) == 0 + : "Odd number of key-value pairs " + namesAndValues.length; size = usedNames; - namesAndValues = binaryValues; + this.namesAndValues = namesAndValues; } - private byte[][] namesAndValues; + private Object[] namesAndValues; // The unscaled number of headers present. private int size; private byte[] name(int i) { - return namesAndValues[i * 2]; + return (byte[]) namesAndValues[i * 2]; } private void name(int i, byte[] name) { namesAndValues[i * 2] = name; } - private byte[] value(int i) { + private Object value(int i) { return namesAndValues[i * 2 + 1]; } @@ -146,6 +164,41 @@ private void value(int i, byte[] value) { namesAndValues[i * 2 + 1] = value; } + private void value(int i, Object value) { + if (namesAndValues instanceof byte[][]) { + // Reallocate an array of Object. + expand(cap()); + } + namesAndValues[i * 2 + 1] = value; + } + + private byte[] valueAsBytes(int i) { + Object value = value(i); + if (value instanceof byte[]) { + return (byte[]) value; + } else { + return ((LazyValue) value).toBytes(); + } + } + + private Object valueAsBytesOrStream(int i) { + Object value = value(i); + if (value instanceof byte[]) { + return value; + } else { + return ((LazyValue) value).toStream(); + } + } + + private T valueAsT(int i, Key key) { + Object value = value(i); + if (value instanceof byte[]) { + return key.parseBytes((byte[]) value); + } else { + return ((LazyValue) value).toObject(key); + } + } + private int cap() { return namesAndValues != null ? namesAndValues.length : 0; } @@ -192,7 +245,7 @@ public boolean containsKey(Key key) { public T get(Key key) { for (int i = size - 1; i >= 0; i--) { if (bytesEqual(key.asciiName(), name(i))) { - return key.parseBytes(value(i)); + return valueAsT(i, key); } } return null; @@ -231,7 +284,7 @@ public boolean hasNext() { public T next() { if (hasNext()) { hasNext = false; - return key.parseBytes(value(idx++)); + return valueAsT(idx++, key); } throw new NoSuchElementException(); } @@ -288,7 +341,11 @@ public void put(Key key, T value) { Preconditions.checkNotNull(value, "value"); maybeExpand(); name(size, key.asciiName()); - value(size, key.toBytes(value)); + if (key.serializesToStreams()) { + value(size, LazyValue.create(key, value)); + } else { + value(size, key.toBytes(value)); + } size++; } @@ -300,7 +357,7 @@ private void maybeExpand() { // Expands to exactly the desired capacity. private void expand(int newCapacity) { - byte[][] newNamesAndValues = new byte[newCapacity][]; + Object[] newNamesAndValues = new Object[newCapacity]; if (!isEmpty()) { System.arraycopy(namesAndValues, 0, newNamesAndValues, 0, len()); } @@ -322,8 +379,7 @@ public boolean remove(Key key, T value) { if (!bytesEqual(key.asciiName(), name(i))) { continue; } - @SuppressWarnings("unchecked") - T stored = key.parseBytes(value(i)); + T stored = valueAsT(i, key); if (!value.equals(stored)) { continue; } @@ -333,7 +389,7 @@ public boolean remove(Key key, T value) { System.arraycopy(namesAndValues, readIdx, namesAndValues, writeIdx, readLen); size -= 1; name(size, null); - value(size, null); + value(size, (byte[]) null); return true; } return false; @@ -350,7 +406,7 @@ public Iterable removeAll(Key key) { for (; readIdx < size; readIdx++) { if (bytesEqual(key.asciiName(), name(readIdx))) { ret = ret != null ? ret : new ArrayList(); - ret.add(key.parseBytes(value(readIdx))); + ret.add(valueAsT(readIdx, key)); continue; } name(writeIdx, name(readIdx)); @@ -406,11 +462,36 @@ public void discardAll(Key key) { */ @Nullable byte[][] serialize() { - if (len() == cap()) { - return namesAndValues; - } byte[][] serialized = new byte[len()][]; - System.arraycopy(namesAndValues, 0, serialized, 0, len()); + if (namesAndValues instanceof byte[][]) { + System.arraycopy(namesAndValues, 0, serialized, 0, len()); + } else { + for (int i = 0; i < size; i++) { + serialized[i * 2] = name(i); + serialized[i * 2 + 1] = valueAsBytes(i); + } + } + return serialized; + } + + /** + * Serializes all metadata entries, leaving some values as {@link InputStream}s. + * + *

Produces serialized names and values interleaved. result[i*2] are names, while + * result[i*2+1] are values. + * + *

Names are byte arrays as described according to the {@link #serialize} + * method. Values are either byte arrays or {@link InputStream}s. + * + *

This method is intended for transport use only. + */ + @Nullable + Object[] serializePartial() { + Object[] serialized = new Object[len()]; + for (int i = 0; i < size; i++) { + serialized[i * 2] = name(i); + serialized[i * 2 + 1] = valueAsBytesOrStream(i); + } return serialized; } @@ -467,9 +548,9 @@ public String toString() { String headerName = new String(name(i), US_ASCII); sb.append(headerName).append('='); if (headerName.endsWith(BINARY_HEADER_SUFFIX)) { - sb.append(BASE64_ENCODING_OMIT_PADDING.encode(value(i))); + sb.append(BASE64_ENCODING_OMIT_PADDING.encode(valueAsBytes(i))); } else { - String headerValue = new String(value(i), US_ASCII); + String headerValue = new String(valueAsBytes(i), US_ASCII); sb.append(headerValue); } } @@ -532,6 +613,25 @@ public interface AsciiMarshaller { T parseAsciiString(String serialized); } + /** Marshaller for metadata values that are serialized to an InputStream. */ + public interface BinaryStreamMarshaller { + /** + * Serializes a metadata value to an {@link InputStream}. + * + * @param value to serialize + * @return serialized version of value + */ + InputStream toStream(T value); + + /** + * Parses a serialized metadata value from an {@link InputStream}. + * + * @param stream of metadata to parse + * @return a parsed instance of type T + */ + T parseStream(InputStream stream); + } + /** * Key for metadata entries. Allows for parsing and serialization of metadata. * @@ -579,6 +679,16 @@ public static Key of(String name, BinaryMarshaller marshaller) { return new BinaryKey<>(name, marshaller); } + /** + * Creates a key for a binary header, serializing to input streams. + * + * @param name Must contain only the valid key characters as defined in the class comment. Must + * end with {@link #BINARY_HEADER_SUFFIX}. + */ + public static Key of(String name, BinaryStreamMarshaller marshaller) { + return new LazyStreamBinaryKey<>(name, marshaller); + } + /** * Creates a key for an ASCII header. * @@ -601,6 +711,7 @@ static Key of(String name, boolean pseudo, TrustedAsciiMarshaller mars private final String name; private final byte[] nameBytes; + private final Object marshaller; private static BitSet generateValidTChars() { BitSet valid = new BitSet(0x7f); @@ -632,10 +743,11 @@ private static String validateName(String n, boolean pseudo) { return n; } - private Key(String name, boolean pseudo) { + private Key(String name, boolean pseudo, Object marshaller) { this.originalName = checkNotNull(name, "name"); this.name = validateName(this.originalName.toLowerCase(Locale.ROOT), pseudo); this.nameBytes = this.name.getBytes(US_ASCII); + this.marshaller = marshaller; } /** @@ -706,6 +818,28 @@ public String toString() { * @return a parsed instance of type T */ abstract T parseBytes(byte[] serialized); + + /** + * @return whether this key will be serialized to bytes lazily. + */ + boolean serializesToStreams() { + return false; + } + + /** + * Gets this keys (implementation-specific) marshaller, or null if the + * marshaller is not of the given type. + * + * @param marshallerClass The type we expect the marshaller to be. + * @return the marshaller object for this key, or null. + */ + @Nullable + final M getMarshaller(Class marshallerClass) { + if (marshallerClass.isInstance(marshaller)) { + return marshallerClass.cast(marshaller); + } + return null; + } } private static class BinaryKey extends Key { @@ -713,7 +847,7 @@ private static class BinaryKey extends Key { /** Keys have a name and a binary marshaller used for serialization. */ private BinaryKey(String name, BinaryMarshaller marshaller) { - super(name, false /* not pseudo */); + super(name, false /* not pseudo */, marshaller); checkArgument( name.endsWith(BINARY_HEADER_SUFFIX), "Binary header is named %s. It must end with %s", @@ -734,12 +868,93 @@ T parseBytes(byte[] serialized) { } } + /** A binary key for values which should be serialized lazily to {@Link InputStream}s. */ + private static class LazyStreamBinaryKey extends Key { + + private final BinaryStreamMarshaller marshaller; + + /** Keys have a name and a stream marshaller used for serialization. */ + private LazyStreamBinaryKey(String name, BinaryStreamMarshaller marshaller) { + super(name, false /* not pseudo */, marshaller); + checkArgument( + name.endsWith(BINARY_HEADER_SUFFIX), + "Binary header is named %s. It must end with %s", + name, + BINARY_HEADER_SUFFIX); + checkArgument(name.length() > BINARY_HEADER_SUFFIX.length(), "empty key name"); + this.marshaller = checkNotNull(marshaller, "marshaller is null"); + } + + @Override + byte[] toBytes(T value) { + return streamToBytes(marshaller.toStream(value)); + } + + @Override + T parseBytes(byte[] serialized) { + return marshaller.parseStream(new ByteArrayInputStream(serialized)); + } + + @Override + boolean serializesToStreams() { + return true; + } + } + + /** Internal holder for values which are serialized/de-serialized lazily. */ + static final class LazyValue { + private final BinaryStreamMarshaller marshaller; + private final T value; + private volatile byte[] serialized; + + static LazyValue create(Key key, T value) { + return new LazyValue<>(checkNotNull(getBinaryStreamMarshaller(key)), value); + } + + /** A value set by the application. */ + LazyValue(BinaryStreamMarshaller marshaller, T value) { + this.marshaller = marshaller; + this.value = value; + } + + InputStream toStream() { + return marshaller.toStream(value); + } + + byte[] toBytes() { + if (serialized == null) { + synchronized (this) { + if (serialized == null) { + serialized = streamToBytes(toStream()); + } + } + } + return serialized; + } + + T2 toObject(Key key) { + if (key.serializesToStreams()) { + BinaryStreamMarshaller marshaller = getBinaryStreamMarshaller(key); + if (marshaller != null) { + return marshaller.parseStream(toStream()); + } + } + return key.parseBytes(toBytes()); + } + + @Nullable + @SuppressWarnings("unchecked") + private static BinaryStreamMarshaller getBinaryStreamMarshaller(Key key) { + return (BinaryStreamMarshaller) key.getMarshaller(BinaryStreamMarshaller.class); + } + } + private static class AsciiKey extends Key { private final AsciiMarshaller marshaller; /** Keys have a name and an ASCII marshaller used for serialization. */ private AsciiKey(String name, boolean pseudo, AsciiMarshaller marshaller) { - super(name, pseudo); + super(name, pseudo, marshaller); Preconditions.checkArgument( !name.endsWith(BINARY_HEADER_SUFFIX), "ASCII header is named %s. Only binary headers may end with %s", @@ -764,7 +979,7 @@ private static final class TrustedAsciiKey extends Key { /** Keys have a name and an ASCII marshaller used for serialization. */ private TrustedAsciiKey(String name, boolean pseudo, TrustedAsciiMarshaller marshaller) { - super(name, pseudo); + super(name, pseudo, marshaller); Preconditions.checkArgument( !name.endsWith(BINARY_HEADER_SUFFIX), "ASCII header is named %s. Only binary headers may end with %s", @@ -808,4 +1023,12 @@ interface TrustedAsciiMarshaller { */ T parseAsciiString(byte[] serialized); } + + private static byte[] streamToBytes(InputStream stream) { + try { + return ByteStreams.toByteArray(stream); + } catch (IOException ioe) { + throw new RuntimeException("failure reading serialized stream", ioe); + } + } } diff --git a/api/src/test/java/io/grpc/MetadataTest.java b/api/src/test/java/io/grpc/MetadataTest.java index e2bdf667ae0..c9095a82d5a 100644 --- a/api/src/test/java/io/grpc/MetadataTest.java +++ b/api/src/test/java/io/grpc/MetadataTest.java @@ -22,13 +22,19 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import com.google.common.collect.Lists; +import com.google.common.io.ByteStreams; import io.grpc.Metadata.Key; import io.grpc.internal.GrpcUtil; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; import java.util.Arrays; import java.util.Iterator; import java.util.Locale; @@ -59,9 +65,59 @@ public Fish parseBytes(byte[] serialized) { } }; + private static class FishStreamMarsaller implements Metadata.BinaryStreamMarshaller { + @Override + public InputStream toStream(Fish fish) { + return new ByteArrayInputStream(FISH_MARSHALLER.toBytes(fish)); + } + + @Override + public Fish parseStream(InputStream stream) { + try { + return FISH_MARSHALLER.parseBytes(ByteStreams.toByteArray(stream)); + } catch (IOException ioe) { + throw new AssertionError(); + } + } + } + + private static final Metadata.BinaryStreamMarshaller FISH_STREAM_MARSHALLER = + new FishStreamMarsaller(); + + /** A pattern commonly used to avoid unnecessary serialization of immutable objects. */ + private static final class FakeFishStream extends InputStream { + final Fish fish; + + FakeFishStream(Fish fish) { + this.fish = fish; + } + + @Override + public int read() throws IOException { + throw new IOException("Not actually a stream"); + } + } + + private static final Metadata.BinaryStreamMarshaller IMMUTABLE_FISH_MARSHALLER = + new Metadata.BinaryStreamMarshaller() { + @Override + public InputStream toStream(Fish fish) { + return new FakeFishStream(fish); + } + + @Override + public Fish parseStream(InputStream stream) { + return ((FakeFishStream) stream).fish; + } + }; + private static final String LANCE = "lance"; private static final byte[] LANCE_BYTES = LANCE.getBytes(US_ASCII); private static final Metadata.Key KEY = Metadata.Key.of("test-bin", FISH_MARSHALLER); + private static final Metadata.Key KEY_STREAMED = + Key.of("streamed-bin", FISH_STREAM_MARSHALLER); + private static final Metadata.Key KEY_IMMUTABLE = + Key.of("immutable-bin", IMMUTABLE_FISH_MARSHALLER); @Test public void noPseudoHeaders() { @@ -334,6 +390,95 @@ public void invalidKeyName() { } } + @Test + public void streamedValue() { + Fish salmon = new Fish("salmon"); + Metadata h = new Metadata(); + h.put(KEY_STREAMED, salmon); + assertEquals(salmon, h.get(KEY_STREAMED)); + } + + @Test + public void streamedValueDifferentKey() { + Fish salmon = new Fish("salmon"); + Metadata h = new Metadata(); + h.put(KEY_STREAMED, salmon); + + // Get using a different key instance (but the same marshaller). + Fish fish = h.get(copyKey(KEY_STREAMED, FISH_STREAM_MARSHALLER)); + assertEquals(salmon, fish); + } + + @Test + public void streamedValueDifferentMarshaller() { + Fish salmon = new Fish("salmon"); + Metadata h = new Metadata(); + h.put(KEY_STREAMED, salmon); + + // Get using a different marshaller instance. + Fish fish = h.get(copyKey(KEY_STREAMED, new FishStreamMarsaller())); + assertEquals(salmon, fish); + } + + @Test + public void serializeParseMetadataWithStreams() { + Metadata h = new Metadata(); + Fish salmon = new Fish("salmon"); + h.put(KEY_STREAMED, salmon); + + Metadata parsed = new Metadata(h.serialize()); + assertEquals(salmon, parsed.get(KEY_STREAMED)); + } + + @Test + public void immutableMarshaller() { + Metadata h = new Metadata(KEY.asciiName(), LANCE_BYTES); + Fish salmon = new Fish("salmon"); + h.put(KEY_IMMUTABLE, salmon); + assertSame(salmon, h.get(KEY_IMMUTABLE)); + // Even though the key differs, the marshaller can chose to avoid serialization. + assertSame(salmon, h.get(copyKey(KEY_IMMUTABLE, IMMUTABLE_FISH_MARSHALLER))); + } + + @Test + public void partialSerialization() { + Metadata h = new Metadata(KEY.asciiName(), LANCE_BYTES); + Fish salmon = new Fish("salmon"); + h.put(KEY_STREAMED, salmon); + h.put(KEY_IMMUTABLE, salmon); + + Object[] serialized = InternalMetadata.serializePartial(h); + assertEquals(6, serialized.length); + assertEquals("test-bin", new String((byte[]) serialized[0], US_ASCII)); + assertArrayEquals(LANCE_BYTES, (byte[]) serialized[1]); + + assertEquals("streamed-bin", new String((byte[]) serialized[2], US_ASCII)); + assertEquals(salmon, FISH_STREAM_MARSHALLER.parseStream((InputStream) serialized[3])); + assertNotSame(salmon, FISH_STREAM_MARSHALLER.parseStream((InputStream) serialized[3])); + + assertEquals("immutable-bin", new String((byte[]) serialized[4], US_ASCII)); + assertSame(salmon, IMMUTABLE_FISH_MARSHALLER.parseStream((InputStream) serialized[5])); + } + + @Test + public void createFromPartial() { + Metadata h = new Metadata(KEY.asciiName(), LANCE_BYTES); + Fish salmon = new Fish("salmon"); + h.put(KEY_STREAMED, salmon); + h.put(KEY_IMMUTABLE, salmon); + + Fish anotherSalmon = new Fish("salmon"); + + Object[] partial = InternalMetadata.serializePartial(h); + partial[3] = InternalMetadata.parsedValue(FISH_STREAM_MARSHALLER, anotherSalmon); + partial[5] = InternalMetadata.parsedValue(IMMUTABLE_FISH_MARSHALLER, anotherSalmon); + + Metadata h2 = new Metadata(3, partial); + assertEquals(new Fish(LANCE), h2.get(KEY)); + assertEquals(anotherSalmon, h2.get(KEY_STREAMED)); + assertSame(anotherSalmon, h2.get(KEY_IMMUTABLE)); + } + private static final class Fish { private String name; @@ -366,4 +511,8 @@ public String toString() { return "Fish(" + name + ")"; } } + + private static Key copyKey(Key key, Metadata.BinaryStreamMarshaller marshaller) { + return Key.of(key.originalName(), marshaller); + } }