Skip to content

Commit

Permalink
Add a message accept encoding header
Browse files Browse the repository at this point in the history
  • Loading branch information
carl-mastrangelo committed Sep 17, 2015
1 parent c4bcf14 commit 4572b19
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 9 deletions.
18 changes: 9 additions & 9 deletions core/src/main/java/io/grpc/DecompressorRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

package io.grpc;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -59,20 +60,16 @@ public final class DecompressorRegistry {
*
* @param d The decompressor to register
* @param advertised If true, the message encoding will be listed in the Accept-Encoding header.
* @throws IllegalArgumentException if another compressor by the same name is already registered.
*/
public static void register(Decompressor d, boolean advertised) {
INSTANCE.internalRegister(d, advertised);
}

@VisibleForTesting
void internalRegister(Decompressor d, boolean advertised) {
DecompressorInfo previousInfo = decompressors.putIfAbsent(
d.getMessageEncoding(), new DecompressorInfo(d, advertised));
if (previousInfo != null) {
throw new IllegalArgumentException(
"A decompressor was already registered: " + previousInfo.decompressor);
}
String encoding = d.getMessageEncoding();
checkArgument(!encoding.contains(","), "Comma is currently not allowed in message encoding");
decompressors.put(encoding, new DecompressorInfo(d, advertised));
}

/**
Expand All @@ -90,6 +87,9 @@ Set<String> internalGetKnownMessageEncodings() {
/**
* Provides a list of all message encodings that have decompressors available and should be
* advertised.
*
* <p>The specification doesn't say anything about ordering, or preference, so the returned codes
* can be arbitrary.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/492")
public static Set<String> getAdvertisedMessageEncodings() {
Expand All @@ -98,7 +98,7 @@ public static Set<String> getAdvertisedMessageEncodings() {

@VisibleForTesting
Set<String> internalGetAdvertisedMessageEncodings() {
Set<String> advertisedDecompressors = new HashSet<String>();
Set<String> advertisedDecompressors = new HashSet<String>(decompressors.size());
for (Entry<String, DecompressorInfo> entry : decompressors.entrySet()) {
if (entry.getValue().advertised) {
advertisedDecompressors.add(entry.getKey());
Expand Down Expand Up @@ -132,7 +132,7 @@ Decompressor internalLookupDecompressor(String messageEncoding) {
DecompressorRegistry() {
decompressors = new ConcurrentHashMap<String, DecompressorInfo>();
Decompressor gzip = new Codec.Gzip();
// By default, Gzip
// By default, Gzip is not advertised
decompressors.put(gzip.getMessageEncoding(), new DecompressorInfo(gzip, false));
decompressors.put(
Codec.Identity.NONE.getMessageEncoding(), new DecompressorInfo(Codec.Identity.NONE, false));
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/io/grpc/internal/ClientCallImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.Decompressor;
import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
Expand Down Expand Up @@ -147,6 +148,12 @@ public void start(Listener<RespT> observer, Metadata headers) {
headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
}

headers.removeAll(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY);
// TODO: Maybe move registry to the channel to ease testing.
for (String encoding : DecompressorRegistry.getAdvertisedMessageEncodings()) {
headers.put(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY, encoding);
}

try {
stream = transport.newStream(method, headers, listener);
} catch (IllegalStateException ex) {
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/io/grpc/internal/GrpcUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public final class GrpcUtil {
public static final Metadata.Key<String> MESSAGE_ENCODING_KEY =
Metadata.Key.of(GrpcUtil.MESSAGE_ENCODING, Metadata.ASCII_STRING_MARSHALLER);

/**
* {@link io.grpc.Metadata.Key} for the accepted message encodings header.
*/
public static final Metadata.Key<String> MESSAGE_ACCEPT_ENCODING_KEY =
Metadata.Key.of(GrpcUtil.MESSAGE_ACCEPT_ENCODING, Metadata.ASCII_STRING_MARSHALLER);

/**
* {@link io.grpc.Metadata.Key} for the :authority pseudo header.
*
Expand Down Expand Up @@ -118,6 +124,11 @@ public final class GrpcUtil {
*/
public static final String MESSAGE_ENCODING = "grpc-encoding";

/**
* The accepted message encodings (i.e. compression) that can be used in the stream.
*/
public static final String MESSAGE_ACCEPT_ENCODING = "grpc-accept-encoding";

/**
* The default maximum uncompressed size (in bytes) for inbound messages. Defaults to 100 MiB.
*/
Expand Down
128 changes: 128 additions & 0 deletions core/src/test/java/io/grpc/internal/ClientCallImplTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package io.grpc.internal;

import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.MoreExecutors;

import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.Codec;
import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.Marshaller;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.internal.ClientCallImpl.ClientTransportProvider;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;

import java.io.InputStream;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

/**
* Test for {@link ClientCallImpl}.
*/
@RunWith(JUnit4.class)
public class ClientCallImplTest {

private final SerializingExecutor executor =
new SerializingExecutor(MoreExecutors.newDirectExecutorService());
private final ScheduledExecutorService deadlineCancellationExecutor =
Executors.newScheduledThreadPool(0);

@Before
public void setUp() {
DecompressorRegistry.register(new Codec.Gzip(), true);
}

@Test
public void advertisedEncodingsAreSent() {
MethodDescriptor<Void, Void> descriptor = MethodDescriptor.create(
MethodType.UNARY,
"service/method",
new TestMarshaller<Void>(),
new TestMarshaller<Void>());
final ClientTransport transport = mock(ClientTransport.class);
ClientTransportProvider provider = new ClientTransportProvider() {
@Override
public ClientTransport get() {
return transport;
}
};
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
descriptor,
executor,
CallOptions.DEFAULT,
provider,
deadlineCancellationExecutor);

call.start(new TestClientCallListener<Void>(), new Metadata());

ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
verify(transport).newStream(
eq(descriptor), metadataCaptor.capture(), isA(ClientStreamListener.class));
Metadata actual = metadataCaptor.getValue();

Set<String> acceptedEncodings =
ImmutableSet.copyOf(actual.getAll(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY));
assertEquals(DecompressorRegistry.getAdvertisedMessageEncodings(), acceptedEncodings);
}

private static class TestMarshaller<T> implements Marshaller<T> {
@Override
public InputStream stream(T value) {
return null;
}

@Override
public T parse(InputStream stream) {
return null;
}
}

private static class TestClientCallListener<T> extends ClientCall.Listener<T> {
}
}

0 comments on commit 4572b19

Please sign in to comment.